-
Notifications
You must be signed in to change notification settings - Fork 0
/
sokqlovia.py
167 lines (140 loc) · 6.02 KB
/
sokqlovia.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import azure.mgmt.resourcegraph as arg
import json
import math
from pandas import json_normalize
from tqdm import tqdm
from azure.mgmt.resource import SubscriptionClient
from azure.identity import AzureCliCredential
from colorama import init, Fore, Back, Style
import pyfiglet
from argparse import ArgumentParser
from prettytable import PrettyTable
init(autoreset=True)
outputfilename="sokqlovia_output.csv"
def getresources( strQuery ):
# Get your credentials from Azure CLI (development only!) and get your subscription list
print("\nLaunching Query")
credential = AzureCliCredential()
subsClient = SubscriptionClient(credential)
subsRaw = []
for sub in subsClient.subscriptions.list():
subsRaw.append(sub.as_dict())
subsList = []
for sub in subsRaw:
subsList.append(sub.get('subscription_id'))
# Create Azure Resource Graph client and set options
argClient = arg.ResourceGraphClient(credential)
argQueryOptions = arg.models.QueryRequestOptions(result_format="objectArray")
# Create query
argQuery = arg.models.QueryRequest(subscriptions=subsList, query=strQuery, options=argQueryOptions)
# Run query
argResults = argClient.resources(argQuery)
print("Results Found: " + str(argResults.total_records))
print("Return Count: " + str(argResults.count))
print("Results Trucated:" + str(argResults.result_truncated))
print("Skip Token: " + str(argResults.skip_token))
query_num = (math.ceil(argResults.total_records/argResults.count))
print("Queries Needed: " + str(query_num))
orderstring = "order by"
if orderstring in strQuery or query_num==1:
sf = 1
else:
if input((Style.BRIGHT + Back.YELLOW + Fore.RED + "[WARNING] Query will be executed in multiple parts. 'order by' operator not found in query. Recommend terminating script (Y/N)")) == "y":
exit()
print("###################################\n")
# pandas test
json_a = json.dumps(argResults.data)
json_b = json.loads(json_a)
df_full = json_normalize(json_b)
d = 1
#print(d)
newval=1000
while argResults.skip_token:
for d in tqdm(range(query_num)):
argQueryOptions = arg.models.QueryRequestOptions(result_format="objectArray", skip_token=argResults.skip_token, skip=newval)
argQuery = arg.models.QueryRequest(subscriptions=subsList, query=strQuery, options=argQueryOptions)
argResults = argClient.resources(argQuery)
json_a = json.dumps(argResults.data)
json_b = json.loads(json_a)
df = json_normalize(json_b)
df_full = df_full.append(df)
d += 1
newval+= 1000
#print(d)
print("Data Preview")
print(df_full)
df_full.to_csv(outputfilename, index=False)
def compulsary_ascii():
ascii_banner = pyfiglet.figlet_format("Sokqlovia")
print(ascii_banner)
def testquery():
getresources(
"Resources | where type =~ 'Microsoft.Compute/virtualMachines' | project id, name, location, resourceGroup, properties.storageProfile.osDisk.osType | order by id asc")
def display_stored():
x = PrettyTable()
x.field_names = ["QueryID","QueryName","ShortDescription","PositionalArg","KQL"]
f = open("stored_queries.json")
data = json.load(f)
for i in data['queries']:
x.add_row([i['QueryID'], i['QueryName'], i['ShortDescription'],i['PositionalArg'], i['KQL']])
# #print(i)
f.close()
print(x)
def savedqueryexec(id,param):
f = open("stored_queries.json")
data = json.load(f)
curr = 0
found = 0
for i in data['queries']:
if i['QueryID'] == id:
found = 1
#i['QueryID'], i['QueryName'], i['ShortDescription'],i['PositionalArg'], i['KQL']
# if it has a positional argument - need to check if one was provided
if i['PositionalArg'] == "Y":
if param == "zttz":
print(Style.BRIGHT + Back.YELLOW + Fore.RED + "[Error] Positional Argument Expected with -p")
exit()
else:
#replace holder with arg <sokqlovia>
query = i['KQL']
print(query)
query = query.replace("<sokqlovia>",param)
print(query)
getresources(query)
# since no positional arg needed - kick of normal query
else:
getresources(i['KQL'])
else:
if (curr+1) == len(i) and found=="0":
print(Style.BRIGHT + Back.YELLOW + Fore.RED + "[Error] Unable to find matching template (run -d)")
curr += 1
def argparsing():
compulsary_ascii()
parser = ArgumentParser()
parser.add_argument("-x", help="Demo Query. i.e. outputs results of query saved in testquery()",
action="store_true",required=False)
parser.add_argument("-o", dest="outfile", help="Override default output file/path. Default is sokqlovia_output.csv in cwd",
required=False)
parser.add_argument("-c", dest="custom_query", help="Quote encapsulated KQL query to be executed", required=False)
parser.add_argument("-d", help="Displays List of Pre-define templates for use with -t flag",
action="store_true", required=False)
parser.add_argument("-s", dest="saved_query", help="Select Saved Query by ID, provide param -p for positional argument", required=False)
parser.add_argument("-p", help="Parameter needed. Used in conjuction with -S", required=False)
args = parser.parse_args()
if args.outfile:
outputfilename = args.outfile
if args.x:
testquery()
if args.custom_query:
getresources(args.custom_query)
if args.saved_query:
if args.p:
tmp = (args.p)
else:
tmp = "zttz"
savedqueryexec(args.saved_query,tmp)
if args.d:
print("\n Typical Usage: soklovia.py -s <QueryID> -p <positional_arg>\n")
display_stored()
if __name__ == '__main__':
argparsing()