Skip to content

Commit

Permalink
add -c -n params for fft to set frame size and cache count
Browse files Browse the repository at this point in the history
  • Loading branch information
fatedier committed Mar 18, 2019
1 parent 1d70eff commit f3a8ff0
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 43 deletions.
3 changes: 2 additions & 1 deletion client/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (svc *Service) recvFile(id string, filePath string) error {
defer conn.Close()

msg.WriteMsg(conn, &msg.ReceiveFile{
ID: id,
ID: id,
CacheCount: int64(svc.cacheCount),
})

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
Expand Down
10 changes: 6 additions & 4 deletions client/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func (svc *Service) sendFile(id string, filePath string) error {
}

msg.WriteMsg(conn, &msg.SendFile{
ID: id,
Name: finfo.Name(),
Fsize: finfo.Size(),
ID: id,
Name: finfo.Name(),
Fsize: finfo.Size(),
CacheCount: int64(svc.cacheCount),
})

fmt.Printf("Wait receiver...\n")
Expand All @@ -61,6 +62,7 @@ func (svc *Service) sendFile(id string, filePath string) error {
if len(m.Workers) == 0 {
return fmt.Errorf("no available workers")
}
svc.cacheCount = int(m.CacheCount)
fmt.Printf("ID: %s\n", m.ID)
if svc.debugMode {
fmt.Printf("Workers: %v\n", m.Workers)
Expand All @@ -79,7 +81,7 @@ func (svc *Service) sendFile(id string, filePath string) error {
bar.Add(n)
}

s, err := sender.NewSender(0, fio.NewCallbackReader(f, callback), 5*1024, 500)
s, err := sender.NewSender(0, fio.NewCallbackReader(f, callback), svc.frameSize, svc.cacheCount)
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ type Options struct {
ServerAddr string
ID string
SendFile string
FrameSize int
CacheCount int
RecvFile string
DebugMode bool
}
Expand All @@ -16,12 +18,24 @@ func (op *Options) Check() error {
if op.SendFile == "" && op.RecvFile == "" {
return fmt.Errorf("send_file or recv_file is required")
}

if op.SendFile != "" {
if op.FrameSize <= 0 {
return fmt.Errorf("frame_size should be greater than 0")
}
}

if op.CacheCount <= 0 {
return fmt.Errorf("cache_count should be greater than 0")
}
return nil
}

type Service struct {
debugMode bool
serverAddr string
frameSize int
cacheCount int

runHandler func() error
}
Expand All @@ -34,6 +48,8 @@ func NewService(options Options) (*Service, error) {
svc := &Service{
debugMode: options.DebugMode,
serverAddr: options.ServerAddr,
frameSize: options.FrameSize,
cacheCount: options.CacheCount,
}

if options.SendFile != "" {
Expand Down
2 changes: 2 additions & 0 deletions cmd/fft/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&options.ServerAddr, "server_addr", "s", version.DefaultServerAddr(), "remote fft server address")
rootCmd.PersistentFlags().StringVarP(&options.ID, "id", "i", "", "specify a special id to transfer file")
rootCmd.PersistentFlags().StringVarP(&options.SendFile, "send_file", "l", "", "specify which file to send to another client")
rootCmd.PersistentFlags().IntVarP(&options.FrameSize, "frame_size", "n", 5*1024, "each frame size, it's only for sender, default(5*1024 B)")
rootCmd.PersistentFlags().IntVarP(&options.CacheCount, "cache_count", "c", 512, "how many frames be cached, it will be set to the min value between sender and receiver")
rootCmd.PersistentFlags().StringVarP(&options.RecvFile, "recv_file", "t", "", "specify local file path to store received file")
rootCmd.PersistentFlags().BoolVarP(&options.DebugMode, "debug", "g", false, "print more debug info")
}
Expand Down
26 changes: 15 additions & 11 deletions pkg/msg/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,30 @@ type RegisterWorkerResp struct {
}

type SendFile struct {
ID string `json:"id"`
Fsize int64 `json:"fsize"`
Name string `json:"name"`
ID string `json:"id"`
Fsize int64 `json:"fsize"`
Name string `json:"name"`
CacheCount int64 `json:"cache_count"`
}

type SendFileResp struct {
ID string `json:"id"`
Workers []string `json:"workers"`
Error string `json:"error"`
ID string `json:"id"`
Workers []string `json:"workers"`
CacheCount int64 `json:"cache_count"`
Error string `json:"error"`
}

type ReceiveFile struct {
ID string `json:"id"`
ID string `json:"id"`
CacheCount int64 `json:"cache_count"`
}

type ReceiveFileResp struct {
Name string `json:"name"`
Fsize int64 `json:"fsize"`
Workers []string `json:"workers"`
Error string `json:"error"`
Name string `json:"name"`
Fsize int64 `json:"fsize"`
Workers []string `json:"workers"`
CacheCount int64 `json:"cache_count"`
Error string `json:"error"`
}

type NewSendFileStream struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ func (sender *Sender) HandleStream(s *stream.FrameStream) {
sender.mu.Unlock()

id := atomic.AddUint32(&sender.count, 1)
tr := NewTransfer(int(id), 100, s, sender.frameCh, sender.ackCh)
trBufferCount := sender.maxBufferCount / 2
if trBufferCount <= 0 {
trBufferCount = 1
}
tr := NewTransfer(int(id), trBufferCount, s, sender.frameCh, sender.ackCh)

// block until transfer exit
noAckFrames := tr.Run()
Expand Down
40 changes: 24 additions & 16 deletions server/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,37 @@ import (
)

type SendConn struct {
id string
conn net.Conn
filename string
fsize int64
id string
conn net.Conn
filename string
fsize int64
cacheCount int64

recvConnCh chan *RecvConn
}

func NewSendConn(id string, conn net.Conn, filename string, fsize int64) *SendConn {
func NewSendConn(id string, conn net.Conn, filename string, fsize int64, cacheCount int64) *SendConn {
return &SendConn{
id: id,
conn: conn,
filename: filename,
fsize: fsize,
cacheCount: cacheCount,
recvConnCh: make(chan *RecvConn),
}
}

type RecvConn struct {
id string
conn net.Conn
id string
conn net.Conn
cacheCount int64
}

func NewRecvConn(id string, conn net.Conn) *RecvConn {
func NewRecvConn(id string, conn net.Conn, cacheCount int64) *RecvConn {
return &RecvConn{
id: id,
conn: conn,
id: id,
conn: conn,
cacheCount: cacheCount,
}
}

Expand All @@ -51,29 +55,32 @@ func NewMatchController() *MatchController {
}

// block until there is a same ID recv conn or timeout
func (mc *MatchController) DealSendConn(sc *SendConn, timeout time.Duration) error {
func (mc *MatchController) DealSendConn(sc *SendConn, timeout time.Duration) (cacheCount int64, err error) {
mc.mu.Lock()
if _, ok := mc.senders[sc.id]; ok {
mc.mu.Unlock()
return fmt.Errorf("id is repeated")
err = fmt.Errorf("id is repeated")
return
}
mc.senders[sc.id] = sc
mc.mu.Unlock()

select {
case <-sc.recvConnCh:
case rc := <-sc.recvConnCh:
cacheCount = rc.cacheCount
case <-time.After(timeout):
mc.mu.Lock()
if tmp, ok := mc.senders[sc.id]; ok && tmp == sc {
delete(mc.senders, sc.id)
}
mc.mu.Unlock()
return fmt.Errorf("timeout waiting recv conn")
err = fmt.Errorf("timeout waiting recv conn")
return
}
return nil
return
}

func (mc *MatchController) DealRecvConn(rc *RecvConn) (filename string, fsize int64, err error) {
func (mc *MatchController) DealRecvConn(rc *RecvConn) (filename string, fsize int64, cacheCount int64, err error) {
mc.mu.Lock()
sc, ok := mc.senders[rc.id]
if ok {
Expand All @@ -87,6 +94,7 @@ func (mc *MatchController) DealRecvConn(rc *RecvConn) (filename string, fsize in
}
filename = sc.filename
fsize = sc.fsize
cacheCount = sc.cacheCount

select {
case sc.recvConnCh <- rc:
Expand Down
22 changes: 12 additions & 10 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (svc *Service) Run() error {
// Debug ========
go func() {
for {
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
log.Info("worker addrs: %v", svc.workerGroup.GetAvailableWorkerAddrs())
}
}()
Expand Down Expand Up @@ -139,16 +139,17 @@ func (svc *Service) handleSendFile(conn net.Conn, m *msg.SendFile) error {
}
log.Debug("new SendFile id [%s], filename [%s] size [%d]", m.ID, m.Name, m.Fsize)

sc := NewSendConn(m.ID, conn, m.Name, m.Fsize)
err := svc.matchController.DealSendConn(sc, 60*time.Second)
sc := NewSendConn(m.ID, conn, m.Name, m.Fsize, m.CacheCount)
cacheCount, err := svc.matchController.DealSendConn(sc, 120*time.Second)
if err != nil {
log.Warn("deal send conn error: %v", err)
return err
}

msg.WriteMsg(conn, &msg.SendFileResp{
ID: m.ID,
Workers: svc.workerGroup.GetAvailableWorkerAddrs(),
ID: m.ID,
Workers: svc.workerGroup.GetAvailableWorkerAddrs(),
CacheCount: cacheCount,
})
return nil
}
Expand All @@ -159,17 +160,18 @@ func (svc *Service) handleRecvFile(conn net.Conn, m *msg.ReceiveFile) error {
}
log.Debug("new ReceiveFile id [%s]", m.ID)

rc := NewRecvConn(m.ID, conn)
filename, fsize, err := svc.matchController.DealRecvConn(rc)
rc := NewRecvConn(m.ID, conn, m.CacheCount)
filename, fsize, cacheCount, err := svc.matchController.DealRecvConn(rc)
if err != nil {
log.Warn("deal recv conn error: %v", err)
return err
}

msg.WriteMsg(conn, &msg.ReceiveFileResp{
Name: filename,
Fsize: fsize,
Workers: svc.workerGroup.GetAvailableWorkerAddrs(),
Name: filename,
Fsize: fsize,
Workers: svc.workerGroup.GetAvailableWorkerAddrs(),
CacheCount: cacheCount,
})
return nil
}

0 comments on commit f3a8ff0

Please sign in to comment.