Skip to content

Commit

Permalink
Merge pull request #5 from fatedier/dev
Browse files Browse the repository at this point in the history
bump version to v0.1.0
  • Loading branch information
fatedier authored Mar 18, 2019
2 parents b18901e + 634ffc2 commit eda34ec
Show file tree
Hide file tree
Showing 423 changed files with 230,691 additions and 366 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### v0.1.0

- Support upload and download process bar.
- Enable TLS between all components.
- Support ACK message and retry missing data.
- Add `-n` to set frame size of sender. Add `-c` to set cache frame count.

### v0.0.1

- Init.
10 changes: 10 additions & 0 deletions CHANGELOG_zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### v0.1.0

- 支持上传下载进度条显示。
- 所有组件启用 TLS 加密传输。
- 发送方和接收方支持 ACK 确认消息,支持重传。
- 发送方和接收方支持设置传输帧和缓冲区大小。

### v0.0.1

- 初始可用版本。
141 changes: 94 additions & 47 deletions client/recv.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package client

import (
"crypto/tls"
"fmt"
"net"
"os"
"path/filepath"
"sync"
"time"

fio "github.com/fatedier/fft/pkg/io"
"github.com/fatedier/fft/pkg/msg"
"github.com/fatedier/fft/pkg/receiver"
"github.com/fatedier/fft/pkg/stream"

"github.com/cheggaaa/pb"
)

func (svc *Service) recvFile(id string, filePath string) error {
Expand All @@ -23,10 +28,12 @@ func (svc *Service) recvFile(id string, filePath string) error {
if err != nil {
return err
}
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
defer conn.Close()

msg.WriteMsg(conn, &msg.ReceiveFile{
ID: id,
ID: id,
CacheCount: int64(svc.cacheCount),
})

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
Expand All @@ -47,7 +54,8 @@ func (svc *Service) recvFile(id string, filePath string) error {
if len(m.Workers) == 0 {
return fmt.Errorf("no available workers")
}
fmt.Printf("Recv filename: %s\n", m.Name)

fmt.Printf("Recv filename: %s Size: %s\n", m.Name, pb.Format(m.Fsize).To(pb.U_BYTES).String())
if svc.debugMode {
fmt.Printf("Workers: %v\n", m.Workers)
}
Expand All @@ -60,64 +68,103 @@ func (svc *Service) recvFile(id string, filePath string) error {
if err != nil {
return err
}
defer f.Close()

var wait sync.WaitGroup
count := m.Fsize
bar := pb.New(int(count))
bar.ShowSpeed = true
bar.SetUnits(pb.U_BYTES)

if !svc.debugMode {
bar.Start()
}

callback := func(n int) {
bar.Add(n)
}

recv := receiver.NewReceiver(0, f)
recv := receiver.NewReceiver(0, fio.NewCallbackWriter(f, callback))
for _, worker := range m.Workers {
addr := worker
go newRecvStream(recv, id, addr, svc.debugMode)
wait.Add(1)
go func(addr string) {
newRecvStream(recv, id, addr, svc.debugMode)
wait.Done()
}(worker)
}

recvDoneCh := make(chan struct{})
streamCloseCh := make(chan struct{})
go func() {
recv.Run()
close(recvDoneCh)
}()
go func() {
wait.Wait()
close(streamCloseCh)
}()

select {
case <-recvDoneCh:
case <-streamCloseCh:
select {
case <-recvDoneCh:
case <-time.After(2 * time.Second):
}
}

if !svc.debugMode {
bar.Finish()
}
recv.Run()
return nil
}

func newRecvStream(recv *receiver.Receiver, id string, addr string, debugMode bool) {
first := true
for {
if !first {
time.Sleep(3 * time.Second)
} else {
first = false
}
conn, err := net.Dial("tcp", addr)
if err != nil {
log(debugMode, "[%s] %v", addr, err)
return
}
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})

msg.WriteMsg(conn, &msg.NewReceiveFileStream{
ID: id,
})

conn, err := net.Dial("tcp", addr)
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
conn.Close()
log(debugMode, "[%s] %v", addr, err)
return
}
conn.SetReadDeadline(time.Time{})
m, ok := raw.(*msg.NewReceiveFileStreamResp)
if !ok {
conn.Close()
log(debugMode, "[%s] read NewReceiveFileStreamResp format error", addr)
return
}

if m.Error != "" {
conn.Close()
log(debugMode, "[%s] new recv file stream error: %s", addr, m.Error)
return
}

s := stream.NewFrameStream(conn)
for {
frame, err := s.ReadFrame()
if err != nil {
log(debugMode, "[%s] %v", addr, err)
return
}

msg.WriteMsg(conn, &msg.NewReceiveFileStream{
ID: id,
recv.RecvFrame(frame)
err = s.WriteAck(&stream.Ack{
FileID: frame.FileID,
FrameID: frame.FrameID,
})

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
conn.Close()
log(debugMode, "[%s] %v", addr, err)
continue
}
conn.SetReadDeadline(time.Time{})
m, ok := raw.(*msg.NewReceiveFileStreamResp)
if !ok {
conn.Close()
log(debugMode, "[%s] read NewReceiveFileStreamResp format error", addr)
continue
}

if m.Error != "" {
conn.Close()
log(debugMode, "[%s] new recv file stream error: %s", addr, m.Error)
continue
}
fmt.Printf("connect to worker [%s] success\n", addr)

s := stream.NewFrameStream(conn)
for {
frame, err := s.ReadFrame()
if err != nil {
return
}
recv.RecvFrame(frame)
return
}
}
}
122 changes: 66 additions & 56 deletions client/send.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package client

import (
"crypto/tls"
"fmt"
"net"
"os"
"sync"
"time"

fio "github.com/fatedier/fft/pkg/io"
"github.com/fatedier/fft/pkg/msg"
"github.com/fatedier/fft/pkg/sender"
"github.com/fatedier/fft/pkg/stream"

"github.com/cheggaaa/pb"
)

func (svc *Service) sendFile(id string, filePath string) error {
conn, err := net.Dial("tcp", svc.serverAddr)
if err != nil {
return err
}
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})
defer conn.Close()

f, err := os.Open(filePath)
Expand All @@ -34,10 +39,13 @@ func (svc *Service) sendFile(id string, filePath string) error {
}

msg.WriteMsg(conn, &msg.SendFile{
ID: id,
Name: finfo.Name(),
ID: id,
Name: finfo.Name(),
Fsize: finfo.Size(),
CacheCount: int64(svc.cacheCount),
})

fmt.Printf("Wait receiver...\n")
conn.SetReadDeadline(time.Now().Add(120 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
Expand All @@ -56,76 +64,78 @@ func (svc *Service) sendFile(id string, filePath string) error {
if len(m.Workers) == 0 {
return fmt.Errorf("no available workers")
}
svc.cacheCount = int(m.CacheCount)
fmt.Printf("ID: %s\n", m.ID)
if svc.debugMode {
fmt.Printf("Workers: %v\n", m.Workers)
}

var wait sync.WaitGroup
doneCh := make(chan struct{})
s := sender.NewSender(0, f)
count := finfo.Size()
bar := pb.New(int(count))
bar.ShowSpeed = true
bar.SetUnits(pb.U_BYTES)
if !svc.debugMode {
bar.Start()
}

callback := func(n int) {
bar.Add(n)
}

s, err := sender.NewSender(0, fio.NewCallbackReader(f, callback), svc.frameSize, svc.cacheCount)
if err != nil {
return err
}

for _, worker := range m.Workers {
wait.Add(1)
go func(addr string) {
newSendStream(doneCh, s, m.ID, addr, svc.debugMode)
newSendStream(s, m.ID, addr, svc.debugMode)
wait.Done()
}(worker)
}
s.Run()
close(doneCh)
go s.Run()
wait.Wait()

if !svc.debugMode {
bar.Finish()
}
return nil
}

func newSendStream(doneCh chan struct{}, s *sender.Sender, id string, addr string, debugMode bool) {
first := true
for {
select {
case <-doneCh:
return
default:
}

if !first {
time.Sleep(3 * time.Second)
} else {
first = false
}

conn, err := net.Dial("tcp", addr)
if err != nil {
log(debugMode, "[%s] %v", addr, err)
continue
}

msg.WriteMsg(conn, &msg.NewSendFileStream{
ID: id,
})

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
conn.Close()
log(debugMode, "[%s] %v", addr, err)
continue
}
conn.SetReadDeadline(time.Time{})
m, ok := raw.(*msg.NewSendFileStreamResp)
if !ok {
conn.Close()
log(debugMode, "[%s] read NewSendFileStreamResp format error", addr)
continue
}

if m.Error != "" {
conn.Close()
log(debugMode, "[%s] new send file stream error: %s", addr, m.Error)
continue
}
fmt.Printf("connect to worker [%s] success\n", addr)

s.HandleStream(stream.NewFrameStream(conn))
break
func newSendStream(s *sender.Sender, id string, addr string, debugMode bool) {
conn, err := net.Dial("tcp", addr)
if err != nil {
log(debugMode, "[%s] %v", addr, err)
return
}
conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true})

msg.WriteMsg(conn, &msg.NewSendFileStream{
ID: id,
})

conn.SetReadDeadline(time.Now().Add(10 * time.Second))
raw, err := msg.ReadMsg(conn)
if err != nil {
conn.Close()
log(debugMode, "[%s] %v", addr, err)
return
}
conn.SetReadDeadline(time.Time{})
m, ok := raw.(*msg.NewSendFileStreamResp)
if !ok {
conn.Close()
log(debugMode, "[%s] read NewSendFileStreamResp format error", addr)
return
}

if m.Error != "" {
conn.Close()
log(debugMode, "[%s] new send file stream error: %s", addr, m.Error)
return
}

s.HandleStream(stream.NewFrameStream(conn))
}
Loading

0 comments on commit eda34ec

Please sign in to comment.