Skip to content

Commit

Permalink
Merge pull request #6192 from zeusoo001/event-22
Browse files Browse the repository at this point in the history
feat(event): optimize the event service
  • Loading branch information
CodeNinjaEvan authored Feb 17, 2025
2 parents a82baeb + ff44e09 commit d70cb52
Show file tree
Hide file tree
Showing 26 changed files with 1,848 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ public class EventPluginConfig {
public static final String SOLIDITY_EVENT_NAME = "solidityevent";
public static final String SOLIDITY_LOG_NAME = "soliditylog";

@Getter
@Setter
private int version;

@Getter
@Setter
private long startSyncBlockNum;

@Getter
@Setter
private String pluginPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public BlockLogTrigger() {
@Override
public String toString() {
return new StringBuilder().append("triggerName: ").append(getTriggerName())
.append("timestamp: ")
.append(", timestamp: ")
.append(timeStamp)
.append(", blockNumber: ")
.append(blockNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public SolidityTrigger() {
@Override
public String toString() {
return new StringBuilder().append("triggerName: ").append(getTriggerName())
.append("timestamp: ")
.append(", timestamp: ")
.append(timeStamp)
.append(", latestSolidifiedBlockNumber: ")
.append(latestSolidifiedBlockNumber).toString();
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public class Constant {

public static final String NATIVE_QUEUE_SEND_LENGTH = "event.subscribe.native.sendqueuelength";

public static final String EVENT_SUBSCRIBE_VERSION = "event.subscribe.version";
public static final String EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM = "event.subscribe.startSyncBlockNum";
public static final String EVENT_SUBSCRIBE_PATH = "event.subscribe.path";
public static final String EVENT_SUBSCRIBE_SERVER = "event.subscribe.server";
public static final String EVENT_SUBSCRIBE_DB_CONFIG = "event.subscribe.dbconfig";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.tron.core.db.Manager;
import org.tron.core.metrics.MetricsUtil;
import org.tron.core.net.TronNetService;
import org.tron.core.services.event.EventService;

@Slf4j(topic = "app")
@Component
Expand All @@ -18,6 +19,9 @@ public class ApplicationImpl implements Application {
@Autowired
private ServiceContainer services;

@Autowired
private EventService eventService;

@Autowired
private TronNetService tronNetService;

Expand All @@ -37,6 +41,7 @@ public class ApplicationImpl implements Application {
*/
public void startup() {
this.startServices();
eventService.init();
if ((!Args.getInstance().isSolidityNode()) && (!Args.getInstance().isP2pDisable())) {
tronNetService.start();
}
Expand All @@ -47,6 +52,7 @@ public void startup() {
@Override
public void shutdown() {
this.shutdownServices();
eventService.close();
consensusService.stop();
if (!Args.getInstance().isSolidityNode() && (!Args.getInstance().p2pDisable)) {
tronNetService.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.tron.common.logsfilter.trigger.SolidityTrigger;
import org.tron.common.logsfilter.trigger.TransactionLogTrigger;
import org.tron.common.logsfilter.trigger.Trigger;
import org.tron.common.utils.JsonUtil;

@Slf4j
public class EventPluginLoader {
Expand All @@ -42,6 +43,10 @@ public class EventPluginLoader {

private List<TriggerConfig> triggerConfigList;

private int version = 0;

private long startSyncBlockNum = 0;

private boolean blockLogTriggerEnable = false;

private boolean blockLogTriggerSolidified = false;
Expand Down Expand Up @@ -219,6 +224,10 @@ public boolean start(EventPluginConfig config) {
return false;
}

this.version = config.getVersion();

this.startSyncBlockNum = config.getStartSyncBlockNum();

this.triggerConfigList = config.getTriggerConfigList();

useNativeQueue = config.isUseNativeQueue();
Expand Down Expand Up @@ -358,6 +367,14 @@ public void postSolidityTrigger(SolidityTrigger trigger) {
}
}

public synchronized int getVersion() {
return version;
}

public synchronized long getStartSyncBlockNum() {
return startSyncBlockNum;
}

public synchronized boolean isBlockLogTriggerEnable() {
return blockLogTriggerEnable;
}
Expand Down
9 changes: 9 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,15 @@ private static EventPluginConfig getEventPluginConfig(
final com.typesafe.config.Config config) {
EventPluginConfig eventPluginConfig = new EventPluginConfig();

if (config.hasPath(Constant.EVENT_SUBSCRIBE_VERSION)) {
eventPluginConfig.setVersion(config.getInt(Constant.EVENT_SUBSCRIBE_VERSION));
}

if (config.hasPath(Constant.EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM)) {
eventPluginConfig.setStartSyncBlockNum(config
.getLong(Constant.EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM));
}

boolean useNativeQueue = false;
int bindPort = 0;
int sendQueueLength = 0;
Expand Down
37 changes: 23 additions & 14 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public class Manager {
Collections.synchronizedList(Lists.newArrayList());
// the capacity is equal to Integer.MAX_VALUE default
private BlockingQueue<TransactionCapsule> rePushTransactions;
@Getter
private BlockingQueue<TriggerCapsule> triggerCapsuleQueue;
// log filter
private boolean isRunFilterProcessThread = true;
Expand Down Expand Up @@ -1111,7 +1112,9 @@ private void switchFork(BlockCapsule newHead)
while (!getDynamicPropertiesStore()
.getLatestBlockHeaderHash()
.equals(binaryTree.getValue().peekLast().getParentHash())) {
reOrgContractTrigger();
if (EventPluginLoader.getInstance().getVersion() == 0) {
reOrgContractTrigger();
}
reOrgLogsFilter();
eraseBlock();
}
Expand Down Expand Up @@ -1373,11 +1376,26 @@ public void pushBlock(final BlockCapsule block)
}

void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);
}

if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityFilter(oldSolid, newSolid);
}

if (EventPluginLoader.getInstance().getVersion() != 0) {
lastUsedSolidityNum = newSolid;
return;
}

// if event subscribe is enabled, post block trigger to queue
postBlockTrigger(block);
// if event subscribe is enabled, post solidity trigger to queue
postSolidityTrigger(oldSolid, newSolid);
postSolidityTrigger(newSolid);
} catch (Exception e) {
logger.error("Block trigger failed. head: {}, oldSolid: {}, newSolid: {}",
block.getNum(), oldSolid, newSolid, e);
Expand Down Expand Up @@ -1517,7 +1535,8 @@ public TransactionInfo processTransaction(final TransactionCapsule trxCap, Block

// if event subscribe is enabled, post contract triggers to queue
// only trigger when process block
if (Objects.nonNull(blockCap) && !blockCap.isMerkleRootEmpty()) {
if (Objects.nonNull(blockCap) && !blockCap.isMerkleRootEmpty()
&& EventPluginLoader.getInstance().getVersion() == 0) {
String blockHash = blockCap.getBlockId().toString();
postContractTrigger(trace, false, blockHash);
}
Expand Down Expand Up @@ -2096,7 +2115,7 @@ private void postSolidityFilter(final long oldSolidNum, final long latestSolidif
}
}

private void postSolidityTrigger(final long oldSolidNum, final long latestSolidifiedBlockNumber) {
private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
if (eventPluginLoaded && EventPluginLoader.getInstance().isSolidityLogTriggerEnable()) {
for (Long i : Args.getSolidityContractLogTriggerMap().keySet()) {
postSolidityLogContractTrigger(i, latestSolidifiedBlockNumber);
Expand All @@ -2122,10 +2141,6 @@ private void postSolidityTrigger(final long oldSolidNum, final long latestSolidi
}
}
}

if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityFilter(oldSolidNum, latestSolidifiedBlockNumber);
}
lastUsedSolidityNum = latestSolidifiedBlockNumber;
}

Expand Down Expand Up @@ -2237,12 +2252,6 @@ private void postLogsFilter(final BlockCapsule blockCapsule, boolean solidified,
}

void postBlockTrigger(final BlockCapsule blockCapsule) {
// post block and logs for jsonrpc
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(blockCapsule, false);
postLogsFilter(blockCapsule, false, false);
}

// process block trigger
long solidityBlkNum = getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.tron.core.services.event;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.services.event.bo.BlockEvent;
import org.tron.core.services.event.exception.EventException;

@Slf4j(topic = "event")
public class BlockEventCache {
@Getter
private static volatile long solidNum;

@Getter
private static volatile BlockEvent head;

@Getter
private static volatile BlockCapsule.BlockId solidId;

private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>();

private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>();

public static BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) {
return blockEventMap.get(blockId);
}

public static void init(BlockCapsule.BlockId blockId) {
blockEventMap.clear();
numMap.clear();
solidNum = blockId.getNum();
head = new BlockEvent(blockId);
solidId = blockId;
List<BlockEvent> list = new ArrayList<>();
list.add(head);
numMap.put(blockId.getNum(), list);
blockEventMap.put(blockId, head);
}

public static void add(BlockEvent blockEvent) throws EventException {
logger.info("Add block event, {}", blockEvent.getBlockId().getString(),
blockEvent.getParentId().getString());
if (blockEventMap.get(blockEvent.getParentId()) == null) {
throw new EventException("unlink BlockEvent, "
+ blockEvent.getBlockId().getString() + ", "
+ blockEvent.getParentId().getString());
}

long num = blockEvent.getBlockId().getNum();
List<BlockEvent> list = numMap.get(num);
if (list == null) {
list = new ArrayList<>();
numMap.put(num, list);
}
list.add(blockEvent);

blockEventMap.put(blockEvent.getBlockId(), blockEvent);

if (num > head.getBlockId().getNum()) {
head = blockEvent;
}

if (blockEvent.getSolidId().getNum() > solidId.getNum()) {
solidId = blockEvent.getSolidId();
}
}

public static void remove(BlockCapsule.BlockId solidId) {
logger.info("Remove solidId {}, solidNum {}, {}, {}",
solidId.getString(), solidNum, numMap.size(), blockEventMap.size());
numMap.forEach((k, v) -> {
if (k < solidId.getNum()) {
v.forEach(value -> blockEventMap.remove(value.getBlockId()));
numMap.remove(k);
}
});
solidNum = solidId.getNum();
}

public static List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) {
List<BlockEvent> blockEvents = new ArrayList<>();
BlockCapsule.BlockId tmp = solidId;
while (tmp.getNum() > solidNum) {
BlockEvent blockEvent = blockEventMap.get(tmp);
blockEvents.add(blockEvent);
tmp = blockEvent.getParentId();
}

return Lists.reverse(blockEvents);
}
}
Loading

0 comments on commit d70cb52

Please sign in to comment.