diff --git a/client/pom.xml b/client/pom.xml
index 3193e6729..865bdc297 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -25,6 +25,11 @@
commons-cli
1.4
+
+ commons-io
+ commons-io
+ 2.11.0
+
org.jline
jline
diff --git a/client/src/main/java/cn/edu/tsinghua/iginx/client/CurveMatchClient.java b/client/src/main/java/cn/edu/tsinghua/iginx/client/CurveMatchClient.java
new file mode 100644
index 000000000..9b80a2808
--- /dev/null
+++ b/client/src/main/java/cn/edu/tsinghua/iginx/client/CurveMatchClient.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.client;
+
+import cn.edu.tsinghua.iginx.exceptions.ExecutionException;
+import cn.edu.tsinghua.iginx.exceptions.SessionException;
+import cn.edu.tsinghua.iginx.session.CurveMatchResult;
+import cn.edu.tsinghua.iginx.session.Session;
+import cn.edu.tsinghua.iginx.session.SessionExecuteSqlResult;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CurveMatchClient {
+
+ private static Session session;
+
+ public static void main(String[] args) throws SessionException, ExecutionException {
+ session = new Session("127.0.0.1", 6888, "root", "root");
+ // 打开 Session
+ session.openSession();
+
+ // 曲线匹配
+ curveMatch(args);
+
+ // 关闭 Session
+ session.closeSession();
+ }
+
+ private static void curveMatch(String[] args) throws ExecutionException, SessionException {
+ if (args.length != 5) {
+ System.out.println("参数个数必须为5!");
+ return;
+ }
+
+ List paths = Arrays.stream(args[0].split(";")).collect(Collectors.toList());
+
+ long startTime = Long.parseLong(args[1]);
+ long endTime = Long.parseLong(args[2]);
+
+ List queryList = Arrays.stream(args[3].split(",")).map(Double::parseDouble).collect(Collectors.toList());
+
+ if (queryList.size() >= 256) {
+ System.out.println("匹配长度不能超过256!");
+ return;
+ }
+
+ long curveUnit = Long.parseLong(args[4]);
+
+ CurveMatchResult result = session.curveMatch(paths, startTime, endTime, queryList, curveUnit);
+
+ long matchedTimestamp = result.getMatchedTimestamp();
+ String matchedPath = result.getMatchedPath();
+
+ String[] parts = matchedPath.split("\\.");
+ StringBuilder sql = new StringBuilder();
+ sql.append("select first_value(");
+ sql.append(parts[parts.length - 1]);
+ sql.append(") from ");
+ sql.append(matchedPath, 0, matchedPath.lastIndexOf('.'));
+ sql.append(" group (");
+ sql.append(matchedTimestamp);
+ sql.append(", ");
+ sql.append(matchedTimestamp + curveUnit * queryList.size());
+ sql.append(") by ");
+ sql.append(curveUnit);
+ sql.append("ms");
+
+ SessionExecuteSqlResult dataSet = session.executeSql(sql.toString());
+ dataSet.print(false, "ms");
+ }
+}
diff --git a/client/src/main/java/cn/edu/tsinghua/iginx/client/InsertSingleFileClient.java b/client/src/main/java/cn/edu/tsinghua/iginx/client/InsertSingleFileClient.java
new file mode 100644
index 000000000..e0e98a3ae
--- /dev/null
+++ b/client/src/main/java/cn/edu/tsinghua/iginx/client/InsertSingleFileClient.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package cn.edu.tsinghua.iginx.client;
+
+import cn.edu.tsinghua.iginx.exceptions.IginxException;
+import cn.edu.tsinghua.iginx.exceptions.SessionException;
+import cn.edu.tsinghua.iginx.session.Session;
+import cn.edu.tsinghua.iginx.thrift.DataType;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.Double.parseDouble;
+
+public class InsertSingleFileClient {
+
+ private static Session session;
+
+ private static final String separator = "\\s+";
+
+ private static final ThreadLocal format = new ThreadLocal<>();
+
+ // 存储组
+ private static String storageGroupName;
+ // 设备
+ private static String deviceId;
+
+ // 飞机型号
+ private static String planeType;
+ // 架机号
+ private static String planeId;
+ // 任务单号/任务编号
+ private static String taskId;
+ // 试飞地点
+ private static String place;
+ // 试飞日期
+ private static String date;
+ // 试验类型
+ private static String flightType;
+
+ // 采样率
+ private static int frequency;
+
+ // 传感器
+ private static List measurements;
+
+ public static void main(String[] args) throws SessionException {
+ session = new Session("127.0.0.1", 6888, "root", "root");
+ // 打开 Session
+ session.openSession();
+
+ // 写入单个文件
+ insertSingleFile(args);
+
+ // 关闭 Session
+ session.closeSession();
+ }
+
+ private static void insertSingleFile(String[] args) {
+ if (args.length != 1) {
+ System.out.println("参数个数必须为1!");
+ return;
+ }
+
+ format.set(new SimpleDateFormat("yyyyMMddHH:mm:ss:SSS"));
+
+ String filename = args[0];
+ extractInfo(filename);
+ try {
+ LineIterator it = FileUtils.lineIterator(new File(filename));
+
+ if(!it.hasNext()){
+ System.out.println(filename + "是空文件!");
+ it.close();
+ return;
+ }
+
+ String line = it.nextLine();
+ extractMeasurements(line);
+ int lineNum = 1;
+
+ int batchSize = Math.max(100, Math.min(10000, 512 * 1024 / measurements.size()));
+
+ int cnt = 0;
+ int singleCnt = 0;
+ long[] timestamps = new long[batchSize];
+ Object[] valuesList = new Object[batchSize];
+
+ System.out.println("文件 " + filename + " 参数数量 " + measurements.size() + " 批处理大小 " + batchSize);
+
+ while (it.hasNext()) {
+ line = it.nextLine();
+ lineNum++;
+ String[] contents = line.trim().split(separator);
+ long timestamp;
+ if (contents[0].length() == 12) {
+ timestamp = format.get().parse(date + contents[0]).getTime() * 1000;
+ } else if (contents[0].length() == 15) {
+ timestamp = format.get().parse(date + contents[0].substring(0, 12)).getTime() * 1000 + Long.parseLong(contents[0].substring(12));
+ } else {
+ //logger.error("line {} 有非法的时间格式:{}!", lineNum, date + contents[0]);
+ continue;
+ }
+
+ //now check whether we need to read another line
+ while(it.hasNext() && contents.length < measurements.size() + 1){
+// logger.error("Incomplete line: {}", line);
+// logger.error("Incomplete line with {} elements at line {}", contents.length, lineNum);
+ line = it.nextLine();
+ lineNum++;
+ String[] tempContents = line.trim().split(separator);
+ contents = Arrays.copyOf( contents, contents.length+tempContents.length );
+ for(int i=0;i measurements.size() + 1) {
+// logger.error("Skipping illegal line with {} elements: {}", contents.length, line);
+// logger.error("{} : =====================", lineNum);
+// logger.error("Skipping illegal line with {} elements at line {}", contents.length, lineNum);
+ continue;
+ }
+
+ timestamps[singleCnt] = timestamp;
+ if (measurements.size() != contents.length - 1) {
+ contents = line.trim().split("\t");
+ if (measurements.size() != contents.length - 1) {
+// logger.error("文件{}存在某行数据点数目({})与参数数目({})不符。", filename, contents.length - 1, measurements.size());
+ continue;
+ }
+ }
+ Object[] tmpValues = new Object[measurements.size()];
+ for (int i = 1; i < contents.length; i++) {
+ tmpValues[i - 1] = (isValueValid(contents[i]) ? parseDouble(contents[i]) : 0);
+ }
+ valuesList[singleCnt] = tmpValues;
+
+ cnt++;
+ singleCnt++;
+ if (singleCnt % batchSize == 0) {
+ insertBatch(singleCnt, deviceId, measurements, timestamps, valuesList);
+ singleCnt = 0;
+ }
+
+ if (cnt % 1_000_000 == 0) {
+ System.out.println("文件 " + filename + " 已加载 " + cnt + " 行");
+ }
+ }
+ if (singleCnt != 0) {
+ insertBatch(singleCnt, deviceId, measurements, timestamps, valuesList);
+ }
+ System.out.println("文件 " + filename + " 内容 " + lineNum + " 行 " + cnt + " 共加载 {} 行。");
+ it.close();
+ } catch (IOException | ParseException e) {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ private static List extractMeasurements(String line) {
+ String myline = fixIllegalParameterChars(line);
+ //process the list of measurements
+ String[] tempMeasurements = myline.trim().split(separator);
+ return new ArrayList<>(Arrays.asList(tempMeasurements).subList(1, tempMeasurements.length));
+ }
+
+ private static String fixIllegalParameterChars(String line){
+ String ret = line;
+ ret = ret.replace("<", "小");
+ ret = ret.replace(">", "大");
+ ret = ret.replace("=", "等");
+ ret = ret.replace(".", "点");
+ ret = ret.replace("-", "一");
+ return ret;
+ }
+
+ private static void extractInfo(String filename) {
+ String[] parts = filename.split("-");
+ planeType = parts[1];
+ planeId = parts[2];
+ place = parts[3];
+ date = "20" + parts[4];
+ flightType = parts[5]+"-"+parts[6];
+ flightType = flightType.replace('-', '一');
+
+ String taskIDPostfix = "";
+ int pos4TaskId = parts.length - 2;
+ try{
+ frequency = Integer.parseInt(parts[parts.length - 1].substring(0, parts[parts.length - 1].indexOf(".")));
+ }catch(NumberFormatException e){
+ frequency = Integer.parseInt(parts[parts.length - 2]);
+ taskIDPostfix = parts[parts.length - 1].substring(0, parts[parts.length - 1].indexOf("."));
+ pos4TaskId = parts.length - 3;
+ }
+
+ //the rest are in the taskID
+ taskId="";
+ for(int i=7;i<=pos4TaskId;i++)
+ taskId = taskId+parts[i]+"-";
+ taskId = taskIDPostfix.length()==0?taskId.substring( 0, taskId.length()-1 ):taskId+taskIDPostfix;
+ //IoTDB does not support path with these characters
+ taskId = taskId.replace('-', '一');
+ taskId = taskId.replace('&', '二');
+
+ storageGroupName = "root." + planeType + "." + planeId + "." + taskId;
+ deviceId = storageGroupName + "." + place + "." + date + "." + flightType;
+ }
+
+ private static boolean isValueValid(String value) {
+ try {
+ Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+
+ private static void insertBatch(int size, String deviceId, List measurements, long[] timestamps, Object[] values) {
+ List pathList = measurements.stream().map(str-> deviceId+"." +str).collect( Collectors.toList() );
+
+ List datatypeList = new ArrayList<>();
+ for (int i = 0; i < measurements.size(); i++) {
+ datatypeList.add(DataType.DOUBLE);
+ }
+
+ timestamps = Arrays.copyOfRange(timestamps, 0, size);
+ values = Arrays.copyOfRange(values, 0, size);
+
+ try {
+ session.insertRowRecords(pathList, timestamps, values, datatypeList, null);
+ } catch (IginxException e) {
+ System.out.println(e.getMessage());
+ }
+ }
+}
diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/transform/data/LogWriter.java b/core/src/main/java/cn/edu/tsinghua/iginx/transform/data/LogWriter.java
index b3b626280..290a947e9 100644
--- a/core/src/main/java/cn/edu/tsinghua/iginx/transform/data/LogWriter.java
+++ b/core/src/main/java/cn/edu/tsinghua/iginx/transform/data/LogWriter.java
@@ -14,11 +14,11 @@ public class LogWriter extends ExportWriter {
@Override
public void write(BatchData batchData) {
Header header = batchData.getHeader();
- logger.info(header.toString());
+// logger.info(header.toString());
List rowList = batchData.getRowList();
- rowList.forEach(row -> {
- logger.info(row.toCSVTypeString());
- });
+// rowList.forEach(row -> {
+// logger.info(row.toCSVTypeString());
+// });
}
}