-
Notifications
You must be signed in to change notification settings - Fork 53
/
file_reader.go
361 lines (309 loc) · 11.9 KB
/
file_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
package goparquet
import (
"context"
"fmt"
"io"
"runtime"
"github.com/fraugster/parquet-go/parquet"
"github.com/fraugster/parquet-go/parquetschema"
)
// FileReader is used to read data from a parquet file. Always use NewFileReader or a related
// function to create such an object.
type FileReader struct {
meta *parquet.FileMetaData
schemaReader *schema
reader io.ReadSeeker
rowGroupPosition int
currentRecord int64
skipRowGroup bool
ctx context.Context
allocTracker *allocTracker
}
// NewFileReaderWithOptions creates a new FileReader. You can provide a list of FileReaderOptions to configure
// aspects of its behaviour, such as limiting the columns to read, the file metadata to use, or the
// context to use. For a full list of options, please see the type FileReaderOption.
func NewFileReaderWithOptions(r io.ReadSeeker, readerOptions ...FileReaderOption) (*FileReader, error) {
opts := newFileReaderOptions()
if err := opts.apply(readerOptions); err != nil {
return nil, err
}
var err error
if opts.metaData == nil {
opts.metaData, err = ReadFileMetaData(r, true)
if err != nil {
return nil, fmt.Errorf("reading file meta data failed: %w", err)
}
}
schema, err := makeSchema(opts.metaData, opts.validateCRC, opts.allocTracker)
if err != nil {
return nil, fmt.Errorf("creating schema failed: %w", err)
}
schema.SetSelectedColumns(opts.columns...)
// Reset the reader to the beginning of the file
if _, err := r.Seek(4, io.SeekStart); err != nil {
return nil, err
}
return &FileReader{
meta: opts.metaData,
schemaReader: schema,
reader: r,
ctx: opts.ctx,
allocTracker: opts.allocTracker,
}, nil
}
// FileReaderOption is an option that can be passed on to NewFileReaderWithOptions when
// creating a new parquet file reader.
type FileReaderOption func(*fileReaderOptions) error
type fileReaderOptions struct {
metaData *parquet.FileMetaData
ctx context.Context
columns []ColumnPath
validateCRC bool
allocTracker *allocTracker
}
func newFileReaderOptions() *fileReaderOptions {
return &fileReaderOptions{ctx: context.Background()}
}
func (o *fileReaderOptions) apply(opts []FileReaderOption) error {
for _, f := range opts {
if err := f(o); err != nil {
return err
}
}
return nil
}
// WithReaderContext configures a custom context for the file reader. If none
// is provided, context.Background() is used as a default.
func WithReaderContext(ctx context.Context) FileReaderOption {
return func(opts *fileReaderOptions) error {
opts.ctx = ctx
return nil
}
}
// WithFileMetaData allows you to provide your own file metadata. If none
// is set with this option, the file reader will read it from the parquet
// file.
func WithFileMetaData(metaData *parquet.FileMetaData) FileReaderOption {
return func(opts *fileReaderOptions) error {
opts.metaData = metaData
return nil
}
}
// WithColumns limits the columns which are read. If none are set, then
// all columns will be read by the parquet file reader.
//
// Deprecated: use WithColumnPaths instead.
func WithColumns(columns ...string) FileReaderOption {
return func(opts *fileReaderOptions) error {
parsedCols := []ColumnPath{}
for _, c := range columns {
parsedCols = append(parsedCols, parseColumnPath(c))
}
opts.columns = parsedCols
return nil
}
}
// WithColumnPaths limits the columns which are read. If none are set, then
// all columns will be read by the parquet file reader.
func WithColumnPaths(columns ...ColumnPath) FileReaderOption {
return func(opts *fileReaderOptions) error {
opts.columns = columns
return nil
}
}
// WithCRC32Validation allows you to configure whether CRC32 page checksums will
// be validated when they're read. By default, checksum validation is disabled.
func WithCRC32Validation(enable bool) FileReaderOption {
return func(opts *fileReaderOptions) error {
opts.validateCRC = enable
return nil
}
}
// WithMaximumMemorySize allows you to configure a maximum limit in terms of memory
// that shall be allocated when reading this file. If the amount of memory gets over
// this limit, further function calls will fail.
func WithMaximumMemorySize(maxSizeBytes uint64) FileReaderOption {
return func(opts *fileReaderOptions) error {
opts.allocTracker = newAllocTracker(maxSizeBytes)
return nil
}
}
// NewFileReader creates a new FileReader. You can limit the columns that are read by providing
// the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read.
func NewFileReader(r io.ReadSeeker, columns ...string) (*FileReader, error) {
return NewFileReaderWithOptions(r, WithColumns(columns...))
}
// NewFileReaderWithContext creates a new FileReader. You can limit the columns that are read by providing
// the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read. The provided context.Context overrides the default context (which is a context.Background())
// for use in other methods of the *FileReader type.
//
// Deprecated: use the function NewFileReaderWithOptions and the option WithContext instead.
func NewFileReaderWithContext(ctx context.Context, r io.ReadSeeker, columns ...string) (*FileReader, error) {
return NewFileReaderWithOptions(r, WithReaderContext(ctx), WithColumns(columns...))
}
// NewFileReaderWithMetaData creates a new FileReader with custom file meta data. You can limit the columns that
// are read by providing the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read.
//
// Deprecated: use the function NewFileReaderWithOptions and the option WithFileMetaData instead.
func NewFileReaderWithMetaData(r io.ReadSeeker, meta *parquet.FileMetaData, columns ...string) (*FileReader, error) {
return NewFileReaderWithOptions(r, WithFileMetaData(meta), WithColumns(columns...))
}
func (*FileReader) recover(errp *error) {
if e := recover(); e != nil {
if _, ok := e.(runtime.Error); ok {
panic(e)
}
*errp = e.(error)
}
}
// SeekToRowGroup seeks to a particular row group, identified by its index.
func (f *FileReader) SeekToRowGroup(rowGroupPosition int) (err error) {
defer f.recover(&err)
return f.SeekToRowGroupWithContext(f.ctx, rowGroupPosition)
}
// SeekToRowGroupWithContext seeks to a particular row group, identified by its index.
func (f *FileReader) SeekToRowGroupWithContext(ctx context.Context, rowGroupPosition int) (err error) {
defer f.recover(&err)
f.rowGroupPosition = rowGroupPosition - 1
f.currentRecord = 0
return f.readRowGroup(ctx)
}
// readRowGroup read the next row group into memory
func (f *FileReader) readRowGroup(ctx context.Context) error {
if len(f.meta.RowGroups) <= f.rowGroupPosition {
return io.EOF
}
f.rowGroupPosition++
return f.readRowGroupData(ctx) //, f.reader, f.schemaReader, f.meta.RowGroups[f.rowGroupPosition-1])
}
// CurrentRowGroup returns information about the current row group.
func (f *FileReader) CurrentRowGroup() *parquet.RowGroup {
if f == nil || f.meta == nil || f.meta.RowGroups == nil || f.rowGroupPosition-1 >= len(f.meta.RowGroups) {
return nil
}
return f.meta.RowGroups[f.rowGroupPosition-1]
}
// RowGroupCount returns the number of row groups in the parquet file.
func (f *FileReader) RowGroupCount() int {
return len(f.meta.RowGroups)
}
// NumRows returns the number of rows in the parquet file. This information is directly taken from
// the file's meta data.
func (f *FileReader) NumRows() int64 {
return f.meta.NumRows
}
func (f *FileReader) advanceIfNeeded(ctx context.Context) error {
if f.rowGroupPosition == 0 || f.currentRecord >= f.schemaReader.rowGroupNumRecords() || f.skipRowGroup {
if err := f.readRowGroup(ctx); err != nil {
f.skipRowGroup = true
return err
}
f.currentRecord = 0
f.skipRowGroup = false
}
return nil
}
// RowGroupNumRows returns the number of rows in the current RowGroup.
func (f *FileReader) RowGroupNumRows() (int64, error) {
return f.RowGroupNumRowsWithContext(f.ctx)
}
// RowGroupNumRowsWithContext returns the number of rows in the current RowGroup.
func (f *FileReader) RowGroupNumRowsWithContext(ctx context.Context) (numRecords int64, err error) {
defer f.recover(&err)
if err := f.advanceIfNeeded(ctx); err != nil {
return 0, err
}
return f.schemaReader.rowGroupNumRecords(), nil
}
// NextRow reads the next row from the parquet file. If required, it will load the next row group.
func (f *FileReader) NextRow() (map[string]interface{}, error) {
return f.NextRowWithContext(f.ctx)
}
// NextRowWithContext reads the next row from the parquet file. If required, it will load the next row group.
func (f *FileReader) NextRowWithContext(ctx context.Context) (row map[string]interface{}, err error) {
defer f.recover(&err)
if err := f.advanceIfNeeded(ctx); err != nil {
return nil, err
}
f.currentRecord++
return f.schemaReader.getData()
}
// SkipRowGroup skips the currently loaded row group and advances to the next row group.
func (f *FileReader) SkipRowGroup() {
f.skipRowGroup = true
}
// PreLoad is used to load the row group if required. It does nothing if the row group is already loaded.
func (f *FileReader) PreLoad() error {
return f.PreLoadWithContext(f.ctx)
}
// PreLoadWithContext is used to load the row group if required. It does nothing if the row group is already loaded.
func (f *FileReader) PreLoadWithContext(ctx context.Context) (err error) {
defer f.recover(&err)
return f.advanceIfNeeded(ctx)
}
// MetaData returns a map of metadata key-value pairs stored in the parquet file.
func (f *FileReader) MetaData() map[string]string {
return keyValueMetaDataToMap(f.meta.KeyValueMetadata)
}
// ColumnMetaData returns a map of metadata key-value pairs for the provided column in the current
// row group. The column name has to be provided in its dotted notation.
//
// Deprecated: use ColumnMetaDataPath instead.
func (f *FileReader) ColumnMetaData(colName string) (map[string]string, error) {
return f.ColumnMetaDataByPath(parseColumnPath(colName))
}
// ColumnMetaData returns a map of metadata key-value pairs for the provided column in the current
// row group. The column is provided as ColumnPath.
func (f *FileReader) ColumnMetaDataByPath(path ColumnPath) (metaData map[string]string, err error) {
defer f.recover(&err)
for _, col := range f.CurrentRowGroup().Columns {
if path.Equal(ColumnPath(col.MetaData.PathInSchema)) {
return keyValueMetaDataToMap(col.MetaData.KeyValueMetadata), nil
}
}
return nil, fmt.Errorf("column %q not found", path.flatName())
}
// SetSelectedColumns sets the columns which are read. By default, all columns
// will be read.
//
// Deprecated: use SetSelectedColumnsByPath instead.
func (f *FileReader) SetSelectedColumns(cols ...string) {
parsedCols := []ColumnPath{}
for _, c := range cols {
parsedCols = append(parsedCols, parseColumnPath(c))
}
f.schemaReader.SetSelectedColumns(parsedCols...)
}
func (f *FileReader) SetSelectedColumnsByPath(cols ...ColumnPath) {
f.schemaReader.SetSelectedColumns(cols...)
}
// Columns returns the list of columns.
func (f *FileReader) Columns() []*Column {
return f.schemaReader.Columns()
}
// GetColumnByName returns a column identified by name. If the column doesn't exist,
// the method returns nil.
func (f *FileReader) GetColumnByName(name string) *Column {
return f.schemaReader.GetColumnByName(name)
}
// GetColumnByPath returns a column identified by its path. If the column doesn't exist,
// nil is returned.
func (f *FileReader) GetColumnByPath(path ColumnPath) *Column {
return f.schemaReader.GetColumnByPath(path)
}
// GetSchemaDefinition returns the current schema definition.
func (f *FileReader) GetSchemaDefinition() *parquetschema.SchemaDefinition {
return f.schemaReader.GetSchemaDefinition()
}
func keyValueMetaDataToMap(kvMetaData []*parquet.KeyValue) map[string]string {
data := make(map[string]string)
for _, kv := range kvMetaData {
if kv.Value != nil {
data[kv.Key] = *kv.Value
}
}
return data
}