Skip to content

Commit

Permalink
Add split progress in jdbc and mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond-up committed Mar 3, 2023
1 parent 2ac540c commit 24b3f7c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ public interface JdbcReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<String> CONNECTION_PARAMETERS =
key(READER_PREFIX + "connection_parameters")
.noDefaultValue(String.class);

ConfigOption<Integer> TASK_PROGRESS_NUM =
key(READER_PREFIX + "task_progress_num")
.defaultValue(1000);
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ void setStatementRange(PreparedStatement statement, SplitRangeInfo splitRangeInf
statement.setString(2, splitRangeInfo.getEndPos().toString());
}

@SuppressWarnings("checkstyle:MagicNumber")
private synchronized SplitRangeInfo getRangeInfo() {
List<SplitRangeInfo> oneTaskGroupSplitRangeInfoList = getTaskRangeInfos();
if (oneTaskGroupSplitRangeInfoList.isEmpty()) {
Expand All @@ -368,6 +369,12 @@ private synchronized SplitRangeInfo getRangeInfo() {
inputSplitId = splitRangeInfo.getRangeId();
messenger.recordSplitProgress();
incCompletedSplits(1);

int taskProgressNum = inputSliceConfig.getUnNecessaryOption(JdbcReaderOptions.TASK_PROGRESS_NUM, 1000);
if (currentTaskId % taskProgressNum == 0) {
LOG.info("Start to process range info: [" + splitRangeInfo.getBeginPos() + "," + splitRangeInfo.getEndPos() + "]\t[" +
currentTaskId + "/" + oneTaskGroupSplitRangeInfoList.size() + "]");
}
return splitRangeInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ public interface MongoDBReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<String> SPLIT_MODE =
key(READER_PREFIX + "split_mode")
.defaultValue("parallelism");

ConfigOption<Integer> TASK_PROGRESS_NUM =
key(READER_PREFIX + "task_progress_num")
.defaultValue(1000);
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ private synchronized Range getRangeInfo() {
Range range = currentTaskGroupRangeList.get(currentTaskId);
incCompletedSplits(1);

if (currentTaskId % 1000 == 0) {
int taskProgressNum = inputSliceConfig.getUnNecessaryOption(MongoDBReaderOptions.TASK_PROGRESS_NUM, 1000);
if (currentTaskId % taskProgressNum == 0) {
LOG.info("Start to process range info: " + range + "\t[" + currentTaskId + "/" + currentTaskGroupRangeList.size() + "]");
}
return range;
Expand Down

0 comments on commit 24b3f7c

Please sign in to comment.