1
1
#!/usr/bin/env python
2
2
# -*- coding: utf-8 -*-
3
3
4
+ from .utils import get_timestamp_ms
4
5
from easyrocks import RocksDB , WriteBatch , utils
5
- import time
6
6
import builtins
7
7
from typing import Generator
8
8
9
-
10
- def get_timestamp_ms ():
11
- return int (round (time .time () * 1000 ))
9
+ UINT_BYTES = 5
10
+ MAX_UINT = 2 ** (UINT_BYTES * 8 ) - 1
12
11
13
12
14
13
class Call :
@@ -82,7 +81,16 @@ def set_index(self, index):
82
81
83
82
84
83
class TxLog :
85
- def __init__ (self , path = './txlog_data' , max_committed_items = 0 , committed_ttl_seconds = None ):
84
+
85
+ CALL_PREFIX = b'\x00 '
86
+ OFFSET_PREFIX = b'\x01 '
87
+ INDEX_PREFIX = b'\x02 '
88
+ META_PREFIX = b'\x03 '
89
+
90
+ def __init__ (self ,
91
+ path = './txlog-data' ,
92
+ max_committed_items = 0 ,
93
+ committed_ttl_seconds = None ):
86
94
self ._batch_index = None
87
95
self ._write_batch = None
88
96
self ._committed_ttl_seconds = committed_ttl_seconds
@@ -101,22 +109,21 @@ def commit(self):
101
109
def rollback (self ):
102
110
self ._write_batch = None
103
111
104
- @staticmethod
105
- def _get_call_key (index : int ):
106
- return f'txlog_{ utils .get_padded_int (index )} '
107
-
108
112
def commit_call (self , call : Call ):
109
- new_write_batch = self ._write_batch is None
110
- self .begin ()
113
+ is_new_write_batch = self ._write_batch is None
114
+ if is_new_write_batch :
115
+ self .begin ()
116
+
111
117
call ._committed = True
112
118
call ._commitment_timestamp = get_timestamp_ms ()
113
119
self ._update_call (call .index , call )
114
120
self ._increment_offset ()
115
- if new_write_batch :
121
+
122
+ if is_new_write_batch :
116
123
self .commit ()
117
124
118
125
def get (self , index : int ) -> Call :
119
- call_key = TxLog ._get_call_key (index )
126
+ call_key = self ._get_call_key (index )
120
127
return self ._db .get (call_key )
121
128
122
129
def exec_uncommitted_calls (self , container_object = None ):
@@ -127,7 +134,11 @@ def exec_uncommitted_calls(self, container_object=None):
127
134
def add (self , call : Call ):
128
135
if not isinstance (call , Call ):
129
136
raise TypeError
137
+
130
138
index = self ._get_next_index ()
139
+ if index > MAX_UINT :
140
+ raise ValueError (index )
141
+
131
142
call .set_index (index )
132
143
self ._put_call (index , call )
133
144
return index
@@ -141,26 +152,27 @@ def print_uncommitted_calls(self):
141
152
print (call ._method_name , call ._args , call ._kwargs )
142
153
143
154
def get_calls (self ) -> Generator [Call , None , None ]:
144
- for _ , call in self ._db .scan (prefix = 'txlog_' ):
155
+ for _ , call in self ._db .scan (prefix = self . CALL_PREFIX ):
145
156
yield call
146
157
147
158
def get_first_uncommitted_call (self ) -> Call :
148
159
next_offset = self ._get_next_offset ()
149
160
if next_offset > self ._get_index ():
150
161
return
151
- call_key = TxLog ._get_call_key (next_offset )
162
+ call_key = self ._get_call_key (next_offset )
152
163
return self ._db .get (call_key )
153
164
154
165
def get_uncommitted_calls (self ) -> Generator [Call , None , None ]:
155
166
first_uncommitted_call = self .get_first_uncommitted_call ()
156
167
if first_uncommitted_call is not None :
157
- for index in range (first_uncommitted_call .index , self ._get_next_index ()):
168
+ for index in range (first_uncommitted_call .index ,
169
+ self ._get_next_index ()):
158
170
yield self .get (index )
159
171
160
172
def truncate (self ):
161
173
if self ._max_committed_items is not None :
162
174
committed_calls_no = self .count_committed_calls ()
163
- for key , call in self ._db .scan (prefix = 'txlog_' ):
175
+ for key , call in self ._db .scan (prefix = self . CALL_PREFIX ):
164
176
if not call .committed :
165
177
break
166
178
@@ -171,47 +183,50 @@ def truncate(self):
171
183
committed_calls_no -= 1
172
184
173
185
if self ._committed_ttl_seconds is not None :
174
- for key , call in self ._db .scan (prefix = 'txlog_' ):
186
+ for key , call in self ._db .scan (prefix = self . CALL_PREFIX ):
175
187
if not call .committed :
176
188
break
177
189
178
- min_timestamp = get_timestamp_ms () - self ._committed_ttl_seconds * 1000
190
+ min_timestamp = get_timestamp_ms () \
191
+ - self ._committed_ttl_seconds * 1000
179
192
if call ._creation_timestamp <= min_timestamp :
180
193
self ._db .delete (key )
181
194
182
195
def count_committed_calls (self ) -> int :
183
196
counter = 0
184
- for _ , call in self ._db .scan (prefix = 'txlog_' ):
197
+ for _ , call in self ._db .scan (prefix = self . CALL_PREFIX ):
185
198
if call .committed :
186
199
counter += 1
187
200
return counter
188
201
189
202
def count_calls (self ) -> int :
190
203
counter = 0
191
- for _ , _ in self ._db .scan (prefix = 'txlog_' ):
204
+ for _ , _ in self ._db .scan (prefix = self . CALL_PREFIX ):
192
205
counter += 1
193
206
return counter
194
207
208
+ def _get_call_key (self , index : int ):
209
+ return self .CALL_PREFIX + utils .int_to_padded_bytes (index , UINT_BYTES )
210
+
195
211
def _update_call (self , index : int , call : Call ):
196
212
if not isinstance (call , Call ):
197
213
raise TypeError
198
- call_key = TxLog ._get_call_key (index )
214
+ call_key = self ._get_call_key (index )
199
215
self ._db .put (call_key , call , write_batch = self ._write_batch )
200
216
201
217
def _put_call (self , index : int , call : Call ):
202
218
if not isinstance (call , Call ):
203
219
raise TypeError
204
220
205
- is_batch_new = False
206
- if self ._write_batch is None :
207
- is_batch_new = True
221
+ is_new_write_batch = self ._write_batch is None
222
+ if is_new_write_batch :
208
223
self .begin ()
209
224
210
- call_key = TxLog ._get_call_key (index )
225
+ call_key = self ._get_call_key (index )
211
226
self ._db .put (call_key , call , write_batch = self ._write_batch )
212
227
self ._increment_index ()
213
228
214
- if is_batch_new :
229
+ if is_new_write_batch :
215
230
self .commit ()
216
231
217
232
def _increment_offset (self ):
@@ -236,16 +251,22 @@ def _get_next_index(self) -> int:
236
251
return self ._batch_index + 1
237
252
return self ._get_int_attribute ('index' ) + 1
238
253
254
+ def _get_meta_key (self , attribute : str ):
255
+ return self .META_PREFIX + bytes (attribute , 'utf-8' )
256
+
239
257
def _increment_int_attribute (self , attribute : str ):
240
258
if attribute == 'index' and self ._write_batch is not None :
241
259
self ._batch_index += 1
242
260
value = self ._batch_index
243
261
else :
244
262
value = self ._get_int_attribute (attribute ) + 1
245
- self ._db .put (f'meta_{ attribute } ' , value , write_batch = self ._write_batch )
263
+
264
+ key = self ._get_meta_key (attribute )
265
+ self ._db .put (key , value , write_batch = self ._write_batch )
246
266
247
267
def _get_int_attribute (self , attribute : str ) -> int :
248
- value = self ._db .get (f'meta_{ attribute } ' )
268
+ key = self ._get_meta_key (attribute )
269
+ value = self ._db .get (key )
249
270
if value is None :
250
271
return - 1
251
- return value
272
+ return value
0 commit comments