-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: influx_tools export parquet #25253
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a suggestion to change how schema is gathered and to change the processing so that it processes a shard at a time. If you make those changes, we can the revisit the next questions.
cmd/influx_tools/parquet/exporter.go
Outdated
func (e *exporter) gatherSchema(start, end time.Time, measurement string, rs *storage.ResultSet) { | ||
fmt.Printf("gather schema start: %s, end: %s\n", start.Format(time.RFC3339), end.Format(time.RFC3339)) | ||
|
||
for rs.Next() { | ||
measurementName := string(models.ParseName(rs.Name())) | ||
if measurement != "" && measurement != measurementName { | ||
continue | ||
} | ||
|
||
t := e.measurements.getTable(measurementName) | ||
t.addTags(rs.Tags()) | ||
t.addField(rs.Field(), rs.FieldType()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend you use the following, more efficient approach to gather the schema in InfluxDB 1.x. As mentioned in this comment, I recommend you process and export each shard separately to 1 or more parquet field, as there can be no schema conflicts within a single shard.
Given that, you will also be able to gather the complete schema very efficiently using existing indices.
For example, the shards returned by your getShards
function returns a slice of *tsdb.Shard
. Using that, you can get both the exact set of tag keys for that shard, and the set of fields:
cond := influxql.MustParseExpr("_name = '<measurement name>'")
shard := shards[0]
tagKeys, err := e.tsdbStore.TagKeys(context.Background(), query.OpenAuthorizer, []uint64{shard.ID()}, cond)
fields := shard.MeasurementFields([]byte("<measurement name>"))
You could merge together the full set of tag keys across all shards to ensure the Parquet schema tag keys are consistent, and also perform a check that the field keys are all consistent data types. I would recommend generating a warning of field conflicts, but I would suggest you still export the individual shards, as the field types will be consistent within the shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for the feedback. I have refactored the code per your suggestion, now individual shards are exported and schema is retrieved using the code above. The export is now single-pass op therefore.
Unfortunately, I still get the same wrong result (incomplete output).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alespour looking at the code, it seems like you do not create a schema if a measurement has no tags. Even though this might be rare, it's perfectly valid to have a series without any tag... Do you by chance miss data in the output from measurements without tags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srebhan I believe the database and measurement (telegraf, cpu / disk etc) I 'm using for testing does not contain tag-less data.
But you are right, that needs to be fixed, thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using telegraf to check parquet output like this:
telegraf --config ./telegraf-parquet.conf --once
[[inputs.file]]
files = ["/tmp/parquet/table-*.parquet"]
name_override = "cpu"
data_format = "parquet"
tag_columns = ["datacenter","hostname","os","rack","region","service","team"]
timestamp_column = "time"
timestamp_format = "unix_ns"
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout"]
Closing this in favor of #25297 which was successfully tested. |
Test run like
go run -ldflags "-X google.golang.org/protobuf/reflect/protoregistry.conflictPolicy=ignore" ./cmd/influx_tools/ export-parquet -config /bigdata/influxdb-copy/influxdb.conf -no-conflict-path -database telegraf -measurement cpu
Closes #
Describe your proposed changes here.