-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcollect.go
105 lines (89 loc) · 2.98 KB
/
collect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/hleb-albau/ethereum-pubkey-collector/crypto"
"github.com/hleb-albau/ethereum-pubkey-collector/storage"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"log"
"math/big"
"sync"
"time"
)
const (
nodeUrlFlag = "node-url"
threads = "threads"
)
// Usage: eth-pub-keys collect --node-url=https://mainnet.infura.io --threads=10
func CollectCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "collect",
Short: "A simple task which connects to web3 provider and pull txes, extracting and collecting pub keys",
Long: `This script connects to a web3 client and pulls transaction data from the blockchain.
In particular, it extracts r,v,s signature components of each transaction and calculates the secp256k1
public key associated with the Ethereum account that created the transaction.
Collected data are stored in LevelDb as current sub-folder "eth-pubkeys".`,
RunE: func(cmd *cobra.Command, args []string) error {
db, err := storage.OpenDb("eth-pubkeys")
if err != nil {
return err
}
lastProcessedBlock := int64(db.GetLastProcessedBlock())
log.Println("Last processed block", lastProcessedBlock)
client, err := ethclient.Dial(viper.GetString(nodeUrlFlag))
if err != nil {
return err
}
header, err := client.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
lastNetworkBlock := header.Number.Int64()
threads := viper.GetInt64(threads)
log.Println("Last network block", lastNetworkBlock)
for blockNum := lastProcessedBlock; blockNum <= lastNetworkBlock; blockNum += threads {
var wg sync.WaitGroup
wg.Add(int(threads))
for i := int64(0); i < threads; i++ {
go func(thread int64) {
downloadAndProcessBlock(blockNum+thread, client, db)
wg.Done()
}(i)
}
wg.Wait()
if blockNum-lastProcessedBlock > 100 {
log.Printf("Processed block %v.", blockNum)
db.SaveLastProcessedBlock(uint64(blockNum))
lastProcessedBlock = blockNum
}
}
log.Printf("Collecting pubkeys till %v block finished", lastNetworkBlock)
return nil
},
}
cmd.Flags().String(nodeUrlFlag, "https://mainnet.infura.io", "web3 endpoint")
cmd.Flags().Int64(threads, 4, "number of concurrent collectors")
_ = viper.BindPFlag(nodeUrlFlag, cmd.Flags().Lookup(nodeUrlFlag))
_ = viper.BindPFlag(threads, cmd.Flags().Lookup(threads))
return cmd
}
func downloadAndProcessBlock(blockNum int64, client *ethclient.Client, db storage.Db) {
// loop for retry
for true {
block, err := client.BlockByNumber(ctx, big.NewInt(blockNum))
if err != nil {
// retry after 5 secs
log.Printf("Could not download block %v:\n %e", blockNum, err)
time.Sleep(time.Second * 10)
continue
}
for _, tx := range block.Transactions() {
// process only first txes for each address
if tx.Nonce() == 0 {
address, pubkey := crypto.GetPubKey(tx)
db.SaveAddressPublicKey(address, pubkey)
}
}
break
}
}