Skip to content

Commit

Permalink
feat(forward): support dry run
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Nov 16, 2023
1 parent 4128065 commit 0944b07
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
2 changes: 2 additions & 0 deletions app/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Options struct {
To string
Mode forwarder.Mode
Silent bool
DryRun bool
}

func Run(ctx context.Context, opts Options) error {
Expand Down Expand Up @@ -78,6 +79,7 @@ func Run(ctx context.Context, opts Options) error {
Pool: pool,
Iter: newIter(manager, pool, to, dialogs),
Silent: opts.Silent,
DryRun: opts.DryRun,
Mode: opts.Mode,
Progress: newProgress(fwProgress),
})
Expand Down
1 change: 1 addition & 0 deletions cmd/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewForward() *cobra.Command {
cmd.Flags().StringVar(&opts.To, "to", "", "destination peer, can be a CHAT or router based on expression engine")
cmd.Flags().Var(&opts.Mode, "mode", fmt.Sprintf("forward mode: [%s]", strings.Join(forwarder.ModeNames(), ", ")))
cmd.Flags().BoolVar(&opts.Silent, "silent", false, "send messages silently")
cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "do not actually send messages, just show how they would be sent")

return cmd
}
21 changes: 16 additions & 5 deletions pkg/forwarder/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,29 @@ import (
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"

"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/tmedia"
)

type CloneOptions struct {
Pool dcpool.Pool
Media *tmedia.Media
PartSize int
Progress uploader.Progress
}

func CloneMedia(ctx context.Context, opts CloneOptions) (tg.InputFileClass, error) {
func (f *Forwarder) CloneMedia(ctx context.Context, opts CloneOptions) (tg.InputFileClass, error) {
// if dry run, just return empty input file
if f.opts.DryRun {
// directly call progress callback
if err := opts.Progress.Chunk(ctx, uploader.ProgressState{
Uploaded: opts.Media.Size,
Total: opts.Media.Size,
}); err != nil {
return nil, errors.Wrap(err, "dry run chunk")
}

return &tg.InputFile{}, nil
}

r, w := io.Pipe()

wg, errctx := errgroup.WithContext(ctx)
Expand All @@ -32,7 +43,7 @@ func CloneMedia(ctx context.Context, opts CloneOptions) (tg.InputFileClass, erro

_, err := downloader.NewDownloader().
WithPartSize(opts.PartSize).
Download(opts.Pool.Client(ctx, opts.Media.DC), opts.Media.InputFileLoc).
Download(f.opts.Pool.Client(ctx, opts.Media.DC), opts.Media.InputFileLoc).
Stream(errctx, w)
if err != nil {
return errors.Wrap(err, "download")
Expand All @@ -46,7 +57,7 @@ func CloneMedia(ctx context.Context, opts CloneOptions) (tg.InputFileClass, erro

var err error
upload := uploader.NewUpload(opts.Media.Name, r, opts.Media.Size)
file, err = uploader.NewUploader(opts.Pool.Default(ctx)).
file, err = uploader.NewUploader(f.opts.Pool.Default(ctx)).
WithPartSize(opts.PartSize).
WithProgress(opts.Progress).
Upload(errctx, upload)
Expand Down
30 changes: 22 additions & 8 deletions pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/go-faster/errors"
"github.com/gotd/td/bin"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Options struct {
Pool dcpool.Pool
Iter Iter
Silent bool
DryRun bool
Mode Mode
Progress Progress
}
Expand Down Expand Up @@ -134,7 +136,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
}
req.SetFlags()

if _, err := f.opts.Pool.Default(ctx).MessagesSendMessage(ctx, req); err != nil {
if _, err := f.forwardClient(ctx).MessagesSendMessage(ctx, req); err != nil {
return errors.Wrap(err, "send message")
}
return nil
Expand Down Expand Up @@ -170,8 +172,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
return nil, errors.Errorf("unsupported media %T", msg.Media)
}

mediaFile, err := CloneMedia(ctx, CloneOptions{
Pool: f.opts.Pool,
mediaFile, err := f.CloneMedia(ctx, CloneOptions{
Media: media,
PartSize: viper.GetInt(consts.FlagPartSize),
Progress: uploadProgress{
Expand Down Expand Up @@ -204,8 +205,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
return nil, errors.Errorf("empty document thumb %d", msg.ID)
}

thumbFile, err := CloneMedia(ctx, CloneOptions{
Pool: f.opts.Pool,
thumbFile, err := f.CloneMedia(ctx, CloneOptions{
Media: thumb,
PartSize: viper.GetInt(consts.FlagPartSize),
Progress: nopProgress{},
Expand Down Expand Up @@ -237,7 +237,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
case ModeDirect:
// it can be forwarded via API
if !protectedDialog(from) && !protectedMessage(msg) {
builder := message.NewSender(f.opts.Pool.Default(ctx)).
builder := message.NewSender(f.forwardClient(ctx)).
To(to.InputPeer()).CloneBuilder()
if f.opts.Silent {
builder = builder.Silent()
Expand Down Expand Up @@ -297,7 +297,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
SendAs: nil,
}
req.SetFlags()
if _, err := f.opts.Pool.Default(ctx).MessagesSendMultiMedia(ctx, req); err != nil {
if _, err := f.forwardClient(ctx).MessagesSendMultiMedia(ctx, req); err != nil {
return errors.Wrap(err, "send multi media")
}
return nil
Expand Down Expand Up @@ -330,7 +330,7 @@ func (f *Forwarder) forwardMessage(ctx context.Context, from, to peers.Peer, msg
}
req.SetFlags()

if _, err := f.opts.Pool.Default(ctx).MessagesSendMedia(ctx, req); err != nil {
if _, err := f.forwardClient(ctx).MessagesSendMedia(ctx, req); err != nil {
return errors.Wrap(err, "send single media")
}
return nil
Expand All @@ -343,6 +343,20 @@ func (f *Forwarder) sentTuple(peer peers.Peer, msg *tg.Message) [2]int64 {
return [2]int64{peer.ID(), int64(msg.ID)}
}

type nopInvoker struct{}

func (n nopInvoker) Invoke(_ context.Context, _ bin.Encoder, _ bin.Decoder) error {
return nil
}

func (f *Forwarder) forwardClient(ctx context.Context) *tg.Client {
if f.opts.DryRun {
return tg.NewClient(nopInvoker{})
}

return f.opts.Pool.Default(ctx)
}

func protectedDialog(peer peers.Peer) bool {
switch p := peer.(type) {
case peers.Chat:
Expand Down

0 comments on commit 0944b07

Please sign in to comment.