Skip to content

Commit

Permalink
Merge pull request #313 from XDagger/develop
Browse files Browse the repository at this point in the history
merge to master
  • Loading branch information
LucasMLK authored May 7, 2024
2 parents eb27f11 + a8cb0d4 commit 406caef
Show file tree
Hide file tree
Showing 11 changed files with 514 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.xdag</groupId>
<artifactId>xdagj</artifactId>
<version>0.7.0</version>
<version>0.7.1</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/xdag/config/AbstractConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class AbstractConfig implements Config, AdminSpec, NodeSpec, WalletSpec,
protected int netMaxOutboundConnections = 128;
protected int netMaxInboundConnections = 512;
protected int netMaxInboundConnectionsPerIp = 5;
protected int netMaxMessageQueueSize = 4096;
// protected int netMaxMessageQueueSize = 4096;
protected int netMaxFrameBodySize = 128 * 1024;
protected int netMaxPacketSize = 16 * 1024 * 1024;
protected int netRelayRedundancy = 8;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/xdag/config/spec/NodeSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface NodeSpec {

int getWaitEpoch();

int getNetMaxMessageQueueSize();
// int getNetMaxMessageQueueSize();

int getNetHandshakeExpiry();

Expand Down
25 changes: 12 additions & 13 deletions src/main/java/io/xdag/consensus/XdagSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class XdagSync {
private volatile boolean isRunning;

private long lastRequestTime;
private int count;

public XdagSync(Kernel kernel) {
this.kernel = kernel;
Expand All @@ -97,22 +96,19 @@ public void start() {
status = Status.SYNCING;
// TODO: paulochen 开始同步的时间点/快照时间点
// startSyncTime = 1588687929343L; // 1716ffdffff 171e52dffff
sendFuture = sendTask.scheduleAtFixedRate(this::syncLoop, 32, 2, TimeUnit.SECONDS);
sendFuture = sendTask.scheduleAtFixedRate(this::syncLoop, 32, 10, TimeUnit.SECONDS);
}
}

private void syncLoop() {
count++;
try {
if (syncWindow.size() < 32) {
if (syncWindow.isEmpty()) {
log.debug("start finding different time periods");
requestBlocks(0, 1L << 48);
}
if (getLastTime() >= lastRequestTime || count == 128) {
count = 0;
log.debug("start getting blocks");
getBlocks();
}

log.debug("start getting blocks");
getBlocks();
} catch (Throwable e) {
log.error("error when requestBlocks {}", e.getMessage());
}
Expand All @@ -138,7 +134,7 @@ private void getBlocks() {

// Segmented requests, each request for 32 time periods.
int size = syncWindow.size();
for (int i = 0; i < 32; i++) {
for (int i = 0; i < 128; i++) {
if (i >= size) {
break;
}
Expand All @@ -149,8 +145,11 @@ private void getBlocks() {
}

long time = syncWindow.get(i);
sendGetBlocks(xc, time, sf);
if (i == 30) lastRequestTime = time;
if (time >= lastRequestTime) {
sendGetBlocks(xc, time, sf);
lastRequestTime = time;
}

}

}
Expand Down Expand Up @@ -183,7 +182,7 @@ private void requestBlocks(long t, long dt) {
setSyncOld();
}

if ((syncWindow.isEmpty() || t > syncWindow.peekFirst()) && syncWindow.size() < 2048) {
if (t > getLastTime()) {
syncWindow.offerLast(t);
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/xdag/core/BlockchainImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ public XAmount unApplyBlock(Block block) {
for (Address link : links) {
if (!link.isAddress) {
Block ref = getBlockByHash(link.getAddress(), false);
//even mainBlock duplicate link the TX_block which other mainBlock is handled, we could check the TX ref if this mainBlock.
if (ref.getInfo().getRef() != null
&& equalBytes(ref.getInfo().getRef(), block.getHashLow().toArray())
&& ((ref.getInfo().flags & BI_MAIN_REF) != 0)) {
Expand Down Expand Up @@ -894,7 +895,11 @@ public void setMain(Block block) {
xdagStats.nmain++;

// 递归执行主块引用的区块 并获取手续费
applyBlock(true, block);
XAmount mainBlockFee = applyBlock(true, block); //the mainBlock may have tx, return the fee to itself.
if (!mainBlockFee.equals(XAmount.ZERO)) {// normal mainBlock will not go into this
acceptAmount(block, mainBlockFee); //add the fee
block.getInfo().setFee(mainBlockFee);
}
// 主块REF指向自身
// TODO:补充手续费
updateBlockRef(block, new Address(block));
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/io/xdag/crypto/RandomX.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,22 @@ public void randomXPoolUpdateSeed(long memIndex) {
// 分配成功
RandomXJNA.INSTANCE.randomx_init_cache(rx_memory.rxCache, bytesToPointer(rx_memory.seed), new NativeSize(rx_memory.seed.length));

if (rx_memory.rxDataset == null) {
// 分配dataset
rx_memory.rxDataset = RandomXJNA.INSTANCE.randomx_alloc_dataset(flags);
if (config.getRandomxSpec().getRandomxFlag()) {
if (rx_memory.rxDataset == null) {
//分配失败
log.debug("Failed alloc dataset");
return;
// 分配dataset
rx_memory.rxDataset = RandomXJNA.INSTANCE.randomx_alloc_dataset(flags);
if (rx_memory.rxDataset == null) {
//分配失败
log.debug("Failed alloc dataset");
return;
}
}

randomXPoolInitDataset(rx_memory.rxCache, rx_memory.rxDataset);
} else {
rx_memory.rxDataset = null;
}

randomXPoolInitDataset(rx_memory.rxCache, rx_memory.rxDataset);

if (randomXUpdateVm(rx_memory, true) == null) {
// update failed
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/xdag/db/rocksdb/BlockStoreImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,14 @@ public int loadSum(long starttime, long endtime, MutableBytes sums) {
if ((level & 1) != 0) {
// Arrays.fill(sums, (byte)0);
sums.fill((byte) 0);
for (int i = 0; i < 256; i++) {
for (int i = 1; i <= 256; i++) {
// long totalsum = BytesUtils.bytesToLong(buf, i * 16, true);
long totalsum = buf.getLong(i * 16, ByteOrder.LITTLE_ENDIAN);
long totalsum = buf.getLong((i-1) * 16, ByteOrder.LITTLE_ENDIAN);
sum += totalsum;
// long totalsize = BytesUtils.bytesToLong(buf, i * 16 + 8, true);
long totalsize = buf.getLong(i * 16 + 8, ByteOrder.LITTLE_ENDIAN);
long totalsize = buf.getLong((i-1) * 16 + 8, ByteOrder.LITTLE_ENDIAN);
size += totalsize;
if (i % 16 == 0 && i != 0) {
if (i % 16 == 0) {
// System.arraycopy(BytesUtils.longToBytes(sum, true), 0, sums, i - 16, 8);
sums.set(i - 16, Bytes.wrap(BytesUtils.longToBytes(sum, true)));
// System.arraycopy(BytesUtils.longToBytes(size, true), 0, sums, i - 8, 8);
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/xdag/net/XdagP2pHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,10 @@ private ReasonCode checkPeer(Peer peer, boolean newHandShake) {
// }

// validator can't share IP address
if (channelMgr.isActiveIP(channel.getRemoteIp()) // already connected
&& nodeSpec.getNetwork() == Network.MAINNET) { // on main net
return ReasonCode.VALIDATOR_IP_LIMITED;
}
// if (channelMgr.isActiveIP(channel.getRemoteIp()) // already connected
// && nodeSpec.getNetwork() == Network.MAINNET) { // on main net
// return ReasonCode.VALIDATOR_IP_LIMITED;
// }

return null;
}
Expand Down
29 changes: 13 additions & 16 deletions src/main/java/io/xdag/net/message/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.xdag.config.Config;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import io.xdag.net.message.p2p.DisconnectMessage;
Expand All @@ -50,8 +46,8 @@ public class MessageQueue {
.daemon(true)
.build());
private final Config config;

private final Queue<Message> queue = new ConcurrentLinkedQueue<>();
//'8192' is a value obtained from testing experience, not a standard value.Looking forward to optimization.
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(8192);
private final Queue<Message> prioritized = new ConcurrentLinkedQueue<>();
private ChannelHandlerContext ctx;
private ScheduledFuture<?> timerTask;
Expand Down Expand Up @@ -95,26 +91,27 @@ public void disconnect(ReasonCode code) {
}
}

public boolean sendMessage(Message msg) {
if (size() >= config.getNodeSpec().getNetMaxMessageQueueSize()) {
disconnect(ReasonCode.MESSAGE_QUEUE_FULL);
return false;
}

public void sendMessage(Message msg) {
//when full message queue, whitelist don't need to disconnect.
if (config.getNodeSpec().getNetPrioritizedMessages().contains(msg.getCode())) {
prioritized.add(msg);
} else {
queue.add(msg);
try {
//update to BlockingQueue, capacity 8192
queue.put(msg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return true;
}

public int size() {
return queue.size() + prioritized.size();
}

private void nudgeQueue() {
int n = Math.min(5, size());
//Increase bandwidth consumption of a full used single sync thread to 3 Mbps.
int n = Math.min(8, size());
if (n == 0) {
return;
}
Expand Down
26 changes: 26 additions & 0 deletions src/test/java/io/xdag/BlockBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ public static Block generateOldTransactionBlock(Config config, KeyPair key, long
b.signOut(key);
return b;
}

public static Block generateOldTransactionBlock(Config config, KeyPair key, long xdagTime, Address from, XAmount amount,Address to,
XAmount amount1, Address to1, XAmount amount2) {
List<Address> refs = Lists.newArrayList();
List<KeyPair> keys = Lists.newArrayList();
refs.add(new Address(from.getAddress(), XDAG_FIELD_IN, amount,false)); // key1
refs.add(new Address(to.getAddress(), XDAG_FIELD_OUTPUT, amount1,true));
refs.add(new Address(to1.getAddress(), XDAG_FIELD_OUTPUT, amount2,true));
keys.add(key);
Block b = new Block(config, xdagTime, refs, null, false, keys, null, 0,XAmount.of(100,XUnit.MILLI_XDAG)); // orphan
b.signOut(key);
return b;
}

public static Block generateNewTransactionBlock(Config config, KeyPair key, long xdagTime, Address from, Address to,
XAmount amount) {
List<Address> refs = Lists.newArrayList();
Expand All @@ -100,6 +114,18 @@ public static Block generateNewTransactionBlock(Config config, KeyPair key, long
return b;
}

public static Block generateNewTransactionBlock(Config config, KeyPair key, long xdagTime, Address from, Address to,
XAmount amount, XAmount VariableFee) {
List<Address> refs = Lists.newArrayList();
List<KeyPair> keys = Lists.newArrayList();
refs.add(new Address(from.getAddress(), XDAG_FIELD_INPUT, amount,true)); // key1
refs.add(new Address(to.getAddress(), XDAG_FIELD_OUTPUT, amount,true));
keys.add(key);
Block b = new Block(config, xdagTime, refs, null, false, keys, null, 0, VariableFee); // orphan
b.signOut(key);
return b;
}

public static Block generateWalletTransactionBlock(Config config, KeyPair key, long xdagTime, Address from, Address to,
XAmount amount) {
List<Address> refs = Lists.newArrayList();
Expand Down
Loading

0 comments on commit 406caef

Please sign in to comment.