1
- import hashlib
2
1
import logging
3
2
import os
4
- from base64 import b64decode
5
3
from typing import Any , Dict , List , Optional , Tuple
6
4
from urllib .parse import urlparse
7
5
8
6
import pydantic
9
- from cryptography . exceptions import InvalidSignature
7
+ from joserfc . rfc7515 . model import CompactSignature
10
8
11
9
from irrd .conf import get_setting
12
10
from irrd .mirroring .nrtm4 .jsonseq import jsonseq_decode
29
27
NRTM4ClientDatabaseStatus ,
30
28
)
31
29
from irrd .storage .queries import DatabaseStatusQuery
32
- from irrd .utils .crypto import ed25519_public_key_from_str
30
+ from irrd .utils .crypto import eckey_from_config , eckey_from_str , jws_deserialize
33
31
from irrd .utils .misc import format_pydantic_errors
34
32
35
33
logger = logging .getLogger (__name__ )
@@ -107,7 +105,7 @@ def _run_client(self) -> bool:
107
105
)
108
106
return has_loaded_snapshot
109
107
110
- def _retrieve_unf (self ) -> Tuple [NRTM4UpdateNotificationFile , str ]:
108
+ def _retrieve_unf (self ) -> Tuple [NRTM4UpdateNotificationFile , Optional [ str ] ]:
111
109
"""
112
110
Retrieve, verify and parse the Update Notification File.
113
111
Returns the UNF object and the used key in base64 string.
@@ -116,24 +114,17 @@ def _retrieve_unf(self) -> Tuple[NRTM4UpdateNotificationFile, str]:
116
114
if not notification_file_url : # pragma: no cover
117
115
raise RuntimeError ("NRTM4 client called for a source without a Update Notification File URL" )
118
116
119
- unf_content , _ = retrieve_file (notification_file_url , return_contents = True )
120
- unf_hash = hashlib .sha256 (unf_content .encode ("ascii" )).hexdigest ()
121
- sig_url = notification_file_url .replace (
122
- "update-notification-file.json" , f"update-notification-file-signature-{ unf_hash } .sig"
123
- )
124
- legacy_sig_url = notification_file_url + ".sig"
117
+ unf_signed , _ = retrieve_file (notification_file_url , return_contents = True )
125
118
if "nrtm.db.ripe.net" in notification_file_url : # pragma: no cover
126
- logger . warning (
127
- f"Downloading signature from legacy url { legacy_sig_url } instead of expected { sig_url } "
128
- )
129
- signature , _ = retrieve_file ( legacy_sig_url , return_contents = True )
119
+ # When removing this, also remove Optional[] from return type
120
+ logger . warning ( "Expecting raw UNF as source is RIPE*, signature not checked" )
121
+ unf_payload = unf_signed . encode ( "ascii" )
122
+ used_key = None
130
123
else :
131
- signature , _ = retrieve_file (sig_url , return_contents = True )
132
-
133
- used_key = self ._validate_unf_signature (unf_content , signature )
124
+ unf_payload , used_key = self ._deserialize_unf (unf_signed )
134
125
135
126
unf = NRTM4UpdateNotificationFile .model_validate_json (
136
- unf_content ,
127
+ unf_payload ,
137
128
context = {
138
129
"update_notification_file_scheme" : urlparse (notification_file_url ).scheme ,
139
130
"expected_values" : {
@@ -143,61 +134,64 @@ def _retrieve_unf(self) -> Tuple[NRTM4UpdateNotificationFile, str]:
143
134
)
144
135
return unf , used_key
145
136
146
- def _validate_unf_signature (self , unf_content : str , signature_b64 : str ) -> str :
137
+ def _deserialize_unf (self , unf_content : str ) -> Tuple [ bytes , str ] :
147
138
"""
148
139
Verify the Update Notification File signature,
149
- given the content (before JSON parsing) and a base64 signature .
150
- Returns the used key in base64 string.
140
+ given the content (before JWS deserialize) .
141
+ Returns the deserialized payload and used key in PEM string.
151
142
"""
143
+ compact_signature : Optional [CompactSignature ]
152
144
unf_content_bytes = unf_content .encode ("utf-8" )
153
- signature = b64decode (signature_b64 , validate = True )
154
145
config_key = get_setting (f"sources.{ self .source } .nrtm4_client_initial_public_key" )
155
146
156
147
if self .last_status .current_key :
157
- keys = [
148
+ keys_pem = [
158
149
self .last_status .current_key ,
159
150
self .last_status .next_key ,
160
151
]
161
152
else :
162
- keys = [get_setting (f"sources.{ self .source } .nrtm4_client_initial_public_key" )]
163
-
164
- for key in keys :
165
- if key and self ._validate_ed25519_signature (key , unf_content_bytes , signature ):
166
- return key
167
-
168
- if self .last_status .current_key and self ._validate_ed25519_signature (
169
- config_key , unf_content_bytes , signature
170
- ):
171
- # While technically just a "signature not valid case", it is a rather
172
- # confusing situation for the user, so gets a special message.
173
- msg = (
174
- f"{ self .source } : No valid signature found for the Update Notification File for signature"
175
- f" { signature_b64 } . The signature is valid for public key { config_key } set in the"
176
- " nrtm4_client_initial_public_key setting, but that is only used for initial validation."
177
- f" IRRD is currently expecting the public key { self .last_status .current_key } . If you want to"
178
- " clear IRRDs key information and revert to nrtm4_client_initial_public_key, use the"
179
- " 'irrdctl nrtmv4 client-clear-known-keys' command."
180
- )
181
- if self .last_status .next_key :
182
- msg += f" or { self .last_status .next_key } "
183
- raise NRTM4ClientError (msg )
153
+ keys_pem = [get_setting (f"sources.{ self .source } .nrtm4_client_initial_public_key" )]
154
+
155
+ for key_pem in keys_pem :
156
+ if not key_pem : # pragma: no cover
157
+ continue
158
+ pubkey = eckey_from_str (key_pem )
159
+ try :
160
+ compact_signature = jws_deserialize (unf_content_bytes , pubkey )
161
+ return compact_signature .payload , key_pem
162
+ except ValueError :
163
+ continue
164
+
165
+ if self .last_status .current_key :
166
+ compact_signature = None
167
+
168
+ try :
169
+ ec_key = eckey_from_config (f"sources.{ self .source } .nrtm4_client_initial_public_key" )
170
+ if ec_key :
171
+ compact_signature = jws_deserialize (
172
+ unf_content_bytes ,
173
+ ec_key ,
174
+ )
175
+ except ValueError : # pragma: no cover
176
+ pass
177
+ if compact_signature :
178
+ # While technically just a "signature not valid case", it is a rather
179
+ # confusing situation for the user, so gets a special message.
180
+ msg = (
181
+ f"{ self .source } : No valid signature found for the Update Notification File. The signature"
182
+ f" is valid for public key { config_key } set in the nrtm4_client_initial_public_key"
183
+ " setting, but that is only used for initial validation. IRRD is currently expecting the"
184
+ f" public key { self .last_status .current_key } . If you want to clear IRRDs key information"
185
+ " and revert to nrtm4_client_initial_public_key, use the 'irrdctl nrtmv4"
186
+ " client-clear-known-keys' command."
187
+ )
188
+ if self .last_status .next_key :
189
+ msg += f" or { self .last_status .next_key } "
190
+ raise NRTM4ClientError (msg )
184
191
raise NRTM4ClientError (
185
- f"{ self .source } : No valid signature found for any known keys, signature { signature_b64 } ,"
186
- f" considered public keys: { keys } "
192
+ f"{ self .source } : No valid signature found for any known keys, considered public keys: { keys_pem } "
187
193
)
188
194
189
- def _validate_ed25519_signature (self , key_b64 : str , content : bytes , signature : bytes ) -> bool :
190
- """
191
- Verify an Ed25519 signature, given the key in base64, and the content
192
- and signature in bytes. Returns True or False for validity, raises other
193
- exceptions for things like an invalid key format.
194
- """
195
- try :
196
- ed25519_public_key_from_str (key_b64 ).verify (signature , content )
197
- return True
198
- except InvalidSignature :
199
- return False
200
-
201
195
def _current_db_status (self ) -> Tuple [bool , NRTM4ClientDatabaseStatus ]:
202
196
"""Look up the current status of self.source in the database."""
203
197
query = DatabaseStatusQuery ().source (self .source )
0 commit comments