Skip to content

Commit

Permalink
test: add another test
Browse files Browse the repository at this point in the history
  • Loading branch information
JeanArhancet committed Aug 1, 2024
1 parent 25f05a6 commit ab971ce
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions influxdb3_server/src/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,4 +894,78 @@ mod tests {
// Ensure we received exactly 4 responses
assert_eq!(counter, 4, "Expected 4 responses, but received {}", counter);
}

#[tokio::test]
async fn test_partial_flag_one_stream() {
let meta = serde_json::to_string(&json!({
"measurement_column_index": 0,
"tag_key_columns": [],
}))
.unwrap();
let schema = Arc::new(Schema::new_with_metadata(
vec![
Field::new("iox::measurement", DataType::Utf8, false),
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("value", DataType::Utf8, true),
],
HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]),
));
let record_batch_0 = Ok(RecordBatch::try_new(
Arc::clone(&schema),
vec![
strs(&[Some("cpu"), Some("cpu")]),
times(&[1157082300000000000, 1157082310000000000]),
strs(&[Some("cpu0"), Some("cpu0")]),
],
)
.unwrap());

let batch = vec![record_batch_0];
let schema = batch[0].as_ref().unwrap().schema();
let input_stream = stream::iter(batch);
let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
Box::pin(input_stream),
));
let chunk_size = Some(1);
let mut query_response_stream =
QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap();

// Counters for assertions
let mut counter = 0;

while let Some(response) = query_response_stream.next().await {
match response {
Ok(resp) => {
println!("Received response: {:?}", resp);

match counter {
0 => {
assert!(resp.results[0].partial.unwrap());
assert!(resp.results[0].series[0].partial.unwrap());
assert_eq!(resp.results[0].series[0].name, "cpu");
assert_eq!(resp.results[0].series[0].values.len(), 1);
}
1 => {
assert_eq!(resp.results[0].partial, None);
assert_eq!(resp.results[0].series[0].partial, None);
assert_eq!(resp.results[0].series[0].name, "cpu");
assert_eq!(resp.results[0].series[0].values.len(), 1);
}
_ => (),
}

counter += 1;
}
Err(err) => panic!("Error while polling stream: {:?}", err),
}
}

// Ensure we received exactly 4 responses
assert_eq!(counter, 2, "Expected 2 responses, but received {}", counter);
}
}

0 comments on commit ab971ce

Please sign in to comment.