-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathipset.py
126 lines (89 loc) · 3.2 KB
/
ipset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# coding: utf-8
import ipaddress
from .shell_commander import ShellCommander
class IpsetError(Exception):
pass
class IpsetNoRights(Exception):
pass
class IpsetSetNotFound(Exception):
pass
class IpsetAlreadyInSet(Exception):
pass
class Ipset(ShellCommander):
"""
"""
CMD = 'ipset'
def __init__(self):
"""
"""
super().__init__()
async def add(self, setname, ip, timeout=0):
"""
Adds the given IP address to the given ipset.
If a timeout is given, the IP will stay in the ipset for
the given duration. Else it's added forever.
The resulting command looks like this:
``ipset add -exist ellis_blacklist4 192.0.2.10 timeout 14400``
"""
args = ['add', '-exist', setname, ip, 'timeout', timeout]
return await self.start(__class__.CMD, args)
async def list(self, setname=None):
"""
Lists the existing ipsets.
If setname is given, only lists this ipset.
The resulting command looks like one of the following:
* ``ipset list``
* ``ipset list ellis_blacklist4``
"""
args = ['list']
if setname is not None:
args.append(setname)
return await self.start(__class__.CMD, args)
def chose_blacklist(self, ip):
"""
Given an IP address, figure out the ipset we have to use.
If the address is an IPv4, we have to use *ellis_blacklist4*.
If the address is an IPv6, we have to use *ellis_blacklist6*.
Raises ipaddress.AddressValueError if the address is neither
an IPv4 nor an IPv6.
"""
blacklist = 'ellis_blacklist{0}'
try:
address = ipaddress.ip_address(ip)
except ipaddress.AddressValueError:
raise
else:
if address.version is 6:
# We don't ban private IPv6:
if address.is_private:
msg = "We don't ban private addresses ({0} given)." \
.format(address)
raise ipaddress.AddressValueError(msg)
else:
# Do we have an embedded IPv4 ?
if address.ipv4_mapped is not None:
address = address.ipv4_mapped
elif address.sixtofour is not None:
address = address.sixtofour
blacklist = blacklist.format(address.version)
return (address, blacklist)
def handle_error(self, err):
"""
"""
msg = err.decode()
if "Kernel error received: Operation not permitted" in msg:
raise IpsetNoRights(msg)
elif "The set with the given name does not exist" in msg:
raise IpsetSetNotFound(msg)
elif "Element cannot be added to the set: it's already added" in msg:
raise IpsetAlreadyInSet(msg)
else:
raise IpsetError(msg)
async def ban(ip, timeout=0):
"""
"""
ipset = Ipset()
address, ipset_name = ipset.chose_blacklist(ip)
print("Adding {0} to {1}".format(address, ipset_name))
return await ipset.add(ipset_name, address, timeout)