-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcommunity.py
1663 lines (1316 loc) · 72.9 KB
/
community.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
the community module provides the Community base class that should be used when a new Community is
implemented. It provides a simplified interface between the Dispersy instance and a running
Community instance.
@author: Boudewijn Schoon
@organization: Technical University Delft
@contact: [email protected]
"""
from hashlib import sha1
from itertools import islice
from math import ceil
from random import random, Random, randint, shuffle
from time import time
from itertools import cycle
try:
# python 2.7 only...
from collections import OrderedDict
except ImportError:
from .python27_ordereddict import OrderedDict
from .bloomfilter import BloomFilter
from .conversion import BinaryConversion, DefaultConversion
from .crypto import ec_generate_key, ec_to_public_bin, ec_to_private_bin
from .decorator import documentation, runtime_duration_warning
from .dispersy import Dispersy
from .distribution import SyncDistribution
from .dprint import dprint
from .member import DummyMember, Member
from .resolution import PublicResolution, LinearResolution, DynamicResolution
from .revision import update_revision_information
from .statistics import CommunityStatistics
from .timeline import Timeline
from .candidate import WalkCandidate
# update version information directly from SVN
update_revision_information("$HeadURL$", "$Revision$")
class SyncCache(object):
def __init__(self, time_low, time_high, modulo, offset, bloom_filter):
self.time_low = time_low
self.time_high = time_high
self.modulo = modulo
self.offset = offset
self.bloom_filter = bloom_filter
self.times_used = 0
self.responses_received = 0
self.candidate = None
class Community(object):
@classmethod
def get_classification(cls):
"""
Describes the community type. Should be the same across compatible versions.
@rtype: unicode
"""
return cls.__name__.decode("UTF-8")
@classmethod
def create_community(cls, my_member, *args, **kargs):
"""
Create a new community owned by my_member.
Each unique community, that exists out in the world, is identified by a public/private key
pair. When the create_community method is called such a key pair is generated.
Furthermore, my_member will be granted permission to use all the messages that the community
provides.
@param my_member: The Member that will be granted Permit, Authorize, and Revoke for all
messages.
@type my_member: Member
@param args: optional arguments that are passed to the community constructor.
@type args: tuple
@param kargs: optional keyword arguments that are passed to the community constructor.
@type args: dictionary
@return: The created community instance.
@rtype: Community
"""
assert isinstance(my_member, Member), my_member
assert my_member.public_key, my_member.database_id
assert my_member.private_key, my_member.database_id
ec = ec_generate_key(u"high")
master = Member(ec_to_public_bin(ec), ec_to_private_bin(ec))
database = Dispersy.get_instance().database
database.execute(u"INSERT INTO community (master, member, classification) VALUES(?, ?, ?)", (master.database_id, my_member.database_id, cls.get_classification()))
community_database_id = database.last_insert_rowid
try:
# new community instance
community = cls.load_community(master, *args, **kargs)
assert community.database_id == community_database_id
# create the dispersy-identity for the master member
message = community.create_dispersy_identity(sign_with_master=True)
# create my dispersy-identity
message = community.create_dispersy_identity()
# authorize MY_MEMBER
permission_triplets = []
for message in community.get_meta_messages():
# grant all permissions for messages that use LinearResolution or DynamicResolution
if isinstance(message.resolution, (LinearResolution, DynamicResolution)):
for allowed in (u"authorize", u"revoke", u"permit"):
permission_triplets.append((my_member, message, allowed))
# ensure that undo_callback is available
if message.undo_callback:
# we do not support undo permissions for authorize, revoke, undo-own, and
# undo-other (yet)
if not message.name in (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other"):
permission_triplets.append((my_member, message, u"undo"))
# grant authorize, revoke, and undo permission for messages that use PublicResolution
# and SyncDistribution. Why? The undo permission allows nodes to revoke a specific
# message that was gossiped around. The authorize permission is required to grant other
# nodes the undo permission. The revoke permission is required to remove the undo
# permission. The permit permission is not required as the message uses
# PublicResolution and is hence permitted regardless.
elif isinstance(message.distribution, SyncDistribution) and isinstance(message.resolution, PublicResolution):
# ensure that undo_callback is available
if message.undo_callback:
# we do not support undo permissions for authorize, revoke, undo-own, and
# undo-other (yet)
if not message.name in (u"dispersy-authorize", u"dispersy-revoke", u"dispersy-undo-own", u"dispersy-undo-other"):
for allowed in (u"authorize", u"revoke", u"undo"):
permission_triplets.append((my_member, message, allowed))
if permission_triplets:
community.create_dispersy_authorize(permission_triplets, sign_with_master=True, forward=False)
except:
# undo the insert info the database
# TODO it might still leave unused database entries referring to the community id
database.execute(u"DELETE FROM community WHERE id = ?", (community_database_id,))
# raise the exception because this shouldn't happen
raise
else:
return community
@classmethod
def join_community(cls, master, my_member, *args, **kargs):
"""
Join an existing community.
Once you have discovered an existing community, i.e. you have obtained the public master key
from a community, you can join this community.
Joining a community does not mean that you obtain permissions in that community, those will
need to be granted by another member who is allowed to do so. However, it will let you
receive, send, and disseminate messages that do not require any permission to use.
@param master: The master member that identified the community that we want to join.
@type master: DummyMember or Member
@param my_member: The member that will be granted Permit, Authorize, and Revoke for all
messages.
@type my_member: Member
@param args: optional argumets that are passed to the community constructor.
@type args: tuple
@param kargs: optional keyword arguments that are passed to the community constructor.
@type args: dictionary
@return: The created community instance.
@rtype: Community
"""
assert isinstance(master, DummyMember)
assert isinstance(my_member, Member)
assert my_member.public_key, my_member.database_id
assert my_member.private_key, my_member.database_id
if __debug__: dprint("joining ", cls.get_classification(), " ", master.mid.encode("HEX"))
database = Dispersy.get_instance().database
database.execute(u"INSERT INTO community(master, member, classification) VALUES(?, ?, ?)",
(master.database_id, my_member.database_id, cls.get_classification()))
community_database_id = database.last_insert_rowid
try:
# new community instance
community = cls.load_community(master, *args, **kargs)
assert community.database_id == community_database_id
# create my dispersy-identity
community.create_dispersy_identity()
except:
# undo the insert info the database
# TODO it might still leave unused database entries referring to the community id
database.execute(u"DELETE FROM community WHERE id = ?", (community_database_id,))
# raise the exception because this shouldn't happen
raise
else:
return community
@classmethod
def get_master_members(cls):
if __debug__: dprint("retrieving all master members owning ", cls.get_classification(), " communities")
execute = Dispersy.get_instance().database.execute
return [Member(str(public_key)) if public_key else DummyMember(str(mid))
for mid, public_key,
in list(execute(u"SELECT m.mid, m.public_key FROM community AS c JOIN member AS m ON m.id = c.master WHERE c.classification = ?",
(cls.get_classification(),)))]
@classmethod
def load_community(cls, master, *args, **kargs):
"""
Load a single community.
Will raise a ValueError exception when cid is unavailable.
@param master: The master member that identifies the community.
@type master: DummyMember or Member
@return: The community identified by master.
@rtype: Community
"""
assert isinstance(master, DummyMember)
if __debug__: dprint("loading ", cls.get_classification(), " ", master.mid.encode("HEX"))
community = cls(master, *args, **kargs)
# tell dispersy that there is a new community
community._dispersy.attach_community(community)
return community
def __init__(self, master):
"""
Initialize a community.
Generally a new community is created using create_community. Or an existing community is
loaded using load_community. These two methods prepare and call this __init__ method.
@param master: The master member that identifies the community.
@type master: DummyMember or Member
"""
assert isinstance(master, DummyMember)
if __debug__:
dprint("initializing: ", self.get_classification())
dprint("master member: ", master.mid.encode("HEX"), "" if isinstance(master, Member) else " (using DummyMember)")
# dispersy
self._dispersy = Dispersy.get_instance()
# _pending_callbacks contains all id's for registered calls that should be removed when the
# community is unloaded. most of the time this contains all the generators that are being
# used by the community
self._pending_callbacks = []
try:
self._database_id, member_public_key, self._database_version = self._dispersy.database.execute(u"SELECT community.id, member.public_key, database_version FROM community JOIN member ON member.id = community.member WHERE master = ?", (master.database_id,)).next()
except StopIteration:
raise ValueError(u"Community not found in database [" + master.mid.encode("HEX") + "]")
if __debug__: dprint("database id: ", self._database_id)
self._cid = master.mid
self._master_member = master
self._my_member = Member(str(member_public_key))
if __debug__: dprint("my member: ", self._my_member.mid.encode("HEX"))
assert self._my_member.public_key, [self._database_id, self._my_member.database_id, self._my_member.public_key]
assert self._my_member.private_key, [self._database_id, self._my_member.database_id, self._my_member.private_key]
if not self._master_member.public_key and self.dispersy_enable_candidate_walker and self.dispersy_auto_download_master_member:
self._pending_callbacks.append(self._dispersy.callback.register(self._download_master_member_identity))
# pre-fetch some values from the database, this allows us to only query the database once
self.meta_message_cache = {}
for database_id, name, cluster, priority, direction in self._dispersy.database.execute(u"SELECT id, name, cluster, priority, direction FROM meta_message WHERE community = ?", (self._database_id,)):
self.meta_message_cache[name] = {"id":database_id, "cluster":cluster, "priority":priority, "direction":direction}
# define all available messages
self._meta_messages = {}
self._initialize_meta_messages()
# cleanup pre-fetched values
self.meta_message_cache = None
# define all available conversions
conversions = self.initiate_conversions()
assert len(conversions) > 0
self._conversions = dict((conversion.prefix, conversion) for conversion in conversions)
# the last conversion in the list will be used as the default conversion
self._conversions[None] = conversions[-1]
# the global time. zero indicates no messages are available, messages must have global
# times that are higher than zero.
self._global_time, = self._dispersy.database.execute(u"SELECT MAX(global_time) FROM sync WHERE community = ?", (self._database_id,)).next()
if self._global_time is None:
self._global_time = 0
assert isinstance(self._global_time, (int, long))
self._acceptable_global_time_cache = self._global_time
self._acceptable_global_time_deadline = 0.0
if __debug__: dprint("global time: ", self._global_time)
# sync range bloom filters
self._sync_cache = None
if __debug__:
b = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate)
dprint("sync bloom: size: ", int(ceil(b.size // 8)), "; capacity: ", b.get_capacity(self.dispersy_sync_bloom_filter_error_rate), "; error-rate: ", self.dispersy_sync_bloom_filter_error_rate)
# initial timeline. the timeline will keep track of member permissions
self._timeline = Timeline(self)
self._initialize_timeline()
# random seed, used for sync range
self._random = Random(self._cid)
self._nrsyncpackets = 0
#Initialize all the candidate iterators
self._candidates = OrderedDict()
self._walked_candidates = self._iter_category(u'walk')
self._stumbled_candidates = self._iter_category(u'stumble')
self._introduced_candidates = self._iter_category(u'intro')
self._walk_candidates = self._iter_categories([u'walk', u'stumble', u'intro'])
self._bootstrap_candidates = self._iter_bootstrap()
# statistics...
self._statistics = CommunityStatistics(self)
@property
def statistics(self):
"""
The Statistics instance.
"""
return self._statistics
def _download_master_member_identity(self):
assert not self._master_member.public_key
if __debug__: dprint("using dummy master member")
def on_dispersy_identity(message):
if message and not self._master_member:
if __debug__: dprint(self._cid.encode("HEX"), " received master member")
assert message.authentication.member.mid == self._master_member.mid
self._master_member = message.authentication.member
assert self._master_member.public_key
delay = 2.0
while not self._master_member.public_key:
try:
public_key, = self._dispersy.database.execute(u"SELECT public_key FROM member WHERE id = ?", (self._master_member.database_id,)).next()
except StopIteration:
pass
else:
if public_key:
if __debug__: dprint(self._cid.encode("HEX"), " found master member")
self._master_member = Member(str(public_key))
assert self._master_member.public_key
break
for candidate in islice(self.dispersy_yield_random_candidates(), 1):
if candidate:
if __debug__: dprint(self._cid.encode("HEX"), " asking for master member from ", candidate)
self._dispersy.create_missing_identity(self, candidate, self._master_member, on_dispersy_identity)
yield delay
delay = min(300.0, delay * 1.1)
def _initialize_meta_messages(self):
assert isinstance(self._meta_messages, dict)
assert len(self._meta_messages) == 0
# obtain dispersy meta messages
for meta_message in self._dispersy.initiate_meta_messages(self):
assert meta_message.name not in self._meta_messages
self._meta_messages[meta_message.name] = meta_message
# obtain community meta messages
for meta_message in self.initiate_meta_messages():
assert meta_message.name not in self._meta_messages
self._meta_messages[meta_message.name] = meta_message
if __debug__:
sync_interval = 5.0
for meta_message in self._meta_messages.itervalues():
if isinstance(meta_message.distribution, SyncDistribution) and meta_message.batch.max_window >= sync_interval:
dprint("when sync is enabled the interval should be greater than the walking frequency. otherwise you are likely to receive duplicate packets [", meta_message.name, "]", level="warning")
def _initialize_timeline(self):
mapping = {}
for name in [u"dispersy-authorize", u"dispersy-revoke", u"dispersy-dynamic-settings"]:
try:
meta = self.get_meta_message(name)
except KeyError:
if __debug__: dprint("unable to load permissions from database [could not obtain '", name, "']", level="warning")
else:
mapping[meta.database_id] = meta.handle_callback
if mapping:
for packet, in list(self._dispersy.database.execute(u"SELECT packet FROM sync WHERE meta_message IN (" + ", ".join("?" for _ in mapping) + ") ORDER BY global_time, packet",
mapping.keys())):
message = self._dispersy.convert_packet_to_message(str(packet), self, verify=False)
if message:
if __debug__: dprint("processing ", message.name)
mapping[message.database_id]([message], initializing=True)
else:
# TODO: when a packet conversion fails we must drop something, and preferably check
# all messages in the database again...
if __debug__:
dprint("invalid message in database [", self.get_classification(), "; ", self.cid.encode("HEX"), "]", level="error")
dprint(str(packet).encode("HEX"), level="error")
# @property
def __get_dispersy_auto_load(self):
"""
When True, this community will automatically be loaded when a packet is received.
"""
# currently we grab it directly from the database, should become a property for efficiency
return bool(self._dispersy.database.execute(u"SELECT auto_load FROM community WHERE master = ?",
(self._master_member.database_id,)).next()[0])
# @dispersu_auto_load.setter
def __set_dispersy_auto_load(self, auto_load):
"""
Sets the auto_load flag for this community.
"""
assert isinstance(auto_load, bool)
self._dispersy.database.execute(u"UPDATE community SET auto_load = ? WHERE master = ?",
(1 if auto_load else 0, self._master_member.database_id))
# .setter was introduced in Python 2.6
dispersy_auto_load = property(__get_dispersy_auto_load, __set_dispersy_auto_load)
@property
def dispersy_auto_download_master_member(self):
"""
Enable or disable automatic downloading of the dispersy-identity for the master member.
"""
return True
@property
def dispersy_enable_candidate_walker(self):
"""
Enable the candidate walker.
When True is returned, the dispersy_take_step method will be called periodically. Otherwise
it will be ignored. The candidate walker is enabled by default.
"""
return True
@property
def dispersy_enable_candidate_walker_responses(self):
"""
Enable the candidate walker responses.
When True is returned, the community will be able to respond to incoming
dispersy-introduction-request and dispersy-puncture-request messages. Otherwise these
messages are left undefined and will be ignored.
When dispersy_enable_candidate_walker returns True, this property must also return True.
The default value is to mirror self.dispersy_enable_candidate_walker.
"""
return self.dispersy_enable_candidate_walker
@property
def dispersy_sync_bloom_filter_error_rate(self):
"""
The error rate that is allowed within the sync bloom filter.
Having a higher error rate will allow for more items to be stored in the bloom filter,
allowing more items to be syced with each sync interval. Although this has the disadvantage
that more false positives will occur.
A false positive will mean that if A sends a dispersy-sync message to B, B will incorrectly
believe that A already has certain messages. Each message has -error rate- chance of being
a false positive, and hence B will not be able to receive -error rate- percent of the
messages in the system.
This problem can be aleviated by having multiple bloom filters for each sync range with
different prefixes. Because bloom filters with different prefixes are extremely likely (the
hash functions md5, sha1, shaxxx ensure this) to have false positives for different packets.
Hence, having two of three different bloom filters will ensure you will get all messages,
though it will take more rounds.
@rtype: float
"""
return 0.01
# @property
# def dispersy_sync_bloom_filter_redundancy(self):
# """
# The number of bloom filters, each with a unique prefix, that are used to represent one sync
# range.
# The effective error rate for a sync range then becomes redundancy * error_rate.
# @rtype: int
# """
# return 3
@property
def dispersy_sync_bloom_filter_bits(self):
"""
The size in bits of this bloom filter.
Note that the amount must be a multiple of eight.
The sync bloom filter is part of the dispersy-introduction-request message and hence must
fit within a single MTU. There are several numbers that need to be taken into account.
- A typical MTU is 1500 bytes
- A typical IP header is 20 bytes. However, the maximum IP header is 60 bytes (this
includes information for VPN, tunnels, etc.)
- The UDP header is 8 bytes
- The dispersy header is 2 + 20 + 1 + 20 + 8 = 51 bytes (version, cid, type, member,
global-time)
- The signature is usually 60 bytes. This depends on what public/private key was chosen.
The current value is: self._my_member.signature_length
- The other payload is 6 + 6 + 6 + 1 + 2 = 21 (destination-address, source-lan-address,
source-wan-address, advice+connection-type+sync flags, identifier)
- The sync payload uses 8 + 8 + 4 + 4 + 1 + 4 + 1 = 30 (time low, time high, modulo, offset,
function, bits, prefix)
"""
return (1500 - 60 - 8 - 51 - self._my_member.signature_length - 21 - 30) * 8
@property
def dispersy_sync_bloom_filter_strategy(self):
return self._dispersy_claim_sync_bloom_filter_largest
def dispersy_store(self, messages):
"""
Called after new MESSAGES have been stored in the database.
"""
if __debug__:
cached = 0
if self._sync_cache:
cache = self._sync_cache
for message in messages:
if (message.distribution.priority > 32 and
cache.time_low <= message.distribution.global_time <= cache.time_high and
(message.distribution.global_time + cache.offset) % cache.modulo == 0):
if __debug__:
cached += 1
# update cached bloomfilter to avoid duplicates
cache.bloom_filter.add(message.packet)
# if this message was received from the candidate we send the bloomfilter too, increment responses
if (cache.candidate and message.candidate and cache.candidate.sock_addr == message.candidate.sock_addr):
cache.responses_received += 1
if __debug__:
if cached:
dprint(self._cid.encode("HEX"), "] ", cached, " out of ", len(messages), " were part of the cached bloomfilter")
def dispersy_claim_sync_bloom_filter(self, request_cache):
"""
Returns a (time_low, time_high, modulo, offset, bloom_filter) or None.
"""
if (self._sync_cache and
self._sync_cache.responses_received > 0 and
self._sync_cache.times_used < 100):
self._statistics.sync_bloom_reuse += 1
cache = self._sync_cache
cache.times_used += 1
cache.responses_received = 0
cache.candidate = request_cache.helper_candidate
if __debug__: dprint(self._cid.encode("HEX"), " reuse #", cache.times_used, " (packets received: ", cache.responses_received, "; ", hex(cache.bloom_filter._filter), ")")
return cache.time_low, cache.time_high, cache.modulo, cache.offset, cache.bloom_filter
sync = self.dispersy_sync_bloom_filter_strategy()
if sync:
self._sync_cache = SyncCache(*sync)
self._sync_cache.candidate = request_cache.helper_candidate
self._statistics.sync_bloom_new += 1
if __debug__: dprint(self._cid.encode("HEX"), " new sync bloom (", self._statistics.sync_bloom_reuse, "/", self._statistics.sync_bloom_new, "~", round(1.0 * self._statistics.sync_bloom_reuse / self._statistics.sync_bloom_new, 2), ")")
return sync
@runtime_duration_warning(0.5)
def dispersy_claim_sync_bloom_filter_simple(self):
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
global_time = self.global_time
desired_mean = global_time / 2.0
lambd = 1.0 / desired_mean
time_point = global_time - int(self._random.expovariate(lambd))
if time_point < 1:
time_point = int(self._random.random() * global_time)
time_low = time_point - capacity / 2
time_high = time_low + capacity
if time_low < 1:
time_low = 1
time_high = capacity
db_high = capacity
elif time_high > global_time - capacity:
time_low = max(1, global_time - capacity)
time_high = self.acceptable_global_time
db_high = global_time
else:
db_high = time_high
bloom.add_keys(str(packet) for packet, in self._dispersy.database.execute(u"SELECT sync.packet FROM sync JOIN meta_message ON meta_message.id = sync.meta_message WHERE sync.community = ? AND meta_message.priority > 32 AND NOT sync.undone AND global_time BETWEEN ? AND ?", (self._database_id, time_low, db_high)))
if __debug__:
import sys
print >> sys.stderr, "Syncing %d-%d, capacity = %d, pivot = %d"%(time_low, time_high, capacity, time_low)
return (time_low, time_high, 1, 0, bloom)
#choose a pivot, add all items capacity to the right. If too small, add items left of pivot
@runtime_duration_warning(0.5)
def dispersy_claim_sync_bloom_filter_right(self):
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
desired_mean = self.global_time / 2.0
lambd = 1.0 / desired_mean
from_gbtime = self.global_time - int(self._random.expovariate(lambd))
if from_gbtime < 1:
from_gbtime = int(self._random.random() * self.global_time)
#import sys
#print >> sys.stderr, "Pivot", from_gbtime
mostRecent = False
if from_gbtime > 1:
#use from_gbtime - 1 to include from_gbtime
right,_ = self._select_and_fix(from_gbtime - 1, capacity, True)
#we did not select enough items from right side, increase nr of items for left
if len(right) < capacity:
to_select = capacity - len(right)
mostRecent = True
left,_ = self._select_and_fix(from_gbtime, to_select, False)
data = left + right
else:
data = right
else:
data,_ = self._select_and_fix(0, capacity, True)
if len(data) > 0:
if len(data) >= capacity:
time_low = min(from_gbtime, data[0][0])
if mostRecent:
time_high = self.acceptable_global_time
else:
time_high = max(from_gbtime, data[-1][0])
#we did not fill complete bloomfilter, assume we selected all items
else:
time_low = 1
time_high = self.acceptable_global_time
bloom.add_keys(str(packet) for _, packet in data)
#print >> sys.stderr, "Syncing %d-%d, nr_packets = %d, capacity = %d, packets %d-%d"%(time_low, time_high, len(data), capacity, data[0][0], data[-1][0])
return (time_low, time_high, 1, 0, bloom)
return (1, self.acceptable_global_time, 1, 0, BloomFilter(8, 0.1, prefix='\x00'))
#instead of pivot + capacity, divide capacity to have 50/50 divivion around pivot
@runtime_duration_warning(0.5)
def dispersy_claim_sync_bloom_filter_50_50(self):
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
desired_mean = self.global_time / 2.0
lambd = 1.0 / desired_mean
from_gbtime = self.global_time - int(self._random.expovariate(lambd))
if from_gbtime < 1:
from_gbtime = int(self._random.random() * self.global_time)
# import sys
#print >> sys.stderr, "Pivot", from_gbtime
mostRecent = False
leastRecent = False
if from_gbtime > 1:
to_select = capacity / 2
#use from_gbtime - 1 to include from_gbtime
right,_ = self._select_and_fix(from_gbtime - 1, to_select, True)
#we did not select enough items from right side, increase nr of items for left
if len(right) < to_select:
to_select = capacity - len(right)
mostRecent = True
left,_ = self._select_and_fix(from_gbtime, to_select, False)
#we did not select enough items from left side
if len(left) < to_select:
leastRecent = True
#increase nr of items for right if we did select enough items on right side
if len(right) >= to_select:
to_select = capacity - len(right) - len(left)
right = right + self._select_and_fix(right[-1][0], to_select, True)[0]
data = left + right
else:
data,_ = self._select_and_fix(0, capacity, True)
if len(data) > 0:
if len(data) >= capacity:
if leastRecent:
time_low = 1
else:
time_low = min(from_gbtime, data[0][0])
if mostRecent:
time_high = self.acceptable_global_time
else:
time_high = max(from_gbtime, data[-1][0])
#we did not fill complete bloomfilter, assume we selected all items
else:
time_low = 1
time_high = self.acceptable_global_time
bloom.add_keys(str(packet) for _, packet in data)
#print >> sys.stderr, "Syncing %d-%d, nr_packets = %d, capacity = %d, packets %d-%d"%(time_low, time_high, len(data), capacity, data[0][0], data[-1][0])
return (time_low, time_high, 1, 0, bloom)
return (1, self.acceptable_global_time, 1, 0, BloomFilter(8, 0.1, prefix='\x00'))
#instead of pivot + capacity, compare pivot - capacity and pivot + capacity to see which globaltime range is largest
@runtime_duration_warning(0.5)
def _dispersy_claim_sync_bloom_filter_largest(self):
if __debug__:
t1 = time()
syncable_messages = u", ".join(unicode(meta.database_id) for meta in self._meta_messages.itervalues() if isinstance(meta.distribution, SyncDistribution) and meta.distribution.priority > 32)
if syncable_messages:
if __debug__:
t2 = time()
acceptable_global_time = self.acceptable_global_time
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
desired_mean = self.global_time / 2.0
lambd = 1.0 / desired_mean
from_gbtime = self.global_time - int(self._random.expovariate(lambd))
if from_gbtime < 1:
from_gbtime = int(self._random.random() * self.global_time)
if from_gbtime > 1 and self._nrsyncpackets >= capacity:
#use from_gbtime -1/+1 to include from_gbtime
right, rightdata = self._select_bloomfilter_range(syncable_messages, from_gbtime -1, capacity, True)
#if right did not get to capacity, then we have less than capacity items in the database
#skip left
if right[2] == capacity:
left, leftdata = self._select_bloomfilter_range(syncable_messages, from_gbtime + 1, capacity, False)
left_range = (left[1] or self.global_time) - left[0]
right_range = (right[1] or self.global_time) - right[0]
if left_range > right_range:
bloomfilter_range = left
data = leftdata
else:
bloomfilter_range = right
data = rightdata
if __debug__:
dprint(self.cid.encode("HEX"), " bloomfilterrange left", left, " right", right, "left" if left_range > right_range else "right")
else:
bloomfilter_range = right
data = rightdata
if __debug__:
t3 = time()
else:
if __debug__:
t3 = time()
bloomfilter_range = [1, acceptable_global_time]
data, fixed = self._select_and_fix(syncable_messages, 0, capacity, True)
if len(data) > 0 and fixed:
bloomfilter_range[1] = data[-1][0]
self._nrsyncpackets = capacity + 1
if __debug__:
t4 = time()
if len(data) > 0:
bloom.add_keys(str(packet) for _, packet in data)
if __debug__:
dprint(self.cid.encode("HEX"), " syncing %d-%d, nr_packets = %d, capacity = %d, packets %d-%d, pivot = %d"%(bloomfilter_range[0], bloomfilter_range[1], len(data), capacity, data[0][0], data[-1][0], from_gbtime))
dprint(self.cid.encode("HEX"), " took %f (fakejoin %f, rangeselect %f, dataselect %f, bloomfill, %f"%(time()-t1, t2-t1, t3-t2, t4-t3, time()-t4))
return (min(bloomfilter_range[0], acceptable_global_time), min(bloomfilter_range[1], acceptable_global_time), 1, 0, bloom)
if __debug__:
dprint(self.cid.encode("HEX"), " no messages to sync")
elif __debug__:
dprint(self.cid.encode("HEX"), " NOT syncing no syncable messages")
return (1, acceptable_global_time, 1, 0, BloomFilter(8, 0.1, prefix='\x00'))
#instead of pivot + capacity, compare pivot - capacity and pivot + capacity to see which globaltime range is largest
@runtime_duration_warning(0.5)
def _dispersy_claim_sync_bloom_filter_modulo(self):
syncable_messages = u", ".join(unicode(meta.database_id) for meta in self._meta_messages.itervalues() if isinstance(meta.distribution, SyncDistribution) and meta.distribution.priority > 32)
if syncable_messages:
bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
self._nrsyncpackets = list(self._dispersy.database.execute(u"SELECT count(*) FROM sync WHERE meta_message IN (%s) AND undone = 0 LIMIT 1" % (syncable_messages)))[0][0]
modulo = int(ceil(self._nrsyncpackets / float(capacity)))
if modulo > 1:
offset = randint(0, modulo-1)
else:
offset = 0
modulo = 1
packets = list(str(packet) for packet, in self._dispersy.database.execute(u"SELECT sync.packet FROM sync WHERE meta_message IN (%s) AND sync.undone = 0 AND sync.global_time > 0 AND (sync.global_time + ?) %% ? = 0" % syncable_messages, (offset, modulo)))
bloom.add_keys(packets)
if __debug__:
dprint(self.cid.encode("HEX"), " syncing %d-%d, nr_packets = %d, capacity = %d, totalnr = %d"%(modulo, offset, self._nrsyncpackets, capacity, self._nrsyncpackets))
return (1, self.acceptable_global_time, modulo, offset, bloom)
elif __debug__:
dprint(self.cid.encode("HEX"), " NOT syncing no syncable messages")
return (1, self.acceptable_global_time, 1, 0, BloomFilter(8, 0.1, prefix='\x00'))
def _select_and_fix(self, syncable_messages, global_time, to_select, higher = True):
assert isinstance(syncable_messages, unicode)
if higher:
data = list(self._dispersy.database.execute(u"SELECT global_time, packet FROM sync WHERE meta_message IN (%s) AND undone = 0 AND global_time > ? ORDER BY global_time ASC LIMIT ?" % (syncable_messages),
(global_time, to_select + 1)))
else:
data = list(self._dispersy.database.execute(u"SELECT global_time, packet FROM sync WHERE meta_message IN (%s) AND undone = 0 AND global_time < ? ORDER BY global_time DESC LIMIT ?" % (syncable_messages),
(global_time, to_select + 1)))
fixed = False
if len(data) > to_select:
fixed = True
#if last 2 packets are equal, then we need to drop those
global_time = data[-1][0]
del data[-1]
while data and data[-1][0] == global_time:
del data[-1]
if not higher:
data.reverse()
return data, fixed
def _select_bloomfilter_range(self, syncable_messages, global_time, to_select, higher = True):
data, fixed = self._select_and_fix(syncable_messages, global_time, to_select, higher)
lowerfixed = True
higherfixed = True
#if we selected less than to_select
if len(data) < to_select:
#calculate how many still remain
to_select = to_select - len(data)
if to_select > 25:
if higher:
lowerdata, lowerfixed = self._select_and_fix(syncable_messages, global_time + 1, to_select, False)
data = lowerdata + data
else:
higherdata, higherfixed = self._select_and_fix(syncable_messages, global_time - 1, to_select, True)
data = data + higherdata
bloomfilter_range = [data[0][0], data[-1][0], len(data)]
#we can use the global_time as a min or max value for lower and upper bound
if higher:
#we selected items higher than global_time, make sure bloomfilter_range[0] is at least as low a global_time + 1
#we select all items higher than global_time, thus all items global_time + 1 are included
bloomfilter_range[0] = min(bloomfilter_range[0], global_time + 1)
#if not fixed and higher, then we have selected up to all know packets
if not fixed:
bloomfilter_range[1] = self.acceptable_global_time
if not lowerfixed:
bloomfilter_range[0] = 1
else:
#we selected items lower than global_time, make sure bloomfilter_range[1] is at least as high as global_time -1
#we select all items lower than global_time, thus all items global_time - 1 are included
bloomfilter_range[1] = max(bloomfilter_range[1], global_time - 1)
if not fixed:
bloomfilter_range[0] = 1
if not higherfixed:
bloomfilter_range[1] = self.acceptable_global_time
return bloomfilter_range, data
# def dispersy_claim_sync_bloom_filter(self, identifier):
# """
# Returns a (time_low, time_high, bloom_filter) tuple or None.
# """
# count, = self._dispersy.database.execute(u"SELECT COUNT(1) FROM sync JOIN meta_message ON meta_message.id = sync.meta_message WHERE sync.community = ? AND meta_message.priority > 32", (self._database_id,)).next()
# if count:
# bloom = BloomFilter(self.dispersy_sync_bloom_filter_bits, self.dispersy_sync_bloom_filter_error_rate, prefix=chr(int(random() * 256)))
# capacity = bloom.get_capacity(self.dispersy_sync_bloom_filter_error_rate)
# ranges = int(ceil(1.0 * count / capacity))
# desired_mean = ranges / 2.0
# lambd = 1.0 / desired_mean
# range_ = ranges - int(ceil(expovariate(lambd)))
# # RANGE_ < 0 is possible when the exponential function returns a very large number (least likely)
# # RANGE_ = 0 is the oldest time bloomfilter_range (less likely)
# # RANGE_ = RANGES - 1 is the highest time bloomfilter_range (more likely)
# if range_ < 0:
# # pick uniform randomly
# range_ = int(random() * ranges)
# if range_ == ranges - 1:
# # the chosen bloomfilter_range is to small to fill an entire bloom filter. adjust the offset
# # accordingly
# offset = max(0, count - capacity + 1)
# else:
# offset = range_ * capacity
# # get the time bloomfilter_range associated to the offset
# try:
# time_low, time_high = self._dispersy.database.execute(u"SELECT MIN(sync.global_time), MAX(sync.global_time) FROM sync JOIN meta_message ON meta_message.id = sync.meta_message WHERE sync.community = ? AND meta_message.priority > 32 ORDER BY sync.global_time LIMIT ? OFFSET ?",
# (self._database_id, capacity, offset)).next()
# except:
# dprint("count: ", count, " capacity: ", capacity, " bloomfilter_range: ", range_, " ranges: ", ranges, " offset: ", offset, force=1)
# assert False
# if __debug__ and self.get_classification() == u"ChannelCommunity":
# low, high = self._dispersy.database.execute(u"SELECT MIN(sync.global_time), MAX(sync.global_time) FROM sync JOIN meta_message ON meta_message.id = sync.meta_message WHERE sync.community = ? AND meta_message.priority > 32",
# (self._database_id,)).next()
# dprint("bloomfilter_range: ", range_, " ranges: ", ranges, " offset: ", offset, " time: [", time_low, ":", time_high, "] in-db: [", low, ":", high, "]", force=1)
# assert isinstance(time_low, (int, long))
# assert isinstance(time_high, (int, long))
# assert 0 < ranges
# assert 0 <= range_ < ranges
# assert ranges == 1 and range_ == 0 or ranges > 1
# assert 0 <= offset
# # get all the data associated to the time bloomfilter_range
# counter = 0
# for packet, in self._dispersy.database.execute(u"SELECT sync.packet FROM sync JOIN meta_message ON meta_message.id = sync.meta_message WHERE sync.community = ? AND meta_message.priority > 32 AND sync.global_time BETWEEN ? AND ?",
# (self._database_id, time_low, time_high)):
# bloom.add(str(packet))
# counter += 1
# if range_ == 0:
# time_low = 1
# if range_ == ranges - 1:
# time_high = 0
# if __debug__ and self.get_classification() == u"ChannelCommunity":
# dprint("off: ", offset, " cap: ", capacity, " count: ", counter, "/", count, " time: [", time_low, ":", time_high, "]", force=1)
# # if __debug__:
# # if len(data) > 1:
# # low, high = self._dispersy.database.execute(u"SELECT MIN(sync.global_time), MAX(sync.global_time) FROM sync JOIN meta_message ON meta_message.id = sync.meta_message WHERE sync.community = ? AND meta_message.priority > 32",
# # (self._database_id,)).next()
# # dprint(self.cid.encode("HEX"), " syncing <<", data[0][0], " <", data[1][0], "-", data[-2][0], "> ", data[-1][0], ">> sync:[", time_low, ":", time_high, "] db:[", low, ":", high, "] len:", len(data), " cap:", capacity)
# return (time_low, time_high, bloom)
# return (1, 0, BloomFilter(8, 0.1, prefix='\x00'))
@property
def dispersy_sync_response_limit(self):
"""
The maximum number of bytes to send back per received dispersy-sync message.
@rtype: int
"""
return 5 * 1025