@@ -167,6 +167,37 @@ def close(self):
167
167
168
168
169
169
170
+
171
+
172
+ def split_by_newline (s ):
173
+ """ Find split position simply by looking for newlines. """
174
+ p = s .find ('\n ' )
175
+ return p + 1 if p >= 0 else None
176
+
177
+ def split_by_json (s ):
178
+ """ Find split position by looking at first (nested) JSON structure. """
179
+ l = 0
180
+ in_string = False
181
+
182
+ i = 0
183
+ while i < len (s ):
184
+ c = s [i ]
185
+
186
+ if c == '"' :
187
+ in_string = not in_string
188
+ elif in_string :
189
+ if c == '\\ ' :
190
+ i += 1
191
+ else :
192
+ if c in "[{" :
193
+ l += 1
194
+ if c in "]}" :
195
+ l -= 1
196
+ if l <= 0 :
197
+ return i + 1
198
+ i += 1
199
+ return None
200
+
170
201
class Connection (object ): # TODO: Split this class in simple ones
171
202
"""
172
203
Represents a communiation tunnel between two parties.
@@ -254,7 +285,9 @@ def getmaxtimeout(cls, operation):
254
285
return cls ._maxtimeout [operation ]
255
286
256
287
257
- def __init__ (self , sck , address = None , handler_factory = None , http = False ):
288
+ def __init__ (self , sck , address = None , handler_factory = None ,
289
+ http = False ):
290
+ self .split = split_by_json
258
291
self ._debug_socket = False
259
292
self ._debug_dispatch = False
260
293
self ._buffer = b''
@@ -520,13 +553,14 @@ def dispatch_until_empty(self):
520
553
521
554
if not ready_to_read : return 0
522
555
523
- newline_idx = 0
524
556
count = 0
525
- while newline_idx != - 1 :
557
+ while True :
526
558
if not self .read_and_dispatch (timeout = 0 ):
527
559
break
528
560
count += 1
529
- newline_idx = self ._buffer .find (b'\n ' )
561
+ if self .split (self ._buffer )== None :
562
+ break
563
+
530
564
return count
531
565
532
566
def read_and_dispatch (self , timeout = None , thread = True , condition = None ):
@@ -877,13 +911,20 @@ def read(self, timeout = None):
877
911
878
912
def _readn (self ):
879
913
"""
880
- Internal function which reads from socket waiting for a newline
914
+ Internal function which reads from socket and splits into
915
+ individual JSON messages
881
916
"""
917
+
882
918
streambuffer = self ._buffer
883
- pos = streambuffer .find (b'\n ' )
884
919
#_log.debug("read...")
885
920
#retry = 0
886
- while pos == - 1 :
921
+ pos = 0
922
+
923
+ while True :
924
+ pos = self .split (streambuffer )
925
+ if pos != None :
926
+ break
927
+
887
928
data = b''
888
929
try :
889
930
data = self ._sck .recv (2048 )
@@ -915,12 +956,10 @@ def _readn(self):
915
956
raise EofError (len (streambuffer ))
916
957
#_log.debug("readbuf+: %r", data)
917
958
streambuffer += data
918
- pos = streambuffer .find (b'\n ' )
919
959
920
- self ._buffer = streambuffer [pos + 1 :]
921
- streambuffer = streambuffer [:pos ]
960
+ self ._buffer = streambuffer [pos :]
922
961
#_log.debug("read: %r", buffer)
923
- return streambuffer
962
+ return streambuffer [: pos ]
924
963
925
964
def serve (self ):
926
965
"""
0 commit comments