Skip to content

Commit 971fd8f

Browse files
committed
print timestamp as millis; fixup from prior commit
- for formatting the record timestamp, we should format from its millis, because record timestamps are in milliseconds
1 parent dcc8df0 commit 971fd8f

File tree

4 files changed

+12
-11
lines changed

4 files changed

+12
-11
lines changed

commands/consume/transaction_state.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func (co *consumeOutput) formatTransactionState(out []byte, r *kgo.Record) []byt
3535

3636
var keep bool
3737
switch v := r.Key[1]; v {
38-
case 0, 1:
39-
out, keep = co.formatTransactionStateV01(out, r, v)
38+
case 0:
39+
out, keep = co.formatTransactionStateV0(out, r)
4040
default:
4141
out = append(out, "(unknown transaction state key format version "...)
4242
out = append(out, r.Key[1])
@@ -50,7 +50,7 @@ func (co *consumeOutput) formatTransactionState(out []byte, r *kgo.Record) []byt
5050
return out
5151
}
5252

53-
func (co *consumeOutput) formatTransactionStateV01(dst []byte, r *kgo.Record, version byte) ([]byte, bool) {
53+
func (co *consumeOutput) formatTransactionStateV0(dst []byte, r *kgo.Record) ([]byte, bool) {
5454
{
5555
var k kmsg.TxnMetadataKey
5656
if err := k.ReadFrom(r.Key); err != nil {

commands/produce/produce.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ Unfortunately, with exact sizing, the format string is unavoidably noisy.
151151
out.Die("cannot produce to a specific topic; the parse format specifies that it parses a topic")
152152
}
153153

154-
var verboseFn func([]byte, *kgo.Record) []byte
154+
var verboseFn func([]byte, *kgo.Record, *kgo.FetchPartition) []byte
155155
var verboseBuf []byte
156156
if verboseFormat != "" {
157157
verboseFn, err = format.ParseWriteFormat(verboseFormat, escape)
@@ -196,6 +196,7 @@ Unfortunately, with exact sizing, the format string is unavoidably noisy.
196196
cl.AddOpt(kgo.ProduceRetries(retries))
197197
}
198198

199+
p := &kgo.FetchPartition{}
199200
for {
200201
r, err := reader.Next()
201202
if err != nil {
@@ -207,14 +208,13 @@ Unfortunately, with exact sizing, the format string is unavoidably noisy.
207208
if !reader.ParsesTopic() {
208209
r.Topic = args[0]
209210
}
210-
err = cl.Client().Produce(context.Background(), r, func(r *kgo.Record, err error) {
211+
cl.Client().Produce(context.Background(), r, func(r *kgo.Record, err error) {
211212
out.MaybeDie(err, "unable to produce record: %v", err)
212213
if verboseFn != nil {
213-
verboseBuf = verboseFn(verboseBuf[:0], r)
214+
verboseBuf = verboseFn(verboseBuf[:0], r, p)
214215
os.Stdout.Write(verboseBuf)
215216
}
216217
})
217-
out.MaybeDie(err, "unable to produce record: %v", err)
218218
}
219219

220220
cl.Client().Flush(context.Background())

commands/transact/transact.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func transact(
199199
quitCtx context.Context,
200200
cl *kgo.Client,
201201
txnSess *kgo.GroupTransactSession,
202-
w func([]byte, *kgo.Record) []byte,
202+
w func([]byte, *kgo.Record, *kgo.FetchPartition) []byte,
203203
r *format.Reader,
204204
destTopic string,
205205
verbose bool,
@@ -228,7 +228,7 @@ func transact(
228228
for _, partition := range topic.Partitions {
229229
out.MaybeDie(partition.Err, "fetch partition error: %v", partition.Err)
230230
for _, record := range partition.Records {
231-
buf = w(buf, record)
231+
buf = w(buf, record, &partition)
232232
}
233233
}
234234
}

format/parse_out.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,14 @@ func ParseWriteFormat(format string, escape rune) (func([]byte, *kgo.Record, *kg
208208
return nil, fmt.Errorf("unknown %sd{ time specification", escstr)
209209
}
210210
format = format[n:]
211+
211212
argFns = append(argFns, func(out []byte, r *kgo.Record, _ *kgo.FetchPartition) []byte {
212-
return numfn(out, int64(r.Timestamp.UnixNano()))
213+
return numfn(out, int64(r.Timestamp.UnixNano())/1e6)
213214
})
214215
}
215216
} else {
216217
argFns = append(argFns, func(out []byte, r *kgo.Record, _ *kgo.FetchPartition) []byte {
217-
return strconv.AppendInt(out, r.Timestamp.UnixNano(), 10)
218+
return strconv.AppendInt(out, r.Timestamp.UnixNano()/1e6, 10)
218219
})
219220
}
220221

0 commit comments

Comments
 (0)