Skip to content

Commit

Permalink
Merge pull request #15 from BigDataBoutique/feat-log-failures
Browse files Browse the repository at this point in the history
handling failed requests and missing data, documenting verify flag
  • Loading branch information
LiorF-BDBQ authored Oct 7, 2021
2 parents 3d1f288 + 796d1dd commit 1d93f89
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ In addition you should set ES_METRICS_CLOUD_PROVIDER to 'Amazon Elasticsearch'.

when running, make sure to set --health-flag=False .

You can opt not to verify your cluster's TLS certificate with --verify-flag=False .

## Visualizing with Grafana

You will need to create an Elasticsearch data source that is pointing to the cluster you use for monitoring.
Expand Down
73 changes: 50 additions & 23 deletions elasticsearch.monitoring/fetch_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def fetch_cluster_stats(base_url='http://localhost:9200/',health = True,verify=T
metric_docs = []
try:
response = requests.get(base_url + '_cluster/health', timeout=(5, 5),verify=verify)
if response.status_code != 200:
logger.error("bad response while getting cluster health. response is:\n" + response.json())
return metric_docs,None
cluster_health = response.json()
utc_datetime = datetime.datetime.utcnow()
ts = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z')
Expand All @@ -88,13 +91,21 @@ def fetch_cluster_stats(base_url='http://localhost:9200/',health = True,verify=T
if health:
metric_docs.append(cluster_health)
response = requests.get(base_url + '_cluster/stats', timeout=(5, 5),verify=verify)
if response.status_code != 200:
logger.error("bad response while getting cluster stats. response is:\n" + response.json())
return metric_docs,None
cluster_stats = response.json()
# creating cluster stats json
cluster_stats_and_state = {'type':'cluster_stats','cluster_stats': cluster_stats, 'cluster_name': cluster_stats['cluster_name'], 'timestamp': ts, '@timestamp': cluster_stats['timestamp'],'cluster_uuid':cluster_stats['cluster_uuid'],'cloud_provider':cloud_provider}
response = requests.get(base_url + '_cluster/state', timeout=(5, 5),verify=verify)
if response.status_code != 200:
logger.error("bad response while getting cluster state. response is:\n" + response.json())
return metric_docs,None
cluster_state = response.json()
cluster_state_json = {'nodes': cluster_state['nodes'],'cluster_uuid':cluster_state['cluster_uuid'],'state_uuid':cluster_state['state_uuid'],'master_node':cluster_state['master_node'],'version':cluster_state['version'],'status':cluster_health['status']}
routing_table = cluster_state['routing_table']['indices']
if type(routing_table) is not dict:
logger.error("bad routing table from cluster state. response is:\n"+ response.json())
cluster_stats_and_state['cluster_state'] = cluster_state_json
metric_docs.append(cluster_stats_and_state)
return metric_docs,routing_table
Expand All @@ -110,14 +121,21 @@ def fetch_nodes_stats(base_url='http://localhost:9200/',verify=True):
r_json = None
try:
response = requests.get(base_url + '_nodes/stats', timeout=(5, 5),verify=verify)
if response.status_code != 200:
logger.error("bad response while getting node stats. response is:\n" + response.json())
return metric_docs
r_json = response.json()
cluster_name = r_json['cluster_name']

# we are opting to not use the timestamp as reported by the actual node
# to be able to better sync the various metrics collected
utc_datetime = datetime.datetime.utcnow()
ts = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z')
for node_id, node in r_json['nodes'].items():
nodes_dict = r_json['nodes']
if type(nodes_dict) is not dict:
logger.error("bad node stats. response is:\n" + response.json())
return metric_docs
for node_id, node in nodes_dict.items():
doc_timestamp = datetime.datetime.fromtimestamp(node['timestamp']/1000.0)
doc_ts = str(doc_timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z')
node_data = {
Expand Down Expand Up @@ -253,15 +271,21 @@ def fetch_index_stats(routing_table,base_url='http://localhost:9200/',verify=Tru
ts = str(utc_datetime.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z')
# getting index stats for all indices
response = requests.get(base_url + '_stats', timeout=(5, 5),verify=verify)

if response.status_code != 200:
return None
logger.error("bad response while getting index stats. response is:\n"+response.json())
return metric_docs
index_stats = response.json()
# creating index stats json
indices = index_stats['indices']
#AWS doesn't accept /_settings
response = requests.get(base_url + '*/_settings', timeout=(5, 5), verify=verify)
if response.status_code != 200:
logger.error("bad response while getting index settings. response is:\n" + response.json())
return metric_docs
index_settings = response.json()
if type(index_settings) is not dict:
logger.error("bad index settings. response is:\n" + response.json())
return metric_docs
index_settings_ordered = dict(sorted(index_settings.items()))
routing_table_ordered = dict(sorted(routing_table.items()))

Expand All @@ -288,7 +312,7 @@ def fetch_index_stats(routing_table,base_url='http://localhost:9200/',verify=Tru
# except KeyError:
# index_data['index_stats']["missing_previous"] = True
metric_docs.append(index_data)
# creating indices stats json
# creating indices stats json
summary = index_stats["_all"]
summary_data = {
"timestamp": ts,
Expand Down Expand Up @@ -328,7 +352,10 @@ def poll_metrics(cluster_host, monitor, monitor_host,health,verify,auth_token,cl
def get_all_data(cluster_host,health,verify,cloud_provider):
cluster_stats,routing_table = fetch_cluster_stats(cluster_host,health,verify,cloud_provider)
node_stats = fetch_nodes_stats(cluster_host,verify)
index_stats = fetch_index_stats(routing_table,cluster_host,verify)
if type(routing_table) is not dict:
index_stats = []
else:
index_stats = fetch_index_stats(routing_table,cluster_host,verify)
return cluster_stats, node_stats,index_stats


Expand Down Expand Up @@ -365,24 +392,24 @@ def into_elasticsearch(monitor_host, cluster_stats, node_stats,index_stats,auth_
index_stats_data = ['{"index":{"_index":"' + index_name + '","_type":"_doc"}}\n' + json.dumps(
o) for o in index_stats]
data += index_stats_data

try:
headers = {'Content-Type': 'application/x-ndjson'}
if auth_token is not None:
headers['X-Auth-Token'] = auth_token
bulk_response = requests.post(monitor_host + '_bulk',
data='\n'.join(data),
headers=headers,
timeout=(30, 30))
assert_http_status(bulk_response)
for item in bulk_response.json()["items"]:
if item.get("index") and item.get("index").get("status") != 201:
click.echo(json.dumps(item.get("index").get("error")))
click.echo(cluster_stats_data)
else:
click.echo(json.dumps(item.get("index").get("status")))
except (requests.exceptions.Timeout, socket.timeout):
logger.error("[%s] Timeout received while pushing collected metrics to Elasticsearch" % (time.strftime("%Y-%m-%d %H:%M:%S")))
if len(data) > 0:
try:
headers = {'Content-Type': 'application/x-ndjson'}
if auth_token is not None:
headers['X-Auth-Token'] = auth_token
bulk_response = requests.post(monitor_host + '_bulk',
data='\n'.join(data),
headers=headers,
timeout=(30, 30))
assert_http_status(bulk_response)
for item in bulk_response.json()["items"]:
if item.get("index") and item.get("index").get("status") != 201:
click.echo(json.dumps(item.get("index").get("error")))
click.echo(cluster_stats_data)
else:
click.echo(json.dumps(item.get("index").get("status")))
except (requests.exceptions.Timeout, socket.timeout):
logger.error("[%s] Timeout received while pushing collected metrics to Elasticsearch" % (time.strftime("%Y-%m-%d %H:%M:%S")))


def with_type(o, _type):
Expand Down

0 comments on commit 1d93f89

Please sign in to comment.