Skip to content

Commit 6769310

Browse files
committed
Refactor: request peer info for more efficient way
1 parent c3e1530 commit 6769310

File tree

1 file changed

+26
-10
lines changed

1 file changed

+26
-10
lines changed

src/aleph/services/p2p/jobs.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import asyncio
22
import logging
33
from dataclasses import dataclass
4-
from typing import Optional
4+
from typing import Optional, Sequence
55

6+
import aiohttp
67
from aleph_p2p_client import AlephP2PServiceClient
78
from configmanager import Config
89

@@ -55,9 +56,13 @@ async def reconnect_p2p_job(
5556
await asyncio.sleep(config.p2p.reconnect_delay.value)
5657

5758

58-
async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus:
59+
async def check_peer(
60+
session: aiohttp.ClientSession, peer_uri: str, timeout: int = 1
61+
) -> PeerStatus:
5962
try:
60-
version_info = await api_get_request(peer_uri, "version", timeout=timeout)
63+
version_info = await api_get_request(
64+
session, peer_uri, "version", timeout=timeout
65+
)
6166
if version_info is not None:
6267
return PeerStatus(peer_uri=peer_uri, is_online=True, version=version_info)
6368

@@ -67,6 +72,23 @@ async def check_peer(peer_uri: str, timeout: int = 1) -> PeerStatus:
6772
return PeerStatus(peer_uri=peer_uri, is_online=False, version=None)
6873

6974

75+
async def request_version(peers: Sequence[str], my_ip: str, timeout: int = 1):
76+
jobs = []
77+
connector = aiohttp.TCPConnector(limit_per_host=5)
78+
timeout_conf = aiohttp.ClientTimeout(total=timeout)
79+
80+
async with aiohttp.ClientSession(
81+
connector=connector, timeout=timeout_conf
82+
) as session:
83+
for peer in peers:
84+
if my_ip in peer:
85+
continue
86+
87+
jobs.append(check_peer(session, peer))
88+
89+
return await asyncio.gather(*jobs)
90+
91+
7092
async def tidy_http_peers_job(
7193
config: Config, session_factory: AsyncDbSessionFactory, node_cache: NodeCache
7294
) -> None:
@@ -77,20 +99,14 @@ async def tidy_http_peers_job(
7799
await asyncio.sleep(2)
78100

79101
while True:
80-
jobs = []
81102

82103
try:
83104
async with session_factory() as session:
84105
peers = await get_all_addresses_by_peer_type(
85106
session=session, peer_type=PeerType.HTTP
86107
)
87108

88-
for peer in peers:
89-
if my_ip in peer:
90-
continue
91-
92-
jobs.append(check_peer(peer))
93-
peer_statuses = await asyncio.gather(*jobs)
109+
peer_statuses = await request_version(peers=peers, my_ip=my_ip)
94110

95111
for peer_status in peer_statuses:
96112
peer_in_api_servers = await node_cache.has_api_server(

0 commit comments

Comments
 (0)