Skip to content

Commit a7b7c25

Browse files
committed
feat(core): Add support for Container and TTL nodes
Also add support through transactions. Closes #334, #496
1 parent b7b2b15 commit a7b7c25

File tree

4 files changed

+218
-98
lines changed

4 files changed

+218
-98
lines changed

kazoo/client.py

Lines changed: 136 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
CloseInstance,
3131
Create,
3232
Create2,
33+
CreateContainer,
34+
CreateTTL,
3335
Delete,
3436
Exists,
3537
GetChildren,
@@ -917,6 +919,8 @@ def create(
917919
sequence=False,
918920
makepath=False,
919921
include_data=False,
922+
container=False,
923+
ttl=0,
920924
):
921925
"""Create a node with the given value as its data. Optionally
922926
set an ACL on the node.
@@ -994,6 +998,9 @@ def create(
994998
The `makepath` option.
995999
.. versionadded:: 2.7
9961000
The `include_data` option.
1001+
.. versionadded:: 2.9
1002+
The `container` and `ttl` options.
1003+
9971004
"""
9981005
acl = acl or self.default_acl
9991006
return self.create_async(
@@ -1004,6 +1011,8 @@ def create(
10041011
sequence=sequence,
10051012
makepath=makepath,
10061013
include_data=include_data,
1014+
container=container,
1015+
ttl=ttl,
10071016
).get()
10081017

10091018
def create_async(
@@ -1015,6 +1024,8 @@ def create_async(
10151024
sequence=False,
10161025
makepath=False,
10171026
include_data=False,
1027+
container=False,
1028+
ttl=0,
10181029
):
10191030
"""Asynchronously create a ZNode. Takes the same arguments as
10201031
:meth:`create`.
@@ -1025,50 +1036,39 @@ def create_async(
10251036
The makepath option.
10261037
.. versionadded:: 2.7
10271038
The `include_data` option.
1039+
.. versionadded:: 2.9
1040+
The `container` and `ttl` options.
10281041
"""
10291042
if acl is None and self.default_acl:
10301043
acl = self.default_acl
10311044

1032-
if not isinstance(path, str):
1033-
raise TypeError("Invalid type for 'path' (string expected)")
1034-
if acl and (
1035-
isinstance(acl, ACL) or not isinstance(acl, (tuple, list))
1036-
):
1037-
raise TypeError(
1038-
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
1039-
)
1040-
if value is not None and not isinstance(value, bytes):
1041-
raise TypeError("Invalid type for 'value' (must be a byte string)")
1042-
if not isinstance(ephemeral, bool):
1043-
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
1044-
if not isinstance(sequence, bool):
1045-
raise TypeError("Invalid type for 'sequence' (bool expected)")
1046-
if not isinstance(makepath, bool):
1047-
raise TypeError("Invalid type for 'makepath' (bool expected)")
1048-
if not isinstance(include_data, bool):
1049-
raise TypeError("Invalid type for 'include_data' (bool expected)")
1050-
1051-
flags = 0
1052-
if ephemeral:
1053-
flags |= 1
1054-
if sequence:
1055-
flags |= 2
1056-
if acl is None:
1057-
acl = OPEN_ACL_UNSAFE
1058-
1045+
opcode = _create_opcode(
1046+
path,
1047+
value,
1048+
acl,
1049+
self.chroot,
1050+
ephemeral,
1051+
sequence,
1052+
include_data,
1053+
container,
1054+
ttl,
1055+
)
10591056
async_result = self.handler.async_result()
10601057

10611058
@capture_exceptions(async_result)
10621059
def do_create():
1063-
result = self._create_async_inner(
1064-
path,
1065-
value,
1066-
acl,
1067-
flags,
1068-
trailing=sequence,
1069-
include_data=include_data,
1070-
)
1071-
result.rawlink(create_completion)
1060+
inner_async_result = self.handler.async_result()
1061+
1062+
call_result = self._call(opcode, inner_async_result)
1063+
if call_result is False:
1064+
# We hit a short-circuit exit on the _call. Because we are
1065+
# not using the original async_result here, we bubble the
1066+
# exception upwards to the do_create function in
1067+
# KazooClient.create so that it gets set on the correct
1068+
# async_result object
1069+
raise inner_async_result.exception
1070+
1071+
inner_async_result.rawlink(create_completion)
10721072

10731073
@capture_exceptions(async_result)
10741074
def retry_completion(result):
@@ -1078,11 +1078,11 @@ def retry_completion(result):
10781078
@wrap(async_result)
10791079
def create_completion(result):
10801080
try:
1081-
if include_data:
1081+
if opcode.type == Create.type:
1082+
return self.unchroot(result.get())
1083+
else:
10821084
new_path, stat = result.get()
10831085
return self.unchroot(new_path), stat
1084-
else:
1085-
return self.unchroot(result.get())
10861086
except NoNodeError:
10871087
if not makepath:
10881088
raise
@@ -1095,33 +1095,6 @@ def create_completion(result):
10951095
do_create()
10961096
return async_result
10971097

1098-
def _create_async_inner(
1099-
self, path, value, acl, flags, trailing=False, include_data=False
1100-
):
1101-
async_result = self.handler.async_result()
1102-
if include_data:
1103-
opcode = Create2
1104-
else:
1105-
opcode = Create
1106-
1107-
call_result = self._call(
1108-
opcode(
1109-
_prefix_root(self.chroot, path, trailing=trailing),
1110-
value,
1111-
acl,
1112-
flags,
1113-
),
1114-
async_result,
1115-
)
1116-
if call_result is False:
1117-
# We hit a short-circuit exit on the _call. Because we are
1118-
# not using the original async_result here, we bubble the
1119-
# exception upwards to the do_create function in
1120-
# KazooClient.create so that it gets set on the correct
1121-
# async_result object
1122-
raise async_result.exception
1123-
return async_result
1124-
11251098
def ensure_path(self, path, acl=None):
11261099
"""Recursively create a path if it doesn't exist.
11271100
@@ -1680,48 +1653,33 @@ def create(
16801653
ephemeral=False,
16811654
sequence=False,
16821655
include_data=False,
1656+
container=False,
1657+
ttl=0,
16831658
):
16841659
"""Add a create ZNode to the transaction. Takes the same
16851660
arguments as :meth:`KazooClient.create`, with the exception
16861661
of `makepath`.
16871662
16881663
:returns: None
16891664
1665+
.. versionadded:: 2.9
1666+
The `include_data`, `container` and `ttl` options.
16901667
"""
16911668
if acl is None and self.client.default_acl:
16921669
acl = self.client.default_acl
16931670

1694-
if not isinstance(path, str):
1695-
raise TypeError("Invalid type for 'path' (string expected)")
1696-
if acl and not isinstance(acl, (tuple, list)):
1697-
raise TypeError(
1698-
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
1699-
)
1700-
if not isinstance(value, bytes):
1701-
raise TypeError("Invalid type for 'value' (must be a byte string)")
1702-
if not isinstance(ephemeral, bool):
1703-
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
1704-
if not isinstance(sequence, bool):
1705-
raise TypeError("Invalid type for 'sequence' (bool expected)")
1706-
if not isinstance(include_data, bool):
1707-
raise TypeError("Invalid type for 'include_data' (bool expected)")
1708-
1709-
flags = 0
1710-
if ephemeral:
1711-
flags |= 1
1712-
if sequence:
1713-
flags |= 2
1714-
if acl is None:
1715-
acl = OPEN_ACL_UNSAFE
1716-
if include_data:
1717-
opcode = Create2
1718-
else:
1719-
opcode = Create
1720-
1721-
self._add(
1722-
opcode(_prefix_root(self.client.chroot, path), value, acl, flags),
1723-
None,
1671+
opcode = _create_opcode(
1672+
path,
1673+
value,
1674+
acl,
1675+
self.client.chroot,
1676+
ephemeral,
1677+
sequence,
1678+
include_data,
1679+
container,
1680+
ttl,
17241681
)
1682+
self._add(opcode, None)
17251683

17261684
def delete(self, path, version=-1):
17271685
"""Add a delete ZNode to the transaction. Takes the same
@@ -1802,3 +1760,85 @@ def _add(self, request, post_processor=None):
18021760
self._check_tx_state()
18031761
self.client.logger.log(BLATHER, "Added %r to %r", request, self)
18041762
self.operations.append(request)
1763+
1764+
1765+
def _create_opcode(
1766+
path,
1767+
value,
1768+
acl,
1769+
chroot,
1770+
ephemeral,
1771+
sequence,
1772+
include_data,
1773+
container,
1774+
ttl,
1775+
):
1776+
"""Helper function.
1777+
Creates the create OpCode for regular `client.create()` operations as
1778+
well as in a `client.transaction()` context.
1779+
"""
1780+
if not isinstance(path, string_types):
1781+
raise TypeError("Invalid type for 'path' (string expected)")
1782+
if acl and (isinstance(acl, ACL) or not isinstance(acl, (tuple, list))):
1783+
raise TypeError(
1784+
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
1785+
)
1786+
if value is not None and not isinstance(value, bytes_types):
1787+
raise TypeError("Invalid type for 'value' (must be a byte string)")
1788+
if not isinstance(ephemeral, bool):
1789+
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
1790+
if not isinstance(sequence, bool):
1791+
raise TypeError("Invalid type for 'sequence' (bool expected)")
1792+
if not isinstance(include_data, bool):
1793+
raise TypeError("Invalid type for 'include_data' (bool expected)")
1794+
if not isinstance(container, bool):
1795+
raise TypeError("Invalid type for 'container' (bool expected)")
1796+
if not isinstance(ttl, int) or ttl < 0:
1797+
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
1798+
if ttl and ephemeral:
1799+
raise TypeError("Invalid node creation: ephemeral & ttl")
1800+
if container and (ephemeral or sequence or ttl):
1801+
raise TypeError(
1802+
"Invalid node creation: container & ephemeral/sequence/ttl"
1803+
)
1804+
1805+
# Should match Zookeeper's CreateMode fromFlag
1806+
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/
1807+
# src/main/java/org/apache/zookeeper/CreateMode.java#L112
1808+
flags = 0
1809+
if ephemeral:
1810+
flags |= 1
1811+
if sequence:
1812+
flags |= 2
1813+
if container:
1814+
flags = 4
1815+
if ttl:
1816+
if sequence:
1817+
flags = 6
1818+
else:
1819+
flags = 5
1820+
1821+
if acl is None:
1822+
acl = OPEN_ACL_UNSAFE
1823+
1824+
# Figure out the OpCode we are going to send
1825+
if include_data:
1826+
return Create2(
1827+
_prefix_root(chroot, path, trailing=sequence), value, acl, flags
1828+
)
1829+
elif container:
1830+
return CreateContainer(
1831+
_prefix_root(chroot, path, trailing=False), value, acl, flags
1832+
)
1833+
elif ttl:
1834+
return CreateTTL(
1835+
_prefix_root(chroot, path, trailing=sequence),
1836+
value,
1837+
acl,
1838+
flags,
1839+
ttl,
1840+
)
1841+
else:
1842+
return Create(
1843+
_prefix_root(chroot, path, trailing=sequence), value, acl, flags
1844+
)

kazoo/protocol/serialization.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,55 @@ def deserialize(cls, bytes, offset):
425425
return data, stat
426426

427427

428+
class CreateContainer(namedtuple("CreateContainer", "path data acl flags")):
429+
type = 19
430+
431+
def serialize(self):
432+
b = bytearray()
433+
b.extend(write_string(self.path))
434+
b.extend(write_buffer(self.data))
435+
b.extend(int_struct.pack(len(self.acl)))
436+
for acl in self.acl:
437+
b.extend(
438+
int_struct.pack(acl.perms)
439+
+ write_string(acl.id.scheme)
440+
+ write_string(acl.id.id)
441+
)
442+
b.extend(int_struct.pack(self.flags))
443+
return b
444+
445+
@classmethod
446+
def deserialize(cls, bytes, offset):
447+
path, offset = read_string(bytes, offset)
448+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
449+
return path, stat
450+
451+
452+
class CreateTTL(namedtuple("CreateTTL", "path data acl flags ttl")):
453+
type = 21
454+
455+
def serialize(self):
456+
b = bytearray()
457+
b.extend(write_string(self.path))
458+
b.extend(write_buffer(self.data))
459+
b.extend(int_struct.pack(len(self.acl)))
460+
for acl in self.acl:
461+
b.extend(
462+
int_struct.pack(acl.perms)
463+
+ write_string(acl.id.scheme)
464+
+ write_string(acl.id.id)
465+
)
466+
b.extend(int_struct.pack(self.flags))
467+
b.extend(long_struct.pack(self.ttl))
468+
return b
469+
470+
@classmethod
471+
def deserialize(cls, bytes, offset):
472+
path, offset = read_string(bytes, offset)
473+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
474+
return path, stat
475+
476+
428477
class Auth(namedtuple("Auth", "auth_type scheme auth")):
429478
type = 100
430479

kazoo/testing/harness.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,13 @@ def get_global_cluster():
7575
"localSessionsEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
7676
"localSessionsUpgradingEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
7777
]
78-
# If defined, this sets the superuser password to "test"
7978
additional_java_system_properties = [
79+
# Enable extended types (container & ttl znodes)
80+
"-Dzookeeper.extendedTypesEnabled=true",
81+
"-Dznode.container.checkIntervalMs=100",
82+
# If defined, this sets the superuser password to "test"
8083
"-Dzookeeper.DigestAuthenticationProvider.superDigest="
81-
"super:D/InIHSb7yEEbrWz8b9l71RjZJU="
84+
"super:D/InIHSb7yEEbrWz8b9l71RjZJU=",
8285
]
8386
else:
8487
additional_configuration_entries = []

0 commit comments

Comments
 (0)