diff --git a/conf/config.properties b/conf/config.properties index 7d9d3b20b..4136b626c 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -64,6 +64,9 @@ disorderMargin=10 # rest 异步执行并发数 asyncRestThreadPool=100 +# rest 请求拆分数量 +restReqSplitNum=1 + #################### ### 元数据配置 #################### diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java index f93cd40f7..eb126995a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java @@ -94,6 +94,8 @@ public class Config { private int instancesNumPerClient = 0; + private int restReqSplitNum = 10; + public int getMaxTimeseriesLength() { return maxTimeseriesLength; } @@ -389,4 +391,13 @@ public int getInstancesNumPerClient() { public void setInstancesNumPerClient(int instancesNumPerClient) { this.instancesNumPerClient = instancesNumPerClient; } + + public int getRestReqSplitNum() { + return restReqSplitNum; + } + + public void setRestReqSplitNum(int restReqSplitNum) { + this.restReqSplitNum = restReqSplitNum; + } + } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java index 1bfd9ab5a..cf5f621bf 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java @@ -100,6 +100,9 @@ private void loadPropsFromFile() { config.setClients(properties.getProperty("clients", "")); config.setInstancesNumPerClient(Integer.parseInt(properties.getProperty("instancesNumPerClient", "0"))); + config.setRestReqSplitNum(Integer.parseInt(properties.getProperty("restReqSplitNum", "10"))); + + } catch (IOException e) { logger.error("Fail to load properties: ", e); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java b/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java index 990ce1c5e..130842494 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/rest/RestSession.java @@ -26,7 +26,27 @@ import cn.edu.tsinghua.iginx.exceptions.SessionException; import cn.edu.tsinghua.iginx.session.SessionAggregateQueryDataSet; import cn.edu.tsinghua.iginx.session.SessionQueryDataSet; -import cn.edu.tsinghua.iginx.thrift.*; +import cn.edu.tsinghua.iginx.thrift.AddStorageEnginesReq; +import cn.edu.tsinghua.iginx.thrift.AggregateQueryReq; +import cn.edu.tsinghua.iginx.thrift.AggregateQueryResp; +import cn.edu.tsinghua.iginx.thrift.AggregateType; +import cn.edu.tsinghua.iginx.thrift.CloseSessionReq; +import cn.edu.tsinghua.iginx.thrift.DataType; +import cn.edu.tsinghua.iginx.thrift.DeleteColumnsReq; +import cn.edu.tsinghua.iginx.thrift.DeleteDataInColumnsReq; +import cn.edu.tsinghua.iginx.thrift.DownsampleQueryReq; +import cn.edu.tsinghua.iginx.thrift.DownsampleQueryResp; +import cn.edu.tsinghua.iginx.thrift.InsertNonAlignedColumnRecordsReq; +import cn.edu.tsinghua.iginx.thrift.InsertNonAlignedRowRecordsReq; +import cn.edu.tsinghua.iginx.thrift.InsertRowRecordsReq; +import cn.edu.tsinghua.iginx.thrift.OpenSessionReq; +import cn.edu.tsinghua.iginx.thrift.OpenSessionResp; +import cn.edu.tsinghua.iginx.thrift.QueryDataReq; +import cn.edu.tsinghua.iginx.thrift.QueryDataResp; +import cn.edu.tsinghua.iginx.thrift.Status; +import cn.edu.tsinghua.iginx.thrift.StorageEngine; +import cn.edu.tsinghua.iginx.thrift.ValueFilterQueryReq; +import cn.edu.tsinghua.iginx.thrift.ValueFilterQueryResp; import cn.edu.tsinghua.iginx.utils.Bitmap; import cn.edu.tsinghua.iginx.utils.ByteUtils; import cn.edu.tsinghua.iginx.utils.RpcUtils; @@ -35,7 +55,12 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -44,18 +69,18 @@ public class RestSession { private static final Logger logger = LoggerFactory.getLogger(RestSession.class); private static final Config config = ConfigDescriptor.getInstance().getConfig(); - private final ReadWriteLock lock; + //private final ReadWriteLock lock; private IginxWorker client; private long sessionId; private boolean isClosed; private int redirectTimes; - private final String username; - private final String password; + private String username; + private String password; public RestSession() { this.isClosed = true; this.redirectTimes = 0; - this.lock = new ReentrantReadWriteLock(); + //this.lock = new ReentrantReadWriteLock(); this.username = config.getUsername(); this.password = config.getPassword(); } @@ -86,8 +111,6 @@ public synchronized void openSession() throws SessionException { sessionId = resp.getSessionId(); break; } - - String[] targetAddress = resp.status.getMessage().split(":"); if (targetAddress.length != 2) { throw new SessionException("unexpected redirect address " + resp.status.getMessage()); @@ -95,7 +118,7 @@ public synchronized void openSession() throws SessionException { logger.info("当前请求将被重定向到:" + resp.status.getMessage()); redirectTimes += 1; - } while (redirectTimes <= Constants.MAX_REDIRECT_TIME); + } while(redirectTimes <= Constants.MAX_REDIRECT_TIME); if (redirectTimes > Constants.MAX_REDIRECT_TIME) { throw new SessionException("重定向次数过多!"); @@ -126,13 +149,13 @@ public void addStorageEngine(String ip, int port, String type, Map paths) throws ExecutionException { Status status; do { - lock.readLock().lock(); - try { - status = client.deleteColumns(req); - } finally { + /*lock.readLock().lock(); + try {*/ + status = client.deleteColumns(req); + /*} finally { lock.readLock().unlock(); - } - } while (checkRedirect(status)); + }*/ + } while(checkRedirect(status)); RpcUtils.verifySuccess(status); } @@ -215,17 +238,17 @@ public void insertNonAlignedColumnRecords(List paths, long[] timestamps, Status status; do { - lock.readLock().lock(); - try { - status = client.insertNonAlignedColumnRecords(req); - } finally { + /* lock.readLock().lock(); + try {*/ + status = client.insertNonAlignedColumnRecords(req); + /*} finally { lock.readLock().unlock(); - } + }*/ } while (checkRedirect(status)); RpcUtils.verifySuccess(status); } - public void insertNonAlignedRowRecords(List paths, long[] timestamps, Object[] valuesList, + /*public void insertNonAlignedRowRecords(List paths, long[] timestamps, Object[] valuesList, List dataTypeList, List> attributesList) throws ExecutionException { if (paths.isEmpty() || timestamps.length == 0 || valuesList.length == 0 || dataTypeList.isEmpty()) { logger.error("Invalid insert request!"); @@ -308,13 +331,106 @@ public void insertNonAlignedRowRecords(List paths, long[] timestamps, Ob Status status; do { - lock.readLock().lock(); - try { + *//*lock.readLock().lock(); + try {*//* status = client.insertNonAlignedRowRecords(req); - } finally { + *//*} finally { lock.readLock().unlock(); + }*//* + } while(checkRedirect(status)); + RpcUtils.verifySuccess(status); + }*/ + + public void insertRowRecords(List paths, long[] timestamps, Object[] valuesList, + List dataTypeList, List> attributesList) throws ExecutionException { + if (paths.isEmpty() || timestamps.length == 0 || valuesList.length == 0 || dataTypeList.isEmpty()) { + logger.error("Invalid insert request!"); + return; + } + if (paths.size() != dataTypeList.size()) { + logger.error("The sizes of paths and dataTypeList should be equal."); + return; + } + if (timestamps.length != valuesList.length) { + logger.error("The sizes of timestamps and valuesList should be equal."); + return; + } + if (attributesList != null && paths.size() != attributesList.size()) { + logger.error("The sizes of paths, valuesList, dataTypeList and attributesList should be equal."); + return; + } + + Integer[] index = new Integer[timestamps.length]; + for (int i = 0; i < timestamps.length; i++) { + index[i] = i; + } + Arrays.sort(index, Comparator.comparingLong(Arrays.asList(ArrayUtils.toObject(timestamps))::get)); + Arrays.sort(timestamps); + Object[] sortedValuesList = new Object[valuesList.length]; + for (int i = 0; i < valuesList.length; i++) { + sortedValuesList[i] = valuesList[index[i]]; + } + + index = new Integer[paths.size()]; + for (int i = 0; i < paths.size(); i++) { + index[i] = i; + } + Arrays.sort(index, Comparator.comparing(paths::get)); + Collections.sort(paths); + List sortedDataTypeList = new ArrayList<>(); + List> sortedAttributesList = new ArrayList<>(); + for (int i = 0; i < sortedValuesList.length; i++) { + Object[] values = new Object[index.length]; + for (int j = 0; j < index.length; j++) { + values[j] = ((Object[]) sortedValuesList[i])[index[j]]; } - } while (checkRedirect(status)); + sortedValuesList[i] = values; + } + for (Integer i : index) { + sortedDataTypeList.add(dataTypeList.get(i)); + } + if (attributesList != null) { + for (Integer i : index) { + sortedAttributesList.add(attributesList.get(i)); + } + } + + List valueBufferList = new ArrayList<>(); + List bitmapBufferList = new ArrayList<>(); + for (int i = 0; i < timestamps.length; i++) { + Object[] values = (Object[]) sortedValuesList[i]; + if (values.length != paths.size()) { + logger.error("The sizes of paths and the element of valuesList should be equal."); + return; + } + valueBufferList.add(ByteUtils.getRowByteBuffer(values, sortedDataTypeList)); + Bitmap bitmap = new Bitmap(values.length); + for (int j = 0; j < values.length; j++) { + if (values[j] != null) { + bitmap.mark(j); + } + } + bitmapBufferList.add(ByteBuffer.wrap(bitmap.getBytes())); + } + + InsertRowRecordsReq req = new InsertRowRecordsReq(); + req.setSessionId(sessionId); + req.setPaths(paths); + req.setTimestamps(getByteArrayFromLongArray(timestamps)); + req.setValuesList(valueBufferList); + req.setBitmapList(bitmapBufferList); + req.setDataTypeList(sortedDataTypeList); + req.setAttributesList(sortedAttributesList); + + Status status; + do { + /*lock.readLock().lock(); + try {*/ + status = client.insertRowRecords(req); + /*} finally { + lock.readLock().unlock(); + }*/ + } while(checkRedirect(status)); RpcUtils.verifySuccess(status); } @@ -329,13 +445,13 @@ public void deleteDataInColumns(List paths, long startTime, long endTime Status status; do { - lock.readLock().lock(); - try { - status = client.deleteDataInColumns(req); - } finally { + /*lock.readLock().lock(); + try {*/ + status = client.deleteDataInColumns(req); + /*} finally { lock.readLock().unlock(); - } - } while (checkRedirect(status)); + }*/ + } while(checkRedirect(status)); } public SessionQueryDataSet queryData(List paths, long startTime, long endTime) { @@ -348,13 +464,13 @@ public SessionQueryDataSet queryData(List paths, long startTime, long en QueryDataResp resp; do { - lock.readLock().lock(); - try { - resp = client.queryData(req); - } finally { + /*lock.readLock().lock(); + try {*/ + resp = client.queryData(req); + /*} finally { lock.readLock().unlock(); - } - } while (checkRedirect(resp.status)); + }*/ + } while(checkRedirect(resp.status)); return new SessionQueryDataSet(resp); } @@ -371,13 +487,13 @@ public SessionQueryDataSet valueFilterQuery(List paths, long startTime, try { do { - lock.readLock().lock(); - try { - resp = client.valueFilterQuery(req); - } finally { + /*lock.readLock().lock(); + try {*/ + resp = client.valueFilterQuery(req); + /*} finally { lock.readLock().unlock(); - } - } while (checkRedirect(resp.status)); + }*/ + } while(checkRedirect(resp.status)); } catch (Exception e) { throw new SessionException(e); } @@ -390,13 +506,13 @@ public SessionAggregateQueryDataSet aggregateQuery(List paths, long star AggregateQueryResp resp; do { - lock.readLock().lock(); - try { - resp = client.aggregateQuery(req); - } finally { + /*lock.readLock().lock(); + try {*/ + resp = client.aggregateQuery(req); + /*} finally { lock.readLock().unlock(); - } - } while (checkRedirect(resp.status)); + }*/ + } while(checkRedirect(resp.status)); return new SessionAggregateQueryDataSet(resp, aggregateType); } @@ -408,13 +524,13 @@ public SessionQueryDataSet downsampleQuery(List paths, long startTime, l DownsampleQueryResp resp; do { - lock.readLock().lock(); - try { - resp = client.downsampleQuery(req); - } finally { + /*lock.readLock().lock(); + try {*/ + resp = client.downsampleQuery(req); + /*} finally { lock.readLock().unlock(); - } - } while (checkRedirect(resp.status)); + }*/ + } while(checkRedirect(resp.status)); return new SessionQueryDataSet(resp); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/DataPointsParser.java b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/DataPointsParser.java index d91e69154..34290054a 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/DataPointsParser.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/DataPointsParser.java @@ -18,6 +18,8 @@ */ package cn.edu.tsinghua.iginx.rest.insert; +import cn.edu.tsinghua.iginx.conf.Config; +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; import cn.edu.tsinghua.iginx.exceptions.ExecutionException; import cn.edu.tsinghua.iginx.exceptions.SessionException; import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager; @@ -33,15 +35,22 @@ import java.io.Reader; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class DataPointsParser { public static final String ANNOTATION_SPLIT_STRING = "@@annotation"; private static final Logger LOGGER = LoggerFactory.getLogger(DataPointsParser.class); private final IMetaManager metaManager = DefaultMetaManager.getInstance(); + private static Config config = ConfigDescriptor.getInstance().getConfig(); private Reader inputStream = null; private final ObjectMapper mapper = new ObjectMapper(); private List metricList = new ArrayList<>(); private final RestSession session = new RestSession(); + private Map> batchMap = new HashMap<>(); + private int restReqSplitNum = config.getRestReqSplitNum(); + + public DataPointsParser() { @@ -72,17 +81,36 @@ public void parse(boolean isAnnotation) throws Exception { LOGGER.error("Error occurred during parsing data ", e); throw e; } - try { - if (isAnnotation) { - sendAnnotationMetricsData(); - } else { - sendMetricsData(); + // sub tread execute and await. + LOGGER.info(String.format("restReqSplitNum: %s", restReqSplitNum)); + + if (restReqSplitNum > 1) { + long batchInsertStartTime = System.currentTimeMillis(); + List> splitMetricList = averageAssign(metricList, restReqSplitNum); + CountDownLatch latch = new CountDownLatch(restReqSplitNum); + for (List list : splitMetricList) { + SenderManager.getInstance().addSender(new Sender(latch, list)); + } + try { + latch.await(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOGGER.error("Request partial sub threads time out"); + } + long batchInsertEndTime = System.currentTimeMillis(); + LOGGER.info(String.format("Batch insert cost time: %s ms", batchInsertEndTime - batchInsertStartTime)); + } else { + try { + if (isAnnotation) { + sendAnnotationMetricsData(); + } else { + sendMetricsDataInBatch(); + } + } catch (Exception e) { + LOGGER.debug("Exception occur for create and send ", e); + throw e; + } finally { + session.closeSession(); } - } catch (Exception e) { - LOGGER.debug("Exception occur for create and send ", e); - throw e; - } finally { - session.closeSession(); } } @@ -123,7 +151,7 @@ private Metric getMetricObject(JsonNode node, boolean isAnnotation) { public void sendData() { try { session.openSession(); - sendMetricsData(); + sendMetricsDataInBatch(); } catch (Exception e) { LOGGER.error("Error occurred during sending data ", e); } @@ -138,7 +166,8 @@ public void setMetricList(List metricList) { this.metricList = metricList; } - private void sendMetricsData() throws Exception { + + private void sendAnnotationMetricsData() throws Exception { for (Metric metric : metricList) { boolean needUpdate = false; Map metricschema = metaManager.getSchemaMapping(metric.getName()); @@ -146,11 +175,13 @@ private void sendMetricsData() throws Exception { needUpdate = true; metricschema = new ConcurrentHashMap<>(); } - for (Map.Entry entry : metric.getTags().entrySet()) { + Iterator iter = metric.getTags().entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = (Map.Entry) iter.next(); if (metricschema.get(entry.getKey()) == null) { needUpdate = true; int pos = metricschema.size() + 1; - metricschema.put(entry.getKey(), pos); + metricschema.put((String) entry.getKey(), pos); } } if (needUpdate) { @@ -161,47 +192,122 @@ private void sendMetricsData() throws Exception { pos2path.put(entry.getValue(), entry.getKey()); } StringBuilder path = new StringBuilder(); - for (Map.Entry entry : pos2path.entrySet()) { + iter = pos2path.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = (Map.Entry) iter.next(); String ins = metric.getTags().get(entry.getValue()); if (ins != null) { path.append(ins).append("."); - } else { + } + else { path.append("null."); } } path.append(metric.getName()); + path.append(ANNOTATION_SPLIT_STRING); List paths = new ArrayList<>(); paths.add(path.toString()); - int size = metric.getTimestamps().size(); List type = new ArrayList<>(); - type.add(findType(metric.getValues())); + type.add(DataType.BINARY); + int size = metric.getTimestamps().size(); Object[] valuesList = new Object[1]; Object[] values = new Object[size]; for (int i = 0; i < size; i++) { - values[i] = getType(metric.getValues().get(i), type.get(0)); + values[i] = metric.getAnnotation().getBytes(); } valuesList[0] = values; + session.insertNonAlignedColumnRecords(paths, metric.getTimestamps().stream().mapToLong(Long::longValue).toArray(), valuesList, type, null); + } + } + + + Object getType(String str, DataType tp) { + switch (tp) { + case BINARY: + return str.getBytes(); + case DOUBLE: + return Double.parseDouble(str); + default: + return null; + } + } + + DataType findType(List values) { + for (String value : values) { try { - session.insertNonAlignedColumnRecords(paths, metric.getTimestamps().stream().mapToLong(Long::longValue).toArray(), valuesList, type, null); - if (metric.getAnnotation() != null) { - for (int i = 0; i < size; i++) { - values[i] = metric.getAnnotation().getBytes(); - } - valuesList[0] = values; - path.append(ANNOTATION_SPLIT_STRING); - paths.set(0, path.toString()); - type.set(0, DataType.BINARY); - session.insertNonAlignedColumnRecords(paths, metric.getTimestamps().stream().mapToLong(Long::longValue).toArray(), valuesList, type, null); - } - } catch (ExecutionException e) { + Double.parseDouble(value); + } catch (NumberFormatException e) { + return DataType.BINARY; + } + } + return DataType.DOUBLE; + } + + private static List> averageAssign(List source, int n) { + List> result = new ArrayList<>(); + + int remainder = source.size() % n; + int number = source.size() / n; + int offset = 0; + + for (int i = 0; i < n; i++) { + List value; + if (remainder > 0) { + value = source.subList(i * number + offset, (i + 1) * number + offset + 1); + remainder--; + offset++; + } else { + value = source.subList(i * number + offset, (i + 1) * number + offset); + } + result.add(value); + } + return result; + } + + private void sendMetricsDataInBatch() { + long umamdTime = System.currentTimeMillis(); + LOGGER.info(String.format("Going in to meta data updates")); + updateMetaAndMergeData(); + LOGGER.info(String.format("MetaData cost time: %s ms", System.currentTimeMillis() - umamdTime)); + + for (Map.Entry> entry : batchMap.entrySet()) { + List paths = new ArrayList<>(); + List types = new ArrayList<>(); + Object[] values = new Object[1]; + long[] timestamps = new long[1]; + + String prefixPath = entry.getKey().getPrefixPath(); + long timestamp = entry.getKey().getTimestamp(); + List valueList = new ArrayList<>(); + timestamps[0] = timestamp; + + for (Map.Entry subEntry : entry.getValue().entrySet()) { + String suffixPath = subEntry.getKey(); + String value = subEntry.getValue(); + + DataType type = findType(new ArrayList<>(Collections.singletonList(value))); + types.add(type); + paths.add(prefixPath + suffixPath); + valueList.add(getType(value, type)); + } + + values[0] = valueList.toArray(); + + try { + long sessionInsertStartTime = System.currentTimeMillis(); +// session.insertNonAlignedRowRecords(paths, timestamps, values, types, null); + session.insertRowRecords(paths, timestamps, values, types, null); + long sessionInsertEndTime = System.currentTimeMillis(); + LOGGER.info(String.format("Session insert cost time: %s ms", sessionInsertEndTime - sessionInsertStartTime)); + } catch (Exception e) { LOGGER.error("Error occurred during insert ", e); - throw e; } } } - private void sendAnnotationMetricsData() throws Exception { + private void updateMetaAndMergeData() { for (Metric metric : metricList) { + // update meta boolean needUpdate = false; Map metricschema = metaManager.getSchemaMapping(metric.getName()); if (metricschema == null) { @@ -224,55 +330,43 @@ private void sendAnnotationMetricsData() throws Exception { for (Map.Entry entry : metricschema.entrySet()) { pos2path.put(entry.getValue(), entry.getKey()); } - StringBuilder path = new StringBuilder(); + StringBuilder path = new StringBuilder(""); iter = pos2path.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); String ins = metric.getTags().get(entry.getValue()); if (ins != null) { - path.append(ins).append("."); + path.append(ins + "."); } else { path.append("null."); } } - path.append(metric.getName()); - path.append(ANNOTATION_SPLIT_STRING); - List paths = new ArrayList<>(); - paths.add(path.toString()); - List type = new ArrayList<>(); - type.add(DataType.BINARY); - int size = metric.getTimestamps().size(); - Object[] valuesList = new Object[1]; - Object[] values = new Object[size]; - for (int i = 0; i < size; i++) { - values[i] = metric.getAnnotation().getBytes(); - } - valuesList[0] = values; - session.insertNonAlignedColumnRecords(paths, metric.getTimestamps().stream().mapToLong(Long::longValue).toArray(), valuesList, type, null); - } - } - - - Object getType(String str, DataType tp) { - switch (tp) { - case BINARY: - return str.getBytes(); - case DOUBLE: - return Double.parseDouble(str); - default: - return null; - } - } + // merge data in time and prefix path + String prefixPath = path.toString(); + for (int i = 0; i < metric.getTimestamps().size(); i++) { + long timestamp = metric.getTimestamps().get(i); + String value = metric.getValues().get(i); + TimeAndPrefixPath tpKey = new TimeAndPrefixPath(timestamp, prefixPath); + if (batchMap.containsKey(tpKey)) { + batchMap.get(tpKey).put(metric.getName(), value); + } else { + Map metricValueMap = new HashMap<>(); + metricValueMap.put(metric.getName(), value); + batchMap.put(tpKey, metricValueMap); + } - DataType findType(List values) { - for (String value : values) { - try { - Double.parseDouble(value); - } catch (NumberFormatException e) { - return DataType.BINARY; + if (metric.getAnnotation() != null) { + if (batchMap.containsKey(tpKey)) { + batchMap.get(tpKey).put(metric.getName() + ANNOTATION_SPLIT_STRING, + Arrays.toString(metric.getAnnotation().getBytes())); + } else { + Map metricValueMap = new HashMap<>(); + metricValueMap.put(metric.getName() + ANNOTATION_SPLIT_STRING, value); + batchMap.put(tpKey, metricValueMap); + } + } } } - return DataType.DOUBLE; } } \ No newline at end of file diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/Sender.java b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/Sender.java new file mode 100644 index 000000000..ec5572b87 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/Sender.java @@ -0,0 +1,142 @@ +package cn.edu.tsinghua.iginx.rest.insert; + +import cn.edu.tsinghua.iginx.exceptions.SessionException; +import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager; +import cn.edu.tsinghua.iginx.metadata.IMetaManager; +import cn.edu.tsinghua.iginx.rest.RestSession; +import cn.edu.tsinghua.iginx.rest.bean.Metric; +import cn.edu.tsinghua.iginx.thrift.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CountDownLatch; + +public class Sender extends Thread { + + private CountDownLatch latch; + public static final String ANNOTATION_SPLIT_STRING = "@@annotation"; + private static final Logger LOGGER = LoggerFactory.getLogger(DataPointsParser.class); + private final IMetaManager metaManager = DefaultMetaManager.getInstance(); + private RestSession session; + private List metricList = new ArrayList<>(); + private Map> batchMap = new HashMap<>(); + + public Sender(CountDownLatch latch, List list) { + this.latch = latch; + this.metricList.addAll(list); + this.session = new RestSession(); + } + + @Override + public void run() { + try { + session.openSession(); + } catch (SessionException e) { + LOGGER.error("Error occurred during opening session", e); + } + try { + sendMetricsDataInBatch(); + } catch (Exception e) { + LOGGER.error("Error occurred during sending data", e); + } finally { + latch.countDown(); +// session.closeSession(); + } + } + + public void sendMetricsDataInBatch() throws Exception { + updateMetaAndMergeData(); + for (Map.Entry> entry : batchMap.entrySet()) { + List paths = new ArrayList<>(); + List types = new ArrayList<>(); + Object[] values = new Object[1]; + long[] timestamps = new long[1]; + + String prefixPath = entry.getKey().getPrefixPath(); + long timestamp = entry.getKey().getTimestamp(); + List valueList = new ArrayList<>(); + timestamps[0] = timestamp; + + for (Map.Entry subEntry : entry.getValue().entrySet()) { + String suffixPath = subEntry.getKey(); + String value = subEntry.getValue(); + + DataType type = findType(new ArrayList<>(Collections.singletonList(value))); + types.add(type); + paths.add(prefixPath + suffixPath); + valueList.add(getType(value, type)); + } + + values[0] = valueList.toArray(); + + try { + long sessionInsertStartTime = System.currentTimeMillis(); + //session.insertNonAlignedRowRecords(paths, timestamps, values, types, null); + session.insertRowRecords(paths, timestamps, values, types, null); + long sessionInsertEndTime = System.currentTimeMillis(); + LOGGER.info(String.format("Session insert cost time: %s ms", sessionInsertEndTime - sessionInsertStartTime)); + } catch (Exception e) { + LOGGER.error("Error occurred during insert ", e); + } + } + } + + private void updateMetaAndMergeData() { + for (Metric metric : metricList) { + StringBuilder path = new StringBuilder(""); + Iterator iter = metric.getTags().entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = (Map.Entry) iter.next(); + path.append(entry.getKey() + "."); + path.append(entry.getValue() + "."); + } + // merge data in time and prefix path + String prefixPath = path.toString(); + for (int i = 0; i < metric.getTimestamps().size(); i++) { + long timestamp = metric.getTimestamps().get(i); + String value = metric.getValues().get(i); + TimeAndPrefixPath tpKey = new TimeAndPrefixPath(timestamp, prefixPath); + if (batchMap.containsKey(tpKey)) { + batchMap.get(tpKey).put(metric.getName(), value); + } else { + Map metricValueMap = new HashMap<>(); + metricValueMap.put(metric.getName(), value); + batchMap.put(tpKey, metricValueMap); + } + + if (metric.getAnnotation() != null) { + if (batchMap.containsKey(tpKey)) { + batchMap.get(tpKey).put(metric.getName() + ANNOTATION_SPLIT_STRING, + Arrays.toString(metric.getAnnotation().getBytes())); + } else { + Map metricValueMap = new HashMap<>(); + metricValueMap.put(metric.getName() + ANNOTATION_SPLIT_STRING, value); + batchMap.put(tpKey, metricValueMap); + } + } + } + } + } + + Object getType(String str, DataType tp) { + switch (tp) { + case BINARY: + return str.getBytes(); + case DOUBLE: + return Double.parseDouble(str); + } + return null; + } + + DataType findType(List values) { + for (int i = 0; i < values.size(); i++) { + try { + Double.parseDouble(values.get(i)); + } catch (NumberFormatException e) { + return DataType.BINARY; + } + } + return DataType.DOUBLE; + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/SenderManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/SenderManager.java new file mode 100644 index 000000000..394b0a427 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/SenderManager.java @@ -0,0 +1,25 @@ +package cn.edu.tsinghua.iginx.rest.insert; + +import cn.edu.tsinghua.iginx.conf.Config; +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class SenderManager { + private static SenderManager instance = new SenderManager(); + private static final Config config = ConfigDescriptor.getInstance().getConfig(); + private static final ExecutorService senderPool = Executors.newFixedThreadPool(config.getRestReqSplitNum() * 15); + + private SenderManager() { + + } + + public static SenderManager getInstance() { + return instance; + } + + public void addSender(Sender sender) { + senderPool.execute(sender); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/TimeAndPrefixPath.java b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/TimeAndPrefixPath.java new file mode 100644 index 000000000..f81a883cf --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/rest/insert/TimeAndPrefixPath.java @@ -0,0 +1,38 @@ +package cn.edu.tsinghua.iginx.rest.insert; + +public class TimeAndPrefixPath { + private long timestamp; + private String prefixPath; + private String key; + + public TimeAndPrefixPath(long timestamp, String prefixPath) { + this.timestamp = timestamp; + this.prefixPath = prefixPath; + this.key = timestamp + prefixPath; + } + + public long getTimestamp() { + return timestamp; + } + + public String getPrefixPath() { + return prefixPath; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TimeAndPrefixPath)) { + return false; + } + TimeAndPrefixPath other = (TimeAndPrefixPath) o; + return other.timestamp == timestamp && other.prefixPath.equals(prefixPath); + } + + @Override + public int hashCode() { + return key.hashCode(); + } +}