-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathanalyze_sensors.py
executable file
·168 lines (138 loc) · 5.35 KB
/
analyze_sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
##
# Copyright 2019 Mentor Graphics
# SPDX-License-Identifier: Apache-2.0
import json, time, sys, base64
from pprint import pprint as pp
from time import sleep
import getpass
import getopt
import tempfile
import os
import tanrest
import subprocess
config = tanrest.config()
failcount=0
warncount=0
analyze = {
'UnixShell': {
'command': '/usr/bin/shellcheck',
'arguments': config.get('package','shellcheck_params') + ' <%file>',
'hashbang': 'sh',
'suffix': '.sh'
},
'Powershell': {
'command': 'powershell.exe -command',
'arguments': "Invoke-ScriptAnalyzer -Path <%file>",
'hashbang': False,
'suffix': '.ps1'
},
'Python': {
'command': '/usr/bin/pylint',
'arguments': '<%file>',
'hashbang': 'python',
'suffix': '.py'
}
}
def usage():
print("""
Usage:
analyze.py [options]
Description:
Runs static analysis against tanium sensor content.
Options:
-h, --help display this help and exit
-t, --type [required] the type of script
-d, --debug turn on debugging
""")
def fail(sensor,script,output):
global failcount
if failcount==0:
print("\n" + sensor["name"] + ' (' + script["platform"] + " - " + script["script_type"] + ') has failed static analysis.')
print(" > " + output)
failcount+=1
def warning(sensor,script,output):
global warncount
f = open("analyze_sensor_" + script["script_type"] + "_warnings.log", "a")
f.write("\n" + sensor["name"] + ' (' + script["platform"] + " - " + script["script_type"] + ") has warnings.\n")
f.write(output)
f.close
warncount+=1
def main(argv):
global failcount
global warncount
try:
opts, args = getopt.getopt(argv,"d:ht:",["debug:","help","type="])
except getopt.GetoptError:
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
sys.exit(2)
if opt in ('-t', '--type'):
script_type = arg
if opt in ('d', '--debug'):
loglevel = arg
if not os.path.exists("updated_sensors.txt"):
print('no sensor updates to analyze (missing updated_sensor.txt).')
sys.exit(0)
with open ("updated_sensors.txt", "r") as updated_sensors:
sensornames=updated_sensors.readlines()
##
# load the JSON object.
for sensorname in sensornames:
with open('sensor/' + sensorname.strip() + '.json') as json_data:
sensor = json.load(json_data)
json_data.close()
#pp(sensor)
for script in sensor["queries"]:
#pp(script)
if script["script_type"] == script_type:
if script_type not in analyze:
output = "WARNING: No static analysis available for " + script_type
warning(sensor,script,output)
continue
output=""
f = tempfile.NamedTemporaryFile(mode='w',delete=False,suffix=analyze[script_type]["suffix"])
f.write(script["script"])
f.flush()
f.close()
if analyze[script_type]["hashbang"]:
if analyze[script_type]["hashbang"] not in script["script"].split("\n")[0]:
fail(sensor,script,'Bad hashbang for ' + script_type + ': ' + script["script"].split("\n")[0] + "\n")
command = analyze[script_type]["command"] + " " + analyze[script_type]["arguments"]
command = command.replace('<%file>', f.name)
analysis = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if script_type == 'UnixShell':
if analysis.wait() != 0:
os.remove(f.name)
fail(sensor,script,output + "\n" + analysis.communicate()[0].decode())
elif script_type == 'Powershell':
## Temporarily remove the waiting for child process and instead just sleep for 30 seconds.
## not optimal solution, but suspect MS Defender is preventing child threads from cleaning
## up correctly and causes wait to hang indefinitely.
# analysis.wait()
sleep(30)
output = analysis.communicate()[0].decode()
os.remove(f.name)
if 'Error' in output:
fail(sensor,script,output)
if 'Warning' in output:
warning(sensor,script,output)
elif script_type == 'Python':
analysis.wait()
output+=analysis.communicate()[0].decode()
if '\nE' in output or '\nF' in output:
fail(sensor,script,output)
if '\nW' in output or '\nC' in output or '\nR' in output:
warning(sensor,script,output)
else:
output = "WARNING: No static analysis available for " + script_type
warning(sensor,script,output)
if failcount > 0:
sys.exit(failcount)
if __name__ == "__main__":
main(sys.argv[1:])
if failcount > 0:
sys.exit(1)