Skip to content

Commit 40d4fde

Browse files
committed
Add record ordering
1 parent d23b5c1 commit 40d4fde

File tree

8 files changed

+294
-4
lines changed

8 files changed

+294
-4
lines changed

example/reader/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func main() {
4545
fmt.Printf("nffile:\n%v", nffile)
4646

4747
// Dump flow records
48-
recordChannel, _ := nffile.AllRecords()
48+
recordChannel, _ := nffile.AllRecords().Get()
4949
cnt := 0
5050
for record := range recordChannel {
5151
cnt++

example/sorter/main.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright © 2024 Peter Haag [email protected]
2+
// All rights reserved.
3+
//
4+
// Use of this source code is governed by the license that can be
5+
// found in the LICENSE file.
6+
7+
package main
8+
9+
import (
10+
"flag"
11+
"fmt"
12+
"os"
13+
14+
nfdump "github.com/phaag/go-nfdump"
15+
)
16+
17+
var (
18+
fileName = flag.String("r", "", "nfdump file to read")
19+
)
20+
21+
func main() {
22+
23+
flag.CommandLine.Usage = func() {
24+
fmt.Fprintf(os.Stderr, "Usage of %s [flags]\n", os.Args[0])
25+
flag.PrintDefaults()
26+
}
27+
28+
flag.Parse()
29+
30+
if len(*fileName) == 0 {
31+
fmt.Printf("Filename required\n")
32+
flag.PrintDefaults()
33+
os.Exit(255)
34+
}
35+
36+
// get empty new nffile object
37+
nffile := nfdump.New()
38+
39+
// open the flow file
40+
if err := nffile.Open(*fileName); err != nil {
41+
fmt.Printf("Failed to open nf file: %v\n", err)
42+
os.Exit(255)
43+
}
44+
45+
// Read all flow records and append the OrderBy() processing
46+
// finally get the flows and print them
47+
if recordChannel, err := nffile.AllRecords().OrderBy("bytes", nfdump.DESCENDING).Get(); err != nil {
48+
fmt.Printf("Failed to process flows: %v\n", err)
49+
} else {
50+
for record := range recordChannel {
51+
record.PrintLine()
52+
}
53+
}
54+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ require (
66
github.com/klauspost/compress v1.17.8
77
github.com/pierrec/lz4/v4 v4.1.17
88
github.com/rasky/go-lzo v0.0.0-20200203143853-96a758eda86e
9+
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553
910
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc
44
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
55
github.com/rasky/go-lzo v0.0.0-20200203143853-96a758eda86e h1:dCWirM5F3wMY+cmRda/B1BiPsFtmzXqV9b0hLWtVBMs=
66
github.com/rasky/go-lzo v0.0.0-20200203143853-96a758eda86e/go.mod h1:9leZcVcItj6m9/CfHY5Em/iBrCz7js8LcRQGTKEEv2M=
7+
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553 h1:DRC1ubdb3ZmyyIeCSTxjZIQAnpLPfKVgYrLETQuOPjo=
8+
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553/go.mod h1:Rj7Csq/tZ/egz+Ltc2IVpsA5309AmSMEswjkTZmq2Xc=

nffile.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ func (nfFile *NfFile) ReadDataBlocks() (chan DataBlock, error) {
265265
// AllRecord takes an NfFile object and returns a channel of FlowRecordV3
266266
// it reads and uncompresses the data blocks with ReadDataBlocks
267267
// Iterating over the channel reads all flow records
268-
func (nfFile *NfFile) AllRecords() (chan *FlowRecordV3, error) {
268+
// returns a record chain type
269+
func (nfFile *NfFile) AllRecords() *RecordChain {
269270
recordChannel := make(chan *FlowRecordV3, 32)
270271
go func() {
271272
blockChannel, _ := nfFile.ReadDataBlocks()
@@ -306,5 +307,9 @@ func (nfFile *NfFile) AllRecords() (chan *FlowRecordV3, error) {
306307
}
307308
close(recordChannel)
308309
}()
309-
return recordChannel, nil
310+
311+
chain := new(RecordChain)
312+
chain.recordChan = recordChannel
313+
chain.err = nil
314+
return chain
310315
}

orderby.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright © 2024 Peter Haag [email protected]
2+
// All rights reserved.
3+
//
4+
// Use of this source code is governed by the license that can be
5+
// found in the LICENSE file.
6+
7+
package nfdump
8+
9+
import (
10+
"fmt"
11+
12+
"github.com/twotwotwo/sorts"
13+
)
14+
15+
type sortRecord struct {
16+
index uint32
17+
value uint64
18+
}
19+
20+
type sortType []sortRecord
21+
22+
var sortArray []sortRecord
23+
var recordArray []*FlowRecordV3
24+
25+
// sort direction ASCENDING
26+
const ASCENDING = 1
27+
28+
// sort direction DESCENDING
29+
const DESCENDING = 2
30+
31+
// implement sorts interface
32+
// return len of sorting array
33+
func (a sortType) Len() int { return len(a) }
34+
35+
// swap 2 elements
36+
func (a sortType) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
37+
38+
// return key of element i
39+
func (a sortType) Key(i int) uint64 {
40+
return a[i].value
41+
}
42+
43+
// compare tw values for less
44+
func (a sortType) Less(i, j int) bool {
45+
return a[i].value < a[j].value
46+
}
47+
48+
// value functions
49+
// return appropriate values
50+
type valueFuncType func(record *FlowRecordV3) uint64
51+
52+
// tstart in msec
53+
func getTstart(record *FlowRecordV3) uint64 {
54+
var value uint64
55+
if genericFlow := record.GenericFlow(); genericFlow != nil {
56+
value = genericFlow.MsecFirst
57+
}
58+
return value
59+
}
60+
61+
// tend in msec
62+
func getTend(record *FlowRecordV3) uint64 {
63+
var value uint64
64+
if genericFlow := record.GenericFlow(); genericFlow != nil {
65+
value = genericFlow.MsecFirst
66+
}
67+
return value
68+
}
69+
70+
// packets
71+
func getPackets(record *FlowRecordV3) uint64 {
72+
var value uint64
73+
if genericFlow := record.GenericFlow(); genericFlow != nil {
74+
value = genericFlow.InPackets
75+
}
76+
return value
77+
}
78+
79+
// bytes
80+
func getBytes(record *FlowRecordV3) uint64 {
81+
var value uint64
82+
if genericFlow := record.GenericFlow(); genericFlow != nil {
83+
value = genericFlow.InBytes
84+
}
85+
return value
86+
}
87+
88+
// order option - name and function
89+
type orderOption struct {
90+
name string
91+
orderFunc valueFuncType
92+
}
93+
94+
// list all possible orderBy options
95+
var orderTable = []orderOption{
96+
orderOption{"tstart", getTstart},
97+
orderOption{"tend", getTend},
98+
orderOption{"packets", getPackets},
99+
orderOption{"bytes", getBytes},
100+
}
101+
102+
// function used recordChain as input, sort the records by orderBy
103+
// accepts and orderBy, defined in the order table as name
104+
// direction is einer ASCENDING or DESCENDING
105+
// returns chain element with channel of sorted records
106+
func (recordChain *RecordChain) OrderBy(orderBy string, direction int) *RecordChain {
107+
// propagate error
108+
if recordChain.err != nil {
109+
return &RecordChain{recordChan: nil, err: recordChain.err}
110+
}
111+
112+
var valueFunc valueFuncType
113+
for i := 0; i < len(orderTable); i++ {
114+
if orderBy == orderTable[i].name {
115+
valueFunc = orderTable[i].orderFunc
116+
}
117+
}
118+
119+
if valueFunc == nil {
120+
return &RecordChain{recordChan: nil, err: fmt.Errorf("Unknown orderBy: %s", orderBy)}
121+
}
122+
123+
writeChan := make(chan *FlowRecordV3, 64)
124+
125+
// store all flow records into an array for later printing
126+
recordArray = make([]*FlowRecordV3, 1024*1024)
127+
128+
// store value to be sorted and index of appropriate flow record of
129+
// recordArray. Keeps sortArray smaller - cache friendly
130+
sortArray = make([]sortRecord, 1024*1024)
131+
132+
go func(readChan chan *FlowRecordV3) {
133+
134+
var arrayLen = len(sortArray)
135+
// use direct access [cnt] to slice to speed up instead of append()
136+
// increase array if needed
137+
var cnt uint32 = 0
138+
for record := range readChan {
139+
if uint32(arrayLen)-cnt == 0 {
140+
// double array, if exhausted
141+
sortArray = append(make([]sortRecord, 2*arrayLen), sortArray...)
142+
recordArray = append(make([]*FlowRecordV3, 2*arrayLen), recordArray...)
143+
144+
// use new len of array. Go may assign more memory than requested
145+
// so use actual len
146+
arrayLen = len(sortArray)
147+
}
148+
recordArray[cnt] = record
149+
value := valueFunc(record)
150+
sortArray[cnt] = sortRecord{cnt, value}
151+
cnt++
152+
}
153+
154+
// sort array
155+
// the interface makes use of len() - therefore cut slice pointer to real size
156+
sorts.ByUint64(sortType(sortArray[0:cnt]))
157+
158+
if direction == ASCENDING {
159+
for i := 0; i < int(cnt); i++ {
160+
index := sortArray[i].index
161+
record := recordArray[index]
162+
writeChan <- record
163+
}
164+
} else {
165+
for i := int(cnt) - 1; i >= 0; i-- {
166+
index := sortArray[i].index
167+
record := recordArray[index]
168+
writeChan <- record
169+
}
170+
}
171+
close(writeChan)
172+
173+
}(recordChain.recordChan)
174+
175+
return &RecordChain{recordChan: writeChan, err: nil}
176+
} // End of OrderBy

printLine.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright © 2024 Peter Haag [email protected]
2+
// All rights reserved.
3+
//
4+
// Use of this source code is governed by the license that can be
5+
// found in the LICENSE file.
6+
7+
package nfdump
8+
9+
import (
10+
"fmt"
11+
"time"
12+
)
13+
14+
func fmtDuration(d uint64) string {
15+
msec := d % 1000
16+
d = (d - msec) / 1000
17+
sec := d % 60
18+
d = (d - sec) / 60
19+
min := d % 60
20+
d = (d - min) / 60
21+
hour := d % 24
22+
days := (d - hour) / 24
23+
if days == 0 {
24+
return fmt.Sprintf(" %02d:%02d:%02d.%03d", hour, min, sec, msec)
25+
} else {
26+
return fmt.Sprintf("%02dd %02d:%02d:%02d.%03d", days, hour, min, sec, msec)
27+
}
28+
}
29+
30+
// Return generic extension
31+
func (flowRecord *FlowRecordV3) PrintLine() {
32+
if genericFlow := flowRecord.GenericFlow(); genericFlow != nil {
33+
tTime := time.UnixMilli(int64(genericFlow.MsecFirst))
34+
duration := genericFlow.MsecLast - genericFlow.MsecFirst
35+
ipAddr := flowRecord.IP()
36+
if ipAddr != nil {
37+
fmt.Printf("%s %s %3d %15v:%-5v -> %15v:%-5v %5d %7d\n", tTime.Format("2006-01-02 15:04:05.000"), fmtDuration(duration), genericFlow.Proto,
38+
ipAddr.SrcIP, genericFlow.SrcPort, ipAddr.DstIP, genericFlow.DstPort, genericFlow.InPackets, genericFlow.InBytes)
39+
}
40+
}
41+
}

record.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,18 @@ type FlowRecordV3 struct {
4040
extOffset [MAXEXTENSIONS]elementParam
4141
}
4242

43-
// Extract next flow record from []byte stream
43+
// type to return/accept in the flow processing record chain
44+
type RecordChain struct {
45+
recordChan chan *FlowRecordV3
46+
err error
47+
}
48+
49+
// at the end of flow processing return a channel of all records
50+
func (recordChain *RecordChain) Get() (chan *FlowRecordV3, error) {
51+
return recordChain.recordChan, recordChain.err
52+
}
53+
54+
// Extract the next flow record from []byte stream
4455
func NewRecord(record []byte) (*FlowRecordV3, error) {
4556

4657
offset := 0

0 commit comments

Comments
 (0)