forked from joohoi/acme-dns
-
Notifications
You must be signed in to change notification settings - Fork 2
/
acme-dns-auth-ossobv.py
135 lines (118 loc) · 4.64 KB
/
acme-dns-auth-ossobv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
# acme-dns-auth-ossobv -- certbot authentication hook for (ossobv) acme-dns
# Multi-domain ready version of acme-dns-certbot, by Walter Doekes OSSO B.V.
#
# See: https://github.com/ossobv/acme-dns [acme-dns-auth-ossobv]
# Original: https://github.com/joohoi/acme-dns-certbot-joohoi
# Copyright (c) 2018 Joona Hoikkala
#
# Usage, after configuring /etc/acme-dns-ossobv/config.json and
# /etc/acme-dns-ossobv/accounts.json:
#
# certbot certonly --manual \
# --manual-auth-hook /usr/local/bin/acme-dns-auth-ossobv.py \
# --preferred-challenges dns --debug-challenges \
# -d example.com -d *.example.com
#
import json
import os
import requests
import sys
from requests.auth import HTTPBasicAuth
# {"acmedns_url": "https://[BASIC_AUTH@]YOUR_ACME_DNS"}
CONFIG_PATH = '/etc/acme-dns-ossobv/config.json'
# {"example.com": {"username": "aa...", password": "abc.."},
# "*.whatever.nl": {"username": "bb...", password": "abc.."},
# "*": {"username": "cc...", password": "abc.."}}
ACCOUNTS_PATH = '/etc/acme-dns-ossobv/accounts.json'
class AcmeDnsClient(object):
"""
Handles the communication with ACME-DNS API
"""
def __init__(self, acmedns_url, basic_auth):
self.acmedns_url = acmedns_url
self.basic_auth = basic_auth
def update_txt_record(self, account, subdomain, txt):
"""Updates the TXT challenge record to ACME-DNS subdomain."""
update = {'subdomain': subdomain, 'txt': txt}
headers = {'X-Api-User': account['username'],
'X-Api-Key': account['password'],
'Content-Type': 'application/json'}
res = requests.post(
'{}/update'.format(self.acmedns_url),
headers=headers,
data=json.dumps(update),
auth=self.basic_auth)
if res.status_code == 200:
# Successful update
return
msg = ('Encountered an error while trying to update TXT record in '
'acme-dns. \n'
'------- Request headers:\n{}\n'
'------- Request body:\n{}\n'
'------- Response HTTP status: {}\n'
'------- Response body: {}')
s_headers = json.dumps(headers, indent=2, sort_keys=True)
s_update = json.dumps(update, indent=2, sort_keys=True)
try:
s_body = json.dumps(res.json(), indent=2, sort_keys=True)
except ValueError:
s_body = res.text
print(msg.format(s_headers, s_update, res.status_code, s_body))
sys.exit(1)
class Storage(object):
def __init__(self, storagepath):
self.storagepath = storagepath
with open(self.storagepath) as fh:
self._data = json.load(fh)
def fetch(self, key):
"""Gets configuration value from storage. If the (sub)domain is not
found, try parent domains"""
# key = example.com matches 'example.com'
if key in self._data:
return self._data[key]
# key = example.com matches '*.example.com'
# key = example.com matches '*.com'
# key = example.com matches '*'
keyParts = key.split('.')
for i in range(len(keyParts) + 1):
domain = '.'.join(['*'] + keyParts[i:])
try:
return self._data[domain]
except KeyError:
pass
return None
def main(config_path, accounts_path):
# Get certbot params
domain = os.environ['CERTBOT_DOMAIN']
if domain.startswith('*.'):
domain = domain[2:]
validation_token = os.environ['CERTBOT_VALIDATION']
# Open main config
with open(config_path) as fp:
_config = json.load(fp)
acmedns_url = _config['acmedns_url']
if '@' in acmedns_url:
_tmp = acmedns_url.split('@', 1)
acmedns_url = '{}/{}'.format(_tmp[0].rsplit('/', 1)[0], _tmp[1])
basic_auth = _tmp[0].rsplit('/', 1)[1].split(':', 1)
basic_auth = HTTPBasicAuth(
*_tmp[0].rsplit('/', 1)[1].split(':', 1))
else:
basic_auth = None
# Init
client = AcmeDnsClient(acmedns_url, basic_auth)
storage = Storage(accounts_path)
# Check that an account already exists in storage
account = storage.fetch(domain)
assert account
# Update the TXT record in acme-dns instance
client.update_txt_record(account, domain, validation_token)
if __name__ == "__main__":
# CERTBOT_DOMAIN=www.example.com \
# CERTBOT_VALIDATION=$(python -c "print 43*'a'") \
# acme-dns-auth-ossobv.py
#
# dig +short TXT _acme-challenge.www.example.com
# ^-- create CNAME to www.example.com.YOUR_ACME_CHALLENGE_DOMAIN
main(CONFIG_PATH, ACCOUNTS_PATH)