From 9f9f7a818d3f3872223e91f23706c23d42d06d25 Mon Sep 17 00:00:00 2001 From: iyear Date: Tue, 14 Nov 2023 21:48:34 +0800 Subject: [PATCH] feat(forward): support message forward router --- app/forward/forward.go | 49 ++++++++++++++++++++--- app/forward/iter.go | 69 ++++++++++++++++++++++++++++---- pkg/texpr/{message.go => env.go} | 7 +++- 3 files changed, 109 insertions(+), 16 deletions(-) rename pkg/texpr/{message.go => env.go} (93%) diff --git a/app/forward/forward.go b/app/forward/forward.go index af48babc0c..f4b4cd275c 100644 --- a/app/forward/forward.go +++ b/app/forward/forward.go @@ -2,9 +2,13 @@ package forward import ( "context" + "fmt" + "os" + "reflect" "strings" - "github.com/fatih/color" + "github.com/antonmedv/expr" + "github.com/antonmedv/expr/vm" "github.com/go-faster/errors" "github.com/gotd/contrib/middleware/floodwait" "github.com/gotd/td/telegram/peers" @@ -19,6 +23,7 @@ import ( "github.com/iyear/tdl/pkg/forwarder" "github.com/iyear/tdl/pkg/prog" "github.com/iyear/tdl/pkg/storage" + "github.com/iyear/tdl/pkg/texpr" "github.com/iyear/tdl/pkg/tmessage" "github.com/iyear/tdl/pkg/utils" ) @@ -31,6 +36,18 @@ type Options struct { } func Run(ctx context.Context, opts Options) error { + if opts.To == "-" { + fg := texpr.NewFieldsGetter(nil) + + fields, err := fg.Walk(exprEnv(nil, nil)) + if err != nil { + return fmt.Errorf("failed to walk fields: %w", err) + } + + fmt.Print(fg.Sprint(fields, true)) + return nil + } + c, kvd, err := tgc.NoLogin(ctx) if err != nil { return err @@ -50,18 +67,16 @@ func Run(ctx context.Context, opts Options) error { manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx)) - peerTo, err := utils.Telegram.GetInputPeer(ctx, manager, opts.To) + to, err := resolveDestPeer(ctx, manager, opts.To) if err != nil { return errors.Wrap(err, "resolve dest peer") } - color.Green("All messages will be forwarded to %s(%d)", peerTo.VisibleName(), peerTo.ID()) - fwProgress := prog.New(pw.FormatNumber) fw := forwarder.New(forwarder.Options{ Pool: pool, - Iter: newIter(manager, pool, peerTo, dialogs), + Iter: newIter(manager, pool, to, dialogs), Silent: opts.Silent, Mode: opts.Mode, Progress: newProgress(fwProgress), @@ -90,7 +105,7 @@ func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, er return nil, errors.Wrap(err, "parse from url") } default: - d, err = tmessage.Parse(tmessage.FromFile(ctx, tctx.Pool(ctx), tctx.KV(ctx), []string{p})) + d, err = tmessage.Parse(tmessage.FromFile(ctx, tctx.Pool(ctx), tctx.KV(ctx), []string{p}, false)) if err != nil { return nil, errors.Wrap(err, "parse from file") } @@ -101,3 +116,25 @@ func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, er return dialogs, nil } + +// resolveDestPeer parses the input string and returns a vm.Program. It can be a CHAT, a text or a file based on expression engine. +func resolveDestPeer(ctx context.Context, manager *peers.Manager, input string) (*vm.Program, error) { + compile := func(i string) (*vm.Program, error) { + // we pass empty peer and message to enable type checking + return expr.Compile(i, expr.AsKind(reflect.String), expr.Env(exprEnv(nil, nil))) + } + + // file + if exp, err := os.ReadFile(input); err == nil { + return compile(string(exp)) + } + + // chat + if _, err := utils.Telegram.GetInputPeer(ctx, manager, input); err == nil { + // convert to const string + return compile(fmt.Sprintf(`"%s"`, input)) + } + + // text + return compile(input) +} diff --git a/app/forward/iter.go b/app/forward/iter.go index d48ec5fb35..2e022dc703 100644 --- a/app/forward/iter.go +++ b/app/forward/iter.go @@ -3,11 +3,14 @@ package forward import ( "context" + "github.com/antonmedv/expr/vm" "github.com/go-faster/errors" "github.com/gotd/td/telegram/peers" + "github.com/gotd/td/tg" "github.com/iyear/tdl/pkg/dcpool" "github.com/iyear/tdl/pkg/forwarder" + "github.com/iyear/tdl/pkg/texpr" "github.com/iyear/tdl/pkg/tmessage" "github.com/iyear/tdl/pkg/utils" ) @@ -15,14 +18,39 @@ import ( type iter struct { manager *peers.Manager pool dcpool.Pool - to peers.Peer + to *vm.Program dialogs []*tmessage.Dialog i, j int elem *forwarder.Elem err error } -func newIter(manager *peers.Manager, pool dcpool.Pool, to peers.Peer, dialogs []*tmessage.Dialog) *iter { +type env struct { + From struct { + ID int64 `comment:"ID of dialog"` + Username string `comment:"Username of dialog"` + VisibleName string `comment:"Title of channel and group, first and last name of user"` + } + Message texpr.EnvMessage +} + +func exprEnv(from peers.Peer, msg *tg.Message) env { + e := env{} + + if from != nil { + e.From.ID = from.ID() + e.From.Username, _ = from.Username() + e.From.VisibleName = from.VisibleName() + } + + if msg != nil { + e.Message = texpr.ConvertEnvMessage(msg) + } + + return e +} + +func newIter(manager *peers.Manager, pool dcpool.Pool, to *vm.Program, dialogs []*tmessage.Dialog) *iter { return &iter{ manager: manager, pool: pool, @@ -39,7 +67,8 @@ func (i *iter) Next(ctx context.Context) bool { default: } - if i.i >= len(i.dialogs) || i.j >= len(i.dialogs[i.i].Messages) || i.err != nil { + // end of iteration or error occurred + if i.i >= len(i.dialogs) || i.err != nil { return false } @@ -50,21 +79,45 @@ func (i *iter) Next(ctx context.Context) bool { i.j = 0 } - peer, err := i.manager.FromInputPeer(ctx, p) + from, err := i.manager.FromInputPeer(ctx, p) if err != nil { - i.err = errors.Wrap(err, "get peer") + i.err = errors.Wrap(err, "get from peer") return false } - msg, err := utils.Telegram.GetSingleMessage(ctx, i.pool.Default(ctx), peer.InputPeer(), m) + msg, err := utils.Telegram.GetSingleMessage(ctx, i.pool.Default(ctx), from.InputPeer(), m) if err != nil { i.err = errors.Wrapf(err, "get message: %d", m) return false } + // message routing + result, err := texpr.Run(i.to, exprEnv(from, msg)) + if err != nil { + i.err = errors.Wrap(err, "message routing") + return false + } + destPeer, ok := result.(string) + if !ok { + i.err = errors.Errorf("message router must return string: %T", result) + return false + } + + var to peers.Peer + if destPeer == "" { // self + to, err = i.manager.Self(ctx) + } else { + to, err = utils.Telegram.GetInputPeer(ctx, i.manager, destPeer) + } + + if err != nil { + i.err = errors.Wrapf(err, "resolve dest peer: %s", destPeer) + return false + } + i.elem = &forwarder.Elem{ - From: peer, - To: i.to, + From: from, + To: to, Msg: msg, } diff --git a/pkg/texpr/message.go b/pkg/texpr/env.go similarity index 93% rename from pkg/texpr/message.go rename to pkg/texpr/env.go index 6689710c69..c9d3d2c99d 100644 --- a/pkg/texpr/message.go +++ b/pkg/texpr/env.go @@ -27,8 +27,11 @@ type EnvMessageMedia struct { DC int `comment:"DC ID"` } -func ConvertEnvMessage(msg *tg.Message) (m *EnvMessage) { - m = &EnvMessage{} +func ConvertEnvMessage(msg *tg.Message) EnvMessage { + m := EnvMessage{} + if msg == nil { + return m + } m.Mentioned = msg.Mentioned m.Silent = msg.Silent