Skip to content

Commit

Permalink
Fixed RobotWebSession and added new functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Schneider committed Sep 23, 2023
1 parent 1ddded2 commit 4202f01
Showing 1 changed file with 81 additions and 30 deletions.
111 changes: 81 additions & 30 deletions franky/robot_web_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import hashlib
import json
import ssl
from http.client import HTTPSConnection
from typing import TYPE_CHECKING
from http.client import HTTPSConnection, HTTPResponse
from typing import TYPE_CHECKING, Dict, Optional, Any, Literal
from urllib.error import HTTPError

if TYPE_CHECKING:
from .robot import Robot
Expand All @@ -17,46 +18,96 @@ def __init__(self, robot: "Robot", username: str, password: str):

self.__client = None
self.__token = None
self.__control_token = None

@staticmethod
def __encode_password(user, password):
def __encode_password(user: str, password: str) -> str:
bs = ",".join([str(b) for b in hashlib.sha256((password + "#" + user + "@franka").encode("utf-8")).digest()])
return base64.encodebytes(bs.encode("utf-8")).decode("utf-8")

def send_api_request(self, target: str, headers: Optional[Dict[str, str]] = None, body: Optional[Any] = None,
method: Literal["GET", "POST", "DELETE"] = "POST"):
_headers = {
"Cookie": f"authorization={self.__token}"
}
if headers is not None:
_headers.update(headers)
self.__client.request(method, target, headers=_headers, body=body)
res: HTTPResponse = self.__client.getresponse()
if res.getcode() != 200:
raise HTTPError(target, res.getcode(), res.reason, res.headers, res.fp)
return res.read()

def send_control_api_request(self, target: str, headers: Optional[Dict[str, str]] = None,
body: Optional[Any] = None,
method: Literal["GET", "POST", "DELETE"] = "POST"):
if self.__control_token is None:
raise ValueError("Client does not have control. Call take_control() first.")
_headers = {
"X-Control-Token": self.__control_token
}
_headers.update(headers)
return self.send_api_request(target, headers=_headers, method=method, body=body)

def __enter__(self):
self.client = HTTPSConnection(self.__robot.fci_hostname, timeout=12, context=ssl._create_unverified_context())
self.client.connect()
self.client.request(
"POST", "/admin/api/login",
body=json.dumps(
{"login": self.__username, "password": self.__encode_password(self.__username, self.__password)}),
headers={"content-type": "application/json"}
)
self.token = self.client.getresponse().read().decode("utf8")
self.__client = HTTPSConnection(self.__robot.fci_hostname, timeout=12, context=ssl._create_unverified_context())
self.__client.connect()
payload = json.dumps(
{"login": self.__username, "password": self.__encode_password(self.__username, self.__password)})
self.__token = self.send_api_request(
"/admin/api/login", headers={"content-type": "application/json"},
body=payload).decode("utf-8")
return self

def __exit__(self, type, value, traceback):
self.client.close()
if self.__control_token is not None:
self.release_control()
self.__token = None
self.__client.close()

def take_control(self):
if self.__control_token is None:
res = self.send_api_request(
"/admin/api/control-token/request", headers={"content-type": "application/json"},
body=json.dumps({"requestedBy": self.__username}))
self.__control_token = json.loads(res)["token"]

def release_control(self):
if self.__control_token is not None:
self.send_control_api_request(
"/admin/api/control-token", headers={"content-type": "application/json"}, method="DELETE",
body=json.dumps({"token": self.__control_token}))
self.__control_token = None

def start_task(self, task):
self.client.request(
"POST", "/desk/api/execution",
body=f"id={task}",
headers={"content-type": "application/x-www-form-urlencoded", "Cookie": f"authorization={self.token}"}
)
return self.client.getresponse().read()
def start_task(self, task: str):
self.send_api_request(
"/desk/api/execution", headers={"content-type": "application/x-www-form-urlencoded"},
body=f"id={task}")

def unlock_brakes(self):
self.client.request(
"POST", "/desk/api/robot/open-brakes",
headers={"content-type": "application/x-www-form-urlencoded", "Cookie": f"authorization={self.token}"}
)
return self.client.getresponse().read()
self.send_control_api_request(
"/desk/api/joints/unlock", headers={"content-type": "application/x-www-form-urlencoded"})

def lock_brakes(self):
self.__robot.stop()
self.client.request(
"POST", "/desk/api/robot/close-brakes",
headers={"content-type": "application/x-www-form-urlencoded", "Cookie": f"authorization={self.token}"}
)
return self.client.getresponse().read()
self.send_control_api_request(
"/desk/api/joints/lock", headers={"content-type": "application/x-www-form-urlencoded"})

def set_mode_programming(self):
self.send_control_api_request(
"/desk/api/operating-mode/programming", headers={"content-type": "application/x-www-form-urlencoded"})

def set_mode_execution(self):
self.send_control_api_request(
"/desk/api/operating-mode/execution", headers={"content-type": "application/x-www-form-urlencoded"})

def get_system_status(self):
return json.loads(self.send_api_request("/admin/api/system-status", method="GET").decode("utf-8"))

@property
def client(self):
return self.__client

@property
def token(self) -> str:
return self.__token

0 comments on commit 4202f01

Please sign in to comment.