Skip to content
30 changes: 4 additions & 26 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,10 @@ public boolean init(StarManagerServer server) {
return true;
}

protected void prepare() {
if (!serviceId.isEmpty()) {
return;
}

// This method must be called before calling any other method which uses serviceId. Fulfill this prerequisite by calling
// `prepare` as soon as possible after the given StarManagerServer to which the `client` is connected is otherwise fully
// initialized. This is to minimize redundant calls to this method.
public void prepare() {
try (LockCloseable ignored = new LockCloseable(rwLock.readLock())) {
if (!serviceId.isEmpty()) {
return;
Expand Down Expand Up @@ -299,7 +298,6 @@ public long getWorkerTabletNum(String workerIpPort) {
* @param workerGroupId
*/
public void addWorker(long nodeId, String workerIpPort, long workerGroupId) {
prepare();
try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) {
if (serviceId.equals("")) {
LOG.warn("When addWorker serviceId is empty");
Expand Down Expand Up @@ -353,8 +351,6 @@ private void tryRemovePreviousWorker(long nodeId) {
}

public void removeWorker(String workerIpPort, long workerGroupId) throws DdlException {
prepare();

long workerId = getWorker(workerIpPort);

try {
Expand Down Expand Up @@ -414,7 +410,6 @@ private long getWorkerIdByNodeIdInternal(long nodeId) {
}

public long createShardGroup(long dbId, long tableId, long partitionId) throws DdlException {
prepare();
List<ShardGroupInfo> shardGroupInfos = null;
try {
List<CreateShardGroupInfo> createShardGroupInfos = new ArrayList<>();
Expand All @@ -435,7 +430,6 @@ public long createShardGroup(long dbId, long tableId, long partitionId) throws D
}

public void deleteShardGroup(List<Long> groupIds) {
prepare();
try {
client.deleteShardGroup(serviceId, groupIds, true);
} catch (StarClientException e) {
Expand All @@ -444,7 +438,6 @@ public void deleteShardGroup(List<Long> groupIds) {
}

public List<ShardGroupInfo> listShardGroup() {
prepare();
try {
return client.listShardGroup(serviceId);
} catch (StarClientException e) {
Expand All @@ -460,7 +453,6 @@ public List<Long> createShards(int numShards, FilePathInfo pathInfo, FileCacheIn
if (matchShardIds != null) {
Preconditions.checkState(numShards == matchShardIds.size());
}
prepare();
List<ShardInfo> shardInfos = null;
try {
List<CreateShardInfo> createShardInfoList = new ArrayList<>(numShards);
Expand Down Expand Up @@ -497,8 +489,6 @@ public List<Long> createShards(int numShards, FilePathInfo pathInfo, FileCacheIn
}

public List<Long> listShard(long groupId) throws DdlException {
prepare();

List<List<ShardInfo>> shardInfo;
try {
shardInfo = client.listShard(serviceId, Arrays.asList(groupId));
Expand All @@ -512,7 +502,6 @@ public void deleteShards(Set<Long> shardIds) throws DdlException {
if (shardIds.isEmpty()) {
return;
}
prepare();
try {
client.deleteShard(serviceId, shardIds);
} catch (StarClientException e) {
Expand Down Expand Up @@ -628,8 +617,6 @@ public Set<Long> getAllNodeIdsByShard(ShardInfo shardInfo, boolean onlyPrimary)
}

public void createMetaGroup(long metaGroupId, List<Long> shardGroupIds) throws DdlException {
prepare();

try {
CreateMetaGroupInfo createInfo = CreateMetaGroupInfo.newBuilder()
.setMetaGroupId(metaGroupId)
Expand All @@ -643,8 +630,6 @@ public void createMetaGroup(long metaGroupId, List<Long> shardGroupIds) throws D
}

public void updateMetaGroup(long metaGroupId, List<Long> shardGroupIds, boolean isJoin) throws DdlException {
prepare();

try {
UpdateMetaGroupInfo.Builder builder = UpdateMetaGroupInfo.newBuilder();

Expand All @@ -670,8 +655,6 @@ public void updateMetaGroup(long metaGroupId, List<Long> shardGroupIds, boolean
}

public boolean queryMetaGroupStable(long metaGroupId) {
prepare();

try {
return client.queryMetaGroupStable(serviceId, metaGroupId);
} catch (StarClientException e) {
Expand All @@ -682,7 +665,6 @@ public boolean queryMetaGroupStable(long metaGroupId) {

public List<Long> getWorkersByWorkerGroup(long workerGroupId) throws UserException {
List<Long> nodeIds = new ArrayList<>();
prepare();
try {
List<WorkerGroupDetailInfo> workerGroupDetailInfos = client.
listWorkerGroup(serviceId, Collections.singletonList(workerGroupId), true);
Expand All @@ -698,7 +680,6 @@ public List<Long> getWorkersByWorkerGroup(long workerGroupId) throws UserExcepti

public List<String> listWorkerGroupIpPort(long workerGroupId) throws UserException {
List<String> addresses = new ArrayList<>();
prepare();
try {
List<WorkerGroupDetailInfo> workerGroupDetailInfos = client.
listWorkerGroup(serviceId, Collections.singletonList(workerGroupId), true);
Expand All @@ -715,8 +696,6 @@ public List<String> listWorkerGroupIpPort(long workerGroupId) throws UserExcepti

// dump all starmgr meta, for DEBUG purpose
public String dump() {
prepare();

try {
return client.dump();
} catch (StarClientException e) {
Expand All @@ -726,7 +705,6 @@ public String dump() {

@NotNull
public ShardInfo getShardInfo(long shardId, long workerGroupId) throws StarClientException {
prepare();
List<ShardInfo> shardInfos = client.getShardInfo(serviceId, Lists.newArrayList(shardId), workerGroupId);
Preconditions.checkState(shardInfos.size() == 1);
return shardInfos.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,11 +1118,8 @@ public void initialize(String[] args) throws Exception {
createTaskCleaner();
createTableKeeper();

// 7. init starosAgent
if (RunMode.isSharedDataMode() && !starOSAgent.init(null)) {
LOG.error("init starOSAgent failed");
System.exit(-1);
}
// There's no point intializing starOsAgent here since we don't have a StarManagerServer available yet.
// It will be called later when StarMgrServer is initialized.
} catch (Exception e) {
try {
if (isFirstTimeStart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ public void initialize(BDBEnvironment environment, String baseImageDir) throws I

// load meta
loadImage(imageDir);

// Fully set up the starOsAgent so that we don't need to call `StarOSAgent.prepare` elsewhere
LOG.info("Waiting for starMgrServer to start");
starMgrServer.blockUntilStart();
LOG.info("Done waiting for starMgrServer to start");
if (starOsAgent != null) {
starOsAgent.prepare();
}
}

private void becomeLeader() {
Expand Down