Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Find a way to manipulate ProcessingTime #29

Open
rami-alisawi opened this issue Sep 25, 2016 · 4 comments
Open

Find a way to manipulate ProcessingTime #29

rami-alisawi opened this issue Sep 25, 2016 · 4 comments

Comments

@rami-alisawi
Copy link

I have a GlobalWindow with a simple class TimedTrigger extends Trigger<Value,GlobalWindow>.
I registerProcessingTimeTimer
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()+timer);

But it never gets triggered ( unless I put the 'timer' value to the unrealistic value of 2 milliseconds). I have tired to increase the timeout value with no effect.

@lofifnc
Copy link
Contributor

lofifnc commented Sep 25, 2016 via email

@rami-alisawi
Copy link
Author

Thanks for fast and elaborated answer. However, regarding this statement

... actually wait the required amount of time to trigger the window, this could
potentially lead to very long running tests.

I have my timer passed as parameter so for testing I can pass a much shorter timer. I realise that it would mean that the record emitting needs to happen all at once, but that's a limitation I can live with.
If I want to achieve this, I doubt that it is as simple as removing
testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Any pointers to what more I have to modify.

  • The sleep addition to keep the pipeline running sounds like a good workaround, but I could not get it to work. I added Thread.sleep with an amount that is more than the window timer but less than the spector timeout and the timer did not trigger. Any ideas?

@eppdot
Copy link

eppdot commented Nov 14, 2016

It might be useful to use a dedicated clock object inside flink or flink-spector to simulate the time.

@lofifnc
Copy link
Contributor

lofifnc commented Nov 15, 2016

A clock object inside Flink which could be manipulated from the outside and represents processing time would be a really neat feature for testing. In a way EventTime represents such a mechanism, which is why it is used for flink-spector. The downside is that is would be nice to test the relationship of event time and processing time.

Spark Streaming and Storm have a clock object which can be played with during local execution and testing. This is realized by running the pipelines in a specialized local runtime environment. When Flink runs locally it will start up a cluster and run in that cluster completely isolated from the user.
The processing time windows in Flink directly access the system time of the machine they are running on. To implement a clock which can be manipulated would mean to replace the System.currentTimeMillis() call inside Flink by an object which can be somehow interchanged during tests. But this object would not be accessible during tests, which means you would have to implement a messaging system which let's you change the time of that object. This would be a pretty big intrusion into one of Flink's core functionalities.

What I have been thinking about is writing a own TimeWindow implementation which you would have to use during tests containing a clock you which can manipulated. The downside is that you would have to replace the normal timeWindow() calls of your code during tests. Also any other user code using some form of system time is still not testable.

I'm not sure if there is a better way in doing this. I personally dropped the idea of just waiting for the clock to progress. The emitting of a record and when it will be processed by the window can not be synced up.

@lofifnc lofifnc changed the title registerProcessingTimeTimer not working Find a way to manipulate ProcessingTime Jul 17, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants