1
1
package main
2
2
3
3
import (
4
+ "bufio"
4
5
"errors"
5
- "strings"
6
6
"flag"
7
7
"fmt"
8
+ "github.com/paracrawl/giashard"
8
9
"log"
9
10
"os"
10
- "github.com/paracrawl/giashard "
11
+ "strings "
11
12
)
12
13
13
14
var outdir string
15
+ var dirslist string
14
16
var shards uint
15
17
var batchsize int64
16
18
var fileslist string
@@ -20,22 +22,55 @@ var schema = []string{"url", "mime", "plain_text"}
20
22
21
23
func init () {
22
24
flag .StringVar (& outdir , "o" , "." , "Output location" )
25
+ flag .StringVar (& dirslist , "l" , "" , "Input file listing all input directories" )
23
26
flag .StringVar (& fileslist , "f" , "plain_text,url,mime" , "Files to shard, separated by commas" )
24
27
flag .UintVar (& shards , "n" , 8 , "Number of shards (2^n)" )
25
28
flag .Int64Var (& batchsize , "b" , 100 , "Batch size in MB" )
26
29
flag .StringVar (& domainList , "d" , "" , "Additional public suffix entries" )
27
30
flag .Usage = func () {
28
- fmt .Fprintf (flag .CommandLine .Output (), "Usage: %s [flags] input directories\n " , os .Args [0 ])
31
+ _ , err := fmt .Fprintf (flag .CommandLine .Output (), "Usage: %s [flags] input directories\n " , os .Args [0 ])
32
+ if err != nil {
33
+ return
34
+ }
29
35
flag .PrintDefaults ()
30
- fmt .Fprintf (flag .CommandLine .Output (),
31
- `Shards together the directories give on input. They are assumed to be in the
36
+ _ , err = fmt .Fprintf (flag .CommandLine .Output (),
37
+ `Shards together the directories given on input. They are assumed to be in the
32
38
standard Paracrawl column storage format. The output is a tree of directories
33
39
of the form: outdir/shard/batch where shard is computed as a hash of the
34
40
significant part of the hostname in a url and batch is approximately fixed size.
35
41
` )
42
+ if err != nil {
43
+ return
44
+ }
36
45
}
37
46
}
38
47
48
+ func processfile (source string , schema []string , w * giashard.Shard , hostname string ) {
49
+ log .Printf ("Processing input: %v" , source )
50
+ r , err := giashard .NewColumnReader (source , schema ... )
51
+ if err != nil {
52
+ log .Printf ("Error opening input reader: %v" , err )
53
+ return
54
+ }
55
+
56
+ // provenance data - where is this from
57
+ provdata := []byte (fmt .Sprintf ("%s:%s" , hostname , source ))
58
+ for row := range r .Rows () {
59
+ row ["source" ] = provdata
60
+ if err := w .WriteRow (row ); err != nil {
61
+ if errors .Is (err , giashard .ShardError ) { // not fatal
62
+ log .Print (err )
63
+ continue
64
+ }
65
+ log .Fatalf ("Error writing row: %v" , err )
66
+ }
67
+ }
68
+
69
+ err = r .Close ()
70
+ if err != nil {
71
+ return
72
+ }
73
+ }
39
74
func main () {
40
75
log .SetFlags (log .Ldate | log .Ltime | log .Lshortfile )
41
76
flag .Parse ()
@@ -50,40 +85,47 @@ func main() {
50
85
}
51
86
}
52
87
53
- w , err := giashard .NewShard (outdir , shards , batchsize * 1024 * 1024 , "url" , append (schema , "source" )... )
88
+ w , err := giashard .NewShard (outdir , shards , batchsize * 1024 * 1024 , "url" , append (schema , "source" )... )
54
89
if err != nil {
55
90
log .Fatalf ("Error opening output shards: %v" , err )
56
91
}
57
- defer w .Close ()
92
+ defer func (w * giashard.Shard ) {
93
+ var err = w .Close ()
94
+ if err != nil {
95
+
96
+ }
97
+ }(w )
58
98
59
99
hostname , err := os .Hostname ()
60
100
if err != nil {
61
101
log .Fatalf ("Error getting local hostname: %v" , err )
62
102
}
63
103
64
- for i := 0 ; i < flag .NArg (); i ++ {
104
+ for i := 0 ; i < flag .NArg (); i ++ {
65
105
source := flag .Arg (i )
106
+ processfile (source , schema , w , hostname )
107
+ }
66
108
67
- log . Printf ( "Processing input: %v" , source )
68
- r , err := giashard . NewColumnReader ( source , schema ... )
109
+ if dirslist != "" {
110
+ file , err := os . Open ( dirslist )
69
111
if err != nil {
70
- log .Printf ("Error opening input reader: %v" , err )
71
- continue
112
+ log .Fatal (err )
72
113
}
114
+ defer func (file * os.File ) {
115
+ var err = file .Close ()
116
+ if err != nil {
73
117
74
- // provenance data - where is this from
75
- provdata := []byte (fmt .Sprintf ("%s:%s" , hostname , source ))
76
- for row := range r .Rows () {
77
- row ["source" ] = provdata
78
- if err := w .WriteRow (row ); err != nil {
79
- if errors .Is (err , giashard .ShardError ) { // not fatal
80
- log .Print (err )
81
- continue
82
- }
83
- log .Fatalf ("Error writing row: %v" , err )
84
118
}
119
+ }(file )
120
+
121
+ scanner := bufio .NewScanner (file )
122
+ for scanner .Scan () {
123
+ source := scanner .Text ()
124
+ processfile (source , schema , w , hostname )
85
125
}
86
126
87
- r .Close ()
127
+ if err := scanner .Err (); err != nil {
128
+ log .Fatal (err )
129
+ }
88
130
}
89
131
}
0 commit comments