Skip to content

Commit

Permalink
Implemented RobotWebSession.has_control and made take_control wait fo…
Browse files Browse the repository at this point in the history
…r control to be granted
  • Loading branch information
Tim Schneider committed Sep 24, 2023
1 parent 4202f01 commit 9f0d72e
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions franky/robot_web_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import json
import ssl
import time
from http.client import HTTPSConnection, HTTPResponse
from typing import TYPE_CHECKING, Dict, Optional, Any, Literal
from urllib.error import HTTPError
Expand All @@ -19,6 +20,7 @@ def __init__(self, robot: "Robot", username: str, password: str):
self.__client = None
self.__token = None
self.__control_token = None
self.__control_token_id = None

@staticmethod
def __encode_password(user: str, password: str) -> str:
Expand Down Expand Up @@ -65,19 +67,31 @@ def __exit__(self, type, value, traceback):
self.__token = None
self.__client.close()

def take_control(self):
def take_control(self, wait_timeout: float = 10.0):
if self.__control_token is None:
res = self.send_api_request(
"/admin/api/control-token/request", headers={"content-type": "application/json"},
body=json.dumps({"requestedBy": self.__username}))
self.__control_token = json.loads(res)["token"]
response_dict = json.loads(res)
self.__control_token = response_dict["token"]
self.__control_token_id = response_dict["id"]
# One should probably use websockets here but that would introduce another dependency
start = time.time()
while time.time() - start < wait_timeout and not self.has_control():
time.sleep(1.0)

def release_control(self):
if self.__control_token is not None:
self.send_control_api_request(
"/admin/api/control-token", headers={"content-type": "application/json"}, method="DELETE",
body=json.dumps({"token": self.__control_token}))
self.__control_token = None
self.__control_token_id = None

def has_control(self):
if self.__control_token_id is not None:
status = self.get_system_status()
return status["controlToken"]["activeToken"]["id"] == self.__control_token_id

def start_task(self, task: str):
self.send_api_request(
Expand Down

0 comments on commit 9f0d72e

Please sign in to comment.