Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYLIN-5514][feature] support batch build and refresh, support batch build by nature month. #2126

Open
wants to merge 1 commit into
base: kylin4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,9 @@ public boolean isJobAutoReadyCubeEnabled() {
return Boolean.parseBoolean(getOptional("kylin.job.cube-auto-ready-enabled", TRUE));
}

public boolean isBatchBuildCubeByMonthEnabled(){
return Boolean.parseBoolean(getOptional("kylin.job.batch-build-by-month-enabled", TRUE));
}
public int getJobOutputMaxSize() {
return Integer.parseInt(getOptional("kylin.job.execute-output.max-size", "10485760"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,10 @@ public enum JobTypeEnum {
/**
* job of sampling table
*/
TABLE_SAMPLING
TABLE_SAMPLING,

/**
* batch build or refresh, refresh if segment exists, build if not
*/
BUILD_OR_REFRESH
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private TimeUtil() {
throw new IllegalStateException("Class TimeUtil is an utility class !");
}

private static TimeZone gmt = TimeZone.getTimeZone("GMT");
public static TimeZone gmt = TimeZone.getTimeZone("GMT");
public static final long ONE_MINUTE_TS = 60 * 1000L;
public static final long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
public static final long ONE_DAY_TS = 24 * ONE_HOUR_TS;
Expand Down Expand Up @@ -83,6 +83,21 @@ public static long getMonthStartWithTimeZone(TimeZone timeZone, long ts){
return calendar.getTimeInMillis();
}

public static long getNextMonthStart(long ts) {
return getNextMonthStartWithTimeZone(gmt, ts);
}

public static long getNextMonthStartWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(ts);
calendar.add(Calendar.MONTH, 1);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH);
calendar.clear();
calendar.set(year, month, 1);
return calendar.getTimeInMillis();
}

public static long getQuarterStart(long ts) {
return getQuarterStartWithTimeZone(gmt, ts);
}
Expand Down
86 changes: 86 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
Expand All @@ -40,6 +42,7 @@
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.kv.RowConstants;
Expand Down Expand Up @@ -202,6 +205,89 @@ public static Pair<Long, Long> parseSegmentName(String segmentName) {
}
}

public static List<TSRange> splitRangeByMergeInterval(long startTime, long endTime, List<Long> mergeInterval) {
List<SegmentRange.TSRange> batchRange = new ArrayList<>();
long indexTime = startTime;
while (indexTime < endTime) {
long curMaxInterval = Long.MIN_VALUE;
for (Long splitRange : mergeInterval) {
if (splitRange <= (endTime - indexTime)) {
curMaxInterval = Math.max(splitRange, curMaxInterval);
}
}
if (curMaxInterval == Long.MIN_VALUE) {
batchRange.add(new SegmentRange.TSRange(indexTime, endTime));
break;
}
batchRange.add(new SegmentRange.TSRange(indexTime, indexTime + curMaxInterval));
indexTime = indexTime + curMaxInterval;
}
return batchRange;
}

public static List<TSRange> splitRangeByMonth(long startTime, long endTime) {
List<TSRange> result = new ArrayList<>();

long startOfNextMonth = TimeUtil.getNextMonthStart(startTime);
while (startOfNextMonth < endTime) {
result.add(new TSRange(startTime, startOfNextMonth));
startTime = startOfNextMonth;
startOfNextMonth = TimeUtil.getNextMonthStart(startTime);
}
result.add(new TSRange(startTime, endTime));
return result;
}

/**
* Find overlapping segment TSRange in the specified time range
*/
public static List<TSRange> getOverlapsRange(Segments<CubeSegment> readySegments, Long startTime, Long endTime, boolean refreshOverlaps) {
List<TSRange> batchRange = new ArrayList<>();
TSRange needRefreshTsRange = new TSRange(startTime, endTime);
for (CubeSegment readySegment : readySegments) {
TSRange tsRange = readySegment.getTSRange();
if (refreshOverlaps && needRefreshTsRange.overlaps(tsRange)) {
batchRange.add(new TSRange(tsRange.startValue(), tsRange.endValue()));
} else if (!refreshOverlaps && needRefreshTsRange.contains(tsRange)) {
batchRange.add(new TSRange(tsRange.startValue(), tsRange.endValue()));
}
}
return batchRange;
}

/**
* Get the segment TSRange that does not overlap within the specified time range
*/
public static List<TSRange> getNotOverlapsRange(Long startTime, Long endTime, List<TSRange> overlapsRange) {
List<TSRange> missingRanges = new ArrayList<>();
overlapsRange.sort(Comparator.comparing(TSRange::startValue));
if (overlapsRange.isEmpty()) {
// no overlapping ranges
missingRanges.add(new TSRange(startTime, endTime));
} else {
// handle missing ranges preceding the first range
TSRange firstRange = overlapsRange.get(0);
if (startTime < firstRange.startValue()) {
missingRanges.add(new TSRange(startTime, firstRange.startValue()));
}
// Handle missing ranges between two adjacent ranges in a range list
for (int i = 0; i < overlapsRange.size() - 1; i++) {
TSRange currRange = overlapsRange.get(i);
TSRange nextRange = overlapsRange.get(i + 1);
if (currRange.endValue() < nextRange.startValue()) {
missingRanges.add(new TSRange(currRange.endValue(), nextRange.startValue()));
}
}

// handle missing ranges after the last range
TSRange lastRange = overlapsRange.get(overlapsRange.size() - 1);
if (endTime > lastRange.endValue()) {
missingRanges.add(new TSRange(lastRange.endValue(), endTime));
}
}
return missingRanges;
}

// ============================================================================

public KylinConfig getConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.metadata.model.PartitionDesc;
Expand Down Expand Up @@ -156,6 +159,76 @@ public void testPartitioned() throws IOException {
assertEquals(new TSRange(0L, 2000L), merge2.getSegRange());
}

@Test
public void testSplitRangeByMergeRange(){
long startTime = 1672588800000L; // 2023-01-02 00:00:00
long endTime = 1675094400000L; // 2023-01-31 00:00:00
long endTime2 = 1673280000000L; // 2023-01-10 00:00:00

List<Long> mergeInternal = Arrays.asList(86400000L, 86400000L * 7, 86400000L * 28);

List<SegmentRange.TSRange> expected = Arrays.asList(
new SegmentRange.TSRange(1672588800000L, 1675008000000L),
new SegmentRange.TSRange(1675008000000L, 1675094400000L)
);
List<SegmentRange.TSRange> expected2 = Arrays.asList(
new SegmentRange.TSRange(1672588800000L, 1673193600000L),
new SegmentRange.TSRange(1673193600000L, 1673280000000L)
);

List<SegmentRange.TSRange> actual = CubeSegment.splitRangeByMergeInterval(startTime, endTime, mergeInternal);
List<SegmentRange.TSRange> actual2 = CubeSegment.splitRangeByMergeInterval(startTime, endTime2, mergeInternal);

assertEquals(expected, actual);
assertEquals(expected2, actual2);
}

@Test
public void testSplitRangeByMonth(){
long startTime = 1667347200000L; // 2022-11-02 00:00:00 (GMT)
long endTime = 1675987200000L; // 2023-02-10 00:00:00 (GMT)

List<SegmentRange.TSRange> expected = Arrays.asList(
new SegmentRange.TSRange(1667347200000L, 1669852800000L),
new SegmentRange.TSRange(1669852800000L, 1672531200000L),
new SegmentRange.TSRange(1672531200000L, 1675209600000L),
new SegmentRange.TSRange(1675209600000L, 1675987200000L)
);

List<SegmentRange.TSRange> actual = CubeSegment.splitRangeByMonth(startTime, endTime);
assertEquals(expected, actual);

long startTime2 = 1667347200000L; // 2022-11-02 00:00:00 (GMT)
long endTime2 = 1669852800000L; // 2022-12-01 00:00:00 (GMT)
List<SegmentRange.TSRange> expected2 = Collections.singletonList(
new TSRange(1667347200000L, 1669852800000L)
);

List<SegmentRange.TSRange> actual2 = CubeSegment.splitRangeByMonth(startTime2, endTime2);
assertEquals(expected2, actual2);
}

@Test
public void testGetNotOverlapsRange() {
Long startTime = 0L;
Long endTime = 100L;
List<TSRange> overlapsRange = Arrays.asList(
new TSRange(20L, 30L),
new TSRange(40L, 50L),
new TSRange(70L, 80L)
);

List<TSRange> expectedMissingRanges = Arrays.asList(
new TSRange(0L, 20L),
new TSRange(30L, 40L),
new TSRange(50L, 70L),
new TSRange(80L, 100L)
);

List<TSRange> missingRanges = CubeSegment.getNotOverlapsRange(startTime, endTime, overlapsRange);
assertEquals(expectedMissingRanges, missingRanges);
}

@Test
public void testAllowGap() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,24 @@ public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildR
req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset());
}


@RequestMapping(value = "/{cubeName}/batchRebuild", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public List<JobInstance> batchBuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req,
@RequestParam(defaultValue = "false") boolean refreshOverlaps) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);

checkBuildingSegment(cube);
return jobService.batchSubmitJob(cube, req.getStartTime(), req.getEndTime(), submitter, req.getPriorityOffset(),
JobTypeEnum.valueOf(req.getBuildType()), req.isForce(), refreshOverlaps);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage(), e);
}
}

/**
* Build/Rebuild a cube segment by source offset
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -229,6 +230,105 @@ public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange se
return jobInstance;
}

public List<JobInstance> batchSubmitJob(CubeInstance cube, Long startTime, Long endTime, String submitter, Integer priorityOffset,
JobTypeEnum buildType, boolean force, boolean refreshOverlaps) throws IOException {
logger.info("batchSubmitJob, cube:{}, startTime:{}, endTime:{}, submitter:{}, priorityOffset:{}, buildType:{}, force:{}, refreshOverlaps:{}",
cube, startTime, endTime, submitter, priorityOffset, buildType, force, refreshOverlaps);
aclEvaluate.checkProjectOperationPermission(cube);
List<JobInstance> jobInstances = new ArrayList<>();
if (!cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
try {
jobInstances.add(submitJobInternal(cube, null, new SegmentRange(startTime, endTime),
null, null, buildType, force, submitter, priorityOffset));
return jobInstances;
} catch (Exception e) {
logger.error("Job submission might failed, cube:{}, start:{}, end:{}", cube, startTime, endTime);
throw e;
}
}

Segments<CubeSegment> segments = cube.getSegments();
Segments<CubeSegment> readySegments = segments.getSegments(SegmentStatusEnum.READY);

Map<JobTypeEnum, List<TSRange>> jobsMap = new HashMap<>();

if (buildType == JobTypeEnum.BUILD) {

jobsMap.put(JobTypeEnum.BUILD, getBatchBuildCubeRange(cube, startTime, endTime));
} else if (buildType == JobTypeEnum.REFRESH) {

jobsMap.put(JobTypeEnum.REFRESH, CubeSegment.getOverlapsRange(readySegments, startTime, endTime, refreshOverlaps));
} else if (buildType == JobTypeEnum.BUILD_OR_REFRESH) {
List<TSRange> overlapsRange;
List<TSRange> notOverlapRange;

if (refreshOverlaps) {
// 1. Find all existing ready segments within the time range
overlapsRange = CubeSegment.getOverlapsRange(readySegments, startTime, endTime, true);
// 2. Find all missing segments
notOverlapRange = CubeSegment.getNotOverlapsRange(startTime, endTime, overlapsRange);
} else {
List<TSRange> containsRange = CubeSegment.getOverlapsRange(readySegments, startTime, endTime, false);
overlapsRange = CubeSegment.getOverlapsRange(readySegments, startTime, endTime, true);
//1. fina all missing segments
notOverlapRange = CubeSegment.getNotOverlapsRange(startTime, endTime, overlapsRange);
//2. only refresh contains segments
overlapsRange = containsRange;
}

//Divide the missing segments according to the natural month
List<TSRange> needBuildRange = new ArrayList<>();
notOverlapRange.forEach(x -> needBuildRange.addAll(CubeSegment.splitRangeByMonth(x.startValue(), x.endValue())));
//3. Cube build for missing ones, and refresh for existing ones
jobsMap.put(JobTypeEnum.BUILD, needBuildRange);
jobsMap.put(JobTypeEnum.REFRESH, overlapsRange);
}
Segments<CubeSegment> buildingSegments = segments.getBuildingSegments();

for (JobTypeEnum cubeBuildType : jobsMap.keySet()) {
// Exclude the range being built from the list of jobs ready to be built
List<TSRange> batchRange = jobsMap.get(cubeBuildType);
List<TSRange> invalidRange = buildingSegments.stream()
.flatMap(segment -> batchRange.stream().filter(segment.getSegRange()::contains))
.collect(Collectors.toList());

batchRange.removeAll(invalidRange);

logger.info("batch buildType: {} , batchRange:{}", cubeBuildType.name(), batchRange);
for (TSRange tsRange : batchRange) {
try {
jobInstances.add(submitJobInternal(cube, tsRange, null, null, null,
cubeBuildType, force, submitter, priorityOffset));
} catch (IOException e) {
logger.error("Job submission might failed, cube:{}, start:{}, end:{}", cube, tsRange.start, tsRange.end);
throw e;
}
}
}
return jobInstances;
}


/**
* Divide the specified time range into segment TSRange
* @param cube
* @param startTime
* @param endTime
* @return
*/
private List<TSRange> getBatchBuildCubeRange(CubeInstance cube, Long startTime, Long endTime) {
List<TSRange> batchRange = new ArrayList<>();
if (cube.getDescriptor().getConfig().isBatchBuildCubeByMonthEnabled()) {
batchRange.addAll(CubeSegment.splitRangeByMonth(startTime, endTime));
} else {
List<Long> mergeInternal = Arrays.stream(cube.getDescriptor().getAutoMergeTimeRanges()).boxed().collect(Collectors.toList());
// Add minimum time range, day level
mergeInternal.add(86400000L);
batchRange.addAll(CubeSegment.splitRangeByMergeInterval(startTime, endTime, mergeInternal));
}
return batchRange;
}

public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
JobTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
Expand Down
Loading