Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: influx inspect export parquet #25047

Open
wants to merge 16 commits into
base: master-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions cmd/influx_inspect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,26 @@ WAL storage path.
`default` = "$HOME/.influxdb/wal"

#### `-out` string
Destination file to export to
Destination file to export to.
In case of export to Parquet, destination should be existing directory.

`default` = "$HOME/.influxdb/export"

#### `-database` string (optional)
#### `-database` string
Database to export.
Mandatory for export to Parquet, optional otherwise (default).

`default` = ""

#### `-retention` string (optional)
#### `-retention` string
Retention policy to export.
Mandatory for export to Parquet, optional otherwise (default).

`default` = ""

#### `-measurement` string
Name of the measurement to export.
Mandatory for export to Parquet, optional otherwise (default).

`default` = ""

Expand All @@ -74,6 +83,16 @@ Compress the output.

`default` = false

#### `-parquet` bool (optional)
Export to Parquet.

`default` = false

#### `-chunk-size` int (optional)
Size to partition Parquet files, in bytes.

`default` = 100000000

#### Sample Commands

Export entire database and compress output:
Expand All @@ -86,6 +105,11 @@ Export specific retention policy:
influx_inspect export --database mydb --retention autogen
```

Export specific measurement to Parquet:
```
influx_inspect export --database mydb --retention autogen --measurement cpu --parquet
```

##### Sample Data
This is a sample of what the output will look like.

Expand Down
55 changes: 52 additions & 3 deletions cmd/influx_inspect/export/export.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data in WAL files is not exported to parquet, as there is no call to exportDone.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/influxdata/influxql"
)

const (
DefaultParquetPartitionSize = 100_000_000
MinParquetPartitionSize = 1_000_000
)

// Command represents the program execution for "influx_inspect export".
type Command struct {
// Standard input/output, overridden for testing.
Expand All @@ -33,14 +38,20 @@ type Command struct {
out string
database string
retentionPolicy string
measurement string
startTime int64
endTime int64
compress bool
lponly bool
parquet bool
pqChunkSize int

manifest map[string]struct{}
tsmFiles map[string][]string
walFiles map[string][]string

writeValues func(io.Writer, []byte, string, []tsm1.Value) error
exportDone func(string) error
}

const stdoutMark = "-"
Expand Down Expand Up @@ -68,17 +79,20 @@ func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("export", flag.ExitOnError)
fs.StringVar(&cmd.dataDir, "datadir", os.Getenv("HOME")+"/.influxdb/data", "Data storage path")
fs.StringVar(&cmd.walDir, "waldir", os.Getenv("HOME")+"/.influxdb/wal", "WAL storage path")
fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "'-' for standard out or the destination file to export to")
fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "'-' for standard out or the destination file to export to (line protocol) | directory to write Parquet files")
fs.StringVar(&cmd.database, "database", "", "Optional: the database to export")
fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)")
fs.StringVar(&cmd.measurement, "measurement", "", "Name of measurement to export")
fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)")
fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)")
fs.BoolVar(&cmd.lponly, "lponly", false, "Only export line protocol")
fs.BoolVar(&cmd.compress, "compress", false, "Compress the output")
fs.BoolVar(&cmd.parquet, "parquet", false, "Export to Parquet format (requires -database -retention -measurement)")
fs.IntVar(&cmd.pqChunkSize, "chunk-size", DefaultParquetPartitionSize, "Size to partition Parquet files (in bytes)")

fs.SetOutput(cmd.Stdout)
fs.Usage = func() {
fmt.Fprintf(cmd.Stdout, "Exports TSM files into InfluxDB line protocol format.\n\n")
fmt.Fprintf(cmd.Stdout, "Exports TSM files into InfluxDB line protocol or Parquet format.\n\n")
fmt.Fprintf(cmd.Stdout, "Usage: %s export [flags]\n\n", filepath.Base(os.Args[0]))
fs.PrintDefaults()
}
Expand Down Expand Up @@ -112,6 +126,14 @@ func (cmd *Command) Run(args ...string) error {
return err
}

if cmd.parquet {
cmd.writeValues = cmd.writeValuesParquet
cmd.exportDone = cmd.exportDoneParquet
} else {
cmd.writeValues = cmd.writeValuesLp
cmd.exportDone = func(_ string) error { return nil }
}

return cmd.export()
}

Expand All @@ -122,6 +144,17 @@ func (cmd *Command) validate() error {
if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime {
return fmt.Errorf("end time before start time")
}
if cmd.parquet {
if cmd.database == "" || cmd.retentionPolicy == "" || cmd.measurement == "" {
return fmt.Errorf("must specify database, retention and measurement when exporting to Parquet")
}
if cmd.out == "-" {
return fmt.Errorf("-out must point to a folder for Parquet files")
}
if cmd.pqChunkSize < MinParquetPartitionSize {
return fmt.Errorf("minimum Parquet partition size is %d bytes", MinParquetPartitionSize)
}
}
return nil
}

Expand All @@ -133,6 +166,9 @@ func (cmd *Command) export() error {
return err
}

if cmd.parquet {
return cmd.writeDML(io.Discard, io.Discard)
}
return cmd.write()
}

Expand Down Expand Up @@ -331,6 +367,9 @@ func (cmd *Command) writeTsmFiles(mw io.Writer, w io.Writer, files []string) err
if err := cmd.exportTSMFile(f, w); err != nil {
return err
}
if err := cmd.exportDone(f); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -368,11 +407,16 @@ func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error {
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
field = escape.Bytes(field)

if cmd.measurement != "" && cmd.measurement != strings.Split(string(measurement), ",")[0] {
continue
}
Comment on lines +410 to +412
Copy link
Contributor

@stuartcarnie stuartcarnie Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Measurements can have escaped , values, so it is not possible to export all measurements with this function. Example:

> create database measurement_with_comma
> insert cols\,bad,tag0=tag0_0,tag1=tag1_0 fieldF=3.2
> insert cols\,bad,tag0=tag0_1,tag1=tag1_0 fieldF=1.2
> select * from "cols,bad"
name: cols,bad
time                fieldF tag0   tag1
----                ------ ----   ----
1721793491299269000 3.2    tag0_0 tag1_0
1721793494708317000 1.2    tag0_1 tag1_0

Use models.ParseName to extract the measurement correctly.


if err := cmd.writeValues(w, measurement, string(field), values); err != nil {
// An error from writeValues indicates an IO error, which should be returned.
return err
}
}

return nil
}

Expand Down Expand Up @@ -436,19 +480,24 @@ func (cmd *Command) exportWALFile(walFilePath string, w io.Writer, warnDelete fu
// measurements are stored escaped, field names are not
field = escape.Bytes(field)

if cmd.measurement != "" && cmd.measurement != strings.Split(string(measurement), ",")[0] {
continue
}

Comment on lines +483 to +486
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted previously, measurements can have escaped , values, so it is not possible to export with this function.

if err := cmd.writeValues(w, measurement, string(field), values); err != nil {
// An error from writeValues indicates an IO error, which should be returned.
return err
}
}
}
}

return nil
}

// writeValues writes every value in values to w, using the given series key and field name.
// If any call to w.Write fails, that error is returned.
func (cmd *Command) writeValues(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error {
func (cmd *Command) writeValuesLp(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error {
buf := []byte(string(seriesKey) + " " + field + "=")
prefixLen := len(buf)

Expand Down
Loading