|
| 1 | +package producer |
| 2 | + |
| 3 | +import ( |
| 4 | + "flow-generator/util" |
| 5 | + "fmt" |
| 6 | + "github.com/google/gopacket" |
| 7 | + "github.com/google/gopacket/layers" |
| 8 | + "github.com/google/gopacket/pcap" |
| 9 | + "log" |
| 10 | + "net" |
| 11 | + "sync" |
| 12 | + "sync/atomic" |
| 13 | + "time" |
| 14 | +) |
| 15 | + |
| 16 | +type WorkStatus int |
| 17 | + |
| 18 | +const ( |
| 19 | + WorkStatusStop = iota |
| 20 | + WorkStatusRunning |
| 21 | +) |
| 22 | + |
| 23 | +var ( |
| 24 | + wg sync.WaitGroup |
| 25 | +) |
| 26 | + |
| 27 | +type RandPktGenerator struct { |
| 28 | + workers []*worker |
| 29 | + sender *sender |
| 30 | + runningStatus WorkStatus |
| 31 | + defaultPacket *randomPacketLayer |
| 32 | + speed uint64 |
| 33 | +} |
| 34 | + |
| 35 | +type randomPacketLayer struct { |
| 36 | + ether *layers.Ethernet |
| 37 | + ipv4 *layers.IPv4 |
| 38 | + tcp *layers.TCP |
| 39 | +} |
| 40 | + |
| 41 | +func (r *RandPktGenerator) Run() { |
| 42 | + r.init() |
| 43 | + r.sender.Run() |
| 44 | + for i := range r.workers { |
| 45 | + r.workers[i].Run() |
| 46 | + } |
| 47 | + |
| 48 | + wg.Wait() |
| 49 | +} |
| 50 | + |
| 51 | +func defaultConfig() map[string]int { |
| 52 | + return map[string]int{ |
| 53 | + "default_worker_count": 10, |
| 54 | + } |
| 55 | +} |
| 56 | + |
| 57 | +func defaultPacketConfig() *randomPacketLayer { |
| 58 | + return &randomPacketLayer{ |
| 59 | + ether: &layers.Ethernet{ |
| 60 | + EthernetType: layers.EthernetTypeIPv4, |
| 61 | + SrcMAC: net.HardwareAddr{0xFB, 0xBA, 0xFA, 0xAA, 0xF6, 0xAA}, |
| 62 | + DstMAC: net.HardwareAddr{0x2F, 0xFF, 0x4F, 0xF6, 0x3F, 0xF0}, |
| 63 | + }, |
| 64 | + ipv4: &layers.IPv4{ |
| 65 | + Protocol: layers.IPProtocolTCP, |
| 66 | + Flags: 0x0000, |
| 67 | + IHL: 0x45, //version + header length |
| 68 | + TTL: 0x80, |
| 69 | + Id: 0x1234, |
| 70 | + Length: 0x014e, |
| 71 | + SrcIP: net.IP{12, 13, 11, 10}, |
| 72 | + DstIP: net.IP{44, 33, 55, 88}, |
| 73 | + }, |
| 74 | + tcp: &layers.TCP{ |
| 75 | + Seq: 0x1234, |
| 76 | + Ack: 0x1235, |
| 77 | + SrcPort: layers.TCPPort(68), |
| 78 | + DstPort: layers.TCPPort(67), |
| 79 | + SYN: true, |
| 80 | + DataOffset: 0x5, |
| 81 | + }, |
| 82 | + } |
| 83 | +} |
| 84 | + |
| 85 | +//初始化流信息和worker信息 |
| 86 | +func (r *RandPktGenerator) init() { |
| 87 | + config := defaultConfig() |
| 88 | + var ( |
| 89 | + workerCount int |
| 90 | + ) |
| 91 | + //set worker count |
| 92 | + if v, ok := config["default_worker_count"]; ok { |
| 93 | + workerCount = v |
| 94 | + } else { |
| 95 | + workerCount = 10 |
| 96 | + } |
| 97 | + //init default packet config |
| 98 | + r.defaultPacket = defaultPacketConfig() |
| 99 | + //init sender |
| 100 | + r.sender = new(sender) |
| 101 | + r.sender.Init() |
| 102 | + //init workers |
| 103 | + r.workers = nil |
| 104 | + for i := 0; i < workerCount; i++ { |
| 105 | + w := new(worker) |
| 106 | + w.Init(r.sender.dsChan) |
| 107 | + r.workers = append(r.workers, w) |
| 108 | + } |
| 109 | +} |
| 110 | + |
| 111 | +// |
| 112 | +func (r *RandPktGenerator) Stop() { |
| 113 | + for i := range r.workers { |
| 114 | + r.workers[i].Stop() |
| 115 | + } |
| 116 | + r.sender.Stop() |
| 117 | +} |
| 118 | + |
| 119 | +func randPktConfig() randomPacketLayer { |
| 120 | + return randomPacketLayer{ |
| 121 | + ether: &layers.Ethernet{ |
| 122 | + EthernetType: layers.EthernetTypeIPv4, |
| 123 | + SrcMAC: util.RandomMac(), |
| 124 | + DstMAC: util.RandomMac(), |
| 125 | + }, |
| 126 | + ipv4: &layers.IPv4{ |
| 127 | + Protocol: layers.IPProtocolTCP, |
| 128 | + Flags: 0x0000, |
| 129 | + IHL: 0x45, //version + header length |
| 130 | + TTL: 0x80, |
| 131 | + Id: 0x1234, |
| 132 | + //Length: 0x014e, |
| 133 | + SrcIP: util.RandomIPv4(), |
| 134 | + DstIP: util.RandomIPv4(), |
| 135 | + }, |
| 136 | + tcp: &layers.TCP{ |
| 137 | + Seq: util.RandomSequence(), |
| 138 | + Ack: util.RandomSequence(), |
| 139 | + SrcPort: layers.TCPPort(util.RandomPort()), |
| 140 | + DstPort: layers.TCPPort(util.RandomPort()), |
| 141 | + SYN: true, |
| 142 | + DataOffset: 0x5, |
| 143 | + }, |
| 144 | + } |
| 145 | +} |
| 146 | + |
| 147 | +type worker struct { |
| 148 | + runningStatus WorkStatus |
| 149 | + exportChan chan []byte |
| 150 | +} |
| 151 | + |
| 152 | +func (w *worker) Init(exportChan chan []byte) { |
| 153 | + w.runningStatus = WorkStatusStop |
| 154 | + w.exportChan = exportChan |
| 155 | +} |
| 156 | + |
| 157 | +func (w *worker) Run() { |
| 158 | + w.runningStatus = WorkStatusRunning |
| 159 | + wg.Add(1) |
| 160 | + go func() { |
| 161 | + defer wg.Done() |
| 162 | + stopTicker := time.NewTicker(time.Second * 5) |
| 163 | + defer stopTicker.Stop() |
| 164 | + for w.runningStatus == WorkStatusRunning { |
| 165 | + select { |
| 166 | + case <-stopTicker.C: |
| 167 | + default: //do rand update |
| 168 | + lyrs := randPktConfig() |
| 169 | + buffer := gopacket.NewSerializeBuffer() |
| 170 | + gopacket.SerializeLayers(buffer, gopacket.SerializeOptions{}, |
| 171 | + lyrs.ether, |
| 172 | + lyrs.ipv4, |
| 173 | + lyrs.tcp, |
| 174 | + gopacket.Payload(util.RandomBytes(util.RandomInt(0, 1000))), |
| 175 | + ) |
| 176 | + w.exportChan <- buffer.Bytes() |
| 177 | + } |
| 178 | + } |
| 179 | + }() |
| 180 | +} |
| 181 | +func (w *worker) Stop() { |
| 182 | + w.runningStatus = WorkStatusStop |
| 183 | +} |
| 184 | + |
| 185 | +type sender struct { |
| 186 | + dsChan chan []byte |
| 187 | + running WorkStatus |
| 188 | + handler *pcap.Handle |
| 189 | + sts statics |
| 190 | +} |
| 191 | + |
| 192 | +func (s *sender) Init() { |
| 193 | + s.dsChan = make(chan []byte, 100*1000) |
| 194 | + s.running = WorkStatusStop |
| 195 | + //get handler |
| 196 | + var ( |
| 197 | + snapshot_len int32 = 65535 |
| 198 | + promiscuous bool = false |
| 199 | + err error |
| 200 | + timeout time.Duration = 30 * time.Second |
| 201 | + ) |
| 202 | + |
| 203 | + devices, err := pcap.FindAllDevs() |
| 204 | + if err != nil { |
| 205 | + log.Fatal(err) |
| 206 | + } |
| 207 | + |
| 208 | + for _, value := range devices { |
| 209 | + if value.Description == "Realtek Gaming GbE Family Controller" { |
| 210 | + //Open device |
| 211 | + s.handler, err = pcap.OpenLive(value.Name, snapshot_len, promiscuous, timeout) |
| 212 | + if err != nil { |
| 213 | + log.Fatal(err) |
| 214 | + } |
| 215 | + } |
| 216 | + fmt.Println(value.Description, value.Name) |
| 217 | + } |
| 218 | + if s.handler == nil { |
| 219 | + log.Panic("Init handle in sender is nil") |
| 220 | + } |
| 221 | +} |
| 222 | + |
| 223 | +func (s *sender) Run() { |
| 224 | + s.running = WorkStatusRunning |
| 225 | + wg.Add(1) |
| 226 | + go func() { |
| 227 | + defer wg.Done() |
| 228 | + defer s.handler.Close() |
| 229 | + |
| 230 | + stopTicker := time.NewTicker(time.Second * 5) |
| 231 | + logTicker := time.NewTicker(time.Second * 1) |
| 232 | + defer stopTicker.Stop() |
| 233 | + for s.running == WorkStatusRunning { |
| 234 | + select { |
| 235 | + case <-stopTicker.C: //do nothing |
| 236 | + case <-logTicker.C: |
| 237 | + log.Printf("[sender]chan.len()=%d,pps=%d", len(s.dsChan), s.sts.CountLoad()) |
| 238 | + s.sts.CountClear() |
| 239 | + case v, ok := <-s.dsChan: |
| 240 | + if !ok { |
| 241 | + continue |
| 242 | + } |
| 243 | + err := s.handler.WritePacketData(v) |
| 244 | + s.sts.CountAdd() |
| 245 | + if err != nil { |
| 246 | + log.Panic(err) |
| 247 | + } |
| 248 | + } |
| 249 | + } |
| 250 | + s.running = WorkStatusStop |
| 251 | + |
| 252 | + }() |
| 253 | +} |
| 254 | +func (s *sender) Stop() { |
| 255 | + s.running = WorkStatusStop |
| 256 | +} |
| 257 | + |
| 258 | +type statics struct { |
| 259 | + countPerS uint64 |
| 260 | +} |
| 261 | + |
| 262 | +func (s *statics) CountAdd() { |
| 263 | + atomic.AddUint64(&s.countPerS, 1) |
| 264 | +} |
| 265 | +func (s *statics) CountLoad() uint64 { |
| 266 | + return atomic.LoadUint64(&s.countPerS) |
| 267 | +} |
| 268 | +func (s *statics) CountClear() { |
| 269 | + atomic.StoreUint64(&s.countPerS, 0) |
| 270 | +} |
0 commit comments