From 01dac2faca6f83161ce213fec827314d2fa652e9 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Thu, 21 Dec 2023 20:40:03 +0800 Subject: [PATCH] feat: more quick start --- .../src/main/java/io/greptime/MyMetric1.java | 110 ++++++++++++ .../src/main/java/io/greptime/MyMetric2.java | 81 +++++++++ .../src/main/java/io/greptime/QuickStart.java | 158 ------------------ .../greptime/StreamWritePOJOsQuickStart.java | 95 +++++++++++ .../StreamWriteTableRowsQuickStart.java | 116 +++++++++++++ .../io/greptime/WritePOJOsQuickStart.java | 106 ++++++++++++ .../io/greptime/WriteTableRowsQuickStart.java | 129 ++++++++++++++ .../src/main/java/io/greptime/PojoMapper.java | 22 +-- .../java/io/greptime/models/TableRows.java | 24 ++- .../java/io/greptime/models/TableSchema.java | 57 +++---- .../io/greptime/options/GreptimeOptions.java | 29 ++-- .../io/greptime/options/RouterOptions.java | 6 +- .../src/test/java/io/greptime/TestUtil.java | 13 +- .../java/io/greptime/WriteClientTest.java | 36 ++-- .../io/greptime/models/TableRowsTest.java | 16 +- 15 files changed, 735 insertions(+), 263 deletions(-) create mode 100644 ingester-example/src/main/java/io/greptime/MyMetric1.java create mode 100644 ingester-example/src/main/java/io/greptime/MyMetric2.java delete mode 100644 ingester-example/src/main/java/io/greptime/QuickStart.java create mode 100644 ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java create mode 100644 ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java create mode 100644 ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java create mode 100644 ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java diff --git a/ingester-example/src/main/java/io/greptime/MyMetric1.java b/ingester-example/src/main/java/io/greptime/MyMetric1.java new file mode 100644 index 0000000..b29aaaf --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/MyMetric1.java @@ -0,0 +1,110 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.Column; +import io.greptime.models.DataType; +import io.greptime.models.Metric; +import java.math.BigDecimal; + +/** + * @author jiachun.fjc + */ +@Metric(name = "my_metric1") +public class MyMetric1 { + @Column(name = "tag1", tag = true, dataType = DataType.String) + private String tag1; + @Column(name = "tag2", tag = true, dataType = DataType.String) + private String tag2; + @Column(name = "tag3", tag = true, dataType = DataType.String) + private String tag3; + + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) + private long ts; + + @Column(name = "field1", dataType = DataType.String) + private String field1; + @Column(name = "field2", dataType = DataType.Float64) + private double field2; + @Column(name = "field3", dataType = DataType.Decimal128) + private BigDecimal field3; + @Column(name = "field4", dataType = DataType.Int32) + private int field4; + + public String getTag1() { + return tag1; + } + + public void setTag1(String tag1) { + this.tag1 = tag1; + } + + public String getTag2() { + return tag2; + } + + public void setTag2(String tag2) { + this.tag2 = tag2; + } + + public String getTag3() { + return tag3; + } + + public void setTag3(String tag3) { + this.tag3 = tag3; + } + + public long getTs() { + return ts; + } + + public void setTs(long ts) { + this.ts = ts; + } + + public String getField1() { + return field1; + } + + public void setField1(String field1) { + this.field1 = field1; + } + + public double getField2() { + return field2; + } + + public void setField2(double field2) { + this.field2 = field2; + } + + public BigDecimal getField3() { + return field3; + } + + public void setField3(BigDecimal field3) { + this.field3 = field3; + } + + public int getField4() { + return field4; + } + + public void setField4(int field4) { + this.field4 = field4; + } +} diff --git a/ingester-example/src/main/java/io/greptime/MyMetric2.java b/ingester-example/src/main/java/io/greptime/MyMetric2.java new file mode 100644 index 0000000..6e9efb7 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/MyMetric2.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.Column; +import io.greptime.models.DataType; +import io.greptime.models.Metric; +import java.math.BigDecimal; +import java.util.Date; + +/** + * @author jiachun.fjc + */ +@Metric(name = "my_metric2") +public class MyMetric2 { + @Column(name = "tag1", tag = true, dataType = DataType.String) + private String tag1; + @Column(name = "tag2", tag = true, dataType = DataType.String) + private String tag2; + + @Column(name = "ts", timestamp = true, dataType = DataType.TimestampSecond) + private long ts; + + @Column(name = "field1", dataType = DataType.Date) + private Date field1; + @Column(name = "field2", dataType = DataType.Float64) + private double field2; + + public String getTag1() { + return tag1; + } + + public void setTag1(String tag1) { + this.tag1 = tag1; + } + + public String getTag2() { + return tag2; + } + + public void setTag2(String tag2) { + this.tag2 = tag2; + } + + public long getTs() { + return ts; + } + + public void setTs(long ts) { + this.ts = ts; + } + + public Date getField1() { + return field1; + } + + public void setField1(Date field1) { + this.field1 = field1; + } + + public double getField2() { + return field2; + } + + public void setField2(double field2) { + this.field2 = field2; + } +} diff --git a/ingester-example/src/main/java/io/greptime/QuickStart.java b/ingester-example/src/main/java/io/greptime/QuickStart.java deleted file mode 100644 index 996057f..0000000 --- a/ingester-example/src/main/java/io/greptime/QuickStart.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2023 Greptime Team - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.greptime; - -import io.greptime.models.Column; -import io.greptime.models.DataType; -import io.greptime.models.Err; -import io.greptime.models.Metric; -import io.greptime.models.Result; -import io.greptime.models.WriteOk; -import io.greptime.options.GreptimeOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -/** - * @author jiachun.fjc - */ -public class QuickStart { - - private static final Logger LOG = LoggerFactory.getLogger(QuickStart.class); - - public static void main(String[] args) throws Exception { - String endpoint = "127.0.0.1:4001"; - GreptimeOptions opts = GreptimeOptions.newBuilder("public", endpoint) // - .writeMaxRetries(1) // - .routeTableRefreshPeriodSeconds(-1) // - .build(); - - GreptimeDB greptimeDB = new GreptimeDB(); - - if (!greptimeDB.init(opts)) { - throw new RuntimeException("Fail to start GreptimeDB client"); - } - - long now = System.currentTimeMillis(); - LOG.info("now = {}", now); - - // normal inset - runInsert(greptimeDB, now); - - // streaming insert - runInsertWithStream(greptimeDB, now); - } - - @Metric(name = "monitor1") - static class Monitor { - @Column(name = "host", tag = true, dataType = DataType.String) - String host; - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) - long ts; - @Column(name = "cpu", dataType = DataType.Float64) - double cpu; - @Column(name = "memory", dataType = DataType.Float64) - double memory; - @Column(name = "decimal_value", dataType = DataType.Decimal128) - BigDecimal decimalValue; - } - - @Metric(name = "monitor_cpu1") - static class MonitorCpu { - @Column(name = "host", tag = true, dataType = DataType.String) - String host; - @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond) - long ts; - @Column(name = "cpu", dataType = DataType.Float64) - double cpu; - } - - private static void runInsert(GreptimeDB greptimeDB, long now) throws Exception { - List monitors = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - Monitor monitor = new Monitor(); - monitor.host = "127.0.0." + i; - monitor.ts = now + i; - monitor.cpu = i; - monitor.memory = i * 2; - monitor.decimalValue = new BigDecimal("1111111111111111111.2333333" + i); - monitors.add(monitor); - } - - List monitorCpus = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - MonitorCpu monitor = new MonitorCpu(); - monitor.host = "127.0.0." + i; - monitor.ts = now + i; - monitor.cpu = i; - monitorCpus.add(monitor); - } - - List> pojos = new ArrayList<>(); - pojos.add(monitors); - pojos.add(monitorCpus); - - // For performance reasons, the SDK is designed to be purely asynchronous. - // The return value is a future object. If you want to immediately obtain - // the result, you can call `future.get()`. - CompletableFuture> future = greptimeDB.writePOJOs(pojos); - - Result result = future.get(); - - if (result.isOk()) { - LOG.info("Write result: {}", result.getOk()); - } else { - LOG.error("Failed to write: {}", result.getErr()); - } - } - - private static void runInsertWithStream(GreptimeDB greptimeDB, long now) throws Exception { - StreamWriter, WriteOk> streamWriter = greptimeDB.streamWriterPOJOs(); - - List monitors = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - Monitor monitor = new Monitor(); - monitor.host = "127.0.0." + i; - monitor.ts = now + i; - monitor.cpu = i; - monitor.memory = i * 2; - monitor.decimalValue = new BigDecimal("1111111111111111111.2333333" + i); - monitors.add(monitor); - } - - streamWriter.write(monitors); - - List monitorCpus = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - MonitorCpu monitor = new MonitorCpu(); - monitor.host = "127.0.0." + i; - monitor.ts = now + i; - monitor.cpu = i; - monitorCpus.add(monitor); - } - - streamWriter.write(monitorCpus); - - CompletableFuture future = streamWriter.completed(); - - WriteOk result = future.get(); - - LOG.info("Write result: {}", result); - } -} diff --git a/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java new file mode 100644 index 0000000..2771db2 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/StreamWritePOJOsQuickStart.java @@ -0,0 +1,95 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.WriteOk; +import io.greptime.options.GreptimeOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * @author jiachun.fjc + */ +public class StreamWritePOJOsQuickStart { + + private static final Logger LOG = LoggerFactory.getLogger(StreamWritePOJOsQuickStart.class); + + public static void main(String[] args) throws ExecutionException, InterruptedException { + // GreptimeDB has a default database named "public", we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + .build(); + + GreptimeDB greptimeDB = new GreptimeDB(); + + if (!greptimeDB.init(opts)) { + throw new RuntimeException("Failed to start GreptimeDB client"); + } + + List myMetric1s = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MyMetric1 m = new MyMetric1(); + m.setTag1("tag_value_1_" + i); + m.setTag2("tag_value_2_" + i); + m.setTag3("tag_value_3_" + i); + m.setTs(System.currentTimeMillis()); + m.setField1("field_value_1_" + i); + m.setField2(i); + m.setField3(new BigDecimal(i)); + m.setField4(i); + + myMetric1s.add(m); + } + + List myMetric2s = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MyMetric2 m = new MyMetric2(); + m.setTag1("tag_value_1_" + i); + m.setTag2("tag_value_2_" + i); + m.setTs(System.currentTimeMillis() / 1000); + m.setField1(Calendar.getInstance().getTime()); + m.setField2(i); + + myMetric2s.add(m); + } + + StreamWriter, WriteOk> writer = greptimeDB.streamWriterPOJOs(); + + // write data into stream + writer.write(myMetric1s); + writer.write(myMetric2s); + + // delete the first 5 rows + writer.write(myMetric1s.subList(0, 5), WriteOp.Delete); + + // complete the stream + CompletableFuture future = writer.completed(); + + WriteOk result = future.get(); + + LOG.info("Write result: {}", result); + } +} diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java new file mode 100644 index 0000000..515b3a4 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java @@ -0,0 +1,116 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.DataType; +import io.greptime.models.SemanticType; +import io.greptime.models.TableRows; +import io.greptime.models.TableSchema; +import io.greptime.models.WriteOk; +import io.greptime.options.GreptimeOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * @author jiachun.fjc + */ +public class StreamWriteTableRowsQuickStart { + + private static final Logger LOG = LoggerFactory.getLogger(StreamWriteTableRowsQuickStart.class); + + public static void main(String[] args) throws ExecutionException, InterruptedException { + // GreptimeDB has a default database named "public", we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + .build(); + + GreptimeDB greptimeDB = new GreptimeDB(); + + if (!greptimeDB.init(opts)) { + throw new RuntimeException("Failed to start GreptimeDB client"); + } + + TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // + .addColumn("tag1", SemanticType.Tag, DataType.String) // + .addColumn("tag2", SemanticType.Tag, DataType.String) // + .addColumn("tag3", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("field1", SemanticType.Field, DataType.String) // + .addColumn("field2", SemanticType.Field, DataType.Float64) // + .addColumn("field3", SemanticType.Field, DataType.Decimal128) // + .addColumn("field4", SemanticType.Field, DataType.Int32) // + .build(); + + TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // + .addColumn("tag1", SemanticType.Tag, DataType.String) // + .addColumn("tag2", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // + .addColumn("field1", SemanticType.Field, DataType.Date) // + .addColumn("field2", SemanticType.Field, DataType.Float64) // + .build(); + + TableRows myMetric3Rows = TableRows.from(myMetric3Schema); + TableRows myMetric4Rows = TableRows.from(myMetric4Schema); + + for (int i = 0; i < 10; i++) { + String tag1v = "tag_value_1_" + i; + String tag2v = "tag_value_2_" + i; + String tag3v = "tag_value_3_" + i; + long ts = System.currentTimeMillis(); + String field1 = "field_value_1" + i; + double field2 = i + 0.1; + BigDecimal field3 = new BigDecimal(i); + int field4 = i + 1; + + myMetric3Rows.insert(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + } + + for (int i = 0; i < 10; i++) { + String tag1v = "tag_value_1_" + i; + String tag2v = "tag_value_2_" + i; + long ts = System.currentTimeMillis() / 1000; + Date field1 = Calendar.getInstance().getTime(); + double field2 = i + 0.1; + + myMetric4Rows.insert(tag1v, tag2v, ts, field1, field2); + } + + StreamWriter writer = greptimeDB.streamWriter(); + + // write data into stream + writer.write(myMetric3Rows); + writer.write(myMetric4Rows); + + // delete the first 5 rows + writer.write(myMetric3Rows.subRange(0, 5), WriteOp.Delete); + + // complete the stream + CompletableFuture future = writer.completed(); + + WriteOk result = future.get(); + + LOG.info("Write result: {}", result); + } +} diff --git a/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java new file mode 100644 index 0000000..b5ae877 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/WritePOJOsQuickStart.java @@ -0,0 +1,106 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.Err; +import io.greptime.models.Result; +import io.greptime.models.WriteOk; +import io.greptime.options.GreptimeOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * @author jiachun.fjc + */ +public class WritePOJOsQuickStart { + + private static final Logger LOG = LoggerFactory.getLogger(WritePOJOsQuickStart.class); + + public static void main(String[] args) throws ExecutionException, InterruptedException { + // GreptimeDB has a default database named "public", we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + .build(); + + GreptimeDB greptimeDB = new GreptimeDB(); + + if (!greptimeDB.init(opts)) { + throw new RuntimeException("Failed to start GreptimeDB client"); + } + + List myMetric1s = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MyMetric1 m = new MyMetric1(); + m.setTag1("tag_value_1_" + i); + m.setTag2("tag_value_2_" + i); + m.setTag3("tag_value_3_" + i); + m.setTs(System.currentTimeMillis()); + m.setField1("field_value_1_" + i); + m.setField2(i); + m.setField3(new BigDecimal(i)); + m.setField4(i); + + myMetric1s.add(m); + } + + List myMetric2s = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + MyMetric2 m = new MyMetric2(); + m.setTag1("tag_value_1_" + i); + m.setTag2("tag_value_2_" + i); + m.setTs(System.currentTimeMillis() / 1000); + m.setField1(Calendar.getInstance().getTime()); + m.setField2(i); + + myMetric2s.add(m); + } + + List> pojos = Arrays.asList(myMetric1s, myMetric2s); + + // For performance reasons, the SDK is designed to be purely asynchronous. + // The return value is a future object. If you want to immediately obtain + // the result, you can call `future.get()`. + CompletableFuture> puts = greptimeDB.writePOJOs(pojos); + + Result result = puts.get(); + + if (result.isOk()) { + LOG.info("Write result: {}", result.getOk()); + } else { + LOG.error("Failed to write: {}", result.getErr()); + } + + List> delete_pojos = Arrays.asList(myMetric1s.subList(0, 5), myMetric2s.subList(0, 5)); + Result deletes = greptimeDB.writePOJOs(delete_pojos, WriteOp.Delete).get(); + + if (deletes.isOk()) { + LOG.info("Delete result: {}", result.getOk()); + } else { + LOG.error("Failed to delete: {}", result.getErr()); + } + } +} diff --git a/ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java new file mode 100644 index 0000000..9de27a0 --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.greptime; + +import io.greptime.models.DataType; +import io.greptime.models.Err; +import io.greptime.models.Result; +import io.greptime.models.SemanticType; +import io.greptime.models.TableRows; +import io.greptime.models.TableSchema; +import io.greptime.models.WriteOk; +import io.greptime.options.GreptimeOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * @author jiachun.fjc + */ +public class WriteTableRowsQuickStart { + + private static final Logger LOG = LoggerFactory.getLogger(WriteTableRowsQuickStart.class); + + public static void main(String[] args) throws ExecutionException, InterruptedException { + // GreptimeDB has a default database named "public", we can use it as the test database + String database = "public"; + // By default, GreptimeDB listens on port 4001 using the gRPC protocol. + // We can provide multiple endpoints that point to the same GreptimeDB cluster. + // The client will make calls to these endpoints based on a load balancing strategy. + String[] endpoints = {"127.0.0.1:4001"}; + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // + .build(); + + GreptimeDB greptimeDB = new GreptimeDB(); + + if (!greptimeDB.init(opts)) { + throw new RuntimeException("Failed to start GreptimeDB client"); + } + + TableSchema myMetric3Schema = TableSchema.newBuilder("my_metric3") // + .addColumn("tag1", SemanticType.Tag, DataType.String) // + .addColumn("tag2", SemanticType.Tag, DataType.String) // + .addColumn("tag3", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("field1", SemanticType.Field, DataType.String) // + .addColumn("field2", SemanticType.Field, DataType.Float64) // + .addColumn("field3", SemanticType.Field, DataType.Decimal128) // + .addColumn("field4", SemanticType.Field, DataType.Int32) // + .build(); + + TableSchema myMetric4Schema = TableSchema.newBuilder("my_metric4") // + .addColumn("tag1", SemanticType.Tag, DataType.String) // + .addColumn("tag2", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampSecond) // + .addColumn("field1", SemanticType.Field, DataType.Date) // + .addColumn("field2", SemanticType.Field, DataType.Float64) // + .build(); + + TableRows myMetric3Rows = TableRows.from(myMetric3Schema); + TableRows myMetric4Rows = TableRows.from(myMetric4Schema); + + for (int i = 0; i < 10; i++) { + String tag1v = "tag_value_1_" + i; + String tag2v = "tag_value_2_" + i; + String tag3v = "tag_value_3_" + i; + long ts = System.currentTimeMillis(); + String field1 = "field_value_1" + i; + double field2 = i + 0.1; + BigDecimal field3 = new BigDecimal(i); + int field4 = i + 1; + + myMetric3Rows.insert(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + } + + for (int i = 0; i < 10; i++) { + String tag1v = "tag_value_1_" + i; + String tag2v = "tag_value_2_" + i; + long ts = System.currentTimeMillis() / 1000; + Date field1 = Calendar.getInstance().getTime(); + double field2 = i + 0.1; + + myMetric4Rows.insert(tag1v, tag2v, ts, field1, field2); + } + + Collection rows = Arrays.asList(myMetric3Rows, myMetric4Rows); + + // For performance reasons, the SDK is designed to be purely asynchronous. + // The return value is a future object. If you want to immediately obtain + // the result, you can call `future.get()`. + CompletableFuture> future = greptimeDB.write(rows); + + Result result = future.get(); + + if (result.isOk()) { + LOG.info("Write result: {}", result.getOk()); + } else { + LOG.error("Failed to write: {}", result.getErr()); + } + + List delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5)); + Result deletes = greptimeDB.write(delete_pojos, WriteOp.Delete).get(); + + if (deletes.isOk()) { + LOG.info("Delete result: {}", result.getOk()); + } else { + LOG.error("Failed to delete: {}", result.getErr()); + } + } +} diff --git a/ingester-protocol/src/main/java/io/greptime/PojoMapper.java b/ingester-protocol/src/main/java/io/greptime/PojoMapper.java index 0fa037b..390648c 100644 --- a/ingester-protocol/src/main/java/io/greptime/PojoMapper.java +++ b/ingester-protocol/src/main/java/io/greptime/PojoMapper.java @@ -57,10 +57,7 @@ public TableRows toTableRows(List pojos) { String metricName = getMetricName(metricType); - String[] columnNames = new String[fieldMap.size()]; - DataType[] dataTypes = new DataType[fieldMap.size()]; - SemanticType[] semanticTypes = new SemanticType[fieldMap.size()]; - int i = 0; + TableSchema.Builder schemaBuilder = TableSchema.newBuilder(metricName); for (Map.Entry entry : fieldMap.entrySet()) { String name = entry.getKey(); Field field = entry.getValue(); @@ -72,21 +69,10 @@ public TableRows toTableRows(List pojos) { } else if (column.timestamp()) { semanticType = SemanticType.Timestamp; } - - columnNames[i] = name; - dataTypes[i] = dataType; - semanticTypes[i] = semanticType; - - i++; + schemaBuilder.addColumn(name, semanticType, dataType); } - TableSchema schema = TableSchema.newBuilder(metricName) // - .columnNames(columnNames) // - .semanticTypes(semanticTypes) // - .dataTypes(dataTypes) // - .build(); - - TableRows tableRows = TableRows.newBuilder(schema).build(); + TableRows tableRows = TableRows.from(schemaBuilder.build()); for (M pojo : pojos) { Class type = pojo.getClass(); if (!type.equals(metricType)) { @@ -109,7 +95,7 @@ public TableRows toTableRows(List pojos) { } private String getMetricName(Class metricType) { - // From @Metirc annotation + // From @Metric annotation Metric metricAnnotation = metricType.getAnnotation(Metric.class); if (metricAnnotation != null) { return metricAnnotation.name(); diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableRows.java b/ingester-protocol/src/main/java/io/greptime/models/TableRows.java index b696370..994de7b 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableRows.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableRows.java @@ -58,6 +58,8 @@ default int pointCount() { */ TableRows insert(Object... values); + TableRows subRange(int fromIndex, int toIndex); + /** * Convert to {@link Database.RowInsertRequest}. * @@ -77,8 +79,8 @@ default void checkNumValues(int len) { Ensures.ensure(columnCount == len, "Expected values num: %d, actual: %d", columnCount, len); } - static Builder newBuilder(TableSchema tableSchema) { - return new Builder(tableSchema); + static TableRows from(TableSchema tableSchema) { + return new Builder(tableSchema).build(); } class Builder { @@ -137,7 +139,17 @@ class RowBasedTableRows implements TableRows, Into { private String tableName; private List columnSchemas; - private final List rows = new ArrayList<>(); + private final List rows; + + public RowBasedTableRows() { + this.rows = new ArrayList<>(); + } + + private RowBasedTableRows(String tableName, List columnSchemas, List rows) { + this.tableName = tableName; + this.columnSchemas = columnSchemas; + this.rows = rows; + } @Override public String tableName() { @@ -169,6 +181,12 @@ public TableRows insert(Object... values) { return this; } + @Override + public TableRows subRange(int fromIndex, int toIndex) { + List rows = this.rows.subList(fromIndex, toIndex); + return new RowBasedTableRows(this.tableName, this.columnSchemas, rows); + } + @Override public Database.RowInsertRequest intoRowInsertRequest() { return Database.RowInsertRequest.newBuilder() // diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java index fb895e9..d1af871 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java @@ -17,11 +17,10 @@ import io.greptime.common.util.Ensures; import io.greptime.v1.Common; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; /** * Table schema for write. @@ -78,49 +77,31 @@ public static Builder newBuilder(String tableName) { public static class Builder { private final String tableName; - private List columnNames; - private List semanticTypes; - private List dataTypes; - private List dataTypeExtensions; + private final List columnNames = new ArrayList<>(); + private final List semanticTypes = new ArrayList<>(); + private final List dataTypes = new ArrayList<>(); + private final List dataTypeExtensions = new ArrayList<>(); public Builder(String tableName) { this.tableName = tableName; } - public Builder columnNames(String... names) { - this.columnNames = Arrays.stream(names).collect(Collectors.toList()); - return this; - } - - public Builder semanticTypes(SemanticType... semanticTypes) { - this.semanticTypes = Arrays.stream(semanticTypes) // - .map(SemanticType::toProtoValue) // - .collect(Collectors.toList()); - return this; - } - - public Builder dataTypes(DataType... dataTypes) { - DataTypeWithExtension[] columnDataTypeWithExtensions = Arrays.stream(dataTypes) - .map(DataTypeWithExtension::of) - .toArray(DataTypeWithExtension[]::new); - return dataTypes(columnDataTypeWithExtensions); + public Builder addColumn(String name, SemanticType semanticType, DataType dataType) { + return addColumn(name, semanticType, dataType, null); } - public Builder dataTypes(DataTypeWithExtension... dataTypes) { - this.dataTypes = Arrays.stream(dataTypes) // - .map((dataType) -> dataType.getColumnDataType().toProtoValue()) // - .collect(Collectors.toList()); - this.dataTypeExtensions = Arrays.stream(dataTypes) // - .map((dataType) -> { - DataType.DecimalTypeExtension decimalTypeExtension = dataType.getDecimalTypeExtension(); - if (decimalTypeExtension == null) { - return Common.ColumnDataTypeExtension.getDefaultInstance(); - } - return Common.ColumnDataTypeExtension.newBuilder() // - .setDecimalType(decimalTypeExtension.into()) - .build(); - }) - .collect(Collectors.toList()); + public Builder addColumn(String name, SemanticType semanticType, DataType dataType, + DataType.DecimalTypeExtension decimalTypeExtension) { + Ensures.ensureNonNull(name, "Null column name"); + Ensures.ensureNonNull(semanticType, "Null semantic type"); + Ensures.ensureNonNull(dataType, "Null data type"); + + this.columnNames.add(name); + this.semanticTypes.add(semanticType.toProtoValue()); + this.dataTypes.add(dataType.toProtoValue()); + this.dataTypeExtensions.add(decimalTypeExtension == null ? Common.ColumnDataTypeExtension + .getDefaultInstance() : Common.ColumnDataTypeExtension.newBuilder() + .setDecimalType(decimalTypeExtension.into()).build()); return this; } diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 872f6a3..739debf 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -23,6 +23,7 @@ import io.greptime.rpc.RpcOptions; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -139,21 +140,25 @@ public static GreptimeOptions checkSelf(GreptimeOptions opts) { return opts; } - public static Builder newBuilder(String database, List endpoints) { - return new Builder(database, endpoints); + public static Builder newBuilder(String endpoint, String database) { + return newBuilder(Endpoint.parse(endpoint), database); } - public static Builder newBuilder(String database, Endpoint... endpoints) { - return new Builder(database, Arrays.stream(endpoints).collect(Collectors.toList())); + public static Builder newBuilder(Endpoint endpoint, String database) { + return new Builder(Collections.singletonList(endpoint), database); } - public static Builder newBuilder(String database, String... endpoints) { - return new Builder(database, Arrays.stream(endpoints).map(Endpoint::parse).collect(Collectors.toList())); + public static Builder newBuilder(String[] endpoints, String database) { + return new Builder(Arrays.stream(endpoints).map(Endpoint::parse).collect(Collectors.toList()), database); + } + + public static Builder newBuilder(Endpoint[] endpoints, String database) { + return new Builder(Arrays.asList(endpoints), database); } public static final class Builder { - private final String database; private final List endpoints = new ArrayList<>(); + private final String database; // Asynchronous thread pool, which is used to handle various asynchronous tasks in the SDK. private Executor asyncPool; @@ -164,15 +169,15 @@ public static final class Builder { private int maxInFlightWriteRows = 65536; private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy(); private int defaultStreamMaxWritePointsPerSecond = 10 * 65536; - // Refresh frequency of route tables. The background refreshes all route tables periodically. By default, - // all route tables are refreshed every 30 seconds. - private long routeTableRefreshPeriodSeconds = 30; + // Refresh frequency of route tables. The background refreshes all route tables periodically. + // If the value is less than or equal to 0, the route tables will not be refreshed. + private long routeTableRefreshPeriodSeconds = -1; // Authentication information private AuthInfo authInfo; - public Builder(String database, List endpoints) { - this.database = database; + public Builder(List endpoints, String database) { this.endpoints.addAll(endpoints); + this.database = database; } /** diff --git a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java index 27c6610..e978b7e 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java @@ -31,9 +31,9 @@ public class RouterOptions implements Copiable { private List endpoints; // Refresh frequency of route tables. The background refreshes - // all route tables periodically. By default, all route tables are - // refreshed every 30 seconds. - private long refreshPeriodSeconds = 30; + // all route tables periodically. If the value is less than or + // equal to 0, the route tables will not be refreshed. + private long refreshPeriodSeconds = -1; public RpcClient getRpcClient() { return rpcClient; diff --git a/ingester-protocol/src/test/java/io/greptime/TestUtil.java b/ingester-protocol/src/test/java/io/greptime/TestUtil.java index 374e763..58eae65 100644 --- a/ingester-protocol/src/test/java/io/greptime/TestUtil.java +++ b/ingester-protocol/src/test/java/io/greptime/TestUtil.java @@ -28,14 +28,13 @@ public class TestUtil { public static Collection testTableRows(String tableName, int rowCount) { - TableSchema tableSchema = - TableSchema.newBuilder(tableName) - .semanticTypes(SemanticType.Tag, SemanticType.Timestamp, SemanticType.Field) - .dataTypes(DataType.String, DataType.TimestampMillisecond, DataType.Float64) // - .columnNames("host", "ts", "cpu") // - .build(); + TableSchema tableSchema = TableSchema.newBuilder(tableName) // + .addColumn("host", SemanticType.Tag, DataType.String) // + .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // + .addColumn("cpu", SemanticType.Field, DataType.Float64) // + .build(); - TableRows rows = TableRows.newBuilder(tableSchema).build(); + TableRows rows = TableRows.from(tableSchema); for (int i = 0; i < rowCount; i++) { rows.insert("127.0.0.1", System.currentTimeMillis(), i); } diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java index 00ce84e..a0f6217 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java @@ -85,24 +85,28 @@ public void after() { @Test public void testWriteSuccess() throws ExecutionException, InterruptedException { - String[] columnNames = - new String[] {"test_tag", "test_ts", "field1", "field2", "field3", "field4", "field5", "field6", - "field7", "field8", "field9", "field10", "field11", "field12", "field13", "field14", "field15", - "field16", "field17"}; - SemanticType[] semanticTypes = - new SemanticType[] {Tag, Timestamp, Field, Field, Field, Field, Field, Field, Field, Field, Field, - Field, Field, Field, Field, Field, Field, Field, Field}; - DataType[] dataTypes = - new DataType[] {DataType.String, Int64, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, - Float32, Float64, Bool, Binary, Date, DateTime, TimestampSecond, TimestampMillisecond, - TimestampNanosecond}; - TableSchema schema = TableSchema.newBuilder("test_table") // - .columnNames(columnNames) // - .semanticTypes(semanticTypes) // - .dataTypes(dataTypes) // + .addColumn("test_tag", Tag, DataType.String) // + .addColumn("test_ts", Timestamp, DataType.Int64) // + .addColumn("field1", Field, DataType.Int8) // + .addColumn("field2", Field, DataType.Int16) // + .addColumn("field3", Field, DataType.Int32) // + .addColumn("field4", Field, DataType.Int64) // + .addColumn("field5", Field, DataType.UInt8) // + .addColumn("field6", Field, DataType.UInt16) // + .addColumn("field7", Field, DataType.UInt32) // + .addColumn("field8", Field, DataType.UInt64) // + .addColumn("field9", Field, DataType.Float32) // + .addColumn("field10", Field, DataType.Float64) // + .addColumn("field11", Field, DataType.Bool) // + .addColumn("field12", Field, DataType.Binary) // + .addColumn("field13", Field, DataType.Date) // + .addColumn("field14", Field, DataType.DateTime) // + .addColumn("field15", Field, DataType.TimestampSecond) // + .addColumn("field16", Field, DataType.TimestampMillisecond) // + .addColumn("field17", Field, DataType.TimestampNanosecond) // .build(); - TableRows rows = TableRows.newBuilder(schema).build(); + TableRows rows = TableRows.from(schema); long ts = System.currentTimeMillis(); rows.insert("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L); diff --git a/ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java b/ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java index 026af0a..ddd2057 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java @@ -27,12 +27,12 @@ public class TableRowsTest { @Test public void testWriteRowsNonNull() { TableSchema schema = TableSchema.newBuilder("test_table") // - .columnNames("col1", "col2", "col3") // - .semanticTypes(SemanticType.Tag, SemanticType.Tag, SemanticType.Field) // - .dataTypes(DataType.String, DataType.String, DataType.Int32) // + .addColumn("col1", SemanticType.Tag, DataType.String) // + .addColumn("col2", SemanticType.Tag, DataType.String) // + .addColumn("col3", SemanticType.Field, DataType.Int32) // .build(); - TableRows.RowBasedTableRows rows = (TableRows.RowBasedTableRows) TableRows.newBuilder(schema).build(); + TableRows.RowBasedTableRows rows = (TableRows.RowBasedTableRows) TableRows.from(schema); rows.insert("1", "11", 111) // .insert("2", "22", 222) // .insert("3", "33", 333); @@ -47,12 +47,12 @@ public void testWriteRowsNonNull() { @Test public void testWriteRowsSomeNull() { TableSchema schema = TableSchema.newBuilder("test_table") // - .columnNames("col1", "col2", "col3") // - .semanticTypes(SemanticType.Tag, SemanticType.Tag, SemanticType.Field) // - .dataTypes(DataType.String, DataType.String, DataType.Int32) // + .addColumn("col1", SemanticType.Tag, DataType.String) // + .addColumn("col2", SemanticType.Tag, DataType.String) // + .addColumn("col3", SemanticType.Field, DataType.Int32) // .build(); - TableRows.RowBasedTableRows rows = (TableRows.RowBasedTableRows) TableRows.newBuilder(schema).build(); + TableRows.RowBasedTableRows rows = (TableRows.RowBasedTableRows) TableRows.from(schema); rows.insert("1", "11", 111) // .insert("2", null, 222) // .insert("3", "33", null);