Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: influx_tools export parquet #25253

Closed
wants to merge 7 commits into from

Conversation

alespour
Copy link
Contributor

Test run like

go run -ldflags "-X google.golang.org/protobuf/reflect/protoregistry.conflictPolicy=ignore" ./cmd/influx_tools/ export-parquet -config /bigdata/influxdb-copy/influxdb.conf -no-conflict-path -database telegraf -measurement cpu

Closes #

Describe your proposed changes here.

  • I've read the contributing section of the project README.
  • Signed CLA (if not already signed).

Copy link
Contributor

@stuartcarnie stuartcarnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a suggestion to change how schema is gathered and to change the processing so that it processes a shard at a time. If you make those changes, we can the revisit the next questions.

Comment on lines 262 to 275
func (e *exporter) gatherSchema(start, end time.Time, measurement string, rs *storage.ResultSet) {
fmt.Printf("gather schema start: %s, end: %s\n", start.Format(time.RFC3339), end.Format(time.RFC3339))

for rs.Next() {
measurementName := string(models.ParseName(rs.Name()))
if measurement != "" && measurement != measurementName {
continue
}

t := e.measurements.getTable(measurementName)
t.addTags(rs.Tags())
t.addField(rs.Field(), rs.FieldType())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend you use the following, more efficient approach to gather the schema in InfluxDB 1.x. As mentioned in this comment, I recommend you process and export each shard separately to 1 or more parquet field, as there can be no schema conflicts within a single shard.

Given that, you will also be able to gather the complete schema very efficiently using existing indices.

For example, the shards returned by your getShards function returns a slice of *tsdb.Shard. Using that, you can get both the exact set of tag keys for that shard, and the set of fields:

	cond := influxql.MustParseExpr("_name = '<measurement name>'")
	shard := shards[0]
	tagKeys, err := e.tsdbStore.TagKeys(context.Background(), query.OpenAuthorizer, []uint64{shard.ID()}, cond)
	fields := shard.MeasurementFields([]byte("<measurement name>"))

You could merge together the full set of tag keys across all shards to ensure the Parquet schema tag keys are consistent, and also perform a check that the field keys are all consistent data types. I would recommend generating a warning of field conflicts, but I would suggest you still export the individual shards, as the field types will be consistent within the shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for the feedback. I have refactored the code per your suggestion, now individual shards are exported and schema is retrieved using the code above. The export is now single-pass op therefore.
Unfortunately, I still get the same wrong result (incomplete output).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alespour looking at the code, it seems like you do not create a schema if a measurement has no tags. Even though this might be rare, it's perfectly valid to have a series without any tag... Do you by chance miss data in the output from measurements without tags?

Copy link
Contributor Author

@alespour alespour Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srebhan I believe the database and measurement (telegraf, cpu / disk etc) I 'm using for testing does not contain tag-less data.
But you are right, that needs to be fixed, thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using telegraf to check parquet output like this:
telegraf --config ./telegraf-parquet.conf --once

[[inputs.file]]
  files = ["/tmp/parquet/table-*.parquet"]
  name_override = "cpu"
  data_format = "parquet"
  tag_columns = ["datacenter","hostname","os","rack","region","service","team"]
  timestamp_column = "time"
  timestamp_format = "unix_ns"

[[outputs.file]]
  ## Files to write to, "stdout" is a specially handled file.
  files = ["stdout"]

@alespour alespour requested a review from stuartcarnie August 30, 2024 10:23
@srebhan
Copy link
Member

srebhan commented Oct 2, 2024

Closing this in favor of #25297 which was successfully tested.

@srebhan srebhan closed this Oct 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants