Skip to content

Commit

Permalink
Initial code
Browse files Browse the repository at this point in the history
  • Loading branch information
eduard93 committed Oct 9, 2017
1 parent cb6f1c6 commit a8765d1
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 0 deletions.
79 changes: 79 additions & 0 deletions RabbitMQ/Common.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
Class RabbitMQ.Common Extends %RegisteredObject [ Abstract ]
{

/// This is the ID name of the set of credentials values (Username, Password) to be used to access the HTTP server
/// Property Credentials As %String [ InitialExpression = "None" ];
Property Host As %String [ InitialExpression = "localhost" ];

Property Port As %Integer [ InitialExpression = -1 ];

Property VirtualHost As %String [ InitialExpression = "/" ];

Property Queue As %String;

/// Config Name of the Java Gateway service controlling the Java Gateway server this Operation will use.
Property JGService As %String;

/// Gateway connection
Property JGW As %Net.Remote.Gateway;

/// API object
Property API As isc.rabbitmq.API;

/// Encoding to convert message body. Leave empty to get/send as is.
Property Encoding As %String;

/// CLASSPATH containing the files required to be passed as an argument when starting the JVM.
/// The user should typically provide here the files containing the classes used via the Java Gateway.
/// We assume that the user has properly quoted the classpath and supplied the correct separators for the platform
/// in case of multiple files. <br>
/// See property AdditionalPaths in that class.
Property ClassPath As %String(MAXLEN = 32000);

/// These are the production settings for this object
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGService:Basic:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic";

/// Connect to running JGW
Method Connect() As %Status
{
// connect to current namespace, use 2 second timeout
Set sc = $$$OK
Set timeout = 5
Set classPath = ##class(%ListOfDataTypes).%New()
Do classPath.Insert(..ClassPath)

// get a connection handle and connect
Set gateway = ##class(%Net.Remote.Gateway).%New()
Set host = ##class(Ens.Director).GetHostSettingValue(..JGService, "Address")
Set port = ##class(Ens.Director).GetHostSettingValue(..JGService, "Port")
Set sc = gateway.%Connect(host, port, $namespace, timeout, classPath)

If $$$ISOK(sc) {
Set ..JGW = gateway
}
Quit sc
}

Method ConnectToRabbitMQ() As %Status
{
Set sc = $$$OK

If ..%CredentialsObj.Username'="" {
Set user = ..%CredentialsObj.Username
Set pass = ..%CredentialsObj.Password
} Else {
Set user = "guest"
Set pass = "guest"
}

Try {
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue)
} Catch ex {
Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror))
}

Quit sc
}

}

112 changes: 112 additions & 0 deletions RabbitMQ/InboundAdapter.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
Class RabbitMQ.InboundAdapter Extends (Ens.InboundAdapter, RabbitMQ.Common)
{

/// Stream class to store message body. Leave empty to use strings.
Property BodyClass As %Dictionary.CacheClassname;

Parameter SETTINGS = "BodyClass:Basic";

/// Establish gateway connectionand init java API
Method OnInit() As %Status
{
Set sc = $$$OK
Quit:..JGService="" $$$ERROR($$$GeneralError,"Specify JGService setting")
Quit:'##class(Ens.Director).IsItemEnabled(..JGService) $$$ERROR($$$GeneralError, $$$FormatText("Java Gateway Service: '%1' is down",..JGService))
Set sc = ..Connect()
Quit:$$$ISERR(sc)
Set sc = ..ConnectToRabbitMQ()
Quit sc
}

/// Close connection
Method OnTearDown() As %Status
{
Do ..API.close()
Quit $$$OK
}

/// default InboundAdapter behavior: always call ProcessInput on CallInterval
Method OnTask() As %Status
{
Set sc = $$$OK

Set messageCount = 1

While messageCount > 0 {
#Dim messageList As %ListOfDataTypes

If ..BodyClass = "" {
Set messageList = ..API.readMessageString()
} Else {
Set tempStream = ..GetTempStream()
Set messageList = ..API.readMessageStream(.tempStream)
}

Set messageLength = messageList.GetAt(1)
Set messageCount = messageList.GetAt(2)

If messageLength>0 {
#Dim message As RabbitMQ.Message
Set message = ..ListToMessage(messageList)
If ..BodyClass = "" {
Set message.Body = ..DecodeMessageBody(messageList.GetAt(16))
} Else {
Set message.Body = $classmethod(..BodyClass, "%New")
Do message.Body.Write(..DecodeMessageBody(tempStream.Read(messageLength)))
Do message.Body.Rewind()
}
Set sc = ..BusinessHost.ProcessInput(message)
} Else {
CONTINUE
}
Quit:$$$ISERR(sc)
}
Set ..BusinessHost.%WaitForNextCallInterval=1
Quit sc
}

ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message
{
Set message = ##class(RabbitMQ.Message).%New()

Set message.ContentType = list.GetAt(3)
Set message.ContentEncoding = list.GetAt(4)
Set message.CorrelationId = list.GetAt(5)
Set message.ReplyTo = list.GetAt(6)
Set message.Expiration = list.GetAt(7)
Set message.MessageId = list.GetAt(8)
Set message.Type = list.GetAt(9)
Set message.UserId = list.GetAt(10)
Set message.AppId = list.GetAt(11)
Set message.ClusterId = list.GetAt(12)
Set message.DeliveryMode = list.GetAt(13)
Set message.Priority = list.GetAt(14)
Set message.Timestamp = list.GetAt(15)

Quit message
}

Method DecodeMessageBody(body As %String) As %String
{
If ..Encoding '= "" {
If $isObject(body) {
// TODO streams
} Else {
Set body = $zcvt(body, "O", ..Encoding)
}
}
Quit body
}

ClassMethod GetTempStream() As %GlobalBinaryStream
{
Set stream=##class(%GlobalBinaryStream).%New()
// TODO - work around that
// we need to 'reserve' a number of bytes since we are passing the stream
// by reference (Java's equivalent is byte[] ba = new byte[max];)
For i=1:1:32000 Do stream.Write("0")
Quit stream
}

}

91 changes: 91 additions & 0 deletions RabbitMQ/Message.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
Class RabbitMQ.Message Extends %Persistent
{

Property ContentType As %String;

Property ContentEncoding As %String;

Property CorrelationId As %String;

Property ReplyTo As %String;

Property Expiration As %String;

Property MessageId As %String;

Property Type As %String;

Property UserId As %String;

Property AppId As %String;

Property ClusterId As %String;

Property DeliveryMode As %String;

Property Priority As %String;

Property Timestamp As %String;

/// Could be either string or stream
Property Body As %String;

Storage Default
{
<Data name="MessageDefaultData">
<Value name="1">
<Value>%%CLASSNAME</Value>
</Value>
<Value name="2">
<Value>ContentType</Value>
</Value>
<Value name="3">
<Value>ContentEncoding</Value>
</Value>
<Value name="4">
<Value>CorrelationId</Value>
</Value>
<Value name="5">
<Value>ReplyTo</Value>
</Value>
<Value name="6">
<Value>Expiration</Value>
</Value>
<Value name="7">
<Value>MessageId</Value>
</Value>
<Value name="8">
<Value>Type</Value>
</Value>
<Value name="9">
<Value>UserId</Value>
</Value>
<Value name="10">
<Value>AppId</Value>
</Value>
<Value name="11">
<Value>ClusterId</Value>
</Value>
<Value name="12">
<Value>DeliveryMode</Value>
</Value>
<Value name="13">
<Value>Priority</Value>
</Value>
<Value name="14">
<Value>Timestamp</Value>
</Value>
<Value name="15">
<Value>Body</Value>
</Value>
</Data>
<DataLocation>^RabbitMQ.MessageD</DataLocation>
<DefaultData>MessageDefaultData</DefaultData>
<IdLocation>^RabbitMQ.MessageD</IdLocation>
<IndexLocation>^RabbitMQ.MessageI</IndexLocation>
<StreamLocation>^RabbitMQ.MessageS</StreamLocation>
<Type>%Library.CacheStorage</Type>
}

}

16 changes: 16 additions & 0 deletions RabbitMQ/Operation.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Class RabbitMQ.Operation Extends Ens.BusinessOperation
{

Parameter ADAPTER = "RabbitMQ.OutboundAdapter";

Property Adapter As RabbitMQ.OutboundAdapter;

Method OnMessage(request As Ens.StringRequest, response As Ens.Response) As %Status
{
#Dim sc As %Status = $$$OK
Set response = ##class(Ens.Response).%New()
quit ..Adapter.SendMessageToQueue("Hello", request.StringValue)
}

}

79 changes: 79 additions & 0 deletions RabbitMQ/OutboundAdapter.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
Class RabbitMQ.OutboundAdapter Extends (Ens.OutboundAdapter, RabbitMQ.Common)
{

Method OnInit() As %Status
{
Set sc = $$$OK
Quit:..JGService="" $$$ERROR($$$GeneralError,"Specify JGService setting")
Quit:'##class(Ens.Director).IsItemEnabled(..JGService) $$$ERROR($$$GeneralError, $$$FormatText("Java Gateway Service: '%1' is down",..JGService))
Set sc = ..Connect()
Quit:$$$ISERR(sc) sc
Set sc = ..ConnectToRabbitMQ()
Quit sc
}

/// Close connection
Method OnTearDown() As %Status
{
Do ..API.close()
Quit $$$OK
}

/// Send message. message can be a string or stream.
Method SendMessage(message As %Stream.Object) As %Status
{
Set sc = $$$OK
Set stream = ##class(%Library.GlobalBinaryStream).%New()

If $isObject(message) {
While 'message.AtEnd {
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
}
} Else {
Do stream.Write(..EncodeMessageBody(message))
}

Try {
Do ..API.sendMessage(stream)
} Catch ex {
Set sc = ex.AsStatus()
}
Quit sc
}

/// Send message. message can be a string or stream.
Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Status
{
Set sc = $$$OK
Set stream = ##class(%Library.GlobalBinaryStream).%New()

If $isObject(message) {
While 'message.AtEnd {
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
}
} Else {
Do stream.Write(..EncodeMessageBody(message))
}

Try {
Do ..API.sendMessageToQueue(queue, stream)
} Catch ex {
Set sc = ex.AsStatus()
}
Quit sc
}

Method EncodeMessageBody(body As %String) As %String
{
If ..Encoding '= "" {
If $isObject(body) {
// TODO streams
} Else {
Set body = $zcvt(body, "O", ..Encoding)
}
}
Quit body
}

}

Loading

0 comments on commit a8765d1

Please sign in to comment.