Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion scripts/scan-txlog/scan_txlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async def balance(self, party, round_number):
params=params,
max_retries=30,
delay_seconds=0.5,
statuses={404, 429},
statuses={404, 429, 500, 503},
)
return (party, DamlDecimal(json["wallet_balance"]))

Expand Down Expand Up @@ -4310,6 +4310,11 @@ async def main():
global file_handler, LOG
args = _parse_cli_args()

# Ensure log directory exists
log_directory = os.path.dirname(args.log_file_path)
if log_directory and not os.path.exists(log_directory):
os.makedirs(log_directory)

LOG = _setup_logger("global", args.loglevel.upper(), args.log_file_path)
_log_uncaught_exceptions()

Expand Down
87 changes: 74 additions & 13 deletions scripts/scan-txlog/unclaimed_sv_rewards.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,28 @@
getcontext().prec = 38
getcontext().rounding = ROUND_HALF_EVEN

# Ensure log directory exists before logger initialization
log_directory = "log"
if not os.path.exists(log_directory):
os.makedirs(log_directory)

def _default_logger(name, loglevel):
cli_handler = colorlog.StreamHandler()
cli_handler.setFormatter(
colorlog.ColoredFormatter(
"%(log_color)s%(levelname)s:%(name)s:%(message)s",
"%(log_color)s%(asctime)s - %(levelname)s:%(name)s:%(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
datefmt="%Y-%m-%d %H:%M:%S",
)
)
file_handler = logging.FileHandler("log/scan_txlog.log")
file_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
file_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S"))

logger = colorlog.getLogger(name)
logger.addHandler(cli_handler)
Expand Down Expand Up @@ -72,9 +78,9 @@ def _parse_cli_args() -> argparse.Namespace:
"""
)
parser.add_argument(
"scan_url",
help="Address of the Splice Scan server",
default="http://localhost:5012",
"scan_urls",
nargs="+",
help="Address(es) of the Splice Scan server(s). Multiple URLs can be provided for round-robin failover.",
)
parser.add_argument("--loglevel", help="Sets the log level", default="INFO")
parser.add_argument(
Expand Down Expand Up @@ -177,22 +183,76 @@ def from_json(cls, json):
@dataclass
class ScanClient:
session: aiohttp.ClientSession
url: str
urls: list[str]
page_size: int
call_count: int = 0
retry_count: int = 0
current_url_index: int = 0

def _get_current_url(self) -> str:
"""Get the current URL from the round-robin list."""
return self.urls[self.current_url_index]

def _rotate_to_next_url(self):
"""Move to the next URL in the round-robin list."""
self.current_url_index = (self.current_url_index + 1) % len(self.urls)
LOG.info(f"Rotating to next scan URL: {self._get_current_url()}")

async def updates(self, after: Optional[PaginationKey]):
self.call_count = self.call_count + 1
payload = {"page_size": self.page_size}
if after:
payload["after"] = after.to_json()
response = await self.session.post(
f"{self.url}/api/scan/v0/updates", json=payload

json = await self.__post_with_retry_on_statuses(
f"{self._get_current_url()}/api/scan/v0/updates",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably best to switch to /v2/updates as a prudent engineering measure:

  • /v1/updates/{update_id}:
    get:
    deprecated: true
    tags: [ deprecated ]
    x-jvm-package: scan
    operationId: "getUpdateByIdV1"
    description: |
    Returns the update with the given update_id.
    Unlike /v0/updates/{update_id}, this endpoint returns responses that are consistent across different
    scan instances. Event ids returned by this endpoint are not comparable to event ids returned by /v0/updates.
    The order of items in events_by_id is not defined.
  • /v2/updates:
    post:
    tags: [external, scan]
    x-jvm-package: scan
    operationId: "getUpdateHistoryV2"
    description: |
    Returns the update history in ascending order, paged, from ledger begin or optionally starting after a record time.
    Compared to `/v1/updates`, the `/v2/updates` removes the `offset` field in responses,
    which was hardcoded to 1 in `/v1/updates` for compatibility, and is now removed.
    `/v2/updates` sorts events lexicographically in `events_by_id` by `ID` for convenience, which should not be confused with the
    order of events in the transaction, for this you should rely on the order of `root_event_ids` and `child_event_ids`.
    Updates are ordered lexicographically by `(migration id, record time)`.
    For a given migration id, each update has a unique record time.
    The record time ranges of different migrations may overlap, i.e.,
    it is not guaranteed that the maximum record time of one migration is smaller than the minimum record time of the next migration,
    and there may be two updates with the same record time but different migration ids.

payload=payload,
max_retries=30,
delay_seconds=0.5,
statuses={404, 429, 500, 503},
)
response.raise_for_status()
json = await response.json()
return json["transactions"]

async def __post_with_retry_on_statuses(
self, url, payload, max_retries, delay_seconds, statuses
):
assert max_retries >= 1
retry = 0
self.call_count = self.call_count + 1
total_attempts = max_retries * len(self.urls)

while retry < total_attempts:
try:
response = await self.session.post(url, json=payload)
if response.status in statuses:
LOG.debug(
f"Request to {url} with payload {payload} failed with status {response.status}"
)
retry += 1
if retry < total_attempts:
self.retry_count = self.retry_count + 1
self._rotate_to_next_url()
url = f"{self._get_current_url()}/api/scan/v0/updates"
await asyncio.sleep(delay_seconds)
else:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
LOG.debug(
f"Request to {url} with payload {payload} failed with error: {e}"
)
retry += 1
if retry < total_attempts:
self.retry_count = self.retry_count + 1
self._rotate_to_next_url()
url = f"{self._get_current_url()}/api/scan/v0/updates"
await asyncio.sleep(delay_seconds)
else:
raise

LOG.error(f"Exceeded max retries {total_attempts} across all URLs, giving up")
response.raise_for_status()
return await response.json()


# Daml Decimals have a precision of 38 and a scale of 10, i.e., 10 digits after the decimal point.
# Rounding is round_half_even.
Expand Down Expand Up @@ -919,14 +979,15 @@ async def main():
_log_uncaught_exceptions()

LOG.info(f"Starting unclaimed_sv_rewards with arguments: {args}")
LOG.info(f"Using scan URLs (round-robin): {args.scan_urls}")

app_state = State.create_or_restore_from_cache(args)

begin_t = time.time()
tx_count = 0

async with aiohttp.ClientSession() as session:
scan_client = ScanClient(session, args.scan_url, args.page_size)
scan_client = ScanClient(session, args.scan_urls, args.page_size)

while True:
json_batch = await scan_client.updates(app_state.pagination_key)
Expand All @@ -950,7 +1011,7 @@ async def main():

duration = time.time() - begin_t
LOG.info(
f"End run. ({duration:.2f} sec., {tx_count} transaction(s), {scan_client.call_count} Scan API call(s))"
f"End run. ({duration:.2f} sec., {tx_count} transaction(s), {scan_client.call_count} Scan API call(s), {scan_client.retry_count} retries)"
)
LOG.debug(
f"active_mining_rounds count: {len(app_state.active_issuing_rounds)}"
Expand Down