Skip to content

Commit

Permalink
feat: Move out cursor handling
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Sep 19, 2024
1 parent 76a88d1 commit 09a06cd
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 202 deletions.
122 changes: 25 additions & 97 deletions cmd/influx_tools/parquet/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"fmt"
"sort"

"go.uber.org/zap"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap"
)

type row struct {
Expand Down Expand Up @@ -103,112 +104,39 @@ func (b *batcher) next(ctx context.Context) ([]row, error) {
converter = c
}
fieldEnd := models.MaxNanoTime
switch c := cursor.(type) {
case tsdb.IntegerArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.FloatArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}
c, err := newValueCursor(cursor)
if err != nil {
return nil, fmt.Errorf("creating value cursor failed: %w", err)
}

data[s.key][t].fields[fname] = v
fieldEnd = t
for {
// Check if we do still have data
timestamp, ok := c.peek()
if !ok {
break
}
case tsdb.UnsignedArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
timestamp, value := c.next()
v, err := converter(value)
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", value, field, err)
continue
}
case tsdb.BooleanArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
if _, found := data[s.key][timestamp]; !found {
data[s.key][timestamp] = row{
timestamp: timestamp,
tags: tags,
fields: make(map[string]interface{}),
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.StringArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
default:
cursor.Close()
return nil, fmt.Errorf("unexpected type %T", cursor)
data[s.key][timestamp].fields[fname] = v
fieldEnd = timestamp
}
cursor.Close()

c.close()
end = min(end, fieldEnd)
}
}
Expand Down
213 changes: 213 additions & 0 deletions cmd/influx_tools/parquet/cursors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package parquet

import (
"fmt"

"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/cursors"
)

type valueCursor interface {
next() (int64, interface{})
peek() (int64, bool)
close()
}

func newValueCursor(cursor cursors.Cursor) (valueCursor, error) {
switch c := cursor.(type) {
case tsdb.FloatArrayCursor:
return &floatValueCursor{cur: c}, nil
case tsdb.UnsignedArrayCursor:
return &uintValueCursor{cur: c}, nil
case tsdb.IntegerArrayCursor:
return &intValueCursor{cur: c}, nil
case tsdb.BooleanArrayCursor:
return &boolValueCursor{cur: c}, nil
case tsdb.StringArrayCursor:
return &stringValueCursor{cur: c}, nil
}
return nil, fmt.Errorf("unexpected type %T", cursor)

}

type floatValueCursor struct {
cur tsdb.FloatArrayCursor
arr *cursors.FloatArray
idx int
}

func (c *floatValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}

defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *floatValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *floatValueCursor) close() {
c.cur.Close()
}

type uintValueCursor struct {
cur tsdb.UnsignedArrayCursor
arr *cursors.UnsignedArray
idx int
}

func (c *uintValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *uintValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *uintValueCursor) close() {
c.cur.Close()
}

type intValueCursor struct {
cur tsdb.IntegerArrayCursor
arr *cursors.IntegerArray
idx int
}

func (c *intValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *intValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *intValueCursor) close() {
c.cur.Close()
}

type boolValueCursor struct {
cur tsdb.BooleanArrayCursor
arr *cursors.BooleanArray
idx int
}

func (c *boolValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *boolValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *boolValueCursor) close() {
c.cur.Close()
}

type stringValueCursor struct {
cur tsdb.StringArrayCursor
arr *cursors.StringArray
idx int
}

func (c *stringValueCursor) next() (int64, interface{}) {
// Initialize the array on first call
if c.arr == nil {
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, nil
}
defer func() { c.idx++ }()
return c.arr.Timestamps[c.idx], c.arr.Values[c.idx]
}

func (c *stringValueCursor) peek() (int64, bool) {
// Initialize the array on first call
if c.arr == nil {
c.idx = 0
c.arr = c.cur.Next()
}
// Indicate no elements early
if c.arr.Len() == 0 || c.idx >= c.arr.Len() {
return 0, false
}
return c.arr.Timestamps[c.idx], true
}

func (c *stringValueCursor) close() {
c.cur.Close()
}
10 changes: 5 additions & 5 deletions cmd/influx_tools/parquet/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"text/tabwriter"
"time"

"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/array"
"github.com/apache/arrow/go/v16/parquet"
"github.com/apache/arrow/go/v16/parquet/pqarrow"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/parquet"
"github.com/apache/arrow/go/v17/parquet/pqarrow"
"go.uber.org/zap"

"github.com/influxdata/flux/memory"
Expand Down Expand Up @@ -457,7 +457,7 @@ func (e *exporter) exportMeasurement(ctx context.Context, shard *tsdb.Shard, mea
record := e.convertData(rows, builder, creator.tags, creator.fieldKeys)

// Write data
if err := writer.WriteBuffered(record); err != nil {
if err := writer.Write(record); err != nil {
return fmt.Errorf("writing parquet file %q failed: %w", filename, err)
}
e.logger.Infof(" exported %d rows in %v", len(rows), time.Since(last))
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_tools/parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sort"
"strings"

"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v17/arrow"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/errors"
Expand Down
Loading

0 comments on commit 09a06cd

Please sign in to comment.