-
Notifications
You must be signed in to change notification settings - Fork 59
Find a way to manipulate ProcessingTime #29
Comments
Hi,
This is a shortcoming of flinkspector. What fs does is it sets the time
characteristic of the StreamExecutionEnviroment in the background to
EventTime. When you insert an window that works explicitly with processing
time flinkspector can not manipulate the time anymore by injecting
timestamps and watermarks. I think I've addressed this issue shortly in the
documentation.
ProcessingTimeWindows work directly with the system time so there's no way
for flinkspector to manipulate the triggering from the outside than
actually wait the required amount of time to trigger the window, this could
potentially lead to very long running tests. Also the emitting of records
would have to be totally in sync with the processing time triggers which
would be very hard to achieve. There is an old branch of flinkspector where
somebody tried this but I've never got an pull request so I'm assuming it
didn't work as aspected.
Long story short: The only workaround I see is to replace the
processingtimewindows during testing with "normal" windows. Or if you just
want to test one window evaluation you can insert a mapper with a sleep in
it to keep the pipeline running as long as necessary for that window to
trigger. But remember to increase the timeout of flinkspector.
Cheers, Alex
|
Thanks for fast and elaborated answer. However, regarding this statement
I have my timer passed as parameter so for testing I can pass a much shorter timer. I realise that it would mean that the record emitting needs to happen all at once, but that's a limitation I can live with.
|
It might be useful to use a dedicated clock object inside flink or flink-spector to simulate the time. |
A clock object inside Flink which could be manipulated from the outside and represents processing time would be a really neat feature for testing. In a way EventTime represents such a mechanism, which is why it is used for flink-spector. The downside is that is would be nice to test the relationship of event time and processing time. Spark Streaming and Storm have a clock object which can be played with during local execution and testing. This is realized by running the pipelines in a specialized local runtime environment. When Flink runs locally it will start up a cluster and run in that cluster completely isolated from the user. What I have been thinking about is writing a own TimeWindow implementation which you would have to use during tests containing a clock you which can manipulated. The downside is that you would have to replace the normal timeWindow() calls of your code during tests. Also any other user code using some form of system time is still not testable. I'm not sure if there is a better way in doing this. I personally dropped the idea of just waiting for the clock to progress. The emitting of a record and when it will be processed by the window can not be synced up. |
I have a GlobalWindow with a simple class TimedTrigger extends Trigger<Value,GlobalWindow>.
I registerProcessingTimeTimer
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()+timer);
But it never gets triggered ( unless I put the 'timer' value to the unrealistic value of 2 milliseconds). I have tired to increase the timeout value with no effect.
The text was updated successfully, but these errors were encountered: