Skip to content

Commit

Permalink
refactor(tmessage): extract message source parser to tmessage
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Oct 30, 2023
1 parent d3946dd commit e15a721
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 94 deletions.
15 changes: 8 additions & 7 deletions app/dl/dl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/iyear/tdl/pkg/key"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
"github.com/iyear/tdl/pkg/tmessage"
)

type Options struct {
Expand All @@ -45,7 +46,7 @@ type Options struct {

type parser struct {
Data []string
Parser func(ctx context.Context, pool dcpool.Pool, kvd kv.KV, data []string) ([]*dliter.Dialog, error)
Parser tmessage.ParseSource
}

func Run(ctx context.Context, opts *Options) error {
Expand All @@ -59,10 +60,10 @@ func Run(ctx context.Context, opts *Options) error {
defer multierr.AppendInvoke(&rerr, multierr.Close(pool))

parsers := []parser{
{Data: opts.URLs, Parser: parseURLs},
{Data: opts.Files, Parser: parseFiles},
{Data: opts.URLs, Parser: tmessage.FromURL(ctx, pool, kvd, opts.URLs)},
{Data: opts.Files, Parser: tmessage.FromFile(ctx, pool, kvd, opts.Files)},
}
dialogs, err := collectDialogs(ctx, pool, kvd, parsers)
dialogs, err := collectDialogs(parsers)
if err != nil {
return err
}
Expand Down Expand Up @@ -127,10 +128,10 @@ func Run(ctx context.Context, opts *Options) error {
})
}

func collectDialogs(ctx context.Context, pool dcpool.Pool, kvd kv.KV, parsers []parser) ([][]*dliter.Dialog, error) {
var dialogs [][]*dliter.Dialog
func collectDialogs(parsers []parser) ([][]*tmessage.Dialog, error) {
var dialogs [][]*tmessage.Dialog
for _, p := range parsers {
d, err := p.Parser(ctx, pool, kvd, p.Data)
d, err := tmessage.Parse(p.Parser)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions app/dl/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"github.com/gotd/td/tg"
"github.com/spf13/viper"

"github.com/iyear/tdl/app/internal/dliter"
"github.com/iyear/tdl/pkg/consts"
"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/downloader"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/tmedia"
"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
)

Expand All @@ -44,7 +44,7 @@ var tmpl string
func serve(ctx context.Context,
kvd kv.KV,
pool dcpool.Pool,
dialogs [][]*dliter.Dialog,
dialogs [][]*tmessage.Dialog,
port int,
takeout bool,
) error {
Expand Down
48 changes: 0 additions & 48 deletions app/dl/urls.go

This file was deleted.

16 changes: 9 additions & 7 deletions app/internal/dliter/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@ package dliter
import (
"reflect"
"testing"

"github.com/iyear/tdl/pkg/tmessage"
)

func TestPreSum(t *testing.T) {
tests := []struct {
dialogs []*Dialog
dialogs []*tmessage.Dialog
want []int
}{
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}},
dialogs: []*tmessage.Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}},
want: []int{0, 3, 5},
},
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}},
dialogs: []*tmessage.Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}},
want: []int{0, 3, 6, 10},
},
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}, {Messages: []int{1}}},
dialogs: []*tmessage.Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}, {Messages: []int{1}}},
want: []int{0, 3, 6, 10, 11},
},
}
Expand All @@ -34,21 +36,21 @@ func TestPreSum(t *testing.T) {

func TestIter_ij2n(t *testing.T) {
tests := []struct {
dialogs []*Dialog
dialogs []*tmessage.Dialog
input []struct {
i, j int
}
want []int
}{
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}},
dialogs: []*tmessage.Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}},
input: []struct {
i, j int
}{{0, 0}, {0, 1}, {0, 2}, {1, 0}, {1, 1}},
want: []int{0, 1, 2, 3, 4},
},
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}},
dialogs: []*tmessage.Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}},
input: []struct {
i, j int
}{{0, 0}, {0, 1}, {0, 2}, {1, 0}, {1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}, {2, 3}},
Expand Down
11 changes: 3 additions & 8 deletions app/internal/dliter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"text/template"

"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/tg"

"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/tmessage"
)

type Options struct {
Expand All @@ -17,12 +17,12 @@ type Options struct {
Template string
Include, Exclude []string
Desc bool
Dialogs [][]*Dialog
Dialogs [][]*tmessage.Dialog
}

type Iter struct {
pool dcpool.Pool
dialogs []*Dialog
dialogs []*tmessage.Dialog
include, exclude map[string]struct{}
mu sync.Mutex
curi int
Expand All @@ -34,11 +34,6 @@ type Iter struct {
fingerprint string
}

type Dialog struct {
Peer tg.InputPeerClass
Messages []int
}

type fileTemplate struct {
DialogID int64
MessageID int
Expand Down
11 changes: 6 additions & 5 deletions app/internal/dliter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"fmt"
"sort"

"github.com/iyear/tdl/pkg/tmessage"
"github.com/iyear/tdl/pkg/utils"
)

func sortDialogs(dialogs []*Dialog, desc bool) {
func sortDialogs(dialogs []*tmessage.Dialog, desc bool) {
sort.Slice(dialogs, func(i, j int) bool {
return utils.Telegram.GetInputPeerID(dialogs[i].Peer) <
utils.Telegram.GetInputPeerID(dialogs[j].Peer) // increasing order
Expand All @@ -26,7 +27,7 @@ func sortDialogs(dialogs []*Dialog, desc bool) {
}
}

func fingerprint(dialogs []*Dialog) string {
func fingerprint(dialogs []*tmessage.Dialog) string {
endian := binary.BigEndian
buf, b := &bytes.Buffer{}, make([]byte, 8)
for _, m := range dialogs {
Expand All @@ -49,8 +50,8 @@ func filterMap(data []string, keyFn func(key string) string) map[string]struct{}
return m
}

func collectDialogs(dialogs [][]*Dialog) []*Dialog {
res := make([]*Dialog, 0)
func collectDialogs(dialogs [][]*tmessage.Dialog) []*tmessage.Dialog {
res := make([]*tmessage.Dialog, 0)
for _, d := range dialogs {
if len(d) == 0 {
continue
Expand All @@ -61,7 +62,7 @@ func collectDialogs(dialogs [][]*Dialog) []*Dialog {
}

// preSum of dialogs
func preSum(dialogs []*Dialog) []int {
func preSum(dialogs []*tmessage.Dialog) []int {
sum := make([]int, len(dialogs)+1)
for i, m := range dialogs {
sum[i+1] = sum[i] + len(m.Messages)
Expand Down
35 changes: 18 additions & 17 deletions app/dl/files.go → pkg/tmessage/files.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dl
package tmessage

import (
"context"
Expand All @@ -13,7 +13,6 @@ import (
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"

"github.com/iyear/tdl/app/internal/dliter"
"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
Expand All @@ -37,25 +36,27 @@ type fMessage struct {
Text interface{} `mapstructure:"text"`
}

func parseFiles(ctx context.Context, pool dcpool.Pool, kvd kv.KV, files []string) ([]*dliter.Dialog, error) {
dialogs := make([]*dliter.Dialog, 0, len(files))
func FromFile(ctx context.Context, pool dcpool.Pool, kvd kv.KV, files []string) ParseSource {
return func() ([]*Dialog, error) {
dialogs := make([]*Dialog, 0, len(files))

for _, file := range files {
d, err := parseFile(ctx, pool.Default(ctx), kvd, file)
if err != nil {
return nil, err
for _, file := range files {
d, err := parseFile(ctx, pool.Default(ctx), kvd, file)
if err != nil {
return nil, err
}

logger.From(ctx).Debug("Parse file",
zap.String("file", file),
zap.Int("num", len(d.Messages)))
dialogs = append(dialogs, d)
}

logger.From(ctx).Debug("Parse file",
zap.String("file", file),
zap.Int("num", len(d.Messages)))
dialogs = append(dialogs, d)
return dialogs, nil
}

return dialogs, nil
}

func parseFile(ctx context.Context, client *tg.Client, kvd kv.KV, file string) (*dliter.Dialog, error) {
func parseFile(ctx context.Context, client *tg.Client, kvd kv.KV, file string) (*Dialog, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
Expand All @@ -79,10 +80,10 @@ func parseFile(ctx context.Context, client *tg.Client, kvd kv.KV, file string) (
return collect(ctx, f, peer)
}

func collect(ctx context.Context, r io.Reader, peer peers.Peer) (*dliter.Dialog, error) {
func collect(ctx context.Context, r io.Reader, peer peers.Peer) (*Dialog, error) {
d := jstream.NewDecoder(r, 2)

m := &dliter.Dialog{
m := &Dialog{
Peer: peer.InputPeer(),
Messages: make([]int, 0),
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/tmessage/tmessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package tmessage

import (
"github.com/gotd/td/tg"
)

type Dialog struct {
Peer tg.InputPeerClass
Messages []int
}

type ParseSource func() ([]*Dialog, error)

func Parse(src ParseSource) ([]*Dialog, error) {
return src()
}
49 changes: 49 additions & 0 deletions pkg/tmessage/urls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package tmessage

import (
"context"

"github.com/gotd/td/telegram/peers"
"go.uber.org/zap"

"github.com/iyear/tdl/pkg/dcpool"
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/logger"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/utils"
)

func FromURL(ctx context.Context, pool dcpool.Pool, kvd kv.KV, urls []string) ParseSource {
return func() ([]*Dialog, error) {
manager := peers.Options{Storage: storage.NewPeers(kvd)}.
Build(pool.Default(ctx))
msgMap := make(map[int64]*Dialog)

for _, u := range urls {
ch, msgid, err := utils.Telegram.ParseMessageLink(ctx, manager, u)
if err != nil {
return nil, err
}
logger.From(ctx).Debug("Parse URL",
zap.String("url", u),
zap.Int64("peer_id", ch.ID()),
zap.String("peer_name", ch.VisibleName()),
zap.Int("msg", msgid))

// init map value
if _, ok := msgMap[ch.ID()]; !ok {
msgMap[ch.ID()] = &Dialog{Peer: ch.InputPeer(), Messages: []int{}}
}

msgMap[ch.ID()].Messages = append(msgMap[ch.ID()].Messages, msgid)
}

// cap is at least len of map
msgs := make([]*Dialog, 0, len(msgMap))
for _, m := range msgMap {
msgs = append(msgs, m)
}

return msgs, nil
}
}

0 comments on commit e15a721

Please sign in to comment.