From 24b3f7cbf07f48dc47e26c07eda8bb0e7a22688a Mon Sep 17 00:00:00 2001 From: beyond-up <3451663959@qq.com> Date: Fri, 3 Mar 2023 11:22:48 +0800 Subject: [PATCH] Add split progress in jdbc and mongo https://github.com/bytedance/bitsail/issues/389 --- .../connector/legacy/jdbc/options/JdbcReaderOptions.java | 4 ++++ .../connector/legacy/jdbc/source/JDBCInputFormat.java | 7 +++++++ .../legacy/mongodb/option/MongoDBReaderOptions.java | 4 ++++ .../legacy/mongodb/source/MongoDBInputFormat.java | 3 ++- 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/options/JdbcReaderOptions.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/options/JdbcReaderOptions.java index 80b4da7d1..28d4c5590 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/options/JdbcReaderOptions.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/options/JdbcReaderOptions.java @@ -110,4 +110,8 @@ public interface JdbcReaderOptions extends ReaderOptions.BaseReaderOptions { ConfigOption CONNECTION_PARAMETERS = key(READER_PREFIX + "connection_parameters") .noDefaultValue(String.class); + + ConfigOption TASK_PROGRESS_NUM = + key(READER_PREFIX + "task_progress_num") + .defaultValue(1000); } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/source/JDBCInputFormat.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/source/JDBCInputFormat.java index 45fa34061..18f3eeff1 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/source/JDBCInputFormat.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-jdbc/src/main/java/com/bytedance/bitsail/connector/legacy/jdbc/source/JDBCInputFormat.java @@ -356,6 +356,7 @@ void setStatementRange(PreparedStatement statement, SplitRangeInfo splitRangeInf statement.setString(2, splitRangeInfo.getEndPos().toString()); } + @SuppressWarnings("checkstyle:MagicNumber") private synchronized SplitRangeInfo getRangeInfo() { List oneTaskGroupSplitRangeInfoList = getTaskRangeInfos(); if (oneTaskGroupSplitRangeInfoList.isEmpty()) { @@ -368,6 +369,12 @@ private synchronized SplitRangeInfo getRangeInfo() { inputSplitId = splitRangeInfo.getRangeId(); messenger.recordSplitProgress(); incCompletedSplits(1); + + int taskProgressNum = inputSliceConfig.getUnNecessaryOption(JdbcReaderOptions.TASK_PROGRESS_NUM, 1000); + if (currentTaskId % taskProgressNum == 0) { + LOG.info("Start to process range info: [" + splitRangeInfo.getBeginPos() + "," + splitRangeInfo.getEndPos() + "]\t[" + + currentTaskId + "/" + oneTaskGroupSplitRangeInfoList.size() + "]"); + } return splitRangeInfo; } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/option/MongoDBReaderOptions.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/option/MongoDBReaderOptions.java index 05d67b2da..77754bb6a 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/option/MongoDBReaderOptions.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/option/MongoDBReaderOptions.java @@ -81,4 +81,8 @@ public interface MongoDBReaderOptions extends ReaderOptions.BaseReaderOptions { ConfigOption SPLIT_MODE = key(READER_PREFIX + "split_mode") .defaultValue("parallelism"); + + ConfigOption TASK_PROGRESS_NUM = + key(READER_PREFIX + "task_progress_num") + .defaultValue(1000); } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/source/MongoDBInputFormat.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/source/MongoDBInputFormat.java index f3a3f9707..2a7461408 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/source/MongoDBInputFormat.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-mongodb/src/main/java/com/bytedance/bitsail/connector/legacy/mongodb/source/MongoDBInputFormat.java @@ -482,7 +482,8 @@ private synchronized Range getRangeInfo() { Range range = currentTaskGroupRangeList.get(currentTaskId); incCompletedSplits(1); - if (currentTaskId % 1000 == 0) { + int taskProgressNum = inputSliceConfig.getUnNecessaryOption(MongoDBReaderOptions.TASK_PROGRESS_NUM, 1000); + if (currentTaskId % taskProgressNum == 0) { LOG.info("Start to process range info: " + range + "\t[" + currentTaskId + "/" + currentTaskGroupRangeList.size() + "]"); } return range;