-
Notifications
You must be signed in to change notification settings - Fork 518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mock a core in test mode, refactoring reaper #871
Conversation
This PR seems to do three unrelated things, so I'd like you to break it up into three parts. Changing the influx test is a no-brainer; I'm not sure why they used aphyr.com anyway, since it doesn't actually run an influx server. Mocking a core... not sure yet. The reaper changes I don't really understand--it seems like you've completely gutted the service machinery when you could have the service call riemann.time for scheduling instead--note that riemann.time/cancel can be used to cancel scheduled tasks. That'd save the trouble of a drastic rewrite, and then you can use the same logic to support all kinds of services, instead of having to special-case the reaper. |
21624fd
to
467c79b
Compare
Thanks you for your review @aphyr I just updated my PR. It's probably not perfect yet but i wanted to show you some code. I created a new type of service, So the reaper is now executed in the Riemann scheduler, and it is still a service. We just need to start it in inject!. Regarding the influx test, the test was mocked but it did a dns lookup. I did another PR to fix it. If you want, i can divide this PR into 2 PR:
Or i can do everything in this PR (and will rebase/reorganize commits). |
EveryTaskService renamed ScheduledTaskService ;) |
2d89c13
to
f60317a
Compare
I updated the PR. I modified the services (tcp, udp, websocket servers etc...) to not start in test mode (using the riemann.test/*testing* variable). I now reload/start all services in I will continue to do some tests, but everything seems to work. |
src/riemann/kafka.clj
Outdated
@@ -103,8 +104,9 @@ | |||
(conflict? [this other] | |||
(= opts (:opts other))) | |||
(start! [this] | |||
(info "Starting kafka consumer") | |||
(start-kafka-thread running? core opts)) | |||
(if-not test/*testing* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need tests for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a dumb mistake, sorry about that :/ I wrote tests for tcp/udp/ws/sse transports
I am not sure about how write a test for the kafka service, because it does not expose its state (unlike the tcp server when i can use the :killer
keys in my test).
src/riemann/test.clj
Outdated
call (inject! events) without looking up the streams from their core every | ||
time." | ||
(def ^:dynamic *test-core* | ||
"The core used in test mode" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This namespace is already about testing; is there a reason to call this variable out as test-core
instead of just core
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
src/riemann/test.clj
Outdated
@@ -173,6 +174,8 @@ | |||
[name & body] | |||
`(test/deftest ~name | |||
(binding [*results* (fresh-results @*taps*)] | |||
(if (:index *test-core*) | |||
(index/clear (:index *test-core*))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good idea to leak state out of a test; we should ensure cores are spun up and shut down at the beginning and end of the body, respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, i keep the same core during all tests, i just start/stop all services before/after injecting events (i am forced to do it in inject!
because inject!
call reset-tasks!
and kill the new reaper which is now a task).
I could add index/clear in inject!
, before and after starting and stopping the services. Is it ok for you ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Urrgh, yeah, this brings up the awkward question of what the semantics should be for multiple calls to inject!
in a single deftest. I can see arguments for resetting the core and scheduler in deftest
, but that is a breaking change to inject!
. Maybe best to have a with-test-context
macro introducing taps, a fresh core, and time context, and to have deftest
wrap that around the test body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it a breaking change ? inject!
already reset time to 0 and reset the Riemann tasks running on the scheduler.
It tried to not use the same core during tests and recreate a new one for each test, but there would be a cyclic dependency between riemann.test and riemann.config so i am not sure how to achieve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see arguments for resetting the core and scheduler in deftest, but that is a breaking change to inject!
Moving the core/scheduler/tap setup out to deftest instead of inject! would change the semantics of inject!.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, understood ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe best to have a with-test-context macro introducing taps, a fresh core, and time context, and to have deftest wrap that around the test body.
I could do this (and also remove from inject!
the *result*
binding, the calls to time.controlled
and the core restart), but as you said it will be a breaking change.
If the breaking change is ok for you, i will try to update the PR tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pushed a commit when i moved all the code concerning controlled time and core reset from inject!
to deftest (i haven't created a new macro yet, maybe moving the code in deftest is enough). It seems to work, but it's a breaking change
We should now decide, should we break inject! or not ? ;)
res (atom nil) | ||
expired-stream (riemann.streams/expired | ||
(partial reset! res)) | ||
reaper (reaper 0.1 {:keep-keys [:tags]}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change to the reaper interval here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cf previous answer ;)
res (atom nil) | ||
expired-stream (riemann.streams/expired | ||
(fn [e] (reset! res e))) | ||
reaper (reaper 0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the reaper interval changed? Do test users also need to change their use of reaper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the reaper interval because the reaper was before executed in a threadservice with thread/sleep (which take milliseconds). Now the reaper is executed in the Riemann scheduler which take time in seconds. That's why this test was broken and why i rewrote it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So... this is also a breaking API change to reaper
? How do you think users will feel about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, no change for the user. before, (periodically-expire 10) will call thread/sleep (* 1000 10), now it will just schedule a task every 10 seconds. So no breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the API didn't change... why are you changing the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified the test because the test was failing with (for example for the expires
test):
Fail in expires
expected: (= @res {:service 1, :time 0.011, :state "expired"})
actual: (not
(=
{:service 1, :state "expired", :time 0.010000000000000002}
{:service 1, :time 0.011, :state "expired"}))
I think 0.001 was too small for the Riemann scheduler, advance! was causing rounded values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks you for your review aphyr, much appreciated ;)
src/riemann/kafka.clj
Outdated
@@ -103,8 +104,9 @@ | |||
(conflict? [this other] | |||
(= opts (:opts other))) | |||
(start! [this] | |||
(info "Starting kafka consumer") | |||
(start-kafka-thread running? core opts)) | |||
(if-not test/*testing* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a dumb mistake, sorry about that :/ I wrote tests for tcp/udp/ws/sse transports
I am not sure about how write a test for the kafka service, because it does not expose its state (unlike the tcp server when i can use the :killer
keys in my test).
src/riemann/test.clj
Outdated
call (inject! events) without looking up the streams from their core every | ||
time." | ||
(def ^:dynamic *test-core* | ||
"The core used in test mode" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed.
src/riemann/test.clj
Outdated
@@ -173,6 +174,8 @@ | |||
[name & body] | |||
`(test/deftest ~name | |||
(binding [*results* (fresh-results @*taps*)] | |||
(if (:index *test-core*) | |||
(index/clear (:index *test-core*))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, i keep the same core during all tests, i just start/stop all services before/after injecting events (i am forced to do it in inject!
because inject!
call reset-tasks!
and kill the new reaper which is now a task).
I could add index/clear in inject!
, before and after starting and stopping the services. Is it ok for you ?
res (atom nil) | ||
expired-stream (riemann.streams/expired | ||
(fn [e] (reset! res e))) | ||
reaper (reaper 0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the reaper interval because the reaper was before executed in a threadservice with thread/sleep (which take milliseconds). Now the reaper is executed in the Riemann scheduler which take time in seconds. That's why this test was broken and why i rewrote it.
res (atom nil) | ||
expired-stream (riemann.streams/expired | ||
(partial reset! res)) | ||
reaper (reaper 0.1 {:keep-keys [:tags]}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cf previous answer ;)
In general, I advise using (when-not) instead of (if-not) for single-tail conditionals (e.g. side effects); it'll help you avoid this class of bug. |
Yes, i replaced if-not by when-not. |
Should i do something more ? Once we decide if we change the semantics of inject!, i will clean the PR (modify the inject! code or not, make more tests, merge commits together...). |
6a1a484
to
eb3c458
Compare
I made some minor modifications on this PR (docstring, replaced an if by a when). I think we should only reset the scheduler/time in |
Yeah, I think that's a good call, @mcorbin. Looking forward to merging. :) |
- The reaper is now a task, scheduled in the Riemann scheduler. For that, a new kind of service (ScheduledTaskService) has been created. - A full core is used in test mode. The reaper being a task, it will now work in test mode. - The index is now cleared at the beginning of riemann.test/deftest. - Riemann services are started and stopped in riemann.test/deftest - Unused services in test mode (like the tcp/udp/ws servers) are disabled in test mode.
618c951
to
fa0fd25
Compare
I wrote some configs and tests:
(streams
(fixed-time-window 5 (tap :test-tap)))
(tests
(deftest foo-test
(let [result (inject!
[{:time 1}
{:time 4}
{:time 5}
{:time 6}
{:time 11}])]
(is (= (:test-tap result)
[[{:time 1}
{:time 4}
{:time 5}]
[{:time 6}]])))
(let [result (inject! [{:time 21}])]
(is (= (:test-tap result)
[[{:time 1}
{:time 4}
{:time 5}]
[{:time 6}]
[{:time 11}]
[]])))))
(periodically-expire 10)
(streams
(where (and (not (service #"^riemann ")) (not (expired? event)))
#(info %)
(index))
(where (expired? event)
#(info %)
(tap :expired)))
(tests
(deftest test-expired
(let [result (inject! [{:host "foo" :tags ["t1"] :time 0 :ttl 10}
{:host "bar" :time 15 :ttl 10}
{:host "baz" :time 30 :ttl 10}
{:host "foobar" :time 40 :ttl 10}])]
(is (=(:expired result)
[{:host "foo", :state "expired", :time 20}
{:host "bar", :state "expired", :time 30}]))))
(deftest test-expired2
(let [result (inject! [{:host "foo" :tags ["t1"] :time 0 :ttl 10}
{:host "bar" :time 15 :ttl 10}
{:host "baz" :time 30 :ttl 10}
{:host "foobar" :time 40 :ttl 10}])]
(is (=(:expired result)
[{:host "foo", :state "expired", :time 20}
{:host "bar", :state "expired", :time 30}])))))
reinject: (streams
(where (not (service "reinject"))
(with :service "reinject" reinject))
(where (service "reinject")
(tap :reinject)))
(tests
(deftest reinject-test
(let [result (inject! [{:service "foo"}])]
(is (= (:reinject result)
[{:service "reinject"}]))))) All tests are passing. I also tried querying the index in a test, it works (the code is in #838). async queue: (streams
(async-queue! :foo {:queue-size 1000
:core-pool-size 1
:max-pool-size 1}
(tap :queue-tap)))
(tests
(deftest test-queue
(let [result (inject! [{:host "foo" :service "bar" :time 0}])]
(Thread/sleep 1000)
(is (= (:queue-tap
result)
[{:host "foo" :service "bar" :time 0}]))))) This test sometimes fails, i think it's because sometimes events are not yet consumed from the queue so the tap is empty. Users could wrap the async-queue! in (io) (as it must be done actually because sending events to a stopped async queue produces exception). I also tried to think about other potential bugs/side effects, but I did not find anything. So i think we can merge ;) |
It looks like there's some sort of a nondeterministic bug in expiry still? One of the tests didn't pass on one of the Travis runs. |
The failing test is not in expiry, the log in The failing test is not ;; actual
{:service "riemann executor cat completed rate", :metric 2/5, :tags ["riemann"], :state "ok", :time 5}
{:service "riemann executor cat threads active", :metric 1, :tags ["riemann"], :state "ok", :time 5}
;; expected
{:service "riemann executor cat completed rate", :metric 3/5, :tags ["riemann"], :state "ok", :time 5}
{:service "riemann executor cat threads active", :metric 0, :tags ["riemann"], :state "ok", :time 5} I don't think this is related to my PR, i didn't touch this part of the code. |
Okay, let's go ahead and merge that then. I'm... real nervous about that failing test though; don't remember ever seeing that before. Can you take a look? |
Thanks for the merge ;) I created #894 for the failing test. |
cf #838 for PR description
This PR needs more tests/reviews, but i wanted to show the code now to get feedback on it.