Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist client connection session #1552

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ more performant and the import times are shorter.
New Features in 24.1
~~~~~~~~~~~~~~~~~~~~
- All components can be installed into the same virtualenv again.
- Run monitor, acquire and reserve commands with session parameter.
From now on you can run client monitor
command with parameter 'session' - like in
`labgrid-client monitor --session <SESSION_NAME>`
Then if commands 'acquire' ('lock') and/or 'reserve' run with same session name as in
`labgrid-client --place <PLACE> acquire --session <SESSION_NAME>` or
`labgrid-client reserve <FILTER> --session <SESSION_NAME>` those places and reservations
will be registered in coordinator with the same <SESSION_NAME> and then in case this
monitor will failed or cancelled, this will start procedure of releasing places and
cancellation of reservations with the same (as in monitor) SESSION_NAME. Places and
reservations with other session will be rest intact.

Bug fixes in 24.1
~~~~~~~~~~~~~~~~~
Expand Down
29 changes: 22 additions & 7 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from datetime import datetime
from pprint import pformat
from typing import Any, Dict

import attr
import grpc

Expand Down Expand Up @@ -77,6 +76,7 @@ class ClientSession:
prog = attr.ib(default=None, validator=attr.validators.optional(attr.validators.instance_of(str)))
args = attr.ib(default=None, validator=attr.validators.optional(attr.validators.instance_of(argparse.Namespace)))
monitor = attr.ib(default=False, validator=attr.validators.instance_of(bool))
session = None

def gethostname(self):
return os.environ.get("LG_HOSTNAME", gethostname())
Expand Down Expand Up @@ -136,6 +136,8 @@ async def stop(self):
self.out_queue.put_nowait(None) # let the sender side exit gracefully
if self.stream_call:
self.stream_call.cancel()
if self.pump_task:
self.pump_task.cancel()
try:
await self.pump_task
except asyncio.CancelledError:
Expand Down Expand Up @@ -266,6 +268,15 @@ async def on_place_deleted(self, name: str):

async def do_monitor(self):
self.monitor = True
session = self.session = self.args.session
print(f"Session name: {self.session}")
if not session is None and len(session) > 0:
request = labgrid_coordinator_pb2.MonitorRequest(session=session)
try:
await self.stub.Monitor(request)
await self.sync_with_coordinator()
except grpc.aio.AioRpcError as e:
raise ServerError(e.details())
await self.stopping.wait()

async def complete(self):
Expand Down Expand Up @@ -668,8 +679,8 @@ async def acquire(self):
place = self.get_idle_place()
if not self.args.allow_unmatched:
self.check_matches(place)

request = labgrid_coordinator_pb2.AcquirePlaceRequest(placename=place.name)
session = self.session = self.args.session
request = labgrid_coordinator_pb2.AcquirePlaceRequest(placename=place.name, session=session)

try:
await self.stub.AcquirePlace(request)
Expand Down Expand Up @@ -707,7 +718,6 @@ async def release(self):
f"place {place.name} is acquired by a different user ({place.acquired}), use --kick if you are sure"
) # pylint: disable=line-too-long
print(f"warning: kicking user ({place.acquired})")

request = labgrid_coordinator_pb2.ReleasePlaceRequest(placename=place.name)

try:
Expand Down Expand Up @@ -1416,7 +1426,7 @@ def write_image(self):

async def create_reservation(self):
prio = self.args.prio

session = self.args.session
fltr = {}
for pair in self.args.filters:
try:
Expand All @@ -1433,7 +1443,7 @@ async def create_reservation(self):
"main": labgrid_coordinator_pb2.Reservation.Filter(filter=fltr),
}

request = labgrid_coordinator_pb2.CreateReservationRequest(filters=fltrs, prio=prio)
request = labgrid_coordinator_pb2.CreateReservationRequest(filters=fltrs, prio=prio, session=session)

try:
response: labgrid_coordinator_pb2.CreateReservationResponse = await self.stub.CreateReservation(request)
Expand All @@ -1453,7 +1463,6 @@ async def create_reservation(self):

async def cancel_reservation(self):
token: str = self.args.token

request = labgrid_coordinator_pb2.CancelReservationRequest(token=token)

try:
Expand Down Expand Up @@ -1712,6 +1721,7 @@ def main():
subparser.set_defaults(func=ClientSession.complete)

subparser = subparsers.add_parser("monitor", help="monitor events from the coordinator")
subparser.add_argument("--session", default=None, help="add to identify session to release places and cancel reservations")
subparser.set_defaults(func=ClientSession.do_monitor)

subparser = subparsers.add_parser("resources", aliases=("r",), help="list available resources")
Expand Down Expand Up @@ -1778,12 +1788,15 @@ def main():
subparser.add_argument(
"--allow-unmatched", action="store_true", help="allow missing resources for matches when locking the place"
)
subparser.add_argument("--session", help="add to identify session to release places and cancel reservations")

subparser.set_defaults(func=ClientSession.acquire)

subparser = subparsers.add_parser("release", aliases=("unlock",), help="release a place")
subparser.add_argument(
"-k", "--kick", action="store_true", help="release a place even if it is acquired by a different user"
)
subparser.add_argument("--session", help="add to identify session to release places and cancel reservations")
subparser.set_defaults(func=ClientSession.release)

subparser = subparsers.add_parser(
Expand Down Expand Up @@ -2000,11 +2013,13 @@ def main():
subparser.add_argument(
"--prio", type=float, default=0.0, help="priority relative to other reservations (default 0)"
)
subparser.add_argument("--session", help="add to identify session to release places and cancel reservations")
subparser.add_argument("filters", metavar="KEY=VALUE", nargs="+", help="required tags")
subparser.set_defaults(func=ClientSession.create_reservation)

subparser = subparsers.add_parser("cancel-reservation", help="cancel a reservation")
subparser.add_argument("token", type=str, default=token, nargs="?" if token else None)
subparser.add_argument("--session", help="add to identify session to release places and cancel reservations")
subparser.set_defaults(func=ClientSession.cancel_reservation)

subparser = subparsers.add_parser("wait", help="wait for a reservation to be allocated")
Expand Down
11 changes: 10 additions & 1 deletion labgrid/remote/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class Place:
created = attr.ib(default=attr.Factory(time.time))
changed = attr.ib(default=attr.Factory(time.time))
reservation = attr.ib(default=None)
session = attr.ib(default="")

def asdict(self):
# in the coordinator, we have resource objects, otherwise just a path
Expand All @@ -251,6 +252,7 @@ def asdict(self):
"created": self.created,
"changed": self.changed,
"reservation": self.reservation,
"session": self.session,
}

def update_from_pb2(self, place_pb2):
Expand Down Expand Up @@ -296,6 +298,7 @@ def show(self, level=0):
print(indent + f"allowed: {', '.join(self.allowed)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"changed: {datetime.fromtimestamp(self.changed)}")
print(indent + f"session: {self.session}")
if self.reservation:
print(indent + f"reservation: {self.reservation}")

Expand Down Expand Up @@ -348,6 +351,7 @@ def as_pb2(self):
place.allowed.extend(self.allowed)
place.changed = self.changed
place.created = self.created
place.session = self.session
if self.reservation:
place.reservation = self.reservation
for key, value in self.tags.items():
Expand Down Expand Up @@ -377,6 +381,7 @@ def from_pb2(cls, pb2):
created=pb2.created,
changed=pb2.changed,
reservation=pb2.reservation if pb2.HasField("reservation") else None,
session=pb2.session
)


Expand Down Expand Up @@ -406,7 +411,7 @@ class Reservation:
allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
created = attr.ib(default=attr.Factory(time.time))
timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))

session = attr.ib(default="", validator=attr.validators.instance_of(str))
def asdict(self):
return {
"owner": self.owner,
Expand All @@ -416,6 +421,7 @@ def asdict(self):
"allocations": self.allocations,
"created": self.created,
"timeout": self.timeout,
"session": self.session,
}

def refresh(self, delta=60):
Expand All @@ -441,6 +447,7 @@ def show(self, level=0):
print(indent + f" {name}: {', '.join(allocation)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"timeout: {datetime.fromtimestamp(self.timeout)}")
print(indent + f"session: {self.session}")

def as_pb2(self):
res = labgrid_coordinator_pb2.Reservation()
Expand All @@ -459,6 +466,7 @@ def as_pb2(self):
res.allocations.update({"main": allocation[0]})
res.created = self.created
res.timeout = self.timeout
res.session = self.session
return res

@classmethod
Expand All @@ -478,6 +486,7 @@ def from_pb2(cls, pb2: labgrid_coordinator_pb2.Reservation):
allocations=allocations,
created=pb2.created,
timeout=pb2.timeout,
session = pb2.session
)


Expand Down
Loading