-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: influx inspect export parquet #25047
base: master-1.x
Are you sure you want to change the base?
Changes from all commits
0c65d78
733e1bb
3b5896c
92f08e2
2544c28
f04fc04
031aae7
50f4511
db4bb2a
05c1e50
6482911
c1e50bb
b48a1ca
54906a5
5b5d6ee
b09c14e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,11 @@ import ( | |
"github.com/influxdata/influxql" | ||
) | ||
|
||
const ( | ||
DefaultParquetPartitionSize = 100_000_000 | ||
MinParquetPartitionSize = 1_000_000 | ||
) | ||
|
||
// Command represents the program execution for "influx_inspect export". | ||
type Command struct { | ||
// Standard input/output, overridden for testing. | ||
|
@@ -33,14 +38,20 @@ type Command struct { | |
out string | ||
database string | ||
retentionPolicy string | ||
measurement string | ||
startTime int64 | ||
endTime int64 | ||
compress bool | ||
lponly bool | ||
parquet bool | ||
pqChunkSize int | ||
|
||
manifest map[string]struct{} | ||
tsmFiles map[string][]string | ||
walFiles map[string][]string | ||
|
||
writeValues func(io.Writer, []byte, string, []tsm1.Value) error | ||
exportDone func(string) error | ||
} | ||
|
||
const stdoutMark = "-" | ||
|
@@ -68,17 +79,20 @@ func (cmd *Command) Run(args ...string) error { | |
fs := flag.NewFlagSet("export", flag.ExitOnError) | ||
fs.StringVar(&cmd.dataDir, "datadir", os.Getenv("HOME")+"/.influxdb/data", "Data storage path") | ||
fs.StringVar(&cmd.walDir, "waldir", os.Getenv("HOME")+"/.influxdb/wal", "WAL storage path") | ||
fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "'-' for standard out or the destination file to export to") | ||
fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "'-' for standard out or the destination file to export to (line protocol) | directory to write Parquet files") | ||
fs.StringVar(&cmd.database, "database", "", "Optional: the database to export") | ||
fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)") | ||
fs.StringVar(&cmd.measurement, "measurement", "", "Name of measurement to export") | ||
fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)") | ||
fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)") | ||
fs.BoolVar(&cmd.lponly, "lponly", false, "Only export line protocol") | ||
fs.BoolVar(&cmd.compress, "compress", false, "Compress the output") | ||
fs.BoolVar(&cmd.parquet, "parquet", false, "Export to Parquet format (requires -database -retention -measurement)") | ||
fs.IntVar(&cmd.pqChunkSize, "chunk-size", DefaultParquetPartitionSize, "Size to partition Parquet files (in bytes)") | ||
|
||
fs.SetOutput(cmd.Stdout) | ||
fs.Usage = func() { | ||
fmt.Fprintf(cmd.Stdout, "Exports TSM files into InfluxDB line protocol format.\n\n") | ||
fmt.Fprintf(cmd.Stdout, "Exports TSM files into InfluxDB line protocol or Parquet format.\n\n") | ||
fmt.Fprintf(cmd.Stdout, "Usage: %s export [flags]\n\n", filepath.Base(os.Args[0])) | ||
fs.PrintDefaults() | ||
} | ||
|
@@ -112,6 +126,14 @@ func (cmd *Command) Run(args ...string) error { | |
return err | ||
} | ||
|
||
if cmd.parquet { | ||
cmd.writeValues = cmd.writeValuesParquet | ||
cmd.exportDone = cmd.exportDoneParquet | ||
} else { | ||
cmd.writeValues = cmd.writeValuesLp | ||
cmd.exportDone = func(_ string) error { return nil } | ||
} | ||
|
||
return cmd.export() | ||
} | ||
|
||
|
@@ -122,6 +144,17 @@ func (cmd *Command) validate() error { | |
if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime { | ||
return fmt.Errorf("end time before start time") | ||
} | ||
if cmd.parquet { | ||
if cmd.database == "" || cmd.retentionPolicy == "" || cmd.measurement == "" { | ||
return fmt.Errorf("must specify database, retention and measurement when exporting to Parquet") | ||
} | ||
if cmd.out == "-" { | ||
return fmt.Errorf("-out must point to a folder for Parquet files") | ||
} | ||
if cmd.pqChunkSize < MinParquetPartitionSize { | ||
return fmt.Errorf("minimum Parquet partition size is %d bytes", MinParquetPartitionSize) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
|
@@ -133,6 +166,9 @@ func (cmd *Command) export() error { | |
return err | ||
} | ||
|
||
if cmd.parquet { | ||
return cmd.writeDML(io.Discard, io.Discard) | ||
} | ||
return cmd.write() | ||
} | ||
|
||
|
@@ -331,6 +367,9 @@ func (cmd *Command) writeTsmFiles(mw io.Writer, w io.Writer, files []string) err | |
if err := cmd.exportTSMFile(f, w); err != nil { | ||
return err | ||
} | ||
if err := cmd.exportDone(f); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
|
@@ -368,11 +407,16 @@ func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error { | |
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) | ||
field = escape.Bytes(field) | ||
|
||
if cmd.measurement != "" && cmd.measurement != strings.Split(string(measurement), ",")[0] { | ||
continue | ||
} | ||
Comment on lines
+410
to
+412
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Measurements can have escaped
Use |
||
|
||
if err := cmd.writeValues(w, measurement, string(field), values); err != nil { | ||
// An error from writeValues indicates an IO error, which should be returned. | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -436,19 +480,24 @@ func (cmd *Command) exportWALFile(walFilePath string, w io.Writer, warnDelete fu | |
// measurements are stored escaped, field names are not | ||
field = escape.Bytes(field) | ||
|
||
if cmd.measurement != "" && cmd.measurement != strings.Split(string(measurement), ",")[0] { | ||
continue | ||
} | ||
|
||
Comment on lines
+483
to
+486
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As noted previously, measurements can have escaped |
||
if err := cmd.writeValues(w, measurement, string(field), values); err != nil { | ||
// An error from writeValues indicates an IO error, which should be returned. | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// writeValues writes every value in values to w, using the given series key and field name. | ||
// If any call to w.Write fails, that error is returned. | ||
func (cmd *Command) writeValues(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error { | ||
func (cmd *Command) writeValuesLp(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error { | ||
buf := []byte(string(seriesKey) + " " + field + "=") | ||
prefixLen := len(buf) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data in WAL files is not exported to parquet, as there is no call to
exportDone
.