diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs index 1fdf85f03..13acf6c4f 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs @@ -249,15 +249,23 @@ public override int CurrentlyBufferedInputCount protected override void UpdatePointers() { + // Restore orderedKeysDictionary, as it is not Serialized/Deserialized int iter = FastDictionary.IteratorStart; var temp = new List>(); while (this.lastDataTimeDictionary.Iterate(ref iter)) { + var partitionKey = this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key); + if (this.stateDictionary.entries[iter].value.Any()) { temp.Add(Tuple.Create( this.lastDataTimeDictionary.entries[iter].key, - Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value), this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key))); + Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value), + partitionKey)); + } + else if (!this.orderedKeysDictionary.ContainsKey(partitionKey)) + { + this.orderedKeysDictionary.Add(partitionKey, new LinkedList()); } } foreach (var item in temp.OrderBy(o => o.Item2)) diff --git a/Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs b/Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs new file mode 100644 index 000000000..8f94b9f63 --- /dev/null +++ b/Sources/Test/SimpleTesting/Partitioned/PartitionedStreamCheckpointTests.cs @@ -0,0 +1,101 @@ +// ********************************************************************* +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License +// ********************************************************************* +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using Microsoft.StreamProcessing; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace SimpleTesting +{ + /* This test case verifies fix for + * Bug 944724: Exception in Trill SessionWindow when Partition (substreams) is used and restored from checkpoint + * + * Cause of the Bug: + * The PartitionedSessionWindowPipe keeps multiple dictionary states. + * One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization. + * On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback. + * During this, the scenario of empty LinkedList value is missed and not restored. + * When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException + */ + [TestClass] + public class PartitionedStreamCheckpointTests : TestWithConfigSettingsWithoutMemoryLeakDetection + { + [TestMethod, TestCategory("Gated")] + public void CheckpointPartitionedSessionWindow() + { + Config.DataBatchSize = 1; + + var data = new PartitionedStreamEvent[] + { + PartitionedStreamEvent.CreatePoint(0, 5, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 8), + PartitionedStreamEvent.CreatePunctuation(0, 11), + PartitionedStreamEvent.CreatePunctuation(0, 14), + PartitionedStreamEvent.CreatePunctuation(0, 17), + PartitionedStreamEvent.CreatePunctuation(0, 21), + PartitionedStreamEvent.CreatePoint(0, 24, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 100), + }; + + var expected = new PartitionedStreamEvent[] + { + PartitionedStreamEvent.CreateStart(0, 5, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 8), + PartitionedStreamEvent.CreateEnd(0, 9, 5, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 11), + PartitionedStreamEvent.CreatePunctuation(0, 14), + PartitionedStreamEvent.CreatePunctuation(0, 17), + PartitionedStreamEvent.CreatePunctuation(0, 21), + PartitionedStreamEvent.CreateStart(0, 24, 1.0), + PartitionedStreamEvent.CreateEnd(0, 28, 24, 1.0), + PartitionedStreamEvent.CreatePunctuation(0, 100), + }; + + // This index represents the point when the checkpoint restore needs to happen to trigger the bug. + const int checkpointIndex = 6; + + var subject = new Subject>(); + var output = new List>(); + var process = CreateQueryContainerForPartitionedStream(subject, output); + + for (int i = 0; i < data.Length; i++) + { + if (i == checkpointIndex) + { + using (var ms = new MemoryStream()) + { + process.Checkpoint(ms); + ms.Seek(0, SeekOrigin.Begin); + + subject = new Subject>(); + process = CreateQueryContainerForPartitionedStream(subject, output, ms); + } + } + + subject.OnNext(data[i]); + } + + Assert.IsTrue(expected.SequenceEqual(output)); + } + + private Process CreateQueryContainerForPartitionedStream( + Subject> subject, + List> output, + Stream stream = null) + { + var qc = new QueryContainer(); + var input = qc.RegisterInput(subject); + var streamableOutput = input.SessionTimeoutWindow(4, 5).Sum(o => o); + var egress = qc.RegisterOutput(streamableOutput).ForEachAsync(o => output.Add(o)); + var process = qc.Restore(stream); + + return process; + } + } +}