Skip to content

Commit c6214bb

Browse files
author
number571
committed
update
1 parent 4806a5a commit c6214bb

File tree

8 files changed

+49
-45
lines changed

8 files changed

+49
-45
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
*??? ??, ????*
88

9+
### CHANGES
10+
11+
- `pkg/anonymity`: delete produce message (relayer mode) in consume function
12+
913
<!-- ... -->
1014

1115
## v1.7.7

pkg/anonymity/anonymity.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ func (p *sNode) Run(pCtx context.Context) error {
9898
}
9999

100100
func (p *sNode) runProducer(pCtx context.Context) error {
101-
serviceName := p.fSettings.GetServiceName()
102-
103101
for {
104102
select {
105103
case <-pCtx.Done():
@@ -110,13 +108,8 @@ func (p *sNode) runProducer(pCtx context.Context) error {
110108
// context done
111109
continue
112110
}
113-
114-
// create logger state
115-
logBuilder := p.enrichLogger(anon_logger.NewLogBuilder(serviceName), netMsg).
116-
WithPubKey(p.fQBProcessor.GetClient().GetPrivKey().GetPubKey())
117-
118111
// internal logger
119-
_, _ = p.storeHashWithProduce(pCtx, logBuilder, netMsg)
112+
_, _ = p.storeHashWithProduce(pCtx, netMsg)
120113
}
121114
}
122115
}
@@ -129,6 +122,7 @@ func (p *sNode) runConsumer(pCtx context.Context) error {
129122
default:
130123
netMsg, err := p.fAdapter.Consume(pCtx)
131124
if err != nil {
125+
// context done or error
132126
continue
133127
}
134128
// internal logger
@@ -248,10 +242,10 @@ func (p *sNode) messageHandler(pCtx context.Context, pNetMsg net_message.IMessag
248242
}
249243

250244
// try store hash of message
251-
if ok, err := p.storeHashWithProduce(pCtx, logBuilder, pNetMsg); !ok {
245+
if err := p.storeHashIntoDatabase(logBuilder, pNetMsg.GetHash()); err != nil {
252246
// internal logger
253-
if err != nil {
254-
return errors.Join(ErrStoreHashWithProduce, err)
247+
if !errors.Is(err, ErrHashAlreadyExist) {
248+
return errors.Join(ErrStoreHashIntoDatabase, err)
255249
}
256250
// hash already exist in database
257251
return nil
@@ -397,11 +391,16 @@ func (p *sNode) enrichLogger(pLogBuilder anon_logger.ILogBuilder, pNetMsg net_me
397391

398392
func (p *sNode) storeHashWithProduce(
399393
pCtx context.Context,
400-
pLogBuilder anon_logger.ILogBuilder,
401394
pNetMsg net_message.IMessage,
402395
) (bool, error) {
396+
serviceName := p.fSettings.GetServiceName()
397+
398+
// create logger state
399+
logBuilder := p.enrichLogger(anon_logger.NewLogBuilder(serviceName), pNetMsg).
400+
WithPubKey(p.fQBProcessor.GetClient().GetPrivKey().GetPubKey())
401+
403402
// try push hash into database
404-
if err := p.storeHashIntoDatabase(pLogBuilder, pNetMsg.GetHash()); err != nil {
403+
if err := p.storeHashIntoDatabase(logBuilder, pNetMsg.GetHash()); err != nil {
405404
// internal logger
406405
if errors.Is(err, ErrHashAlreadyExist) {
407406
return false, nil
@@ -412,12 +411,12 @@ func (p *sNode) storeHashWithProduce(
412411
// redirect message to another nodes
413412
if err := p.fAdapter.Produce(pCtx, pNetMsg); err != nil {
414413
// some connections can return errors
415-
p.fLogger.PushWarn(pLogBuilder.WithType(anon_logger.CLogBaseBroadcast))
414+
p.fLogger.PushWarn(logBuilder.WithType(anon_logger.CLogBaseBroadcast))
416415
return true, nil
417416
}
418417

419418
// full success broadcast
420-
p.fLogger.PushInfo(pLogBuilder.WithType(anon_logger.CLogBaseBroadcast))
419+
p.fLogger.PushInfo(logBuilder.WithType(anon_logger.CLogBaseBroadcast))
421420
return true, nil
422421
}
423422

pkg/anonymity/anonymity_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -488,14 +488,11 @@ func TestStoreHashWithBroadcastMessage(t *testing.T) {
488488
})
489489

490490
netMsg := node.testNewNetworkMessage(sett, msg)
491-
logBuilder := anon_logger.NewLogBuilder("_")
492-
493-
if ok, err := node.storeHashWithProduce(ctx, logBuilder, netMsg); !ok || err != nil {
491+
if ok, err := node.storeHashWithProduce(ctx, netMsg); !ok || err != nil {
494492
t.Error(err)
495493
return
496494
}
497-
498-
if ok, err := node.storeHashWithProduce(ctx, logBuilder, netMsg); ok || err != nil {
495+
if ok, err := node.storeHashWithProduce(ctx, netMsg); ok || err != nil {
499496
switch {
500497
case ok:
501498
t.Error("success store one message again")
@@ -723,6 +720,7 @@ func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string,
723720
case <-ctx.Done():
724721
return nil, ctx.Err()
725722
case msg := <-msgChan:
723+
_ = networkNode.BroadcastMessage(ctx, msg)
726724
return msg, nil
727725
}
728726
},

pkg/anonymity/errors.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,21 @@ func (err *SAnonymityError) Error() string {
1313
}
1414

1515
var (
16-
ErrSetHashIntoDB = &SAnonymityError{"set hash into database"}
17-
ErrGetHashFromDB = &SAnonymityError{"get hash from database"}
18-
ErrNilDB = &SAnonymityError{"database is nil"}
19-
ErrRetryLimit = &SAnonymityError{"retry limit"}
20-
ErrEnqueueMessage = &SAnonymityError{"enqueue message"}
21-
ErrUnknownType = &SAnonymityError{"unknown type"}
22-
ErrLoadMessage = &SAnonymityError{"load message"}
23-
ErrStoreHashWithProduce = &SAnonymityError{"store hash with produce"}
24-
ErrActionIsNotFound = &SAnonymityError{"action is not found"}
25-
ErrActionIsClosed = &SAnonymityError{"action is closed"}
26-
ErrActionTimeout = &SAnonymityError{"action timeout"}
27-
ErrEnqueuePayload = &SAnonymityError{"enqueue payload"}
28-
ErrFetchResponse = &SAnonymityError{"fetch response"}
29-
ErrRunning = &SAnonymityError{"node running"}
30-
ErrProcessRun = &SAnonymityError{"process run"}
31-
ErrHashAlreadyExist = &SAnonymityError{"hash already exist"}
16+
ErrSetHashIntoDB = &SAnonymityError{"set hash into database"}
17+
ErrGetHashFromDB = &SAnonymityError{"get hash from database"}
18+
ErrNilDB = &SAnonymityError{"database is nil"}
19+
ErrRetryLimit = &SAnonymityError{"retry limit"}
20+
ErrEnqueueMessage = &SAnonymityError{"enqueue message"}
21+
ErrUnknownType = &SAnonymityError{"unknown type"}
22+
ErrLoadMessage = &SAnonymityError{"load message"}
23+
ErrStoreHashIntoDatabase = &SAnonymityError{"store hash into database"}
24+
ErrStoreHashWithProduce = &SAnonymityError{"store hash with produce"}
25+
ErrActionIsNotFound = &SAnonymityError{"action is not found"}
26+
ErrActionIsClosed = &SAnonymityError{"action is closed"}
27+
ErrActionTimeout = &SAnonymityError{"action timeout"}
28+
ErrEnqueuePayload = &SAnonymityError{"enqueue payload"}
29+
ErrFetchResponse = &SAnonymityError{"fetch response"}
30+
ErrRunning = &SAnonymityError{"node running"}
31+
ErrProcessRun = &SAnonymityError{"process run"}
32+
ErrHashAlreadyExist = &SAnonymityError{"hash already exist"}
3233
)

pkg/anonymity/examples/echo/construct.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
9090
case <-ctx.Done():
9191
return nil, ctx.Err()
9292
case msg := <-msgChan:
93+
_ = networkNode.BroadcastMessage(ctx, msg)
9394
return msg, nil
9495
}
9596
},

pkg/anonymity/examples/ping-pong/construct.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
9090
case <-ctx.Done():
9191
return nil, ctx.Err()
9292
case msg := <-msgChan:
93+
_ = networkNode.BroadcastMessage(ctx, msg)
9394
return msg, nil
9495
}
9596
},

test/result/badge_codelines.svg

Lines changed: 1 addition & 1 deletion
Loading

test/result/coverage.svg

Lines changed: 8 additions & 8 deletions
Loading

0 commit comments

Comments
 (0)