Skip to content

Commit

Permalink
RTR Robustness Updates (#205)
Browse files Browse the repository at this point in the history
* Resolves #187 and some other general RTR snags. Also bumps dependencies.

* Resolves linting issues

* Caracara 0.9.0: Preparation for Python 3.13 (and huge code cleanup) (#206)

* Major code formatting cleanup, deprecation of Python 3.8, and removal of setuptools requirement

* Add note about Python version compatibility

* Temporarily set Python 3.13 CI version to 3.13 RC2

* Changes to sate isort and black together

* Unified development dependencies

* Bumps py7zr to 0.22.0 as we no longer support Python 3.7. Also, bumps caracara-filters to 1.0.0+.

* Bumps dependencies

* Resolves #187 and some other general RTR snags. Also bumps dependencies.

* Resolves linting issues

* Resolves linting issues after merge
  • Loading branch information
ChristopherHammond13 authored Oct 17, 2024
1 parent abf35eb commit e3b3be6
Showing 1 changed file with 81 additions and 30 deletions.
111 changes: 81 additions & 30 deletions caracara/modules/rtr/batch_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import partial, wraps
from itertools import repeat
from threading import current_thread
from typing import Dict, List
from typing import Dict, List, Optional, Tuple

from falconpy import RealTimeResponse, RealTimeResponseAdmin

Expand Down Expand Up @@ -44,11 +45,6 @@ class InnerRTRBatchSession: # pylint: disable=too-few-public-methods
a list of InnerRTRBatchSession objects.
"""

batch_id: str = None
devices: Dict = None
expiry: datetime = None
logger: logging.Logger = None

def __init__(self, batch_id: str, devices: Dict, expiry: datetime, logger: logging.Logger):
"""Configure an inner batch of RTR sessions."""
self.batch_id = batch_id
Expand All @@ -74,7 +70,7 @@ def generic_rtr_worker(
func: partial,
logger: logging.Logger,
device_ids: List[str] = None,
):
) -> Tuple[str, Dict]:
"""
Execute a partial function against an RTR batch session.
Expand All @@ -96,7 +92,7 @@ def generic_rtr_worker(
func.keywords["optional_hosts"] = device_ids_in_batch
response = func(batch_id=session.batch_id)["body"]
logger.debug("%s | %s", thread_name, response)
return response
return thread_name, response


class RTRBatchSession:
Expand All @@ -121,14 +117,14 @@ class RTRBatchSession:
@_batch_session_required
def device_ids(self) -> List[str]:
"""Return a list of device IDs from all inner batch sessions."""
return [x.devices.keys() for x in self.batch_sessions]
return [x.devices for x in self.batch_sessions]

def connect(
self,
device_ids: List[str],
queueing: bool = False,
timeout: int = default_timeout,
):
) -> bool:
"""
Establish a connection to one or more hosts.
Expand All @@ -142,24 +138,45 @@ def connect(
batches.append(device_ids[i : i + MAX_BATCH_SESSION_HOSTS])
self.logger.info("Divided up devices into %d batches", len(batches))

def worker(batch_device_ids: List[str], batch_func: partial):
def worker(
batch_device_ids: List[str],
batch_func: partial,
) -> Tuple[str, Optional[InnerRTRBatchSession]]:
thread_name = current_thread().name
self.logger.info(
"%s | Batch worker started with a list of %d devices",
thread_name,
len(batch_device_ids),
)
response = batch_func(host_ids=batch_device_ids)["body"]
self.logger.debug("%s | %s", thread_name, str(response))
resources = response["resources"]
self.logger.info("%s | Connected to %s systems", thread_name, len(resources))

# Identify devices that failed to connect and/or returned an error
# Resolves GitHub issue #187
if not resources:
self.logger.info("%s | Resource list is empty", thread_name)
return thread_name, None

successful_devices = {
device_id: device_data
for device_id, device_data in resources.items()
if not device_data.get("errors")
}

if not successful_devices:
self.logger.info("%s | Successful device list is empty", thread_name)
return thread_name, None

self.logger.info("%s | Connected to %s systems", thread_name, len(successful_devices))
self.logger.debug("%s | %s", thread_name, response)
batch_data = InnerRTRBatchSession(
batch_id=response["batch_id"],
devices=resources,
devices=successful_devices,
expiry=datetime.now() + timedelta(seconds=SESSION_EXPIRY),
logger=self.logger,
)
return batch_data
return thread_name, batch_data

batch_func = partial(
self.api.batch_init_sessions,
Expand All @@ -171,17 +188,27 @@ def worker(batch_device_ids: List[str], batch_func: partial):
with concurrent.futures.ThreadPoolExecutor(
max_workers=MAX_BATCH_SESSION_THREADS
) as executor:
completed = executor.map(worker, batches, [batch_func])
completed = executor.map(worker, batches, repeat(batch_func))

self.batch_sessions = []
for complete in completed:
self.logger.info("Completed a batch of RTR connections")
self.batch_sessions.append(complete)
for thread_name, thread_data in completed:
self.logger.info("%s | Completed a batch of RTR connections", thread_name)
if thread_data is None:
self.logger.info("%s | Batch contained no successful connections", thread_name)
else:
self.logger.info(
"%s | Batch contained %d successful connections",
thread_name,
len(thread_data.devices),
)
self.batch_sessions.append(thread_data)

device_count = sum(len(d.devices) for d in self.batch_sessions)
self.logger.info("Connected to %d devices", device_count)
self.logger.debug(self.batch_sessions)

return len(self.batch_sessions) > 0

@_batch_session_required
def disconnect(self):
"""Disconnect the RTR batch session."""
Expand Down Expand Up @@ -218,15 +245,29 @@ def get(
completed: List[Dict] = executor.map(
partial_worker,
self.batch_sessions,
[partial_get_func],
repeat(partial_get_func),
)

batch_get_cmd_reqs: List[BatchGetCmdRequest] = []
for complete in completed:
self.logger.info("Executed commands on a batch of %d hosts", len(complete))
for thread_name, response in completed:
try:
devices = response["combined"]["resources"]
except KeyError:
self.logger.warning(
"%s | Batch contained no successful get command executions",
thread_name,
)
continue

self.logger.info(
"%s | Executed get command on a batch of %d devices",
thread_name,
len(devices),
)

batch_get_cmd_req = BatchGetCmdRequest(
batch_get_cmd_req_id=complete["batch_get_cmd_req_id"],
devices=complete["combined"]["resources"],
batch_get_cmd_req_id=response["batch_get_cmd_req_id"],
devices=devices,
)
batch_get_cmd_reqs.append(batch_get_cmd_req)

Expand Down Expand Up @@ -278,7 +319,7 @@ def worker(batch_get_cmd_req: BatchGetCmdRequest, func: partial) -> List[GetFile
timeout_duration=f"{timeout}s",
)
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
completed = executor.map(worker, batch_get_cmd_reqs, [partial_func])
completed = executor.map(worker, batch_get_cmd_reqs, repeat(partial_func))

all_get_files: List[GetFile] = []

Expand Down Expand Up @@ -345,13 +386,13 @@ def worker(session: InnerRTRBatchSession, func: partial):
with concurrent.futures.ThreadPoolExecutor(
max_workers=MAX_BATCH_SESSION_THREADS
) as executor:
completed = executor.map(worker, self.batch_sessions, [batch_func])
completed = executor.map(worker, self.batch_sessions, repeat(batch_func))

for complete in completed:
self.logger.info("Refreshed session %s", complete)

@_batch_session_required
def run_generic_command(
def run_generic_command( # pylint: disable=too-many-locals
self,
command_string: str,
device_ids: List[str] = None,
Expand Down Expand Up @@ -399,13 +440,23 @@ def run_generic_command(
completed: List[Dict] = executor.map(
partial_worker,
self.batch_sessions,
[partial_cmd_func],
repeat(partial_cmd_func),
)

all_responses: Dict = {}
for complete in completed:
self.logger.info("Executed commands on a batch of %d hosts", len(complete))
all_responses.update(complete["combined"]["resources"])
for thread_name, response in completed:
try:
devices = response["combined"]["resources"]
except KeyError:
self.logger.warning(
"%s | Batch contained no successful command executions",
thread_name,
)
continue
self.logger.info(
"%s | Executed commands on a batch of %d hosts", thread_name, len(devices)
)
all_responses.update(devices)

return all_responses

Expand Down

0 comments on commit e3b3be6

Please sign in to comment.