Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: weird behavior with collect continuity #648

Open
jordanrfrazier opened this issue Aug 10, 2023 · 3 comments
Open

bug: weird behavior with collect continuity #648

jordanrfrazier opened this issue Aug 10, 2023 · 3 comments
Labels
bug Something isn't working sparrow

Comments

@jordanrfrazier
Copy link
Collaborator

jordanrfrazier commented Aug 10, 2023

Description
Test that splits up each index because we can’t print structs in the csv results:

#[tokio::test]
async fn test_collect_struct_since_hourly() {
    // TODO: The results here are weird, because `collect` is latched. I don't think I'd expect
    // the results we have here, but it's possible they're technically in line with what we expect
    // given our continuity rules. We should revisit this.
    insta::assert_snapshot!(QueryFixture::new("{ 
        b: Collect.b,
        f0: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(0)).b | when(is_valid($input)),
        f1: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(1)).b | when(is_valid($input)),
        f2: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(2)).b | when(is_valid($input)),
        f3: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(3)).b | when(is_valid($input)),
        f4: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(4)).b | when(is_valid($input))
    }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###"
    _time,_subsort,_key_hash,_key,b,f0,f1,f2,f3,f4
    1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true,true,,,,
    1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false,true,false,,,
    1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,,true,false,,,
    1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,false,true,false,,false,
    1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,true,true,false,,false,true
    1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,true,true,false,,false,true
    1996-12-20T01:00:00.000000000,18446744073709551615,12960666915911099378,A,,true,false,,false,true
    1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false,false,,,,
    1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,,false,,,,
    1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,true,false,,true,,
    1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false,false,,true,false,
    1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,true,false,,true,false,true
    1996-12-21T01:00:00.000000000,18446744073709551615,2867199309159137213,B,,false,,true,false,true
    1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,true,false,true
    1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,true,false,true
    1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,true,true,,,,
    1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,,,
    1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,true,,
    1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,true,true,
    "###);
}

But the problem is that now that collect is as-of/continuous , we’re storing the last non-null value for each individual index.

It may be “technically correct” based on our continuity rules. See the compute plan — each b field ref is going to a separate select and merge because of the is_valid, so each merge is keeping the latched state.

Problem:

Specifically the two lines:

    1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,true,false,true
    1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,true,false,true

should have nulls for the last 3 columns, I'd expect. The problem is that the last non-null values for each are being saved, as if it were pushed to a last (which, it almost technically is, since it's going in latched spread).

@jordanrfrazier jordanrfrazier added bug Something isn't working sparrow labels Aug 10, 2023
@bjchambers
Copy link
Collaborator

bjchambers commented Aug 10, 2023

I agree it's weird. I'm somewhat suspicious that it may be caused by the multiple merges. Some thoughts / questions:

  1. Why would you expect the last 3 columns to be null? I'd expect the last 4 or 5 to be null. Rationale: the collect is "since(hourly())", which means it should reset at 2:00", right? Or is this because it's outputting and inclusive of 2:00? IN that case, I guess it makes sense -- the 3 values since (and including) 1:00 were [null, true, null] (although it raises the question of why 1:00 would be included in the collection up to 1:00 if 2:00 is included in the collection starting at 2:00 -- seems like every hour would be included twice).
  2. Does the weirdness go away if you just do b and f0? What about b and f3 or f4? Basically, trying to determine if the issue goes away if there is only one merge.
  3. Similarly, can you reproduce with last and/or some combination of shifts and lasts to get different domains (multiple merges)? Trying to see if we can rule out collect here.

@jordanrfrazier
Copy link
Collaborator Author

final_with_filter

This issue arose from attempting to hack a fix for trailing windows by shifting the input forward, merging that with the original, feeding that to the collect, then filtering on original non null input. The above plan shows the query with hack:

        kd.record(
            {
                "m": m,
                "collect_m": m.collect(
                    max=None, window=kd.windows.Trailing(timedelta(seconds=1))
                ),
            }

-------------------------------------------------------------------------------------------

        is_input = input.is_not_null()
        shift_by = window.duration
        input_shift = input.shift_by(shift_by)
        trailing_ns = int(window.duration.total_seconds() * 1e9)
        input = record({"input": input, "input_shift": input_shift}).col("input")

        # HACK: Use null predicate and number of nanoseconds to encode trailing windows.
        collect = Timestream._call(op, input, *args, None, trailing_ns)
        return collect.filter(is_input)
     

And the weird behavior we see is that collect in operation 2 is correctly clearing the buffers and producing an empty, non-null list. That output is sent to a select, then to the final merge (operation4), where it is correctly latching values. However, the select after the collect is filtering on the is_valid(input) . And because the input was null for several rows, we’re not sending the empty list rows to the final merge , so it’s not latching the correct new state (an empty list).

This behavior is strange in a sense because at first glance it may seem like adding a filter after any aggregation would cause it to not obey interpolation rules further downstream. However, the reason this is okay and expected is that when (or select / filter) always produces discrete values.

Thus, given the (pseudocode) example:

Foo.sum() -> [Hour 1: 10, Hour 2: 20, Hour 3: 30, Hour 4: 40]
Foo.sum(window=Since(hourly())).filter(on_odd_hours) -> [Hour 1: 10, Hour 3: 30]

in { sum, filtered }

->  
[ 
  { sum: 10, filtered: 10}
  { sum: 20, filtered: null}
  { sum: 30, filtered: 30}
  { sum: 40, filtered: null}
]

@jordanrfrazier
Copy link
Collaborator Author

jordanrfrazier commented Aug 15, 2023

This is a behavior that is likely correct, but possible to have subtle and meaningful impact on complex queries. We should discuss ways to alleviate this risk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working sparrow
Projects
None yet
Development

No branches or pull requests

2 participants