-
Notifications
You must be signed in to change notification settings - Fork 13
/
parse.py
382 lines (342 loc) Β· 15.5 KB
/
parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
parses the source html for each group where a parser exists & contributed to the post dictionary
always remember..... https://stackoverflow.com/questions/1732348/regex-match-open-tags-except-xhtml-self-contained-tags/1732454#1732454
'''
import os, hashlib
import json,re, html, time, requests, random
from sys import platform
from datetime import datetime
from bs4 import BeautifulSoup # type: ignore
from sharedutils import openjson
from sharedutils import runshellcmd
# from sharedutils import todiscord, totwitter, toteams
from sharedutils import toMastodon, toPushover, tobluesky, tomattermost
from sharedutils import stdlog, dbglog, errlog # , honk
# For screenshot
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
# For watermark on screenshot
from PIL import Image
from PIL import ImageDraw
from PIL import ImageEnhance
from PIL.PngImagePlugin import PngInfo
from datetime import datetime
# For Notification
import http.client, urllib
from dotenv import load_dotenv
## Detect ID
import cv2
# Load the pre-trained Haar Cascade classifier for face detection (as an illustrative example)
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# mail
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders
# on macOS we use 'grep -oE' over 'grep -oP'
if platform == 'darwin':
fancygrep = 'ggrep -oP'
else:
fancygrep = 'grep -oP'
def add_watermark(image_path, watermark_image_path='./docs/ransomwarelive.png'):
"""
Adds a watermark image (with 50% transparency) to the center of the input image and overwrites it.
:param image_path: Path to the image to be watermarked.
:param watermark_image_path: Path to the watermark image.
:return: None
"""
Image.MAX_IMAGE_PIXELS = None
# Open the image to be watermarked
stdlog('open image ' + image_path)
original = Image.open(image_path)
if original.mode != 'RGBA':
original = original.convert('RGBA')
# Open the watermark image
watermark = Image.open(watermark_image_path)
if watermark.mode != 'RGBA':
watermark = watermark.convert('RGBA')
# Adjust opacity of watermark
transparent = Image.new('RGBA', watermark.size, (255, 255, 255, 0))
for x in range(watermark.width):
for y in range(watermark.height):
r, g, b, a = watermark.getpixel((x, y))
transparent.putpixel((x, y), (r, g, b, int(a * 0.1)))
watermark = transparent
# Position watermark in the center
x = (original.width - watermark.width) // 2
y = (original.height - watermark.height) // 2
# Overlay the watermark onto the screenshot
original.paste(watermark, (x, y), watermark)
# Overwrite the original screenshot
stdlog('save watermaked image ' + image_path)
original.save(image_path, 'PNG')
def posttemplate(victim, group_name, timestamp,description,website,published,post_url,country):
'''
assuming we have a new post - form the template we will use for the new entry in posts.json
'''
schema = {
'post_title': victim,
'group_name': group_name,
'discovered': timestamp,
'description': description,
'website': website,
'published' : published,
'post_url' : post_url,
'country' : country
}
dbglog(schema)
return schema
def send_email(subject, body, to_email, attachment_path=None):
# Set up the SMTP server
smtp_server = 'localhost'
smtp_port = 25
smtp_tls = False
smtp_username = ''
smtp_password = ''
# Create the MIME object
msg = MIMEMultipart()
msg['From'] = "[email protected]"
msg['To'] = to_email
msg['Subject'] = subject
# Attach the body text
msg.attach(MIMEText(body, 'plain'))
# Attach the image if specified
if attachment_path:
attachment = open(attachment_path, 'rb')
part = MIMEBase('application', 'octet-stream')
part.set_payload((attachment).read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', "attachment; filename= %s" % attachment_path)
msg.attach(part)
# Connect to the SMTP server
server = smtplib.SMTP(smtp_server, smtp_port)
if smtp_tls:
server.starttls()
server.login(smtp_username, smtp_password)
# Send the email
server.sendmail(smtp_username, to_email, msg.as_string())
# Quit the server
server.quit()
stdlog('Mail sent')
# Function to check a single image for an ID-like object
def check_image_for_id(image_path):
# Attempt to load the image
image = cv2.imread(image_path)
# Check if the image was loaded successfully
if image is None:
print(f"Error loading image {image_path}. Check if the file exists and is a valid image.")
return False # Indicate that the image could not be processed
# Convert the image to grayscale
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Detect objects in the image (using face detection as an example)
objects = face_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
# Check if any objects were detected
if len(objects) > 0:
return True # Indicate that an ID-like object was detected
else:
return False # Indicate that no ID-like objects were detected
def screenshot(webpage,fqdn,delay=15000,output=None):
stdlog('webshot: {}'.format(webpage))
if output is None:
name = 'docs/screenshots/' + fqdn.replace('.', '-') + '.png'
stdlog("Mode : blog")
else:
stdlog('Post Screenshot --> ' + output)
name = 'docs/screenshots/posts/' + output + '.png'
stdlog("Mode : post")
#try:
with sync_playwright() as play:
try:
tor_prefixes = ["http://stniiomy", "http://noescape", "http://medusa", "http://cactus", "http://hl666"]
if any(webpage.startswith(prefix) for prefix in tor_prefixes):
browser = play.firefox.launch(proxy={"server": "socks5://127.0.0.1:9050"},
args=[''])
#args=['--unsafely-treat-insecure-origin-as-secure='+host['slug']])
stdlog('(!) exception')
elif webpage.startswith("https://ransomed.vc/"):
browser = play.firefox.launch()
stdlog('(!) not via tor')
elif webpage.startswith("https://handala"):
browser = play.firefox.launch()
stdlog('(!) not via tor')
elif webpage.startswith("https://t.me/"):
browser = play.firefox.launch()
stdlog('(!) not via tor')
elif webpage.startswith("https://dispossessor"):
browser = play.firefox.launch()
stdlog('(!) not via tor')
elif webpage.startswith("http://knight"):
browser = play.chromium.launch(proxy={"server": "socks5://127.0.0.1:9050"},
args=["--headless=new"])
stdlog('(!) exception')
else:
browser = play.chromium.launch(proxy={"server": "socks5://127.0.0.1:9050"},
args=[''])
#args=['--unsafely-treat-insecure-origin-as-secure='+host['slug']])
context = browser.new_context(ignore_https_errors= True )
Image.MAX_IMAGE_PIXELS = None
page = context.new_page()
page.goto(webpage, wait_until='load', timeout = 120000)
page.bring_to_front()
page.wait_for_timeout(delay)
page.mouse.move(x=500, y=400)
page.wait_for_load_state('networkidle')
page.mouse.wheel(delta_y=2000, delta_x=0)
page.wait_for_load_state('networkidle')
page.wait_for_timeout(5000)
page.screenshot(path=name, full_page=True)
image = Image.open(name)
metadata = PngInfo()
metadata.add_text("Source", "Ransomware.live")
metadata.add_text("Copyright", "Ransomware.live")
metadata.add_text("Description",webpage)
metadata.add_text("Author","Julien Mousqueton")
# Get current date and time
current_datetime = datetime.now()
# Format it in ISO format
iso_formatted = current_datetime.isoformat()
current_date = current_datetime.strftime('%Y:%m:%d %H:%M:%S')
metadata.add_text("Creation Time",current_date)
draw = ImageDraw.Draw(image)
draw.text((10, 10), iso_formatted, fill=(0, 0, 0))
#draw.text((10, 10), "https://www.ransomware.live", fill=(0, 0, 0))
image.save(name, pnginfo=metadata)
if check_image_for_id(name):
body="A new screenshot must be analysed : \n\n https://www.ransomware.live/screenshots/posts/"+os.path.basename(name)
send_email("[Action Required] Check this screenshot for any ID",body, "[email protected]",name)
add_watermark(name)
except PlaywrightTimeoutError:
stdlog('Timeout!')
except Exception as exception:
errlog(exception)
browser.close()
#except:
# stdlog('Impossible to webshot {}'.format(webpage))
def existingpost(post_title, group_name):
'''
check if a post already exists in posts.json
'''
posts = openjson('posts.json')
# posts = openjson('posts.json')
for post in posts:
if post['post_title'].lower() == post_title.lower() and post['group_name'] == group_name:
#dbglog('post already exists: ' + post_title)
return True
dbglog('post does not exist: ' + post_title)
return False
def gettitlefromURL(website_url):
if not website_url.startswith("www"):
website_url = "www." + website_url
# check if the post_title starts with "http" or "https"
if not website_url.startswith("http"):
website_url = "https://" + website_url
# retrieve the title of the website from its URL
try:
with open("assets/useragents.txt", "r") as f:
user_agents = f.readlines()
# Strip newlines from the user agents
user_agents = [ua.strip() for ua in user_agents]
# Pick a random user agent
headers = {'User-Agent': random.choice(user_agents)}
# Make the request
page = requests.get(website_url, headers=headers, timeout=10)
# page = requests.get(website_url,timeout=10)
soup = BeautifulSoup(page.content, 'html.parser')
website_title = soup.find('title').get_text()
website_title = re.sub(r'[\r\n\t]', '', website_title).replace('|', '-')
# add the title of the website as the description
description = website_title
except requests.exceptions.Timeout:
stdlog('Website did not respond, timeout')
description = ""
except:
stdlog('Website did not respond')
description = ""
return description
def replace_http_slash(text):
# Use regular expression to replace http:/
text = re.sub(r'http:/([^/])', r'http://\1', text)
# Use regular expression to replace https:/
text = re.sub(r'https:/([^/])', r'https://\1', text)
return text
def appender(post_title, group_name, description="", website="", published="", post_url="", country=""):
'''
append a new post to posts.json
'''
if len(post_title) == 0:
stdlog('post_title is empty')
return
# Check exclusion
with open('exceptions.txt', 'r') as f:
# Read the contents of the file
exceptions = f.read()
if post_title in exceptions:
stdlog('(!) '+ post_title + ' is in exceptions')
return
# limit length of post_title to 90 chars
#country=''
if len(post_title) > 90:
post_title = post_title[:90]
post_title=html.unescape(post_title)
if existingpost(post_title, group_name) is False:
#print('==> ' + post_title)
posts = openjson('posts.json')
if description == "_URL_":
description = gettitlefromURL(post_title)
print(post_title)
# if not post_title.lower.startswith("www"):
website = "www." + post_title
website = "https://" + website
if published == "":
published = str(datetime.today())
newpost = posttemplate(post_title, group_name, str(datetime.today()),description,replace_http_slash(website),published,post_url,country)
stdlog('adding new post - ' + 'group:' + group_name + ' title:' + post_title)
posts.append(newpost)
with open('posts.json', 'w', encoding='utf-8') as outfile:
'''
use ensure_ascii to mandate utf-8 in the case the post contains cyrillic π·πΊ
https://pynative.com/python-json-encode-unicode-and-non-ascii-characters-as-is/
'''
dbglog('writing changes to posts.json')
json.dump(posts, outfile, indent=4, ensure_ascii=False)
load_dotenv()
# if socials are set try post
#if os.environ.get('DISCORD_WEBHOOK_1') is not None:
# todiscord(newpost['post_title'], newpost['group_name'], os.environ.get('DISCORD_WEBHOOK_1'))
#if os.environ.get('DISCORD_WEBHOOK_2') is not None:
# todiscord(newpost['post_title'], newpost['group_name'], os.environ.get('DISCORD_WEBHOOK_2'))
#if os.environ.get('TWITTER_ACCESS_TOKEN') is not None:
# totwitter(newpost['post_title'], newpost['group_name'])
#if os.environ.get('MS_TEAMS_WEBHOOK') is not None:
# toteams(newpost['post_title'], newpost['group_name'])
# Mastodon notification
if os.environ.get('MASTODON_TOKEN') is not None:
toMastodon(post_title,group_name)
print("")
# Pushover notification
if os.environ.get('PUSH_API') is not None:
toPushover(post_title, group_name)
pass
if os.environ.get('BLUESKY_APP_PASSWORD') is not None:
tobluesky(post_title, group_name)
if os.environ.get('MATTERMOST_WEBHOOK') is not None:
tomattermost(post_title, group_name)
### Post screenshot
if post_url !="":
hash_object = hashlib.md5()
hash_object.update(post_url.encode('utf-8'))
hex_digest = hash_object.hexdigest()
screenshot(post_url,None,15000,hex_digest)
### Screenshot blog
#groups = openjson('groups.json')
#for group in groups:
# if group["name"] == group_name:
# for webpage in group['locations']:
# delay = webpage['delay']*1000 if ( 'delay' in webpage and webpage['delay'] is not None ) \
# else 15000
# screenshot('http://'+webpage['fqdn'],webpage['fqdn'],delay)
#else:
# stdlog(post_title + ' already exists')