Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fragment compaction #491

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ maxCachedPhysicalTaskPerStorage=500
policyClassName=cn.edu.tsinghua.iginx.policy.naive.NaivePolicy
#policyClassName=cn.edu.tsinghua.iginx.policy.simple.SimplePolicy

# 是否允许监视器,如果需要动态负载均衡策略和分片清理则必须允许
enableMonitor=true

# 监视器数据同步间隔(单位为s),默认为3s
loadBalanceCheckInterval=3

# 重分片时,新分片的结束时间多加的间距,单位为秒
reshardFragmentTimeMargin=60

Expand Down Expand Up @@ -88,6 +94,19 @@ storageGroupValueLimit=200.0
# 是否允许通过环境变量设置参数
enableEnvParameter=false

##########################
### 分片清理配置
##########################

# 分片合并的写入负载阈值(小于该阈值的分片将被删除或合并)
fragmentCompactionWriteThreshold=1000

# 分片合并的查询负载阈值(小于该阈值的分片将被合并)
fragmentCompactionReadThreshold=1000

# 分片合并的查询负载阈值比例(小于该阈值的分片将被合并)
fragmentCompactionReadRatioThreshold=0.1

####################
### Rest 服务配置
####################
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/Iginx.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.edu.tsinghua.iginx.conf.Config;
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.monitor.MonitorManager;
import cn.edu.tsinghua.iginx.mqtt.MQTTService;
import cn.edu.tsinghua.iginx.rest.RestServer;
import cn.edu.tsinghua.iginx.thrift.IService;
Expand All @@ -45,6 +46,9 @@ public static void main(String[] args) throws Exception {
if (config.isEnableMQTT()) {
new Thread(MQTTService.getInstance()).start();
}
if (config.isEnableMonitor()) {
new Thread(MonitorManager.getInstance()).start();
}
Iginx iginx = new Iginx();
iginx.startServer();
}
Expand Down
124 changes: 124 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/compaction/Compaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package cn.edu.tsinghua.iginx.compaction;

import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngine;
import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngineImpl;
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Migration;
import cn.edu.tsinghua.iginx.engine.shared.operator.ShowTimeSeries;
import cn.edu.tsinghua.iginx.engine.shared.source.FragmentSource;
import cn.edu.tsinghua.iginx.engine.shared.source.GlobalSource;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta;
import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta;

import java.util.*;

public abstract class Compaction {
protected final static PhysicalEngine physicalEngine = PhysicalEngineImpl.getInstance();
protected final IMetaManager metaManager = DefaultMetaManager.getInstance();

public abstract boolean needCompaction() throws Exception;

public abstract void compact() throws Exception;

protected List<List<FragmentMeta>> packFragmentsByGroup(List<FragmentMeta> fragmentMetas) {
// 排序以减少算法时间复杂度
fragmentMetas.sort((o1, o2) -> {
// 先按照时间维度排序,再按照时间序列维度排序
if (o1.getTimeInterval().getStartTime() == o2.getTimeInterval().getStartTime()) {
return o1.getTsInterval().getStartTimeSeries().compareTo(o2.getTsInterval().getEndTimeSeries());
} else {
// 所有分片在时间维度上是统一的,因此只需要根据起始时间排序即可
return Long.compare(o1.getTimeInterval().getStartTime(), o2.getTimeInterval().getStartTime());
}
});

// 对筛选出来要合并的所有分片按连通性进行分组(同一组中的分片可以合并)
List<List<FragmentMeta>> result = new ArrayList<>();
List<FragmentMeta> lastFragmentGroup = new ArrayList<>();
FragmentMeta lastFragment = null;
for (FragmentMeta fragmentMeta : fragmentMetas) {
if (lastFragment == null) {
lastFragmentGroup.add(fragmentMeta);
} else {
if (isNext(lastFragment, fragmentMeta)) {
lastFragmentGroup.add(fragmentMeta);
} else {
if (lastFragmentGroup.size() > 1) {
result.add(lastFragmentGroup);
}
lastFragmentGroup = new ArrayList<>();
}
}
lastFragment = fragmentMeta;
}
return result;
}

private boolean isNext(FragmentMeta firstFragment, FragmentMeta secondFragment) {
return firstFragment.getTimeInterval().equals(secondFragment.getTimeInterval()) && firstFragment.getTsInterval().getEndTimeSeries().equals(secondFragment.getTsInterval().getStartTimeSeries());
}

protected void compactFragmentGroupToTargetStorageUnit(List<FragmentMeta> fragmentGroup, StorageUnitMeta targetStorageUnit, long totalPoints) throws PhysicalException {
String startTimeseries = fragmentGroup.get(0).getTsInterval().getStartTimeSeries();
String endTimeseries = fragmentGroup.get(0).getTsInterval().getEndTimeSeries();
long startTime = fragmentGroup.get(0).getTimeInterval().getStartTime();
long endTime = fragmentGroup.get(0).getTimeInterval().getEndTime();

for (FragmentMeta fragmentMeta : fragmentGroup) {
String storageUnitId = fragmentMeta.getMasterStorageUnitId();
if (!storageUnitId.equals(targetStorageUnit.getId())) {
// 重写该分片的数据
Set<String> pathRegexSet = new HashSet<>();
ShowTimeSeries showTimeSeries = new ShowTimeSeries(new GlobalSource(), pathRegexSet, null,
Integer.MAX_VALUE, 0);
RowStream rowStream = physicalEngine.execute(showTimeSeries);
SortedSet<String> pathSet = new TreeSet<>();
rowStream.getHeader().getFields().forEach(field -> {
String timeSeries = field.getName();
if (timeSeries.contains("{") && timeSeries.contains("}")) {
timeSeries = timeSeries.split("\\{")[0];
}
if (fragmentMeta.getTsInterval().isContain(timeSeries)) {
pathSet.add(timeSeries);
}
});
Migration migration = new Migration(new GlobalSource(), fragmentMeta, new ArrayList<>(pathSet), targetStorageUnit);
physicalEngine.execute(migration);
// 更新存储点数信息
metaManager.updateFragmentPoints(fragmentMeta, totalPoints);
}
}
// TODO add write lock
// 创建新分片
FragmentMeta newFragment = new FragmentMeta(startTimeseries, endTimeseries, startTime, endTime, targetStorageUnit);
DefaultMetaManager.getInstance().addFragment(newFragment);

for (FragmentMeta fragmentMeta : fragmentGroup) {
String storageUnitId = fragmentMeta.getMasterStorageUnitId();
if (!storageUnitId.equals(targetStorageUnit.getId())) {
// 删除原分片元数据信息
DefaultMetaManager.getInstance().removeFragment(fragmentMeta);
}
}
// TODO release write lock

for (FragmentMeta fragmentMeta : fragmentGroup) {
String storageUnitId = fragmentMeta.getMasterStorageUnitId();
if (!storageUnitId.equals(targetStorageUnit.getId())) {
// 删除原分片节点数据
List<String> paths = new ArrayList<>();
paths.add(fragmentMeta.getMasterStorageUnitId() + "*");
List<TimeRange> timeRanges = new ArrayList<>();
timeRanges.add(new TimeRange(fragmentMeta.getTimeInterval().getStartTime(), true,
fragmentMeta.getTimeInterval().getEndTime(), false));
Delete delete = new Delete(new FragmentSource(fragmentMeta), timeRanges, paths, null);
physicalEngine.execute(delete);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cn.edu.tsinghua.iginx.compaction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class CompactionManager {

private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);

private static final List<Compaction> compactionList = new ArrayList<>();

static {
compactionList.add(new FragmentDeletionCompaction());
compactionList.add(new LowWriteFragmentCompaction());
compactionList.add(new LowAccessFragmentCompaction());
}

private static final CompactionManager instance = new CompactionManager();

public static CompactionManager getInstance() {
return instance;
}

public void clearFragment() throws Exception {
logger.info("start to compact fragments");
for (Compaction compaction : compactionList) {
if (compaction.needCompaction()) {
compaction.compact();
}
}
logger.info("end compact fragments");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cn.edu.tsinghua.iginx.compaction;

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.shared.TimeRange;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.source.FragmentSource;
import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta;
import cn.edu.tsinghua.iginx.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class FragmentDeletionCompaction extends Compaction {

private static final Logger logger = LoggerFactory.getLogger(FragmentDeletionCompaction.class);
private List<FragmentMeta> toDeletionFragments;

@Override
public boolean needCompaction() throws Exception {
//集中信息(初版主要是统计分区热度)
Pair<Map<FragmentMeta, Long>, Map<FragmentMeta, Long>> fragmentHeatPair = metaManager
.loadFragmentHeat();
Map<FragmentMeta, Long> fragmentHeatWriteMap = fragmentHeatPair.getK();
Map<FragmentMeta, Long> fragmentHeatReadMap = fragmentHeatPair.getV();
if (fragmentHeatWriteMap == null) {
fragmentHeatWriteMap = new HashMap<>();
}
if (fragmentHeatReadMap == null) {
fragmentHeatReadMap = new HashMap<>();
}

long totalHeats = 0;
for (Map.Entry<FragmentMeta, Long> fragmentHeatReadEntry : fragmentHeatReadMap.entrySet()) {
totalHeats += fragmentHeatReadEntry.getValue();
}
double limitReadHeats = totalHeats * 1.0 / fragmentHeatReadMap.size();

// 判断是否要删除可定制化副本生成的冗余分片
// TODO

return !toDeletionFragments.isEmpty();
}

@Override
public void compact() throws PhysicalException {
for (FragmentMeta fragmentMeta : toDeletionFragments) {
// 删除可定制化副本分片元数据
// TODO

// 删除节点数据
List<String> paths = new ArrayList<>();
paths.add(fragmentMeta.getMasterStorageUnitId() + "*");
List<TimeRange> timeRanges = new ArrayList<>();
timeRanges.add(new TimeRange(fragmentMeta.getTimeInterval().getStartTime(), true,
fragmentMeta.getTimeInterval().getEndTime(), false));
Delete delete = new Delete(new FragmentSource(fragmentMeta), timeRanges, paths, null);
physicalEngine.execute(delete);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package cn.edu.tsinghua.iginx.compaction;

import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta;
import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta;
import cn.edu.tsinghua.iginx.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class LowAccessFragmentCompaction extends Compaction {

private static final Logger logger = LoggerFactory.getLogger(LowAccessFragmentCompaction.class);
private static final long fragmentCompactionWriteThreshold = ConfigDescriptor.getInstance().getConfig().getFragmentCompactionWriteThreshold();
private static final long fragmentCompactionReadThreshold = ConfigDescriptor.getInstance().getConfig().getFragmentCompactionReadThreshold();

private List<List<FragmentMeta>> toCompactFragmentGroups;

@Override
public boolean needCompaction() throws Exception {
//集中信息(初版主要是统计分区热度)
Pair<Map<FragmentMeta, Long>, Map<FragmentMeta, Long>> fragmentHeatPair = metaManager
.loadFragmentHeat();
Map<FragmentMeta, Long> fragmentHeatWriteMap = fragmentHeatPair.getK();
Map<FragmentMeta, Long> fragmentHeatReadMap = fragmentHeatPair.getV();
if (fragmentHeatWriteMap == null) {
fragmentHeatWriteMap = new HashMap<>();
}
if (fragmentHeatReadMap == null) {
fragmentHeatReadMap = new HashMap<>();
}

List<FragmentMeta> fragmentMetaSet = metaManager.getFragments();

List<FragmentMeta> candidateFragments = new ArrayList<>();
// 判断是否要合并不再被写入的的历史分片
for (FragmentMeta fragmentMeta : fragmentMetaSet) {
long writeLoad = fragmentHeatWriteMap.getOrDefault(fragmentMeta, 0L);
long readLoad = fragmentHeatReadMap.getOrDefault(fragmentMeta, 0L);
if (fragmentMeta.getTimeInterval().getEndTime() != Long.MAX_VALUE && writeLoad < fragmentCompactionWriteThreshold && readLoad <= fragmentCompactionReadThreshold) {
candidateFragments.add(fragmentMeta);
}
}

toCompactFragmentGroups = packFragmentsByGroup(candidateFragments);

return !toCompactFragmentGroups.isEmpty();
}

@Override
public void compact() throws Exception {
logger.info("start to compact low access fragments");
Map<FragmentMeta, Long> fragmentMetaPointsMap = metaManager.loadFragmentPoints();

// 优先存储到点数最少的节点上(剩余磁盘空间较大)
Map<Long, Long> storageEnginePointsMap = new HashMap<>();
for (Map.Entry<FragmentMeta, Long> fragmentMetaPointsEntry : fragmentMetaPointsMap.entrySet()) {
FragmentMeta fragmentMeta = fragmentMetaPointsEntry.getKey();
long storageEngineId = fragmentMeta.getMasterStorageUnit().getStorageEngineId();
long points = fragmentMetaPointsEntry.getValue();
long allPoints = storageEnginePointsMap.getOrDefault(storageEngineId, 0L);
allPoints += points;
storageEnginePointsMap.put(storageEngineId, allPoints);
}
long minPoints = Long.MAX_VALUE;
long minStorageEngineId = 0;
for (Map.Entry<Long, Long> storageEnginePointsEntry : storageEnginePointsMap.entrySet()) {
if (minPoints > storageEnginePointsEntry.getValue()) {
minStorageEngineId = storageEnginePointsEntry.getKey();
minPoints = storageEnginePointsEntry.getValue();
}
}

for (List<FragmentMeta> fragmentGroup : toCompactFragmentGroups) {
// 分别计算每个du的数据量,取其中数据量最多的du作为目标合并du
StorageUnitMeta maxStorageUnitMeta = fragmentGroup.get(0).getMasterStorageUnit();
long maxStorageUnitPoint = 0;
long totalPoints = 0;
Map<String, Long> storageUnitPointsMap = new HashMap<>();
for (FragmentMeta fragmentMeta : fragmentGroup) {
// 优先按照节点当前存储的点数最小做选择
if (fragmentMeta.getMasterStorageUnit().getStorageEngineId() == minStorageEngineId) {
long pointsNum = storageUnitPointsMap.getOrDefault(fragmentMeta.getMasterStorageUnitId(), 0L);
pointsNum += fragmentMetaPointsMap.getOrDefault(fragmentMeta, 0L);
if (pointsNum > maxStorageUnitPoint) {
maxStorageUnitMeta = fragmentMeta.getMasterStorageUnit();
}
storageUnitPointsMap.put(fragmentMeta.getMasterStorageUnitId(), pointsNum);
}
totalPoints += fragmentMetaPointsMap.getOrDefault(fragmentMeta, 0L);
}

compactFragmentGroupToTargetStorageUnit(fragmentGroup, maxStorageUnitMeta, totalPoints);
}
}
}
Loading