forked from bcurts/agentchattr
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathregistry.py
More file actions
600 lines (515 loc) · 24.1 KB
/
registry.py
File metadata and controls
600 lines (515 loc) · 24.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
"""Runtime Registry — single source of truth for all live agent state.
Seeded from config.toml base definitions. All systems read from the registry
at runtime, never from config.toml directly.
Thread-safe: a single threading.Lock guards all mutations.
"""
import colorsys
import json
import secrets
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class Instance:
"""A live agent instance."""
name: str # canonical ID: "gemini", "gemini-2"
base: str # base family: "gemini"
slot: int # 1, 2, 3...
label: str # "Gemini", "Gemini 2", or human-set custom
color: str # hex color (derived from base + slot)
identity_id: str = field(default_factory=lambda: uuid.uuid4().hex)
token: str = field(default_factory=lambda: secrets.token_hex(16))
epoch: int = 1
state: str = "pending" # "pending" | "active"
registered_at: float = field(default_factory=time.time)
class RuntimeRegistry:
GRACE_PERIOD = 30 # seconds — name reserved after deregister
def __init__(self, data_dir: str = "./data"):
self._lock = threading.Lock()
self._bases: dict[str, dict] = {} # base name → config template
self._instances: dict[str, Instance] = {} # canonical name → Instance
self._reserved: dict[str, float] = {} # name → deregister timestamp
self._renames: dict[str, str] = {} # old name → new name (for heartbeat redirect)
self._on_change_cbs: list = []
self._data_dir = Path(data_dir)
self._load_renames()
# --- Setup ---
def seed(self, agents_config: dict):
"""Load base templates from config.toml [agents.*] section."""
with self._lock:
for name, cfg in agents_config.items():
self._bases[name] = dict(cfg)
def on_change(self, cb):
"""Register a callback fired after any registry mutation."""
self._on_change_cbs.append(cb)
def _notify(self):
for cb in self._on_change_cbs:
try:
cb()
except Exception:
pass
# --- Rename persistence ---
def _renames_path(self) -> Path:
return self._data_dir / "renames.json"
def _load_renames(self):
p = self._renames_path()
if p.exists():
try:
self._renames = json.loads(p.read_text("utf-8"))
except Exception:
self._renames = {}
def _save_renames(self):
"""Persist renames to disk. Must be called outside the lock."""
try:
self._data_dir.mkdir(parents=True, exist_ok=True)
tmp = self._renames_path().with_suffix(".tmp")
with self._lock:
data = dict(self._renames)
tmp.write_text(json.dumps(data), "utf-8")
tmp.replace(self._renames_path())
except Exception:
pass
# --- Registration ---
def register(self, base: str, label: str | None = None) -> dict | None:
"""Register a new instance of `base`. Returns slot info or None if unknown base.
When a 2nd instance registers, slot 1 is renamed from 'base' to 'base-1'
to prevent identity ambiguity. The rename info is returned as '_renamed_slot1'.
"""
with self._lock:
if base not in self._bases:
return None
self._expire_reserved()
# Find next free slot
taken = {i.slot for i in self._instances.values() if i.base == base}
reserved = set()
for rn in self._reserved:
rb, rs = self._parse_name(rn)
if rb == base:
reserved.add(rs)
slot = 1
while slot in taken or slot in reserved:
slot += 1
# When a 2nd instance registers, rename slot-1 from "base" to "base-1"
# so that no instance shares a name with the base family. This prevents
# a second instance from sending messages as "base" (identity theft).
renamed_slot1 = None
if slot >= 2 and base in self._instances:
slot1 = self._instances[base]
if slot1.base == base and slot1.slot == 1:
new_s1_name = f"{base}-1"
del self._instances[base]
slot1.name = new_s1_name
base_cfg = self._bases[base]
slot1.label = f"{base_cfg.get('label', base.capitalize())} 1"
# Color stays the same (slot 1 = base color)
self._instances[new_s1_name] = slot1
self._renames[base] = new_s1_name
renamed_slot1 = {"old": base, "new": new_s1_name}
name = base if slot == 1 else f"{base}-{slot}"
base_cfg = self._bases[base]
color = _derive_color(base_cfg.get("color", "#888"), slot)
if label:
lbl = label
elif slot == 1:
lbl = base_cfg.get("label", base.capitalize())
else:
lbl = f"{base_cfg.get('label', base.capitalize())} {slot}"
# Fresh registrations are immediately authoritative. Identity
# recovery/reclaim still uses chat_claim, but normal startup should
# not block on a manual confirmation step.
state = "active"
inst = Instance(name=name, base=base, slot=slot, label=lbl, color=color, state=state)
self._instances[name] = inst
result = _inst_dict(inst, include_token=True)
if renamed_slot1:
result["_renamed_slot1"] = renamed_slot1
self._notify()
self._save_renames()
return result
def deregister(self, name: str) -> dict | None:
"""Remove an instance. Name is reserved for GRACE_PERIOD seconds.
Returns result dict with 'ok' and optional '_renamed_back' info,
or None if instance not found.
"""
with self._lock:
if name not in self._instances:
return None
base = self._instances[name].base
del self._instances[name]
self._reserved[name] = time.time()
# If family drops to 1 instance with a numbered name, rename back to base
renamed_back = None
family = [i for i in self._instances.values() if i.base == base]
if len(family) == 1:
remaining = family[0]
r_base, r_slot = self._parse_name(remaining.name)
if r_base == base and remaining.name != base:
old_name = remaining.name
del self._instances[old_name]
remaining.name = base
remaining.slot = 1
base_cfg = self._bases.get(base, {})
remaining.label = base_cfg.get("label", base.capitalize())
remaining.color = _derive_color(base_cfg.get("color", "#888"), 1)
self._instances[base] = remaining
self._renames[old_name] = base
renamed_back = {"old": old_name, "new": base}
self._notify()
self._save_renames()
result = {"ok": True}
if renamed_back:
result["_renamed_back"] = renamed_back
return result
# --- Identity Claim ---
def claim(self, sender: str, target_name: str | None = None) -> dict | str:
"""Claim an identity. Returns instance dict on success, error string on failure.
Family-based matching: sender can be a base family name (e.g. 'claude')
and the server assigns the next unclaimed instance of that family.
- sender='claude', no target: assign first unclaimed claude instance
- sender='claude', target='claude-music': assign unclaimed instance AND rename
- sender='claude-2' (exact match): confirm that specific instance
"""
error = None
result = None
with self._lock:
inst = None
# If sender is a base family name, use family-based matching
# (don't exact-match the slot-1 instance — that causes both
# callers to claim the same identity)
if sender in self._bases:
# Find first unclaimed (pending) instance in this family
for candidate in self._instances.values():
if candidate.base == sender and candidate.state == "pending":
inst = candidate
break
# If no pending, fall back to any instance in the family
if not inst:
for candidate in self._instances.values():
if candidate.base == sender:
inst = candidate
break
else:
# Exact match for specific instance names (e.g. 'claude-2')
inst = self._instances.get(sender)
if not inst:
error = f"No available {sender} instance. Is a wrapper registered?"
elif target_name is None or target_name == inst.name:
# Accept current name — but don't auto-activate pending instances.
# Pending instances must be named by human (lightbox) or reclaimed
# with an explicit target name (breadcrumb resume).
if inst.state == "pending" and target_name is None:
result = _inst_dict(inst) # return info but stay pending
else:
inst.state = "active"
result = _inst_dict(inst)
else:
# Rename/reclaim — check collision and family guard
if target_name in self._instances and target_name != inst.name:
error = f"Already claimed: {target_name}"
elif (family_err := self._conflicts_with_other_family(target_name, inst.base)):
error = family_err
else:
# Check slot collision within same family
t_base, t_slot = self._parse_name(target_name)
if t_base == inst.base:
slot_taken = any(
i.slot == t_slot and i.name != inst.name
for i in self._instances.values() if i.base == inst.base
)
if slot_taken:
error = f"Slot {t_slot} already occupied in {inst.base} family"
if not error:
# Swap identity to target name
self._reserved.pop(target_name, None)
old_name = inst.name
del self._instances[old_name]
inst.name = target_name
inst.state = "active"
# Recalculate slot, color, and label from target name
base_cfg = self._bases.get(inst.base, {})
if t_base == inst.base:
# Target parses as same family (e.g. 'claude' or 'claude-3')
inst.slot = t_slot
inst.color = _derive_color(base_cfg.get("color", "#888"), t_slot)
if t_slot == 1:
inst.label = base_cfg.get("label", inst.base.capitalize())
else:
inst.label = f"{base_cfg.get('label', inst.base.capitalize())} {t_slot}"
else:
# Custom name (e.g. 'claude-music') — keep slot color, use name as label
inst.label = target_name
self._instances[target_name] = inst
# Track rename so wrapper can discover it via heartbeat
self._renames[old_name] = target_name
result = _inst_dict(inst)
if error:
return error
self._notify()
self._save_renames()
return result
def confirm_pending(self, name: str) -> bool:
"""Auto-confirm a pending instance (10s timeout path)."""
with self._lock:
inst = self._instances.get(name)
if not inst or inst.state != "pending":
return False
inst.state = "active"
self._notify()
self._save_renames()
return True
# --- Rename / Label ---
def rename(self, old_name: str, new_name: str, label: str | None = None) -> dict | str:
"""Full identity rename (human-initiated). Returns instance dict or error string.
Changes the sender ID, label, and tracks the rename for wrapper sync.
If new_name == old_name, falls back to a label-only change.
"""
with self._lock:
inst = self._instances.get(old_name)
if not inst:
return f"Not found: {old_name}"
if new_name == old_name:
# Same identity — just update label
if label:
inst.label = label
result = _inst_dict(inst)
elif new_name in self._instances:
return f"Already taken: {new_name}"
elif (family_err := self._conflicts_with_other_family(new_name, inst.base)):
return family_err
else:
# Check slot collision within same family
t_base, t_slot = self._parse_name(new_name)
if t_base == inst.base:
slot_taken = any(
i.slot == t_slot and i.name != old_name
for i in self._instances.values() if i.base == inst.base
)
if slot_taken:
return f"Slot {t_slot} already occupied in {inst.base} family"
# Move instance to new name
del self._instances[old_name]
inst.name = new_name
# Set label (use provided label, or derive from new_name)
base_cfg = self._bases.get(inst.base, {})
if label:
inst.label = label
elif t_base == inst.base and t_slot != inst.slot:
# Numbered variant (e.g. claude-3) — use "Claude 3"
if t_slot == 1:
inst.label = base_cfg.get("label", inst.base.capitalize())
else:
inst.label = f"{base_cfg.get('label', inst.base.capitalize())} {t_slot}"
else:
inst.label = new_name
# Update slot + color if it's a numbered family name
if t_base == inst.base:
inst.slot = t_slot
inst.color = _derive_color(base_cfg.get("color", "#888"), t_slot)
self._instances[new_name] = inst
self._renames[old_name] = new_name
result = _inst_dict(inst)
self._notify()
self._save_renames()
return result
def set_label(self, name: str, label: str) -> bool:
"""Set display label only (no identity change)."""
with self._lock:
inst = self._instances.get(name)
if not inst:
return False
inst.label = label
self._notify()
self._save_renames()
return True
# --- Queries ---
def get_instance(self, name: str) -> dict | None:
with self._lock:
inst = self._instances.get(name)
return _inst_dict(inst) if inst else None
def get_all(self) -> dict[str, dict]:
"""All registered instances as {name: {name, base, slot, label, color, state}}."""
with self._lock:
return {n: _inst_dict(i) for n, i in self._instances.items()}
def get_agent_config(self) -> dict[str, dict]:
"""For WebSocket 'agents' message: {name: {color, label, base, state}}."""
with self._lock:
return {
n: {"color": i.color, "label": i.label, "base": i.base, "state": i.state}
for n, i in self._instances.items()
}
def get_all_names(self) -> list[str]:
with self._lock:
return list(self._instances.keys())
def get_active_names(self) -> list[str]:
with self._lock:
return [n for n, i in self._instances.items() if i.state == "active"]
def get_instances_for(self, base: str) -> list[dict]:
with self._lock:
return [_inst_dict(i) for i in self._instances.values() if i.base == base]
def get_bases(self) -> dict[str, dict]:
with self._lock:
return dict(self._bases)
def get_base_config(self, base: str) -> dict | None:
with self._lock:
return dict(self._bases[base]) if base in self._bases else None
def is_agent_family(self, name: str) -> bool:
"""Check if a name belongs to any agent family (base, slot, or custom alias)."""
with self._lock:
# Check registered instance first (handles custom names like 'claude-music')
inst = self._instances.get(name)
if inst:
return inst.base in self._bases
# Fall back to name parsing for slot names like 'claude-2'
base, _ = self._parse_name(name)
if base in self._bases:
return True
# Treat unregistered custom aliases like 'claude-prime' as belonging
# to the same family so stale senders are rejected until claimed.
return any(name.startswith(f"{family}-") for family in self._bases)
def family_instance_count(self, name: str) -> int:
"""Count registered instances in the same family as `name`."""
with self._lock:
# Check registered instance first (handles custom names)
inst = self._instances.get(name)
if inst:
base = inst.base
else:
base, _ = self._parse_name(name)
if base not in self._bases:
for family in self._bases:
if name.startswith(f"{family}-"):
base = family
break
return sum(1 for i in self._instances.values() if i.base == base)
def has_claimed_instances(self, base: str) -> bool:
"""Check if any instance in this family has been claimed (state=active)."""
with self._lock:
return any(
i.state == "active" and i.base == base
for i in self._instances.values()
)
def get_family_instance(self, base: str) -> dict | None:
"""Return the instance dict for a family if exactly one exists.
Used by heartbeat to find renamed instances after server restart."""
with self._lock:
matches = [i for i in self._instances.values() if i.base == base]
if len(matches) == 1:
return _inst_dict(matches[0])
return None
def resolve_to_instances(self, name: str) -> list[str]:
"""Resolve a name to actual registered instance names.
If `name` is a registered instance, returns [name].
If `name` is a base family name with no exact match, returns all
active instances in that family (e.g. 'claude' → ['claude-prime']).
Otherwise returns [name] unchanged (for non-agent names like 'ben').
"""
with self._lock:
if name in self._instances:
return [name]
# Check if it's a base name with registered family members
if name in self._bases:
members = [i.name for i in self._instances.values()
if i.base == name and i.state == "active"]
if members:
return members
return [name]
def resolve_name(self, name: str) -> str:
"""Follow rename chain to find current canonical name."""
with self._lock:
# Follow renames (e.g. claude-2 → claude-music)
seen = set()
current = name
while current in self._renames and current not in seen:
seen.add(current)
current = self._renames[current]
return current
def is_registered(self, name: str) -> bool:
with self._lock:
return name in self._instances
def is_pending(self, name: str) -> bool:
with self._lock:
i = self._instances.get(name)
return i is not None and i.state == "pending"
def resolve_token(self, token: str) -> dict | None:
"""Map an instance_token to the current canonical instance dict, or None."""
with self._lock:
for inst in self._instances.values():
if inst.token == token:
return _inst_dict(inst)
return None
def get_pending(self) -> list[dict]:
"""All pending instances (for timeout checks)."""
with self._lock:
return [_inst_dict(i) for i in self._instances.values()
if i.state == "pending"]
# --- Internal ---
def _conflicts_with_other_family(self, name: str, own_base: str) -> str | None:
"""Check if `name` stomps on another family's namespace.
Returns an error string if it conflicts, None if safe.
Blocks: renaming claude to 'gemini', 'gemini-2', 'codex', etc.
Allows: renaming claude to 'cudders', 'claude-prime', etc.
"""
t_base, _ = self._parse_name(name)
# If the parsed base matches a known family that isn't ours, block it
if t_base in self._bases and t_base != own_base:
return f"Name '{name}' conflicts with the {t_base} agent family"
# Also block if the raw name exactly matches another family's base
if name in self._bases and name != own_base:
return f"Name '{name}' is a reserved agent family name"
return None
def _parse_name(self, name: str) -> tuple[str, int]:
"""Parse 'gemini-2' -> ('gemini', 2), 'gemini' -> ('gemini', 1)."""
if "-" in name:
prefix, suffix = name.rsplit("-", 1)
try:
return prefix, int(suffix)
except ValueError:
pass
return name, 1
def clean_renames_for(self, name: str):
"""Remove all rename chain entries pointing to or from `name`."""
with self._lock:
# Remove entries where name is a key (old name → ...)
self._renames.pop(name, None)
# Remove entries where name is a value (... → name)
stale = [k for k, v in self._renames.items() if v == name]
for k in stale:
del self._renames[k]
self._save_renames()
def _expire_reserved(self):
"""Remove expired reservations. Must hold lock."""
now = time.time()
self._reserved = {n: t for n, t in self._reserved.items()
if now - t < self.GRACE_PERIOD}
# --- Module-level helpers ---
def _inst_dict(inst: Instance, include_token: bool = False) -> dict:
d = {
"identity_id": inst.identity_id,
"name": inst.name, "base": inst.base, "slot": inst.slot,
"label": inst.label, "color": inst.color, "state": inst.state,
"epoch": inst.epoch,
"registered_at": inst.registered_at,
}
if include_token:
d["token"] = inst.token
return d
def _derive_color(base_hex: str, slot: int) -> str:
"""Derive variant color: slot 1 = base, slot N = hue/lightness shifted.
Pattern: slot 2 = hue +25 deg, L +5%; slot 3 = hue -25 deg, L -5%; etc.
"""
if slot == 1:
return base_hex
hx = base_hex.lstrip("#")
if len(hx) != 6:
return base_hex
r, g, b = int(hx[0:2], 16), int(hx[2:4], 16), int(hx[4:6], 16)
h, l, s = colorsys.rgb_to_hls(r / 255, g / 255, b / 255)
# Alternating hue shifts with increasing magnitude
magnitude = ((slot - 1 + 1) // 2) * 25
direction = 1 if slot % 2 == 0 else -1
h = (h + direction * magnitude / 360) % 1.0
l = max(0.15, min(0.85, l + direction * 0.05))
r2, g2, b2 = colorsys.hls_to_rgb(h, l, s)
return f"#{int(r2 * 255):02x}{int(g2 * 255):02x}{int(b2 * 255):02x}"