Skip to content

Commit 17e9cd9

Browse files
XXX: Implement test case with artificial time delay.
1 parent cdf5f3c commit 17e9cd9

File tree

1 file changed

+45
-0
lines changed
  • experimental/iterators/test/python/dialects/iterators

1 file changed

+45
-0
lines changed

experimental/iterators/test/python/dialects/iterators/arrow.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import ctypes
44
import os
5+
import sys
56
from tempfile import NamedTemporaryFile
7+
import time
68

79
import numpy as np
810
import pandas as pd
@@ -249,3 +251,46 @@ def testArrowCSVInput():
249251
# CHECK-NEXT: (10, 510, 1010, 1510, 2510, 3010)
250252
# CHECK-NEXT: (35, 535, 1035, 1535, 2535, 3035)
251253
sum_batches_elementwise_with_iterators(reader)
254+
255+
256+
# Test case: Read from a sequence of Arrow arrays/record batches (produced by a
257+
# Python generator).
258+
259+
260+
# Create a generator that produces single-row record batches with increasing
261+
# numbers with an artificial delay of one second after each of them. Since each
262+
# generated record batch immediately produces output, this visually demonstrate
263+
# that the consumption by the MLIR-based iterators interleaves with the
264+
# Python-based production of the record batches in the stream.
265+
def generate_batches_with_delay(schema: pa.Schema) -> None:
266+
for i in range(5):
267+
arrays = [
268+
pa.array(np.array([i], field.type.to_pandas_dtype()))
269+
for field in schema
270+
]
271+
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
272+
yield batch
273+
# Sleep only when a TTY is attached (in order not to delay unit tests).
274+
if sys.stdout.isatty():
275+
time.sleep(1)
276+
277+
278+
# CHECK-LABEL: TEST: testGeneratorInput
279+
@run
280+
def testGeneratorInput():
281+
# Use pyarrow to create an Arrow table in memory.
282+
table = create_test_input()
283+
284+
# Make physically separate batches from the table. (This ensures offset=0).
285+
generator = generate_batches_with_delay(table.schema)
286+
287+
# Create a RecordBatchReader and export it as a C struct.
288+
reader = pa.RecordBatchReader.from_batches(table.schema, generator)
289+
290+
# Hand the reader as an Arrow array stream to the Iterators test program.
291+
# CHECK-NEXT: (0, 0, 0, 0, 0, 0, 0)
292+
# CHECK-NEXT: (1, 1, 1, 1, 1, 1, 1)
293+
# CHECK-NEXT: (2, 2, 2, 2, 2, 2, 2)
294+
# CHECK-NEXT: (3, 3, 3, 3, 3, 3, 3)
295+
# CHECK-NEXT: (4, 4, 4, 4, 4, 4, 4)
296+
sum_batches_elementwise_with_iterators(reader)

0 commit comments

Comments
 (0)