Skip to content

Commit 076a188

Browse files
committed
remove dict queue
1 parent fc3666e commit 076a188

File tree

4 files changed

+38
-65
lines changed

4 files changed

+38
-65
lines changed

fastcopy/client.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ def new_channel(self, sock, user, pkey, password, num=1):
177177
tp.use_compression(True)
178178
tp.set_keepalive(60)
179179
tp.connect(username=user, pkey=pkey, password=password)
180-
except paramiko.SSHException:
180+
except paramiko.SSHException as e:
181+
logging.error(e)
181182
tp.stop_thread()
182-
return
183+
return []
183184

184185
try:
185186
conns = [tp]
@@ -232,8 +233,8 @@ def handshake(self, channel, remote_path: str):
232233
send_pkt(channel, conn_pkt)
233234
session_pkt = recv_pkt(channel)
234235
session_id, = session_pkt.unpack_body()
235-
logging.debug(f'[bold cyan]fcp[/bold cyan]: '
236-
f'Channel {channel.get_name()} connected')
236+
logging.info(f'[bold cyan]fcp[/bold cyan]: '
237+
f'Channel {channel.get_name()} connected')
237238

238239
return session_id
239240

@@ -249,8 +250,8 @@ def _connect(wait):
249250
for channel in channels:
250251
send_pkt(channel, attach_pkt)
251252
porter.conn_pool.add(channel)
252-
logging.debug(f'[bold cyan]fcp[/bold cyan]: '
253-
f'Channel {channel.get_name()} connected')
253+
logging.info(f'[bold cyan]fcp[/bold cyan]: '
254+
f'Channel {channel.get_name()} connected')
254255

255256
for i in range(self.n_channel - 1):
256257
Thread(target=_connect, args=(0.5 * i,), daemon=True).start()

fastcopy/utils.py

Lines changed: 28 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import os
22
import re
3-
import sys
43
import logging
54
from binascii import crc32
65
from collections import deque
@@ -14,15 +13,15 @@
1413
from socket import socket, MSG_WAITALL, error as SocketError
1514
from struct import pack, unpack
1615
from threading import Event, Semaphore, Thread
17-
from typing import (Any, Deque, Dict, Generator, Iterable, List, NamedTuple,
16+
from typing import (Any, Deque, Dict, Generator, Iterable, List, NamedTuple, Set,
1817
Tuple, Union)
1918

2019
from rich.progress import (BarColumn, Progress, TaskID, TextColumn,
2120
TransferSpeedColumn)
2221

2322

2423
SERVER_ADDR = ('127.0.0.1', 7523)
25-
CHUNK_SIZE = 8192 # 默认数据块大小 (单位: 字节)
24+
CHUNK_SIZE = 1024 * 16 # 默认数据块大小 (单位: 字节)
2625
TIMEOUT = 60 * 5 # 全局超时时间
2726
LEN_HEAD = 7
2827
MAX_SEQ = 0xffffffff
@@ -57,28 +56,6 @@ def contains(cls, member: object) -> bool:
5756
return member in cls.__members__.values()
5857

5958

60-
class DictQueue(dict):
61-
def __init__(self, size=None):
62-
super().__init__()
63-
self.size = size or float('inf')
64-
self.cur_seq = 0
65-
66-
def put(self, value):
67-
while len(self) >= self.size:
68-
self.pop()
69-
70-
self[self.cur_seq] = value
71-
logging.debug(f'[DictQueue] put {self.cur_seq} to {value} into {id(self)}')
72-
self.cur_seq = (self.cur_seq + 1) & MAX_SEQ
73-
74-
def pop(self, seq=None):
75-
if seq is None:
76-
seq = min(self.keys())
77-
value = super().pop(seq)
78-
logging.debug(f'[DictQueue] pop {seq} as {value} from {id(self)}')
79-
return value
80-
81-
8259
class Packet(NamedTuple):
8360
flag: Flag
8461
body: bytes
@@ -251,53 +228,39 @@ def __init__(self, size=16):
251228
self.send_q = Queue(self.size * 16)
252229
self.recv_q = Queue(self.size * 16)
253230
self.done = Event()
254-
self.connections = {}
231+
self.connections: Set[socket] = set()
255232

256233
def send(self, packet: Packet):
257234
self.send_q.put(packet)
258235

259236
def recv(self, timeout=TIMEOUT) -> Packet:
260237
return self.recv_q.get(timeout)
261238

262-
def _send(self, conn: socket, cache: DictQueue):
239+
def _send(self, conn: socket):
263240
conn_name = f'{id(conn):x}'
264241
while not self.done.is_set():
265242
packet: Packet = self.send_q.get()
266243
try:
267244
send_pkt(conn, packet)
268245
except SocketError as e:
269-
logging.error(e)
270-
sys.exit(1)
271-
272-
logging.debug(f'[Send] conn-{conn_name}/{cache.cur_seq}: {packet}')
273-
cache.put(packet)
246+
conn.close()
247+
logging.error(f'[Send] conn-{conn_name}: {e}, conn exit.')
248+
return
274249

275-
def _recv(self, conn: socket, cache: DictQueue):
276-
n_recv = 0
250+
def _recv(self, conn: socket):
277251
conn_name = f'{id(conn):x}'
278252
while not self.done.is_set():
279253
try:
280254
packet = recv_pkt(conn)
255+
self.recv_q.put(packet)
256+
logging.debug(f'[Recv] conn-{conn_name}: {packet}')
281257
except ConnectionResetError:
258+
conn.close()
259+
return
260+
except PacketError as e:
261+
conn.close()
262+
logging.error(f'[Recv] conn-{conn_name}: {e}, conn exit.')
282263
return
283-
except PacketError:
284-
resend_pkt = Packet.load(Flag.RESEND, n_recv)
285-
send_pkt(conn, resend_pkt)
286-
continue
287-
288-
if packet.flag is Flag.RESEND:
289-
seq, = packet.unpack_body()
290-
if seq not in cache:
291-
seqs = ','.join(f'{k}' for k in cache.keys())
292-
logging.error(f"{seq} not in {seqs}")
293-
sys.exit(1)
294-
pkt = cache[seq]
295-
print(f'get {seq} from {id(cache)}')
296-
self.send(pkt)
297-
else:
298-
logging.debug(f'[Recv] conn-{conn_name}/{cache.cur_seq}: {packet}')
299-
self.recv_q.put(packet)
300-
n_recv = (n_recv + 1) & MAX_SEQ
301264

302265
def add(self, conn: socket):
303266
'''添加一个连接'''
@@ -308,22 +271,31 @@ def add(self, conn: socket):
308271
if conn in self.connections:
309272
return True
310273

311-
cache = DictQueue(256)
312-
t_send = Thread(target=self._send, args=(conn, cache), daemon=True)
274+
t_send = Thread(target=self._send, args=(conn,), daemon=True)
313275
t_send.start()
314276

315-
t_recv = Thread(target=self._recv, args=(conn, cache), daemon=True)
277+
t_recv = Thread(target=self._recv, args=(conn,), daemon=True)
316278
t_recv.start()
317279
return True
318280

281+
def all_closed(self):
282+
for conn in self.connections:
283+
if not conn.closed:
284+
return False
285+
return True
286+
319287
def stop(self):
320288
self.done.set()
321289
for conn in self.connections:
322290
conn.close()
323291

324292
def run(self):
325293
self.done.clear()
326-
self.done.wait()
294+
while not self.done.wait(1):
295+
if not self.connections:
296+
continue
297+
elif self.all_closed():
298+
break
327299
self.stop()
328300

329301

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[metadata]
2-
desciption-file = README.md
2+
desciption_file = README.md
33
[bdist_wheel]
44
universal=1

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
long_description = f_readme.read()
55

66
setuptools.setup(
7-
name="fast-copy",
8-
version="0.1.1",
7+
name="fastcopy",
8+
version="0.1.3",
99
python_requires=">=3.6",
1010
author="Seamile",
1111
author_email="[email protected]",

0 commit comments

Comments
 (0)