Skip to content

Commit

Permalink
refactor(forwarder): extract interface args to meta struct
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Nov 13, 2023
1 parent adda6d6 commit 1b91b9e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
2 changes: 1 addition & 1 deletion app/forward/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (i *iter) Next(ctx context.Context) bool {

msg, err := utils.Telegram.GetSingleMessage(ctx, i.pool.Default(ctx), peer.InputPeer(), m)
if err != nil {
i.err = errors.Wrapf(err, "get message %d", msg.ID)
i.err = errors.Wrapf(err, "get message: %d", m)
return false
}

Expand Down
34 changes: 15 additions & 19 deletions app/forward/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import (
"strings"

"github.com/fatih/color"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"
pw "github.com/jedib0t/go-pretty/v6/progress"
"go.uber.org/zap"

"github.com/iyear/tdl/pkg/forwarder"
"github.com/iyear/tdl/pkg/prog"
Expand All @@ -17,43 +14,42 @@ import (

type progress struct {
pw pw.Writer
trackers map[[2]int64]*pw.Tracker
log *zap.Logger
trackers map[[3]int64]*pw.Tracker
}

func newProgress(p pw.Writer) *progress {
return &progress{
pw: p,
trackers: make(map[[2]int64]*pw.Tracker),
trackers: make(map[[3]int64]*pw.Tracker),
}
}

func (p *progress) OnAdd(peer peers.Peer, msg *tg.Message) {
tracker := prog.AppendTracker(p.pw, pw.FormatNumber, p.processMessage(peer, msg, false), 1)
p.trackers[p.tuple(peer, msg)] = tracker
func (p *progress) OnAdd(meta *forwarder.ProgressMeta) {
tracker := prog.AppendTracker(p.pw, pw.FormatNumber, p.processMessage(meta, false), 1)
p.trackers[p.tuple(meta)] = tracker
}

func (p *progress) OnClone(peer peers.Peer, msg *tg.Message, state forwarder.ProgressState) {
tracker, ok := p.trackers[p.tuple(peer, msg)]
func (p *progress) OnClone(meta *forwarder.ProgressMeta, state forwarder.ProgressState) {
tracker, ok := p.trackers[p.tuple(meta)]
if !ok {
return
}

// display re-upload transfer info
tracker.Units.Formatter = utils.Byte.FormatBinaryBytes
tracker.UpdateMessage(p.processMessage(peer, msg, true))
tracker.UpdateMessage(p.processMessage(meta, true))
tracker.UpdateTotal(state.Total)
tracker.SetValue(state.Done)
}

func (p *progress) OnDone(peer peers.Peer, msg *tg.Message, err error) {
tracker, ok := p.trackers[p.tuple(peer, msg)]
func (p *progress) OnDone(meta *forwarder.ProgressMeta, err error) {
tracker, ok := p.trackers[p.tuple(meta)]
if !ok {
return
}

if err != nil {
p.pw.Log(color.RedString("%d-%d error: %s", peer.ID(), msg.ID, err.Error()))
p.pw.Log(color.RedString("%d-%d error: %s", meta.From.ID(), meta.Msg.ID, err.Error()))
tracker.MarkAsErrored()
return
}
Expand All @@ -62,15 +58,15 @@ func (p *progress) OnDone(peer peers.Peer, msg *tg.Message, err error) {
tracker.MarkAsDone()
}

func (p *progress) tuple(peer peers.Peer, msg *tg.Message) [2]int64 {
return [2]int64{peer.ID(), int64(msg.ID)}
func (p *progress) tuple(meta *forwarder.ProgressMeta) [3]int64 {
return [3]int64{meta.From.ID(), int64(meta.Msg.ID), meta.To.ID()}
}

func (p *progress) processMessage(peer peers.Peer, msg *tg.Message, clone bool) string {
func (p *progress) processMessage(meta *forwarder.ProgressMeta, clone bool) string {
b := &strings.Builder{}

// TODO(iyear): display visible name which should be cut to 20 chars
b.WriteString(fmt.Sprintf("%d-%d", peer.ID(), msg.ID))
b.WriteString(fmt.Sprintf("%d-%d-%d", meta.From.ID(), meta.Msg.ID, meta.To.ID()))
if clone {
b.WriteString(" [clone]")
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,21 @@ func (f *Forwarder) Forward(ctx context.Context) error {
}

func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg *tg.Message, grouped ...*tg.Message) (rerr error) {
f.opts.Progress.OnAdd(from, msg)
meta := &ProgressMeta{
From: from,
Msg: msg,
To: to,
}

f.opts.Progress.OnAdd(meta)
defer func() {
f.sent[f.sentTuple(from, msg)] = struct{}{}

// grouped message also should be marked as sent
for _, m := range grouped {
f.sent[f.sentTuple(from, m)] = struct{}{}
}
f.opts.Progress.OnDone(from, msg, rerr)
f.opts.Progress.OnDone(meta, rerr)
}()

log := logger.From(ctx).With(
Expand Down Expand Up @@ -167,8 +173,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
Media: media,
PartSize: viper.GetInt(consts.FlagPartSize),
Progress: uploadProgress{
peer: from,
msg: msg,
meta: meta,
progress: f.opts.Progress,
},
})
Expand Down
17 changes: 11 additions & 6 deletions pkg/forwarder/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ import (
)

type Progress interface {
OnAdd(peer peers.Peer, msg *tg.Message)
OnClone(peer peers.Peer, msg *tg.Message, state ProgressState)
OnDone(peer peers.Peer, msg *tg.Message, err error)
OnAdd(meta *ProgressMeta)
OnClone(meta *ProgressMeta, state ProgressState)
OnDone(meta *ProgressMeta, err error)
}

type ProgressMeta struct {
From peers.Peer
Msg *tg.Message
To peers.Peer
}

type ProgressState struct {
Expand All @@ -20,13 +26,12 @@ type ProgressState struct {
}

type uploadProgress struct {
peer peers.Peer
msg *tg.Message
meta *ProgressMeta
progress Progress
}

func (p uploadProgress) Chunk(_ context.Context, state uploader.ProgressState) error {
p.progress.OnClone(p.peer, p.msg, ProgressState{
p.progress.OnClone(p.meta, ProgressState{
Done: state.Uploaded,
Total: state.Total,
})
Expand Down

0 comments on commit 1b91b9e

Please sign in to comment.