Skip to content

Commit

Permalink
Introduce new IP address attributes to remove intern_ip
Browse files Browse the repository at this point in the history
  • Loading branch information
TuxPowered42 committed Dec 15, 2023
1 parent f060aa7 commit fe0e76c
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 30 deletions.
2 changes: 1 addition & 1 deletion serveradmin/serverdb/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def clean(self):
# It makes no sense to add inet or supernet attributes to hosts of
# ip_addr_type null because they would have to be empty anyways.
inet_attribute = (
self.cleaned_data['attribute'].type in ('inet', 'supernet') and
self.cleaned_data['attribute'].type in ('inet', 'inet4', 'inet6', 'supernet') and
self.instance.servertype.ip_addr_type == 'null'
)
if inet_attribute:
Expand Down
53 changes: 53 additions & 0 deletions serveradmin/serverdb/migrations/0017_auto_20231206_0908.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Generated by Django 3.2.20 on 2023-12-06 15:08

from django.db import migrations, models
import django.db.models.deletion
import netfields.fields


class Migration(migrations.Migration):

dependencies = [
('serverdb', '0016_optional_servertype_for_relation'),
]

operations = [
migrations.AddField(
model_name='attribute',
name='supernet',
field=models.BooleanField(default=False),
),
migrations.AlterField(
model_name='attribute',
name='type',
field=models.CharField(choices=[('string', 'string'), ('boolean', 'boolean'), ('relation', 'relation'), ('reverse', 'reverse'), ('number', 'number'), ('inet', 'inet'), ('inet4', 'inet4'), ('inet6', 'inet6'), ('macaddr', 'macaddr'), ('date', 'date'), ('datetime', 'datetime'), ('supernet', 'supernet'), ('domain', 'domain')], max_length=32),
),
migrations.CreateModel(
name='ServerInet6Attribute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('value', netfields.fields.InetAddressField(max_length=39)),
('attribute', models.ForeignKey(db_index=False, limit_choices_to={'type': 'inet6'}, on_delete=django.db.models.deletion.CASCADE, to='serverdb.attribute')),
('server', models.ForeignKey(db_index=False, on_delete=django.db.models.deletion.CASCADE, to='serverdb.server')),
],
options={
'db_table': 'server_inet6_attribute',
'unique_together': {('server', 'attribute', 'value')},
'index_together': {('attribute', 'value')},
},
),
migrations.CreateModel(
name='ServerInet4Attribute',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('value', netfields.fields.InetAddressField(max_length=39)),
('attribute', models.ForeignKey(db_index=False, limit_choices_to={'type': 'inet4'}, on_delete=django.db.models.deletion.CASCADE, to='serverdb.attribute')),
('server', models.ForeignKey(db_index=False, on_delete=django.db.models.deletion.CASCADE, to='serverdb.server')),
],
options={
'db_table': 'server_inet4_attribute',
'unique_together': {('server', 'attribute', 'value')},
'index_together': {('attribute', 'value')},
},
),
]
221 changes: 218 additions & 3 deletions serveradmin/serverdb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
IPv6Interface,
ip_network,
IPv4Network,
IPv6Network,
IPv6Network, AddressValueError, NetmaskValueError,
)
from typing import Union

Expand All @@ -38,6 +38,8 @@
'reverse': str,
'number': lambda x: float(x) if '.' in str(x) else int(x),
'inet': lambda x: inet_to_python(x),
'inet4': lambda x: inet4_to_python(x),
'inet6': lambda x: inet6_to_python(x),
'macaddr': EUI,
'date': str,
'datetime': str,
Expand Down Expand Up @@ -77,6 +79,46 @@ def get_choices(types):
return zip(*([types] * 2))


def is_supernet_consistent(ip_address, server):
"""Check if requested IP address is consistent with supernets for all other IP address of a server."""

if ip_address.version == 4:
server_attribute_cls = ServerInet4Attribute
else:
server_attribute_cls = ServerInet6Attribute

# A server belongs to different supernets depending on network servertype
supernet_attributes = Attribute.objects.filter(
type='supernet',
servertype_attributes__servertype_id=server.servertype_id,
)
supernet_servertypes = [x.target_servertype for x in supernet_attributes]

for supernet_servertype in supernet_servertypes:
supernet_1, _, _ = server.get_supernet(supernet_servertype) # The result is guaranteed to be consistent

if supernet_1 is None:
continue

# Check the supernet of the new IP address
try:
supernet_2 = server_attribute_cls.objects.get(
value__net_contains_or_equals=ip_address,
server__servertype__ip_addr_type='network',
server__servertype_id=supernet_servertype,
).server
except server_attribute_cls.DoesNotExist:
# There are some provider networks which are valid only for one address family.
# TODO: Enforce validation once the "local" network is gone.
continue

if supernet_1 != supernet_2:
raise ValidationError(
f'Non-matching {supernet_servertype} {supernet_2} for IP address {ip_address}, '
f'other IP addresses of {server.hostname} are in {supernet_1}'
)


# TODO: Make validators out of the methods is_ip_address, is_unique and
# is_network and attach them to the model fields validators.
def is_ip_address(ip_interface: Union[IPv4Interface, IPv6Interface]) -> None:
Expand Down Expand Up @@ -106,16 +148,30 @@ def is_unique_ip(ip_interface: Union[IPv4Interface, IPv6Interface],
:return:
"""

if ip_interface.version == 4:
server_attribute_cls = ServerInet4Attribute
else:
server_attribute_cls = ServerInet6Attribute

# We avoid querying the duplicate hosts here and giving the user
# detailed information because checking with exists is cheaper than
# querying the server and this is a validation and should be fast.
has_duplicates = (
# TODO: Remove "intern_ip" support.
Server.objects.filter(intern_ip=ip_interface).exclude(
Q(servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
# TODO: Remove "primary_ip6" support.
ServerInetAttribute.objects.filter(value=ip_interface).exclude(
server__servertype__ip_addr_type='network').exists())
Q(server__servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists() or
server_attribute_cls.objects.filter(value=ip_interface).exclude(
Q(server__servertype__ip_addr_type='network') |
Q(server_id=object_id)
).exists()
)
if has_duplicates:
raise ValidationError(
'An object with {0} already exists'.format(str(ip_interface)))
Expand Down Expand Up @@ -151,6 +207,20 @@ def inet_to_python(obj: object) -> Union[IPv4Interface, IPv6Interface]:
except ValueError as error:
raise ValidationError(str(error))

# WARNING: called only for edit->commit, not for commit!
def inet4_to_python(obj: object) -> IPv4Interface:
try:
return IPv4Interface(obj)
except (AddressValueError, NetmaskValueError):
raise ValidationError(f'{obj} does not appear to be an IPv4 interface')


def inet6_to_python(obj: object) -> IPv6Interface:
try:
return IPv6Interface(obj)
except (AddressValueError, NetmaskValueError):
raise ValidationError(f'{obj} does not appear to be an IPv6 interface')


def network_overlaps(ip_interface: Union[IPv4Interface, IPv6Interface],
servertype_id: str, object_id: int) -> None:
Expand Down Expand Up @@ -226,6 +296,7 @@ def __init__(self, *args, **kwargs):
max_length=32,
choices=get_choices(ATTRIBUTE_TYPES.keys()),
)
supernet = models.BooleanField(null=False, default=False)
multi = models.BooleanField(null=False, default=False)
hovertext = models.TextField(null=False, blank=True, default='')
group = models.CharField(
Expand Down Expand Up @@ -438,10 +509,57 @@ def __str__(self):
return self.hostname

def get_supernet(self, servertype):
return Server.objects.get(
"""Get a supernet of given servertype for the current server.
This function will check all IP addresses of a server which have the "supernet" feature enabled.
If data is inconsistent, an exception is raised.
No matching network for just some of the addresses does not mean inconsistency.
"""

supernet_1 = None
supernet_ip_address = None
supernet_attribute = None

# TODO: Remove "intern_ip" support. Just remove this block of code below.
supernet_1 = Server.objects.get(
servertype=servertype,
# It should probably match on ip_addr_type too, but we will remove this soon anyway.
intern_ip__net_contains_or_equals=self.intern_ip,
)
supernet_ip_address = self.intern_ip
supernet_attribute = None # Magic value for intern_ip

for server_attribute_cls in (ServerInet4Attribute, ServerInet6Attribute):
ip_addresses = server_attribute_cls.objects.filter(
server_id=self.server_id,
attribute__supernet=True,
)

# TODO: How to net_contains_or_equals for iterable?
for ip_address in ip_addresses:
try:
attr = server_attribute_cls.objects.get(
value__net_contains_or_equals=ip_address.value,
server__servertype__ip_addr_type='network',
server__servertype_id=servertype.servertype_id,
)
if not attr.attribute.supernet:
raise ValidationError(f'Not a supernet: {servertype}!')
supernet_2 = attr.server
# TODO: Shouldn't we check that the requested servertype really point
except server_attribute_cls.DoesNotExist:
continue
else:
# Always trust the 1st found network
if supernet_1 is None:
supernet_1 = supernet_2
supernet_ip_address = ip_address.value
supernet_attribute = ip_addresses.attribute
# Verify that all found networks match the 1st found one.
elif supernet_1 != supernet_2:
raise ValidationError(f'Can\'t determine {servertype} for {self.hostname}!')

return (supernet_1, supernet_ip_address, supernet_attribute)

def clean(self):
super(Server, self).clean()
Expand Down Expand Up @@ -474,6 +592,9 @@ def clean(self):
network_overlaps(self.intern_ip, self.servertype.servertype_id,
self.server_id)

if ip_addr_type != 'null':
is_supernet_consistent(self.intern_ip, self.server)

def get_attributes(self, attribute):
model = ServerAttribute.get_model(attribute.type)
return model.objects.filter(server=self, attribute=attribute)
Expand Down Expand Up @@ -520,6 +641,10 @@ def get_model(attribute_type):
return ServerNumberAttribute
if attribute_type == 'inet':
return ServerInetAttribute
if attribute_type == 'inet4':
return ServerInet4Attribute
if attribute_type == 'inet6':
return ServerInet6Attribute
if attribute_type == 'macaddr':
return ServerMACAddressAttribute
if attribute_type == 'date':
Expand Down Expand Up @@ -695,6 +820,96 @@ def clean(self):
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

is_supernet_consistent(self.value, self.server)


class ServerInet4Attribute(ServerAttribute):
attribute = models.ForeignKey(
Attribute,
db_index=False,
on_delete=models.CASCADE,
limit_choices_to=dict(type='inet4'),
)
value = netfields.InetAddressField()

class Meta:
app_label = 'serverdb'
db_table = 'server_inet4_attribute'
unique_together = [['server', 'attribute', 'value']]
index_together = [['attribute', 'value']]

def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) != IPv4Interface:
self.value = inet4_to_python(self.value)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type

if ip_addr_type == 'null':
# A Servertype with ip_addr_type "null" and attributes of type
# inet must be denied per configuration. This is just a safety net
# in case e.g. somebody creates them programmatically.
raise ValidationError(
_('%(attribute_id)s must be null'), code='invalid value',
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
is_network(self.value)
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

is_supernet_consistent(self.value, self.server)


class ServerInet6Attribute(ServerAttribute):
attribute = models.ForeignKey(
Attribute,
db_index=False,
on_delete=models.CASCADE,
limit_choices_to=dict(type='inet6'),
)
value = netfields.InetAddressField()

class Meta:
app_label = 'serverdb'
db_table = 'server_inet6_attribute'
unique_together = [['server', 'attribute', 'value']]
index_together = [['attribute', 'value']]

def clean(self):
super(ServerAttribute, self).clean()

if type(self.value) != IPv6Interface:
self.value = inet6_to_python(self.value)

# Get the ip_addr_type of the servertype
ip_addr_type = self.server.servertype.ip_addr_type

if ip_addr_type == 'null':
# A Servertype with ip_addr_type "null" and attributes of type
# inet must be denied per configuration. This is just a safety net
# in case e.g. somebody creates them programmatically.
raise ValidationError(
_('%(attribute_id)s must be null'), code='invalid value',
params={'attribute_id': self.attribute_id})
elif ip_addr_type == 'host':
is_ip_address(self.value)
is_unique_ip(self.value, self.server.server_id)
elif ip_addr_type == 'loadbalancer':
is_ip_address(self.value)
elif ip_addr_type == 'network':
is_network(self.value)
network_overlaps(self.value, self.server.servertype_id,
self.server.server_id)

is_supernet_consistent(self.value, self.server)


class ServerMACAddressAttribute(ServerAttribute):
attribute = models.ForeignKey(
Expand Down

0 comments on commit fe0e76c

Please sign in to comment.