forked from jk-ethz/franka_lock_unlock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
executable file
·229 lines (205 loc) · 11.8 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python3
# Copyright jk-ethz
# Released under GNU AGPL-3.0
# Contact us for other licensing options.
# Developed and tested on system version
# 4.2.1
# Inspired by
# https://github.com/frankaemika/libfranka/issues/63
# https://github.com/ib101/DVK/blob/master/Code/DVK.py
import requests
from urllib.parse import urljoin
import hashlib
import base64
import argparse
from time import sleep
from itertools import count
import atexit
from threading import Event
import constant
class FrankaLockUnlock:
def __init__(self, hostname: str, username: str, password: str, protocol: str = 'https', relock: bool = False):
requests.packages.urllib3.disable_warnings()
self._session = requests.Session()
self._session.verify = False
self._hostname = f'{protocol}://{hostname}'
self._username = username
self._password = password
self._logged_in = False
self._token = None
self._token_id = None
self._relock = relock
atexit.register(self._cleanup)
def _cleanup(self):
print("Cleaning up...")
if self._relock:
self.run(unlock=False)
if self._token is not None or self._token_id is not None:
self._release_token()
if self._logged_in:
self._logout()
print("Successfully cleaned up.")
@staticmethod
def _encode_password(username, password):
bs = ','.join([str(b) for b in hashlib.sha256((f'{password}#{username}@franka').encode('utf-8')).digest()])
return base64.encodebytes(bs.encode('utf-8')).decode('utf-8')
def _login(self):
print("Logging in...")
if self._logged_in:
print("Already logged in.")
return
login = self._session.post(urljoin(self._hostname, '/admin/api/login'), \
json={'login': self._username, \
'password': self._encode_password(self._username, self._password)})
assert login.status_code == 200, "Error logging in."
self._session.cookies.set('authorization', login.text)
self._logged_in = True
print("Successfully logged in.")
def _logout(self):
print("Logging out...")
assert self._logged_in
logout = self._session.post(urljoin(self._hostname, '/admin/api/logout'))
assert logout.status_code == 200, "Error logging out"
self._session.cookies.clear()
self._logged_in = False
print("Successfully logged out.")
def _shutdown(self):
print("Shutting down...")
assert self._logged_in
shoutdown = self._session.post(urljoin(self._hostname, constant.shoutdown))
assert shoutdown.status_code == 200, "Error shutting down"
self._session.cookies.clear()
self._logged_in = False
sleep(180)
print("Successfully shuthed down.")
def _get_active_token_id(self):
token_query = self._session.get(urljoin(self._hostname, '/admin/api/control-token'))
assert token_query.status_code == 200, "Error getting control token status."
json = token_query.json()
return None if json['activeToken'] is None else json['activeToken']['id']
def _is_active_token(self):
active_token_id = self._get_active_token_id()
return active_token_id is None or active_token_id == self._token_id
def _request_token(self, physically=False):
print("Requesting a control token...")
if self._token is not None:
assert self._token_id is not None
print("Already having a control token.")
return
token_request = self._session.post(urljoin(self._hostname, f'/admin/api/control-token/request{"?force" if physically else ""}'), \
json={'requestedBy': self._username})
assert token_request.status_code == 200, "Error requesting control token."
json = token_request.json()
self._token = json['token']
self._token_id = json['id']
print(f'Received control token is {self._token} with id {self._token_id}.')
def _release_token(self):
print("Releasing control token...")
token_delete = self._session.delete(urljoin(self._hostname, '/admin/api/control-token'), \
json={'token': self._token})
assert token_delete.status_code == 200, "Error releasing control token."
self._token = None
self._token_id = None
print("Successfully released control token.")
def _activate_fci(self):
print("Activating FCI...")
fci_request = self._session.post(urljoin(self._hostname, f'/admin/api/control-token/fci'), \
json={'token': self._token})
assert fci_request.status_code == 200, "Error activating FCI."
print("Successfully activated FCI.")
def _home_gripper(self):
print("Homing the gripper...")
action = self._session.post(urljoin(self._hostname, f'/desk/api/gripper/homing'), \
headers={'X-Control-Token': self._token})
assert action.status_code == 200, "Error homing gripper."
print(f'Successfully homed the gripper.')
def _lock_unlock(self, unlock: bool, force: bool = False):
print(f'{"Unlocking" if unlock else "Locking"} the robot...')
action = self._session.post(urljoin(self._hostname, f'{constant.deskApiJoints}{constant.unlock if unlock else constant.lock}'), \
files={'force': force},
headers={'X-Control-Token': self._token})
assert action.status_code == 200, "Error requesting brake open/close action."
print(f'Successfully {"unlocked" if unlock else "locked"} the robot.')
def _execution_mode(self):
print(f'Switch to "Execution Mode" the robot...')
action = self._session.post(urljoin(self._hostname, f'{constant.executionMode}'), \
headers={'X-Control-Token': self._token})
assert action.status_code == 200, "Error switching to execution mode."
print(f'Successfully switched to execution mode.')
def _programming_mode(self):
activated = "Programming Mode"
print(f'Switch to "{activated}" the robot...')
action = self._session.post(urljoin(self._hostname, constant.programmingMode), \
headers={'X-Control-Token': self._token})
assert action.status_code == 200, f"Error switching to '{activated}'."
print(f'Successfully switched to "{activated}"')
def run(self, unlock: bool = False, force: bool = False, wait: bool = False, request: bool = False, persistent: bool = False, fci: bool = False, home: bool = False,shutdown: bool = False, execution_mode: bool = False,programming_mode: bool = False) -> None:
assert not request or wait, "Requesting control without waiting for obtaining control is not supported."
assert not fci or unlock, "Activating FCI without unlocking is not possible."
assert not fci or execution_mode, "Activating FCI without execution mode is not possible."
assert not fci or persistent, "Activating FCI without persistence is not possible."
assert not home or unlock, "Homing the gripper without unlocking is not possible."
assert not programming_mode or fci, "Switch the robot in programming mode for fci is not possible"
self._login()
if shutdown:
self._shutdown()
return
try:
assert self._token is not None or self._get_active_token_id() is None or wait, "Error requesting control, the robot is currently in use."
while True:
self._request_token(physically=request)
try:
# Consider the timeout of 30 s for requesting physical access to the robot
for _ in range(20) if request else count():
if (not wait and not request) or self._is_active_token():
print('Successfully acquired control over the robot.')
self._lock_unlock(unlock=unlock)
if home:
self._home_gripper()
if execution_mode:
self._execution_mode()
if programming_mode:
self._programming_mode()
if fci:
self._activate_fci()
return
if request:
print('Please press the button with the (blue) circle on the robot to confirm physical access.')
elif wait:
print('Please confirm the request message in the web interface on the logged in user.')
sleep(1)
# In case physical access was not confirmed, try again
self._release_token()
finally:
if not persistent:
self._release_token()
finally:
if not persistent:
self._logout()
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog = 'FrankaLockUnlock',
description = 'Lock or unlock the Franka Emika Panda joint brakes programmatically.',
epilog = '(c) jk-ethz, https://github.com/jk-ethz'
)
parser.add_argument('hostname', help='The Franka Desk IP address or hostname, for example "1.2.3.4".')
parser.add_argument('username', help='The Franka Desk username, usually "admin".')
parser.add_argument('password', help='The Franka Desk password.')
parser.add_argument('-u', '--unlock', action='store_true', help='Unlock the brakes. Otherwise, lock them.')
parser.add_argument('-l', '--relock', action='store_true', help='Relock the brakes on exit.')
parser.add_argument('-w', '--wait', action='store_true', help='Wait in case the robot web UI is currently in use.')
parser.add_argument('-r', '--request', action='store_true', help='Request control by confirming physical access to the robot in case the robot web UI is currently in use.')
parser.add_argument('-p', '--persistent', action='store_true', help='Keep the connection to the robot open persistently.')
parser.add_argument('-c', '--fci', action='store_true', help='Activate the FCI.')
parser.add_argument('-i', '--home', action='store_true', help='Home the gripper.')
parser.add_argument('-s', '--shutdown', action='store_true', help='Shutdown the robot.WORKING IN PROGRESS FOR A FEEDBACK SHUTDOWN[wait at least 3 min to be sure that the robots is shutted down]')
parser.add_argument('-e', '--execution_mode', action='store_true', help='Activate the execution mode')
parser.add_argument('-m', '--programming_mode', action='store_true', help='Activate the programming mode')
args, _ = parser.parse_known_args()
assert not args.relock or args.unlock, "Relocking without prior unlocking is not possible."
assert not args.relock or args.persistent, "Relocking without persistence would cause an immediate unlock-lock cycle."
franka_lock_unlock = FrankaLockUnlock(hostname=args.hostname, username=args.username, password=args.password, relock=args.relock)
franka_lock_unlock.run(unlock=args.unlock, wait=args.wait, request=args.request, persistent=args.persistent, fci=args.fci, home=args.home, shutdown=args.shutdown, execution_mode=args.execution_mode,programming_mode=args.programming_mode)
if args.persistent:
print("Keeping persistent connection...")
Event().wait()