Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SessionExpired issue #1057

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 129 additions & 57 deletions cartography/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections import OrderedDict
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

Expand Down Expand Up @@ -33,6 +34,111 @@
logger = logging.getLogger(__name__)


class SessionWrapper:
"""
A wrapper around neo4j.Session that provides a context manager interface and a few convenience methods.
"""

_session: Optional[neo4j.Session] = None
_driver: Optional[neo4j.Driver] = None
_config: Union[Config, argparse.Namespace]

def __init__(self, config: Union[Config, argparse.Namespace]):
self._config = config

def new_driver(self):
neo4j_auth = None
if self._config.neo4j_user or self._config.neo4j_password:
neo4j_auth = (self._config.neo4j_user, self._config.neo4j_password)
try:
return GraphDatabase.driver(
self._config.neo4j_uri,
auth=neo4j_auth,
max_connection_lifetime=self._config.neo4j_max_connection_lifetime,
)
except neo4j.exceptions.ServiceUnavailable as e:
logger.debug("Error occurred during Neo4j connect.", exc_info=True)
logger.error(
(
"Unable to connect to Neo4j using the provided URI '%s', an error occurred: '%s'."
"Make sure the Neo4j server is running and accessible from your network."
),
self._config.neo4j_uri,
e,
)
raise
except neo4j.exceptions.AuthError as e:
logger.debug("Error occurred during Neo4j auth.", exc_info=True)
if not neo4j_auth:
logger.error(
(
"Unable to auth to Neo4j, an error occurred: '%s'. cartography attempted to connect to Neo4j "
"without any auth. Check your Neo4j server settings to see if auth is required and, if it is, "
"provide cartography with a valid username and password."
),
e,
)
else:
logger.error(
(
"Unable to auth to Neo4j, an error occurred: '%s'. "
"cartography attempted to connect to Neo4j with a username and password. Check your Neo4j "
"server settings to see if the username and password provided to cartography are valid "
"credentials."
),
e,
)
raise

def get_session(self):
if self._driver is None:
self._driver = self.new_driver()
if self._session is None:
self._session = self._driver.session(database=self._config.neo4j_database)

return self._session

def _wrap(self, func: str, *args, **kwargs):
for i in range(0, 3):
try:
sess = self.get_session()
return sess.__getattribute__(func)(*args, **kwargs)
except neo4j.exceptions.SessionExpired:
logger.debug("Error occurred during Neo4j session run.", exc_info=True)
self._session = None
self._driver = None
continue

def run(self, *args, **kwargs):
return self._wrap("run", *args, **kwargs)

def read_transaction(self, *args, **kwargs):
return self._wrap("read_transaction", *args, **kwargs)

def write_transaction(self, *args, **kwargs):
return self._wrap("write_transaction", *args, **kwargs)

def begin_transaction(self, *args, **kwargs):
return self._wrap("begin_transaction", *args, **kwargs)

def close(self):
if self._session is not None:
try:
self._session.close()
except Exception:
logger.exception("Error occurred during Neo4j session close.", exc_info=True)
raise

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def __del__(self):
self.close()


class Sync:
"""
A cartography sync task.
Expand Down Expand Up @@ -68,28 +174,28 @@ def add_stages(self, stages: List[Tuple[str, Callable]]) -> None:
for name, func in stages:
self.add_stage(name, func)

def run(self, neo4j_driver: neo4j.Driver, config: Union[Config, argparse.Namespace]) -> int:
def run(self, sw: SessionWrapper, config: Union[Config, argparse.Namespace]) -> int:
"""
Execute all stages in the sync task in sequence.

:type neo4j_driver: neo4j.Driver
:param neo4j_driver: Neo4j driver object.
:type sw: SessionWrapper
:param sw: A SessionWrapper object to use for Neo4j transactions.
:type config: cartography.config.Config
:param config: Configuration for the sync run.
"""

logger.info("Starting sync with update tag '%d'", config.update_tag)
with neo4j_driver.session(database=config.neo4j_database) as neo4j_session:
for stage_name, stage_func in self._stages.items():
logger.info("Starting sync stage '%s'", stage_name)
try:
stage_func(neo4j_session, config)
except (KeyboardInterrupt, SystemExit):
logger.warning("Sync interrupted during stage '%s'.", stage_name)
raise
except Exception:
logger.exception("Unhandled exception during sync stage '%s'", stage_name)
raise # TODO this should be configurable
logger.info("Finishing sync stage '%s'", stage_name)
for stage_name, stage_func in self._stages.items():
logger.info("Starting sync stage '%s'", stage_name)
try:
stage_func(sw, config)
except (KeyboardInterrupt, SystemExit):
logger.warning("Sync interrupted during stage '%s'.", stage_name)
raise
except Exception:
logger.exception("Unhandled exception during sync stage '%s'", stage_name)
raise # TODO this should be configurable
logger.info("Finishing sync stage '%s'", stage_name)
logger.info("Finishing sync with update tag '%d'", config.update_tag)
return STATUS_SUCCESS

Expand All @@ -116,51 +222,17 @@ def run_with_config(sync: Sync, config: Union[Config, argparse.Namespace]) -> in
),
)

neo4j_auth = None
if config.neo4j_user or config.neo4j_password:
neo4j_auth = (config.neo4j_user, config.neo4j_password)
try:
neo4j_driver = GraphDatabase.driver(
config.neo4j_uri,
auth=neo4j_auth,
max_connection_lifetime=config.neo4j_max_connection_lifetime,
)
except neo4j.exceptions.ServiceUnavailable as e:
logger.debug("Error occurred during Neo4j connect.", exc_info=True)
logger.error(
(
"Unable to connect to Neo4j using the provided URI '%s', an error occurred: '%s'. Make sure the Neo4j "
"server is running and accessible from your network."
),
config.neo4j_uri,
e,
)
return STATUS_FAILURE
except neo4j.exceptions.AuthError as e:
logger.debug("Error occurred during Neo4j auth.", exc_info=True)
if not neo4j_auth:
logger.error(
(
"Unable to auth to Neo4j, an error occurred: '%s'. cartography attempted to connect to Neo4j "
"without any auth. Check your Neo4j server settings to see if auth is required and, if it is, "
"provide cartography with a valid username and password."
),
e,
)
else:
logger.error(
(
"Unable to auth to Neo4j, an error occurred: '%s'. cartography attempted to connect to Neo4j with "
"a username and password. Check your Neo4j server settings to see if the username and password "
"provided to cartography are valid credentials."
),
e,
)
return STATUS_FAILURE
default_update_tag = int(time.time())
if not config.update_tag:
config.update_tag = default_update_tag
return sync.run(neo4j_driver, config)

sw = SessionWrapper(config)
try:
sw.run("RETURN 1")
except neo4j.exceptions.Neo4jError:
return STATUS_FAILURE

return sync.run(sw, config)


def build_default_sync() -> Sync:
Expand Down