13
13
from __future__ import absolute_import
14
14
import numpy
15
15
import six
16
+ from six .moves import xrange
16
17
from .base import _decode
17
18
from .dataset import Dataset
18
19
from .objectid import DatasetID
21
22
from .h5type import check_dtype
22
23
23
24
25
+ class Cursor ():
26
+ """
27
+ Cursor for retreiving rows from a table
28
+ """
29
+ def __init__ (self , table , query = None , start = None , stop = None ):
30
+ self ._table = table
31
+ self ._query = query
32
+ if start is None :
33
+ self ._start = 0
34
+ else :
35
+ self ._start = start
36
+ if stop is None :
37
+ self ._stop = table .nrows
38
+ else :
39
+ self ._stop = stop
40
+
41
+ def __iter__ (self ):
42
+ """ Iterate over the first axis. TypeError if scalar.
24
43
44
+ BEWARE: Modifications to the yielded data are *NOT* written to file.
45
+ """
46
+ nrows = self ._table .nrows
47
+ # to reduce round trips, grab BUFFER_SIZE items at a time
48
+ # TBD: set buffersize based on size of each row
49
+ BUFFER_SIZE = 1000
50
+
51
+ arr = None
52
+ query_complete = False
53
+
54
+ for indx in xrange (self ._start , self ._stop ):
55
+ if indx % BUFFER_SIZE == 0 :
56
+ # grab another buffer
57
+ read_count = BUFFER_SIZE
58
+ if nrows - indx < read_count :
59
+ read_count = nrows - indx
60
+ if self ._query is None :
61
+
62
+ arr = self ._table [indx :read_count + indx ]
63
+ else :
64
+ # call table to return query result
65
+ if query_complete :
66
+ arr = None # nothing more to fetch
67
+ else :
68
+ arr = self ._table .read_where (self ._query , start = indx , limit = read_count )
69
+ if arr is not None and arr .shape [0 ] < read_count :
70
+ query_complete = True # we've gotten all the rows
71
+ if arr is not None and indx % BUFFER_SIZE < arr .shape [0 ]:
72
+ yield arr [indx % BUFFER_SIZE ]
25
73
26
74
class Table (Dataset ):
27
75
@@ -76,7 +124,7 @@ def read(self, start=None, stop=None, step=None, field=None, out=None):
76
124
77
125
78
126
79
- def read_where (self , condition , condvars = None , field = None , start = None , stop = None , step = None ):
127
+ def read_where (self , condition , condvars = None , field = None , start = None , stop = None , step = None , limit = None ):
80
128
"""Read rows from table using pytable-style condition
81
129
"""
82
130
names = () # todo
@@ -148,10 +196,19 @@ def readtime_dtype(basetype, names):
148
196
try :
149
197
self .log .debug ("params: {}" .format (params ))
150
198
rsp = self .GET (req , params = params )
151
- count = len (rsp ["value" ])
199
+ values = rsp ["value" ]
200
+ count = len (values )
152
201
self .log .info ("got {} rows" .format (count ))
153
202
if count > 0 :
154
- data .extend (rsp ['value' ])
203
+ if limit is None or count + len (data ) <= limit :
204
+ # add in all the data
205
+ data .extend (values )
206
+ else :
207
+ # we've hit the limit for number of rows to return
208
+ add_count = limit - len (data )
209
+ self .log .debug ("adding {} from {} to rrows" .format (add_count , count ))
210
+ data .extend (values [:add_count ])
211
+
155
212
# advance to next page
156
213
cursor += page_size
157
214
except IOError as ioe :
@@ -165,7 +222,7 @@ def readtime_dtype(basetype, names):
165
222
# otherwise, just raise the exception
166
223
self .log .info ("Unexpected exception: {}" .format (ioe .errno ))
167
224
raise ioe
168
- if cursor >= stop :
225
+ if cursor >= stop or limit and len ( data ) == limit :
169
226
self .log .info ("completed iteration, returning: {} rows" .format (len (data )))
170
227
break
171
228
@@ -190,6 +247,13 @@ def readtime_dtype(basetype, names):
190
247
arr = numpy .asscalar (arr )
191
248
192
249
return arr
250
+
251
+ def create_cursor (self , condition = None , start = None , stop = None ):
252
+ """Return a cursor for iteration
253
+ """
254
+ return Cursor (self , query = condition , start = start , stop = stop )
255
+
256
+
193
257
194
258
def append (self , rows ):
195
259
""" Append rows to end of table
0 commit comments