diff --git a/lib/metric_collector/cli.py b/lib/metric_collector/cli.py index 4d380e2..663a9ca 100644 --- a/lib/metric_collector/cli.py +++ b/lib/metric_collector/cli.py @@ -189,9 +189,7 @@ def main(): full_parser.add_argument("--tag", nargs='+', help="Collect data from hosts that matches the tag") full_parser.add_argument("--cmd-tag", nargs='+', help="Collect data from command that matches the tag") - full_parser.add_argument( "--test", action='store_true', help="Use emulated Junos device") full_parser.add_argument("-s", "--start", action='store_true', help="Start collecting (default 'no')") - full_parser.add_argument("-i", "--input", default=BASE_DIR, help="Directory where to find input files") full_parser.add_argument("--loglvl", default=20, help="Logs verbosity, 10-debug, 50 Critical") @@ -201,11 +199,9 @@ def main(): full_parser.add_argument("--sharding-offset", default=True, help="Define an offset needs to be applied to the shard_id") full_parser.add_argument("--parserdir", default="parsers", help="Directory where to find parsers") - full_parser.add_argument("--timeout", default=600, help="Default Timeout for Netconf session") - full_parser.add_argument("--delay", default=3, help="Delay Between Commands") + full_parser.add_argument("--connect-timeout", default=15, help="Timeout for collector device connect") + full_parser.add_argument("--command-timeout", default=30, help="Timeout for collector device rpc calls") full_parser.add_argument("--retry", default=5, help="Max retry") - full_parser.add_argument("--usehostname", default=True, help="Use hostname from device instead of IP") - full_parser.add_argument("--dbschema", default=2, help="Format of the output data") full_parser.add_argument("--host", default=None, help="Host DNS or IP") full_parser.add_argument("--hosts", default="hosts.yaml", help="Hosts file in yaml") @@ -223,7 +219,7 @@ def main(): full_parser.add_argument("--max-worker-threads", type=int, default=1, help="Maximum number of worker threads per interval for scheduler") full_parser.add_argument("--use-scheduler", action='store_true', help="Use scheduler") full_parser.add_argument("--hosts-refresh-interval", type=int, default=3*60*60, help="Interval to periodically refresh dynamic host inventory") - full_parser.add_argument("--allow_zero_hosts", action='store_true', help="Allow scheduler to run even with 0 hosts") + full_parser.add_argument("--allow-zero-hosts", action='store_true', help="Allow scheduler to run even with 0 hosts") dynamic_args = vars(full_parser.parse_args()) @@ -232,19 +228,11 @@ def main(): full_parser.print_help() sys.exit(1) - ## Change BASE_DIR_INPUT if we are in "test" mode - if dynamic_args['test']: - BASE_DIR_INPUT = dynamic_args['input'] - ### ------------------------------------------------------------------------------ # Loading YAML Default Variables ### ------------------------------------------------------------------------------ - db_schema = dynamic_args['dbschema'] max_connection_retries = dynamic_args['retry'] - delay_between_commands = dynamic_args['delay'] logging_level = int(dynamic_args['loglvl']) - default_junos_rpc_timeout = dynamic_args['timeout'] - use_hostname = dynamic_args['usehostname'] ### ------------------------------------------------------------------------------ ### Validate Arguments @@ -339,7 +327,9 @@ def main(): credentials, general_commands, dynamic_args['parserdir'], dynamic_args['output_type'], dynamic_args['output_addr'], max_worker_threads=max_worker_threads, - use_threads=use_threads, num_threads_per_worker=max_collector_threads + use_threads=use_threads, num_threads_per_worker=max_collector_threads, + connect_timeout=dynamic_args['connect_timeout'], + command_timeout=dynamic_args['command_timeout'] ) hri = dynamic_args.get('hosts_refresh_interval', 6 * 60 * 60) select_hosts( @@ -366,7 +356,10 @@ def main(): parser_manager=parsers_manager, output_type=dynamic_args['output_type'], output_addr=dynamic_args['output_addr'], - collect_facts=dynamic_args.get('no_facts', True)) + collect_facts=dynamic_args.get('no_facts', True), + connect_timeout=dynamic_args['connect_timeout'], + command_timeout=dynamic_args['command_timeout'] + ) target_hosts = hosts_manager.get_target_hosts(tags=tag_list) if use_threads: diff --git a/lib/metric_collector/collector.py b/lib/metric_collector/collector.py index d9eb747..696ab9e 100644 --- a/lib/metric_collector/collector.py +++ b/lib/metric_collector/collector.py @@ -12,12 +12,15 @@ class Collector: - def __init__(self, hosts_manager, parser_manager, output_type, output_addr, collect_facts=True): + def __init__(self, hosts_manager, parser_manager, output_type, output_addr, + collect_facts=True, connect_timeout=15, command_timeout=30): self.hosts_manager = hosts_manager self.parser_manager = parser_manager self.output_type = output_type self.output_addr = output_addr self.collect_facts = collect_facts + self.connect_timeout = connect_timeout + self.command_timeout = command_timeout def collect(self, worker_name, hosts=None, host_cmds=None, cmd_tags=None): if not hosts and not host_cmds: @@ -47,11 +50,11 @@ def collect(self, worker_name, hosts=None, host_cmds=None, cmd_tags=None): if device_type == 'juniper': dev = netconf_collector.NetconfCollector( host=host, address=host_address, credential=credential, - parsers=self.parser_manager, context=host_context, collect_facts=self.collect_facts) + parsers=self.parser_manager, context=host_context, collect_facts=self.collect_facts, timeout=self.connect_timeout) elif device_type == 'f5': dev = f5_rest_collector.F5Collector( host=host, address=host_address, credential=credential, - parsers=self.parser_manager, context=host_context) + parsers=self.parser_manager, context=host_context, timeout=self.connect_timeout) dev.connect() if dev.is_connected(): @@ -73,7 +76,7 @@ def collect(self, worker_name, hosts=None, host_cmds=None, cmd_tags=None): for command in target_commands: try: logger.info('[%s] Collecting > %s' % (host,command)) - data = dev.collect(command) # returns a generator + data = dev.collect(command, timeout=self.command_timeout) # returns a generator if data: values.append(data) cmd_successful += 1 diff --git a/lib/metric_collector/f5_rest_collector.py b/lib/metric_collector/f5_rest_collector.py index c161218..6c54eec 100644 --- a/lib/metric_collector/f5_rest_collector.py +++ b/lib/metric_collector/f5_rest_collector.py @@ -64,25 +64,26 @@ def collect_facts(self): # TODO(Mayuresh) Collect any other relevant facts here - def execute_query(self, query): + def execute_query(self, query, timeout=None): base_url = 'https://{}/'.format(self.host) try: query = base_url + query logger.debug('[%s]: execute : %s', self.hostname, query) - result = self.mgmt.icrs.get(query) + timeout = timeout or self.__timeout + result = self.mgmt.icrs.get(query, timeout=timeout) return result.json() except Exception as ex: logger.error('Failed to execute query: %s on %s: %s', query, self.hostname, str(ex)) return - def collect(self, command): + def collect(self, command, timeout=None): # find the command/query to execute logger.debug('[%s]: parsing : %s', self.hostname, command) parser = self.parsers.get_parser_for(command) try: - raw_data = self.execute_query(parser['data']['parser']['query']) + raw_data = self.execute_query(parser['data']['parser']['query'], timeout=timeout) except TypeError as e: logger.error('Parser returned no data. Message: {}'.format(e)) raw_data = None diff --git a/lib/metric_collector/netconf_collector.py b/lib/metric_collector/netconf_collector.py index 69c695a..e02dd3e 100644 --- a/lib/metric_collector/netconf_collector.py +++ b/lib/metric_collector/netconf_collector.py @@ -143,10 +143,12 @@ def collect_facts(self): return True - def execute_command(self,command=None): + def execute_command(self,command=None, timeout=None): try: logger.debug('[%s]: execute : %s', self.hostname, command) + if timeout: + self.pyez.timeout = timeout # the data returned is already in etree format command_result = self.pyez.rpc.cli(command, format="xml") except RpcError as err: @@ -156,11 +158,11 @@ def execute_command(self,command=None): return command_result - def collect( self, command=None ): + def collect(self, command=None, timeout=None): # find the command to execute from the parser directly parser = self.parsers.get_parser_for(command) - data = self.execute_command(parser['data']['parser']['command']) + data = self.execute_command(parser['data']['parser']['command'], timeout=timeout) if data is None: return None diff --git a/lib/metric_collector/scheduler.py b/lib/metric_collector/scheduler.py index 4387d78..51ef087 100644 --- a/lib/metric_collector/scheduler.py +++ b/lib/metric_collector/scheduler.py @@ -10,12 +10,14 @@ class Scheduler: def __init__(self, creds_conf, cmds_conf, parsers_dir, output_type, output_addr, - max_worker_threads=1, use_threads=True, num_threads_per_worker=10): + max_worker_threads=1, use_threads=True, num_threads_per_worker=10, + connect_timeout=15, command_timeout=30): self.workers = {} self.working = set() self.host_mgr = host_manager.HostManager(credentials=creds_conf, commands=cmds_conf) self.parser_mgr = parser_manager.ParserManager(parser_dirs=parsers_dir) - self.collector = collector.Collector(self.host_mgr, self.parser_mgr, output_type, output_addr) + self.collector = collector.Collector(self.host_mgr, self.parser_mgr, output_type, output_addr, + connect_timeout=connect_timeout, command_timeout=command_timeout) self.max_worker_threads = max_worker_threads self.output_type = output_type self.output_addr = output_addr