-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathimport_apache_waf.py
124 lines (101 loc) · 3.92 KB
/
import_apache_waf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import subprocess
import logging
from pathlib import Path
import shutil
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# Constants (configurable via environment variables)
WAF_DIR = Path(os.getenv("WAF_DIR", "waf_patterns/apache")).resolve() # Source directory for WAF files
APACHE_WAF_DIR = Path(os.getenv("APACHE_WAF_DIR", "/etc/modsecurity.d/")).resolve() # Target directory
APACHE_CONF = Path(os.getenv("APACHE_CONF", "/etc/apache2/apache2.conf")).resolve() # Apache config file
INCLUDE_STATEMENT = "IncludeOptional /etc/modsecurity.d/*.conf" # Include directive
def copy_waf_files():
"""
Copy Apache WAF configuration files to the target directory.
Raises:
Exception: If there is an error copying files.
"""
logging.info("Copying Apache WAF patterns...")
try:
# Ensure the target directory exists
APACHE_WAF_DIR.mkdir(parents=True, exist_ok=True)
logging.info(f"[+] Created or verified directory: {APACHE_WAF_DIR}")
# Copy .conf files from source to target directory
for conf_file in WAF_DIR.glob("*.conf"):
dst_path = APACHE_WAF_DIR / conf_file.name
if dst_path.exists():
logging.warning(f"[!] File already exists: {dst_path}")
continue
try:
shutil.copy2(conf_file, dst_path)
logging.info(f"[+] Copied {conf_file} to {APACHE_WAF_DIR}")
except Exception as e:
logging.error(f"[!] Failed to copy {conf_file}: {e}")
raise
except Exception as e:
logging.error(f"[!] Error copying WAF files: {e}")
raise
def update_apache_conf():
"""
Ensure the WAF include statement is present in the Apache configuration file.
Raises:
Exception: If there is an error updating the Apache configuration.
"""
logging.info("Ensuring WAF patterns are included in apache2.conf...")
try:
# Read the current configuration
with open(APACHE_CONF, "r") as f:
config = f.read()
# Append include statement if not present
if INCLUDE_STATEMENT not in config:
logging.info("Adding WAF include to apache2.conf...")
with open(APACHE_CONF, "a") as f:
f.write(f"\n{INCLUDE_STATEMENT}\n")
logging.info("[+] WAF include statement added to apache2.conf.")
else:
logging.info("WAF patterns already included in apache2.conf.")
except Exception as e:
logging.error(f"[!] Error updating Apache configuration: {e}")
raise
def reload_apache():
"""
Reload Apache to apply the new WAF rules.
Raises:
Exception: If there is an error reloading Apache.
"""
logging.info("Reloading Apache to apply new WAF rules...")
try:
# Test Apache configuration
subprocess.run(["apachectl", "configtest"], check=True)
logging.info("[+] Apache configuration test passed.")
# Reload Apache
subprocess.run(["systemctl", "reload", "apache2"], check=True)
logging.info("[+] Apache reloaded successfully.")
except subprocess.CalledProcessError as e:
logging.error(f"[!] Apache configuration test failed: {e}")
raise
except FileNotFoundError:
logging.error("[!] 'apachectl' or 'systemctl' command not found. Are you on a supported system?")
raise
except Exception as e:
logging.error(f"[!] Error reloading Apache: {e}")
raise
def main():
"""
Main function to execute the script.
"""
try:
copy_waf_files()
update_apache_conf()
reload_apache()
logging.info("[✔] Apache configured with latest WAF rules.")
except Exception as e:
logging.critical(f"[!] Script failed: {e}")
exit(1)
if __name__ == "__main__":
main()