Skip to content

Commit

Permalink
Merge pull request #63 from dgarros/timeout_fix
Browse files Browse the repository at this point in the history
add connect/command timeout options
  • Loading branch information
mayuresh82 authored Apr 22, 2020
2 parents 00b5b9c + 96019d6 commit 4beb87c
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 30 deletions.
27 changes: 10 additions & 17 deletions lib/metric_collector/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ def main():
full_parser.add_argument("--tag", nargs='+', help="Collect data from hosts that matches the tag")
full_parser.add_argument("--cmd-tag", nargs='+', help="Collect data from command that matches the tag")

full_parser.add_argument( "--test", action='store_true', help="Use emulated Junos device")
full_parser.add_argument("-s", "--start", action='store_true', help="Start collecting (default 'no')")
full_parser.add_argument("-i", "--input", default=BASE_DIR, help="Directory where to find input files")

full_parser.add_argument("--loglvl", default=20, help="Logs verbosity, 10-debug, 50 Critical")

Expand All @@ -201,11 +199,9 @@ def main():
full_parser.add_argument("--sharding-offset", default=True, help="Define an offset needs to be applied to the shard_id")

full_parser.add_argument("--parserdir", default="parsers", help="Directory where to find parsers")
full_parser.add_argument("--timeout", default=600, help="Default Timeout for Netconf session")
full_parser.add_argument("--delay", default=3, help="Delay Between Commands")
full_parser.add_argument("--connect-timeout", default=15, help="Timeout for collector device connect")
full_parser.add_argument("--command-timeout", default=30, help="Timeout for collector device rpc calls")
full_parser.add_argument("--retry", default=5, help="Max retry")
full_parser.add_argument("--usehostname", default=True, help="Use hostname from device instead of IP")
full_parser.add_argument("--dbschema", default=2, help="Format of the output data")

full_parser.add_argument("--host", default=None, help="Host DNS or IP")
full_parser.add_argument("--hosts", default="hosts.yaml", help="Hosts file in yaml")
Expand All @@ -223,7 +219,7 @@ def main():
full_parser.add_argument("--max-worker-threads", type=int, default=1, help="Maximum number of worker threads per interval for scheduler")
full_parser.add_argument("--use-scheduler", action='store_true', help="Use scheduler")
full_parser.add_argument("--hosts-refresh-interval", type=int, default=3*60*60, help="Interval to periodically refresh dynamic host inventory")
full_parser.add_argument("--allow_zero_hosts", action='store_true', help="Allow scheduler to run even with 0 hosts")
full_parser.add_argument("--allow-zero-hosts", action='store_true', help="Allow scheduler to run even with 0 hosts")

dynamic_args = vars(full_parser.parse_args())

Expand All @@ -232,19 +228,11 @@ def main():
full_parser.print_help()
sys.exit(1)

## Change BASE_DIR_INPUT if we are in "test" mode
if dynamic_args['test']:
BASE_DIR_INPUT = dynamic_args['input']

### ------------------------------------------------------------------------------
# Loading YAML Default Variables
### ------------------------------------------------------------------------------
db_schema = dynamic_args['dbschema']
max_connection_retries = dynamic_args['retry']
delay_between_commands = dynamic_args['delay']
logging_level = int(dynamic_args['loglvl'])
default_junos_rpc_timeout = dynamic_args['timeout']
use_hostname = dynamic_args['usehostname']

### ------------------------------------------------------------------------------
### Validate Arguments
Expand Down Expand Up @@ -339,7 +327,9 @@ def main():
credentials, general_commands, dynamic_args['parserdir'],
dynamic_args['output_type'], dynamic_args['output_addr'],
max_worker_threads=max_worker_threads,
use_threads=use_threads, num_threads_per_worker=max_collector_threads
use_threads=use_threads, num_threads_per_worker=max_collector_threads,
connect_timeout=dynamic_args['connect_timeout'],
command_timeout=dynamic_args['command_timeout']
)
hri = dynamic_args.get('hosts_refresh_interval', 6 * 60 * 60)
select_hosts(
Expand All @@ -366,7 +356,10 @@ def main():
parser_manager=parsers_manager,
output_type=dynamic_args['output_type'],
output_addr=dynamic_args['output_addr'],
collect_facts=dynamic_args.get('no_facts', True))
collect_facts=dynamic_args.get('no_facts', True),
connect_timeout=dynamic_args['connect_timeout'],
command_timeout=dynamic_args['command_timeout']
)
target_hosts = hosts_manager.get_target_hosts(tags=tag_list)

if use_threads:
Expand Down
11 changes: 7 additions & 4 deletions lib/metric_collector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@

class Collector:

def __init__(self, hosts_manager, parser_manager, output_type, output_addr, collect_facts=True):
def __init__(self, hosts_manager, parser_manager, output_type, output_addr,
collect_facts=True, connect_timeout=15, command_timeout=30):
self.hosts_manager = hosts_manager
self.parser_manager = parser_manager
self.output_type = output_type
self.output_addr = output_addr
self.collect_facts = collect_facts
self.connect_timeout = connect_timeout
self.command_timeout = command_timeout

def collect(self, worker_name, hosts=None, host_cmds=None, cmd_tags=None):
if not hosts and not host_cmds:
Expand Down Expand Up @@ -47,11 +50,11 @@ def collect(self, worker_name, hosts=None, host_cmds=None, cmd_tags=None):
if device_type == 'juniper':
dev = netconf_collector.NetconfCollector(
host=host, address=host_address, credential=credential,
parsers=self.parser_manager, context=host_context, collect_facts=self.collect_facts)
parsers=self.parser_manager, context=host_context, collect_facts=self.collect_facts, timeout=self.connect_timeout)
elif device_type == 'f5':
dev = f5_rest_collector.F5Collector(
host=host, address=host_address, credential=credential,
parsers=self.parser_manager, context=host_context)
parsers=self.parser_manager, context=host_context, timeout=self.connect_timeout)
dev.connect()

if dev.is_connected():
Expand All @@ -73,7 +76,7 @@ def collect(self, worker_name, hosts=None, host_cmds=None, cmd_tags=None):
for command in target_commands:
try:
logger.info('[%s] Collecting > %s' % (host,command))
data = dev.collect(command) # returns a generator
data = dev.collect(command, timeout=self.command_timeout) # returns a generator
if data:
values.append(data)
cmd_successful += 1
Expand Down
9 changes: 5 additions & 4 deletions lib/metric_collector/f5_rest_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,26 @@ def collect_facts(self):

# TODO(Mayuresh) Collect any other relevant facts here

def execute_query(self, query):
def execute_query(self, query, timeout=None):

base_url = 'https://{}/'.format(self.host)
try:
query = base_url + query
logger.debug('[%s]: execute : %s', self.hostname, query)
result = self.mgmt.icrs.get(query)
timeout = timeout or self.__timeout
result = self.mgmt.icrs.get(query, timeout=timeout)
return result.json()
except Exception as ex:
logger.error('Failed to execute query: %s on %s: %s', query, self.hostname, str(ex))
return

def collect(self, command):
def collect(self, command, timeout=None):

# find the command/query to execute
logger.debug('[%s]: parsing : %s', self.hostname, command)
parser = self.parsers.get_parser_for(command)
try:
raw_data = self.execute_query(parser['data']['parser']['query'])
raw_data = self.execute_query(parser['data']['parser']['query'], timeout=timeout)
except TypeError as e:
logger.error('Parser returned no data. Message: {}'.format(e))
raw_data = None
Expand Down
8 changes: 5 additions & 3 deletions lib/metric_collector/netconf_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,12 @@ def collect_facts(self):

return True

def execute_command(self,command=None):
def execute_command(self,command=None, timeout=None):

try:
logger.debug('[%s]: execute : %s', self.hostname, command)
if timeout:
self.pyez.timeout = timeout
# the data returned is already in etree format
command_result = self.pyez.rpc.cli(command, format="xml")
except RpcError as err:
Expand All @@ -156,11 +158,11 @@ def execute_command(self,command=None):

return command_result

def collect( self, command=None ):
def collect(self, command=None, timeout=None):

# find the command to execute from the parser directly
parser = self.parsers.get_parser_for(command)
data = self.execute_command(parser['data']['parser']['command'])
data = self.execute_command(parser['data']['parser']['command'], timeout=timeout)

if data is None:
return None
Expand Down
6 changes: 4 additions & 2 deletions lib/metric_collector/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
class Scheduler:

def __init__(self, creds_conf, cmds_conf, parsers_dir, output_type, output_addr,
max_worker_threads=1, use_threads=True, num_threads_per_worker=10):
max_worker_threads=1, use_threads=True, num_threads_per_worker=10,
connect_timeout=15, command_timeout=30):
self.workers = {}
self.working = set()
self.host_mgr = host_manager.HostManager(credentials=creds_conf, commands=cmds_conf)
self.parser_mgr = parser_manager.ParserManager(parser_dirs=parsers_dir)
self.collector = collector.Collector(self.host_mgr, self.parser_mgr, output_type, output_addr)
self.collector = collector.Collector(self.host_mgr, self.parser_mgr, output_type, output_addr,
connect_timeout=connect_timeout, command_timeout=command_timeout)
self.max_worker_threads = max_worker_threads
self.output_type = output_type
self.output_addr = output_addr
Expand Down

0 comments on commit 4beb87c

Please sign in to comment.