Skip to content

Commit

Permalink
network module add submodule proxy.TransparentProxy
Browse files Browse the repository at this point in the history
  • Loading branch information
amaork committed Jun 25, 2022
1 parent b506e42 commit a9a3b08
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class DynamicObject(object):
def __init__(self, **kwargs):
try:
for key in self._properties:
if kwargs.get(key) is None:
if key not in kwargs:
raise KeyError("do not found key:{!r}".format(key))

self.__dict__.update(**kwargs)
Expand Down
41 changes: 41 additions & 0 deletions demos/transparent_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
import sys
import time
import getopt
from ..core.datatype import str2float, str2number
from ..network.proxy import ProxyAttribute, TransparentProxy


if __name__ == '__main__':
try:
attr = ProxyAttribute(address='', port=0)
opts, args = getopt.getopt(sys.argv[1:], '', [f'{x}=' for x in ProxyAttribute.properties()])

for option, argument in opts:
option = option[2:]
if option in ProxyAttribute.properties():
if option in ('port', 'recv_buf_size'):
argument = str2number(argument)

if option in ('timeout',):
argument = str2float(argument)

attr.update({option: argument})

if not attr.address:
raise getopt.GetoptError('must specified an address')

if not attr.port:
raise getopt.GetoptError('must specified a port')

print(f'Start transparent proxy: {attr}')
proxy = TransparentProxy(attribute=attr)
while True:
time.sleep(1)
except getopt.GetoptError as e:
print(f'Get option error: {e}\n')
print(f'{sys.argv[0]} usage:\n'
f'\t{"--address":15}set server address\n'
f'\t{"--port":15}set server port\n'
f'\t{"--timeout":15}set timeout\n')
sys.exit()
97 changes: 97 additions & 0 deletions network/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
import time
import socket
import selectors
import threading
import collections
from ..core.datatype import DynamicObject, ip4_check, port_check
from .utility import create_socket_and_connect, tcp_socket_send_data
__all__ = ['SocketPair', 'ProxyAttribute', 'TransparentProxy']
SocketPair = collections.namedtuple('SocketPair', 'server client')


class ProxyAttribute(DynamicObject):
_properties = {'address', 'port', 'timeout', 'recv_buf_size'}
_check = {
'address': ip4_check,
'port': port_check
}

def __init__(self, **kwargs):
kwargs.setdefault('timeout', None)
kwargs.setdefault('recv_buf_size', 1024)
super(ProxyAttribute, self).__init__(**kwargs)


class TransparentProxy:
def __init__(self, attribute: ProxyAttribute):
self._clients = list()
self.attribute = attribute
self._selector = selectors.DefaultSelector()
threading.Thread(target=self.threadEventHandle, daemon=True).start()
threading.Thread(target=self.threadAcceptHandle, daemon=True).start()

def register(self, sock_pair: SocketPair):
self._clients.append(sock_pair)
for sock in sock_pair:
self._selector.register(sock, selectors.EVENT_READ)

def unregister(self, sock_pair: SocketPair):
self._clients.remove(sock_pair)
for sock in sock_pair:
self._selector.unregister(sock)
sock.close()

def relay(self, rx: socket.socket, tx: socket.socket):
"""Real work recv client/server data send to other side if channel"""
try:
payload = rx.recv(self.attribute.recv_buf_size)
except socket.timeout:
return
except (BrokenPipeError, ConnectionResetError) as e:
print(rx.getsockname(), f'{e}')
payload = b''

if not payload:
print('Closed')
self.unregister(SocketPair(tx, rx))
return

tcp_socket_send_data(tx, payload)

def threadEventHandle(self):
"""Event handle, watch clients socket readable events and handle it"""
while True:
if not self._selector.get_map():
time.sleep(0.01)
continue

for key, mask in self._selector.select():
# Find out which pair of client
for sock_pair in self._clients:
if key.fileobj in sock_pair:
rx, tx = sock_pair if key.fileobj == sock_pair[0] else sock_pair[::-1]
self.relay(rx, tx)

def threadAcceptHandle(self):
"""Create transparent proxy server, accept client connect, make a channel between real server and client"""
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
listen_socket.bind(('', self.attribute.port))
listen_socket.listen(5)

while True:
print('Before accept')
client = listen_socket.accept()[0]
print(f'Client: {client.getpeername()}')

try:
# Connect server immediately
server = create_socket_and_connect(**self.attribute.dict)
print(f'Server: {server.getpeername()}')
except RuntimeError:
# Connect failed close client too
client.close()
continue

# Register to proxy handle list
self.register(SocketPair(server=server, client=client))
16 changes: 13 additions & 3 deletions network/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,29 @@ def tcp_socket_send_data(tcp_socket: socket.socket, data: bytes) -> List[int]:


def tcp_socket_recv_data(tcp_socket: socket.socket, length: int) -> bytes:
"""Receive #length specified bytes data until to timeout or peer closed (raise BrokenPipeError)
:param tcp_socket: tcp socket to receive data
:param length: receive data length
:return: receive data
"""
recv_data = bytes()

while len(recv_data) < length:
data = tcp_socket.recv(length - len(recv_data))
try:
data = tcp_socket.recv(length - len(recv_data))
except socket.timeout:
return recv_data

if not data:
raise ConnectionError('recv timeout')
raise BrokenPipeError('peer closed')

recv_data += data

return recv_data


def create_socket_and_connect(address: str, port: int, timeout: float,
def create_socket_and_connect(address: str, port: int, timeout: Union[float, None] = None,
recv_buf_size: int = 32 * 1024, retry: int = 3, no_delay: bool = True) -> socket.socket:
times = 0
while times < retry:
Expand Down

0 comments on commit a9a3b08

Please sign in to comment.