Skip to content

Commit

Permalink
feat(forward): support message forward router
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Nov 14, 2023
1 parent 51c52ec commit 9f9f7a8
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 16 deletions.
49 changes: 43 additions & 6 deletions app/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package forward

import (
"context"
"fmt"
"os"
"reflect"
"strings"

"github.com/fatih/color"
"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/go-faster/errors"
"github.com/gotd/contrib/middleware/floodwait"
"github.com/gotd/td/telegram/peers"
Expand All @@ -19,6 +23,7 @@ import (
"github.com/iyear/tdl/pkg/forwarder"
"github.com/iyear/tdl/pkg/prog"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/texpr"
"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
)
Expand All @@ -31,6 +36,18 @@ type Options struct {
}

func Run(ctx context.Context, opts Options) error {
if opts.To == "-" {
fg := texpr.NewFieldsGetter(nil)

fields, err := fg.Walk(exprEnv(nil, nil))
if err != nil {
return fmt.Errorf("failed to walk fields: %w", err)
}

fmt.Print(fg.Sprint(fields, true))
return nil
}

c, kvd, err := tgc.NoLogin(ctx)
if err != nil {
return err
Expand All @@ -50,18 +67,16 @@ func Run(ctx context.Context, opts Options) error {

manager := peers.Options{Storage: storage.NewPeers(kvd)}.Build(pool.Default(ctx))

peerTo, err := utils.Telegram.GetInputPeer(ctx, manager, opts.To)
to, err := resolveDestPeer(ctx, manager, opts.To)
if err != nil {
return errors.Wrap(err, "resolve dest peer")
}

color.Green("All messages will be forwarded to %s(%d)", peerTo.VisibleName(), peerTo.ID())

fwProgress := prog.New(pw.FormatNumber)

fw := forwarder.New(forwarder.Options{
Pool: pool,
Iter: newIter(manager, pool, peerTo, dialogs),
Iter: newIter(manager, pool, to, dialogs),
Silent: opts.Silent,
Mode: opts.Mode,
Progress: newProgress(fwProgress),
Expand Down Expand Up @@ -90,7 +105,7 @@ func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, er
return nil, errors.Wrap(err, "parse from url")
}
default:
d, err = tmessage.Parse(tmessage.FromFile(ctx, tctx.Pool(ctx), tctx.KV(ctx), []string{p}))
d, err = tmessage.Parse(tmessage.FromFile(ctx, tctx.Pool(ctx), tctx.KV(ctx), []string{p}, false))
if err != nil {
return nil, errors.Wrap(err, "parse from file")
}
Expand All @@ -101,3 +116,25 @@ func collectDialogs(ctx context.Context, input []string) ([]*tmessage.Dialog, er

return dialogs, nil
}

// resolveDestPeer parses the input string and returns a vm.Program. It can be a CHAT, a text or a file based on expression engine.
func resolveDestPeer(ctx context.Context, manager *peers.Manager, input string) (*vm.Program, error) {
compile := func(i string) (*vm.Program, error) {
// we pass empty peer and message to enable type checking
return expr.Compile(i, expr.AsKind(reflect.String), expr.Env(exprEnv(nil, nil)))
}

// file
if exp, err := os.ReadFile(input); err == nil {
return compile(string(exp))
}

// chat
if _, err := utils.Telegram.GetInputPeer(ctx, manager, input); err == nil {
// convert to const string
return compile(fmt.Sprintf(`"%s"`, input))
}

// text
return compile(input)
}
69 changes: 61 additions & 8 deletions app/forward/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,54 @@ package forward
import (
"context"

"github.com/antonmedv/expr/vm"
"github.com/go-faster/errors"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"

"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/forwarder"
"github.com/iyear/tdl/pkg/texpr"
"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
)

type iter struct {
manager *peers.Manager
pool dcpool.Pool
to peers.Peer
to *vm.Program
dialogs []*tmessage.Dialog
i, j int
elem *forwarder.Elem
err error
}

func newIter(manager *peers.Manager, pool dcpool.Pool, to peers.Peer, dialogs []*tmessage.Dialog) *iter {
type env struct {
From struct {
ID int64 `comment:"ID of dialog"`
Username string `comment:"Username of dialog"`
VisibleName string `comment:"Title of channel and group, first and last name of user"`
}
Message texpr.EnvMessage
}

func exprEnv(from peers.Peer, msg *tg.Message) env {
e := env{}

if from != nil {
e.From.ID = from.ID()
e.From.Username, _ = from.Username()
e.From.VisibleName = from.VisibleName()
}

if msg != nil {
e.Message = texpr.ConvertEnvMessage(msg)
}

return e
}

func newIter(manager *peers.Manager, pool dcpool.Pool, to *vm.Program, dialogs []*tmessage.Dialog) *iter {
return &iter{
manager: manager,
pool: pool,
Expand All @@ -39,7 +67,8 @@ func (i *iter) Next(ctx context.Context) bool {
default:
}

if i.i >= len(i.dialogs) || i.j >= len(i.dialogs[i.i].Messages) || i.err != nil {
// end of iteration or error occurred
if i.i >= len(i.dialogs) || i.err != nil {
return false
}

Expand All @@ -50,21 +79,45 @@ func (i *iter) Next(ctx context.Context) bool {
i.j = 0
}

peer, err := i.manager.FromInputPeer(ctx, p)
from, err := i.manager.FromInputPeer(ctx, p)
if err != nil {
i.err = errors.Wrap(err, "get peer")
i.err = errors.Wrap(err, "get from peer")
return false
}

msg, err := utils.Telegram.GetSingleMessage(ctx, i.pool.Default(ctx), peer.InputPeer(), m)
msg, err := utils.Telegram.GetSingleMessage(ctx, i.pool.Default(ctx), from.InputPeer(), m)
if err != nil {
i.err = errors.Wrapf(err, "get message: %d", m)
return false
}

// message routing
result, err := texpr.Run(i.to, exprEnv(from, msg))
if err != nil {
i.err = errors.Wrap(err, "message routing")
return false
}
destPeer, ok := result.(string)
if !ok {
i.err = errors.Errorf("message router must return string: %T", result)
return false
}

var to peers.Peer
if destPeer == "" { // self
to, err = i.manager.Self(ctx)
} else {
to, err = utils.Telegram.GetInputPeer(ctx, i.manager, destPeer)
}

if err != nil {
i.err = errors.Wrapf(err, "resolve dest peer: %s", destPeer)
return false
}

i.elem = &forwarder.Elem{
From: peer,
To: i.to,
From: from,
To: to,
Msg: msg,
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/texpr/message.go → pkg/texpr/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ type EnvMessageMedia struct {
DC int `comment:"DC ID"`
}

func ConvertEnvMessage(msg *tg.Message) (m *EnvMessage) {
m = &EnvMessage{}
func ConvertEnvMessage(msg *tg.Message) EnvMessage {
m := EnvMessage{}
if msg == nil {
return m
}

m.Mentioned = msg.Mentioned
m.Silent = msg.Silent
Expand Down

0 comments on commit 9f9f7a8

Please sign in to comment.