Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: split the task thread pool for committing and rollbacking statuses #6499

Open
wants to merge 11 commits into
base: 2.x
Choose a base branch
from
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6427](https://github.com/apache/incubator-seata/pull/6427)] support spi、saga、spring module compatible
- [[#6442](https://github.com/apache/incubator-seata/pull/6442)] clarify if conditions
- [[#6487](https://github.com/apache/incubator-seata/pull/6487)] fix typo and package name
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses

### refactor:
- [[#6269](https://github.com/apache/incubator-seata/pull/6269)] standardize Seata Exception
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
- [[#6427](https://github.com/apache/incubator-seata/pull/6427)] 支持spi、saga、spring模块的向下兼容
- [[#6442](https://github.com/apache/incubator-seata/pull/6442)] 阐明 if
- [[#6487](https://github.com/apache/incubator-seata/pull/6487)] 修复错误包名以及单词
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池

### refactor:
- [[#6269](https://github.com/apache/incubator-seata/pull/6269)] 统一Seata异常规范
Expand Down
15 changes: 15 additions & 0 deletions common/src/main/java/org/apache/seata/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ public interface Constants {
*/
String UNDOLOG_DELETE = "UndologDelete";

/**
* The constant SYNC_PROCESSING
*/
String SYNC_PROCESSING = "SyncProcessing";

/**
* The constant Committing
*/
String COMMITTING = "Committing";

/**
* The constant Rollbacking
*/
String ROLLBACKING = "Rollbacking";

/**
* The constant AUTO_COMMIT
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.seata.server.coordinator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -25,6 +28,7 @@
import java.util.concurrent.TimeUnit;

import io.netty.channel.Channel;
import org.apache.seata.common.DefaultValues;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;
Expand Down Expand Up @@ -73,8 +77,11 @@
import org.slf4j.MDC;

import static org.apache.seata.common.Constants.ASYNC_COMMITTING;
import static org.apache.seata.common.Constants.COMMITTING;
import static org.apache.seata.common.Constants.RETRY_COMMITTING;
import static org.apache.seata.common.Constants.RETRY_ROLLBACKING;
import static org.apache.seata.common.Constants.ROLLBACKING;
import static org.apache.seata.common.Constants.SYNC_PROCESSING;
import static org.apache.seata.common.Constants.TX_TIMEOUT_CHECK;
import static org.apache.seata.common.Constants.UNDOLOG_DELETE;
import static org.apache.seata.common.DefaultValues.DEFAULT_ASYNC_COMMITTING_RETRY_PERIOD;
Expand Down Expand Up @@ -154,6 +161,9 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE);

private static final int RETRY_DEAD_THRESHOLD = ConfigurationFactory.getInstance()
.getInt(org.apache.seata.common.ConfigurationKeys.RETRY_DEAD_THRESHOLD, DefaultValues.DEFAULT_RETRY_DEAD_THRESHOLD);

private final ScheduledThreadPoolExecutor retryRollbacking =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1));

Expand All @@ -169,10 +179,17 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private final ScheduledThreadPoolExecutor undoLogDelete =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1));

private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] {GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking};
private final ScheduledThreadPoolExecutor syncProcessing =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(SYNC_PROCESSING, 1));

private final GlobalStatus[] retryRollbackingStatuses = new GlobalStatus[] {
GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying};

private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committed};

private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] {GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Committed};
private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] {GlobalStatus.Rollbacking};
private final GlobalStatus[] committingStatuses = new GlobalStatus[] {GlobalStatus.Committing};

private final ThreadPoolExecutor branchRemoveExecutor;

Expand Down Expand Up @@ -366,7 +383,7 @@ protected void timeoutCheck() {
* Handle retry rollbacking.
*/
protected void handleRetryRollbacking() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
SessionCondition sessionCondition = new SessionCondition(retryRollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
Expand All @@ -376,12 +393,6 @@ protected void handleRetryRollbacking() {
long now = System.currentTimeMillis();
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
// prevent repeated rollback
if (rollbackingSession.getStatus() == GlobalStatus.Rollbacking
&& !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) {
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
Expand Down Expand Up @@ -413,13 +424,6 @@ protected void handleRetryCommitting() {
long now = System.currentTimeMillis();
SessionHelper.forEach(committingSessions, committingSession -> {
try {
// prevent repeated commit
if ((GlobalStatus.Committing.equals(committingSession.getStatus())
|| GlobalStatus.Committed.equals(committingSession.getStatus()))
&& !committingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT, committingSession.getBeginTime())) {

// commit retry timeout event
Expand Down Expand Up @@ -488,6 +492,116 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) {
return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout;
}

/**
* Handle rollbacking by scheduled.
*/
protected void handleRollbackingByScheduled() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
List<GlobalSession> rollbackingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(rollbackingSessions)) {
rollbackingSchedule(RETRY_DEAD_THRESHOLD);
return;
}
long delay = ROLLBACKING_RETRY_PERIOD;
rollbackingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
List<GlobalSession> needDoRollbackingSessions = new ArrayList<>();
for (GlobalSession rollbackingSession : rollbackingSessions) {
long time = rollbackingSession.timeToDeadSession();
if (time <= 0) {
needDoRollbackingSessions.add(rollbackingSession);
} else {
delay = Math.max(time, ROLLBACKING_RETRY_PERIOD);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
long now = System.currentTimeMillis();
SessionHelper.forEach(needDoRollbackingSessions, rollbackingSession -> {
try {
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) {
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
}

SessionHelper.endRollbackFailed(rollbackingSession, true, true);

//The function of this 'return' is 'continue'.
return;
}
core.doGlobalRollback(rollbackingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to handle rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
rollbackingSchedule(delay);
}

private void rollbackingSchedule(long delay) {
syncProcessing.schedule(
liuqiufeng marked this conversation as resolved.
Show resolved Hide resolved
() -> {
boolean called = SessionHolder.distributedLockAndExecute(ROLLBACKING, this::handleRollbackingByScheduled);
if (!called) {
rollbackingSchedule(ROLLBACKING_RETRY_PERIOD);
}
},
delay, TimeUnit.MILLISECONDS);
}

/**
* Handle committing by scheduled.
*/
protected void handleCommittingByScheduled() {
SessionCondition sessionCondition = new SessionCondition(committingStatuses);
sessionCondition.setLazyLoadBranch(true);
List<GlobalSession> committingSessions =
SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(committingSessions)) {
committingSchedule(RETRY_DEAD_THRESHOLD);
return;
}
long delay = COMMITTING_RETRY_PERIOD;
committingSessions.sort(Comparator.comparingLong(GlobalSession::getBeginTime));
List<GlobalSession> needDoCommittingSessions = new ArrayList<>();
for (GlobalSession committingSession : committingSessions) {
long time = committingSession.timeToDeadSession();
if (time <= 0) {
needDoCommittingSessions.add(committingSession);
} else {
delay = Math.max(time, COMMITTING_RETRY_PERIOD);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
long now = System.currentTimeMillis();
SessionHelper.forEach(needDoCommittingSessions, committingSession -> {
try {
if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT, committingSession.getBeginTime())) {

// commit retry timeout event
SessionHelper.endCommitFailed(committingSession, true, true);

//The function of this 'return' is 'continue'.
return;
}
core.doGlobalCommit(committingSession, true);
} catch (TransactionException ex) {
LOGGER.error("Failed to handle committing [{}] {} {}", committingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
committingSchedule(delay);
}

private void committingSchedule(long delay) {
syncProcessing.schedule(
() -> {
boolean called = SessionHolder.distributedLockAndExecute(COMMITTING, this::handleCommittingByScheduled);
if (!called) {
committingSchedule(COMMITTING_RETRY_PERIOD);
}
},
delay, TimeUnit.MILLISECONDS);
}

/**
* Init.
*/
Expand All @@ -511,6 +625,10 @@ public void init() {
undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);

rollbackingSchedule(0);

committingSchedule(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ public boolean isDeadSession() {
return (System.currentTimeMillis() - beginTime) > RETRY_DEAD_THRESHOLD;
}

/**
* prevent could not handle committing and rollbacking transaction
* @return time to dead session. if not greater than 0, then deadSession
*/
public long timeToDeadSession() {
return beginTime + RETRY_DEAD_THRESHOLD - System.currentTimeMillis();
}

@Override
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
Expand Down