diff --git a/RabbitMQ/InboundAdapter.cls b/RabbitMQ/InboundAdapter.cls index 65c31b8..35bb3dc 100644 --- a/RabbitMQ/InboundAdapter.cls +++ b/RabbitMQ/InboundAdapter.cls @@ -6,7 +6,7 @@ Property BodyClass As %Dictionary.CacheClassname; Parameter SETTINGS = "BodyClass:Basic"; -/// Establish gateway connectionand init java API +/// Establish gateway connection and init java API. Method OnInit() As %Status { Set sc = $$$OK @@ -18,14 +18,14 @@ Method OnInit() As %Status Quit sc } -/// Close connection +/// Close connection. Method OnTearDown() As %Status { Do ..API.close() Quit $$$OK } -/// default InboundAdapter behavior: always call ProcessInput on CallInterval +/// Get Messages from RabbitMQ queue. Method OnTask() As %Status { Set sc = $$$OK @@ -33,13 +33,16 @@ Method OnTask() As %Status Set messageCount = 1 While messageCount > 0 { + // List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message #Dim messageList As %ListOfDataTypes If ..BodyClass = "" { Set messageList = ..API.readMessageString() } Else { - Set tempStream = ..GetTempStream() - Set messageList = ..API.readMessageStream(.tempStream) + #Dim tempStream As %Library.GlobalBinaryStream + Set messageList = ##class(%ListOfDataTypes).%New() + For i=1:1:15 Do messageList.Insert("") + Set tempStream = ..API.readMessageStream(.messageList) } Set messageLength = messageList.GetAt(1) @@ -49,11 +52,13 @@ Method OnTask() As %Status #Dim message As RabbitMQ.Message Set message = ..ListToMessage(messageList) If ..BodyClass = "" { - Set message.Body = ..DecodeMessageBody(messageList.GetAt(16)) + Set message.BodyString = ..DecodeMessageBody(messageList.GetAt(16)) } Else { - Set message.Body = $classmethod(..BodyClass, "%New") - Do message.Body.Write(..DecodeMessageBody(tempStream.Read(messageLength))) - Do message.Body.Rewind() + Set message.BodyStream = $classmethod(..BodyClass, "%New") + While 'tempStream.AtEnd { + Do message.BodyStream.Write(..DecodeMessageBody(tempStream.Read($$$MaxStringLength))) + } + Do message.BodyStream.Rewind() } Set sc = ..BusinessHost.ProcessInput(message) } Else { @@ -65,6 +70,7 @@ Method OnTask() As %Status Quit sc } +/// Convert list containing metainformation into RabbitMQ message ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message { Set message = ##class(RabbitMQ.Message).%New() @@ -86,27 +92,12 @@ ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message Quit message } +/// Decode message body. May be full body or only a piece. Method DecodeMessageBody(body As %String) As %String { - If ..Encoding '= "" { - If $isObject(body) { - // TODO streams - } Else { - Set body = $zcvt(body, "O", ..Encoding) - } - } + Set:..Encoding'="" body = $zcvt(body, "O", ..Encoding) Quit body } -ClassMethod GetTempStream() As %GlobalBinaryStream -{ - Set stream=##class(%GlobalBinaryStream).%New() - // TODO - work around that - // we need to 'reserve' a number of bytes since we are passing the stream - // by reference (Java's equivalent is byte[] ba = new byte[max];) - For i=1:1:32000 Do stream.Write("0") - Quit stream -} - } diff --git a/RabbitMQ/Message.cls b/RabbitMQ/Message.cls index c02c95f..a6f6bb1 100644 --- a/RabbitMQ/Message.cls +++ b/RabbitMQ/Message.cls @@ -28,7 +28,18 @@ Property Priority As %String; Property Timestamp As %String; /// Could be either string or stream -Property Body As %String; +Property Body(MAXLEN = "") [ Transient ]; + +Method BodyGet() [ CodeMode = expression ] +{ +$select(..BodyString'="":..BodyString, 1:..BodyStream) +} + +/// Body if it's a string +Property BodyString As %String(MAXLEN = ""); + +/// Body if it's a stream +Property BodyStream As %Stream.GlobalCharacter; Storage Default { @@ -78,6 +89,12 @@ Storage Default Body + +BodyString + + +BodyStream + ^RabbitMQ.MessageD MessageDefaultData diff --git a/RabbitMQ/Operation.cls b/RabbitMQ/Operation.cls index 241f233..e1c8c19 100644 --- a/RabbitMQ/Operation.cls +++ b/RabbitMQ/Operation.cls @@ -9,7 +9,8 @@ Method OnMessage(request As Ens.StringRequest, response As Ens.Response) As %Sta { #Dim sc As %Status = $$$OK Set response = ##class(Ens.Response).%New() - quit ..Adapter.SendMessage(request.StringValue) + Set sc = ..Adapter.SendMessage(request.StringValue) + Quit sc } }