Skip to content

Commit 4307fa6

Browse files
committed
8253505: JFR: onFlush invoked out of order with a sorted event stream
Reviewed-by: mgronlun
1 parent 0148adf commit 4307fa6

File tree

6 files changed

+54
-50
lines changed

6 files changed

+54
-50
lines changed

src/jdk.jfr/share/classes/jdk/jfr/consumer/RecordingFile.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public RecordedEvent readEvent() throws IOException {
103103
isLastEventInChunk = false;
104104
RecordedEvent event = nextEvent;
105105
nextEvent = chunkParser.readEvent();
106+
while (nextEvent == ChunkParser.FLUSH_MARKER) {
107+
nextEvent = chunkParser.readEvent();
108+
}
106109
if (nextEvent == null) {
107110
isLastEventInChunk = true;
108111
findNext();
@@ -251,6 +254,9 @@ private void findNext() throws IOException {
251254
return;
252255
}
253256
nextEvent = chunkParser.readEvent();
257+
while (nextEvent == ChunkParser.FLUSH_MARKER) {
258+
nextEvent = chunkParser.readEvent();
259+
}
254260
}
255261
}
256262

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,14 @@ protected final Runnable getFlushOperation() {
233233
return flushOperation;
234234
}
235235

236+
237+
final protected void onFlush() {
238+
Runnable r = getFlushOperation();
239+
if (r != null) {
240+
r.run();
241+
}
242+
}
243+
236244
private void startInternal(long startNanos) {
237245
synchronized (streamConfiguration) {
238246
if (streamConfiguration.started) {
@@ -291,7 +299,7 @@ public void onMetadata(Consumer<MetadataEvent> action) {
291299
streamConfiguration.addMetadataAction(action);
292300
}
293301

294-
protected final void emitMetadataEvent(ChunkParser parser) {
302+
protected final void onMetadata(ChunkParser parser) {
295303
if (parser.hasStaleMetadata()) {
296304
if (dispatcher.hasMetadataHandler()) {
297305
List<EventType> ce = parser.getEventTypes();

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/ChunkParser.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ private boolean is(int flags) {
9191
return (mask & flags) != 0;
9292
}
9393
}
94+
public final static RecordedEvent FLUSH_MARKER = JdkJfrConsumer.instance().newRecordedEvent(null, null, 0L, 0L);
9495

9596
private static final long CONSTANT_POOL_TYPE_ID = 1;
9697
private static final String CHUNKHEADER = "jdk.types.ChunkHeader";
@@ -104,7 +105,6 @@ private boolean is(int flags) {
104105
private LongMap<Parser> parsers;
105106
private boolean chunkFinished;
106107

107-
private Runnable flushOperation;
108108
private ParserConfiguration configuration;
109109
private volatile boolean closed;
110110
private MetadataDescriptor previousMetadata;
@@ -194,6 +194,9 @@ void updateConfiguration(ParserConfiguration configuration, boolean resetEventCa
194194
RecordedEvent readStreamingEvent() throws IOException {
195195
long absoluteChunkEnd = chunkHeader.getEnd();
196196
RecordedEvent event = readEvent();
197+
if (event == ChunkParser.FLUSH_MARKER) {
198+
return null;
199+
}
197200
if (event != null) {
198201
return event;
199202
}
@@ -253,8 +256,9 @@ public RecordedEvent readEvent() throws IOException {
253256
// Not accepted by filter
254257
} else {
255258
if (typeId == 1) { // checkpoint event
256-
if (flushOperation != null) {
257-
parseCheckpoint();
259+
if (CheckPointType.FLUSH.is(parseCheckpointType())) {
260+
input.position(pos + size);
261+
return FLUSH_MARKER;
258262
}
259263
} else {
260264
if (typeId != 0) { // Not metadata event
@@ -267,16 +271,11 @@ public RecordedEvent readEvent() throws IOException {
267271
return null;
268272
}
269273

270-
private void parseCheckpoint() throws IOException {
271-
// Content has been parsed previously. This
272-
// is to trigger flush
274+
private byte parseCheckpointType() throws IOException {
273275
input.readLong(); // timestamp
274276
input.readLong(); // duration
275277
input.readLong(); // delta
276-
byte typeFlags = input.readByte();
277-
if (CheckPointType.FLUSH.is(typeFlags)) {
278-
flushOperation.run();
279-
}
278+
return input.readByte();
280279
}
281280

282281
private boolean awaitUpdatedHeader(long absoluteChunkEnd, long filterEnd) throws IOException {
@@ -451,10 +450,6 @@ public boolean isChunkFinished() {
451450
return chunkFinished;
452451
}
453452

454-
public void setFlushOperation(Runnable flushOperation) {
455-
this.flushOperation = flushOperation;
456-
}
457-
458453
public long getChunkDuration() {
459454
return chunkHeader.getDurationNanos();
460455
}

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/Dispatcher.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@
3838

3939
final class Dispatcher {
4040

41-
public final static RecordedEvent FLUSH_MARKER = JdkJfrConsumer.instance().newRecordedEvent(null, null, 0L, 0L);
42-
4341
final static class EventDispatcher {
4442
private final static EventDispatcher[] NO_DISPATCHERS = new EventDispatcher[0];
4543

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,14 @@ protected void processRecursionSafe() throws IOException {
143143
long filterEnd = disp.endTime != null ? disp.endNanos : Long.MAX_VALUE;
144144

145145
while (!isClosed()) {
146-
emitMetadataEvent(currentParser);
146+
onMetadata(currentParser);
147147
while (!isClosed() && !currentParser.isChunkFinished()) {
148148
disp = dispatcher();
149149
if (disp != lastDisp) {
150150
ParserConfiguration pc = disp.parserConfiguration;
151151
pc.filterStart = filterStart;
152152
pc.filterEnd = filterEnd;
153153
currentParser.updateConfiguration(pc, true);
154-
currentParser.setFlushOperation(getFlushOperation());
155154
lastDisp = disp;
156155
}
157156
if (disp.parserConfiguration.isOrdered()) {
@@ -221,9 +220,10 @@ private void processOrdered(Dispatcher c) throws IOException {
221220
}
222221
sortedCache[index++] = e;
223222
}
224-
emitMetadataEvent(currentParser);
223+
onMetadata(currentParser);
225224
// no events found
226225
if (index == 0 && currentParser.isChunkFinished()) {
226+
onFlush();
227227
return;
228228
}
229229
// at least 2 events, sort them
@@ -233,18 +233,19 @@ private void processOrdered(Dispatcher c) throws IOException {
233233
for (int i = 0; i < index; i++) {
234234
c.dispatch(sortedCache[i]);
235235
}
236+
onFlush();
236237
return;
237238
}
238239

239240
private boolean processUnordered(Dispatcher c) throws IOException {
240241
while (true) {
241242
RecordedEvent e = currentParser.readStreamingEvent();
242243
if (e == null) {
243-
emitMetadataEvent(currentParser);
244+
onFlush();
244245
return true;
245-
} else {
246-
c.dispatch(e);
247246
}
247+
onMetadata(currentParser);
248+
c.dispatch(e);
248249
}
249250
}
250251

src/jdk.jfr/share/classes/jdk/jfr/internal/consumer/EventFileStream.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.nio.file.Path;
3030
import java.security.AccessControlContext;
31+
import java.time.Duration;
3132
import java.util.Arrays;
3233
import java.util.Collections;
3334
import java.util.Comparator;
@@ -87,7 +88,7 @@ protected void process() throws IOException {
8788

8889
currentParser = new ChunkParser(input, disp.parserConfiguration);
8990
while (!isClosed()) {
90-
emitMetadataEvent(currentParser);
91+
onMetadata(currentParser);
9192
if (currentParser.getStartNanos() > end) {
9293
close();
9394
return;
@@ -96,7 +97,6 @@ protected void process() throws IOException {
9697
disp.parserConfiguration.filterStart = start;
9798
disp.parserConfiguration.filterEnd = end;
9899
currentParser.updateConfiguration(disp.parserConfiguration, true);
99-
currentParser.setFlushOperation(getFlushOperation());
100100
if (disp.parserConfiguration.isOrdered()) {
101101
processOrdered(disp);
102102
} else {
@@ -116,46 +116,42 @@ private void processOrdered(Dispatcher c) throws IOException {
116116
}
117117
RecordedEvent event;
118118
int index = 0;
119-
while (true) {
120-
event = currentParser.readEvent();
121-
if (event == Dispatcher.FLUSH_MARKER) {
122-
emitMetadataEvent(currentParser);
123-
dispatchOrdered(c, index);
124-
index = 0;
125-
continue;
126-
}
127-
128-
if (event == null) {
129-
emitMetadataEvent(currentParser);
130-
dispatchOrdered(c, index);
131-
return;
119+
while (!currentParser.isChunkFinished()) {
120+
while ((event = currentParser.readStreamingEvent()) != null) {
121+
if (index == cacheSorted.length) {
122+
RecordedEvent[] tmp = cacheSorted;
123+
cacheSorted = new RecordedEvent[2 * tmp.length];
124+
System.arraycopy(tmp, 0, cacheSorted, 0, tmp.length);
125+
}
126+
cacheSorted[index++] = event;
132127
}
133-
if (index == cacheSorted.length) {
134-
RecordedEvent[] tmp = cacheSorted;
135-
cacheSorted = new RecordedEvent[2 * tmp.length];
136-
System.arraycopy(tmp, 0, cacheSorted, 0, tmp.length);
137-
}
138-
cacheSorted[index++] = event;
128+
dispatchOrdered(c, index);
129+
index = 0;
139130
}
140131
}
141132

142133
private void dispatchOrdered(Dispatcher c, int index) {
134+
onMetadata(currentParser);
143135
Arrays.sort(cacheSorted, 0, index, EVENT_COMPARATOR);
144136
for (int i = 0; i < index; i++) {
145137
c.dispatch(cacheSorted[i]);
146138
}
139+
onFlush();
147140
}
148141

149142
private void processUnordered(Dispatcher c) throws IOException {
143+
onMetadata(currentParser);
150144
while (!isClosed()) {
151-
RecordedEvent event = currentParser.readEvent();
145+
RecordedEvent event = currentParser.readStreamingEvent();
152146
if (event == null) {
153-
emitMetadataEvent(currentParser);
154-
return;
155-
}
156-
if (event != Dispatcher.FLUSH_MARKER) {
157-
c.dispatch(event);
147+
onFlush();
148+
if (currentParser.isChunkFinished()) {
149+
return;
150+
}
151+
continue;
158152
}
153+
onMetadata(currentParser);
154+
c.dispatch(event);
159155
}
160156
}
161157
}

0 commit comments

Comments
 (0)