From d016de527a953b54160f11598d8d893771bf88ec Mon Sep 17 00:00:00 2001 From: eduard93 Date: Wed, 14 Nov 2018 23:21:12 +0300 Subject: [PATCH] Connection auto-recovery --- RabbitMQ/Common.cls | 44 ++++++++++++++++++++++++++++++++++-- RabbitMQ/InboundAdapter.cls | 31 ++++++++++++++++++------- RabbitMQ/OutboundAdapter.cls | 42 +++++++++++++++++++++++++++------- 3 files changed, 99 insertions(+), 18 deletions(-) diff --git a/RabbitMQ/Common.cls b/RabbitMQ/Common.cls index dff595b..5315475 100644 --- a/RabbitMQ/Common.cls +++ b/RabbitMQ/Common.cls @@ -45,8 +45,18 @@ Property Encoding As %String; /// See property AdditionalPaths in that class. Property ClassPath As %String(MAXLEN = 32000); +/// How many times have we tried reconnecting +/// empty - do not retry +/// 0 - retry ad infinitum +/// n - retry n times +Property RetryCount As %Integer [ InitialExpression = 5 ]; + +/// How frequently to retry access to the output system. +/// Pause in seconds between retry attempts. +Property RetryInterval As %Numeric(MINVAL = 0) [ InitialExpression = 5 ]; + /// These are the production settings for this object -Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic"; +Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGHost:Java Gateway,JGPort:Java Gateway,JGService:Java Gateway:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic,RetryCount:Alerting,RetryInterval:Alerting"; /// Connect to running JGW Method Connect() As %Status @@ -87,8 +97,10 @@ Method ConnectToRabbitMQ() As %Status Set pass = "guest" } + Set port = $select(..Port="":-1, 1:..Port) + Try { - Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange) + Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, port, user, pass, ..VirtualHost, ..Queue, $$$YES, ..Exchange) } Catch ex { Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror)) } @@ -96,5 +108,33 @@ Method ConnectToRabbitMQ() As %Status Quit sc } +Method IsOpen() As %Status +{ + #Dim sc As %Status = $$$OK + Set retryCount = 1 + Try { + While '..API.isOpen() { + If ..RetryCount = "" { + Set sc = $$$ERROR($$$GeneralError, "Connection problems. Consider specifying RetryCount and RetryInterval settings") + } ElseIf ((..RetryCount = 0) || (retryCount < ..RetryCount)) { + // wait and retry connecting + Set retryCount = retryCount + 1 + Hang ..RetryInterval + + // reconnect happens in isOpen method + } Else { + // we're out of reconnect attempts + Set sc = $$$ERROR($$$GeneralError, $$$FormatText("Connection still closed after %1 attempts at reconnecting.", ..RetryCount)) + } + Quit:$$$ISERR(sc) + } + } Catch ex { + #Dim ex As %Exception.General + Set sc = ex.AsStatus() + } + + Quit sc +} + } diff --git a/RabbitMQ/InboundAdapter.cls b/RabbitMQ/InboundAdapter.cls index 3ce2544..eae0227 100644 --- a/RabbitMQ/InboundAdapter.cls +++ b/RabbitMQ/InboundAdapter.cls @@ -28,21 +28,36 @@ Method OnTearDown() As %Status /// Get Messages from RabbitMQ queue. Method OnTask() As %Status { - Set sc = $$$OK + #Dim sc As %Status = $$$OK Set messageCount = 1 While messageCount > 0 { + Set sc = ..IsOpen() + Quit:$$$ISERR(sc) + // List containing metainformation and possibly body (in the case of string interaction) of the RabbitMQ message #Dim messageList As %ListOfDataTypes - If ..BodyClass = "" { - Set messageList = ..API.readMessageString() - } Else { - #Dim tempStream As %Library.GlobalBinaryStream - Set messageList = ##class(%ListOfDataTypes).%New() - For i=1:1:15 Do messageList.Insert("") - Set tempStream = ..API.readMessageStream(.messageList) + Set messageList = ##class(%ListOfDataTypes).%New() + For i=1:1:15 Do messageList.Insert("") + + Try { + If ..BodyClass = "" { + Set messageList = ..API.readMessageString() + } Else { + #Dim tempStream As %Library.GlobalBinaryStream + Set tempStream = ..API.readMessageStream(.messageList) + } + } Catch ex { + #Dim ex As %Exception.General + If ($ZE["") { + Set sc = ..IsOpen() + Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus()) + } Else { + Set sc = ex.AsStatus() + } + Quit:$$$ISERR(sc) } Set messageLength = messageList.GetAt(1) diff --git a/RabbitMQ/OutboundAdapter.cls b/RabbitMQ/OutboundAdapter.cls index bb53e2e..539e567 100644 --- a/RabbitMQ/OutboundAdapter.cls +++ b/RabbitMQ/OutboundAdapter.cls @@ -36,10 +36,23 @@ Method SendMessage(message As %Stream.Object) As %Status Do stream.Write(..EncodeMessageBody(message)) } - Try { - Do ..API.sendMessage(stream) - } Catch ex { - Set sc = ex.AsStatus() + #Dim attempts As %Integer = 0 + + While attempts<2 { + Set attempts = attempts + 1 + Try { + Do ..API.sendMessage(stream) + Return sc + } Catch ex { + #Dim ex As %Exception.General + If ($ZE["") { + Set sc = ..IsOpen() + Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus()) + } Else { + Set sc = ex.AsStatus() + } + Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception + } } Quit sc } @@ -57,11 +70,24 @@ Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Statu } Else { Do stream.Write(..EncodeMessageBody(message)) } + + #Dim attempts As %Integer = 0 - Try { - Do ..API.sendMessageToQueue(queue, stream) - } Catch ex { - Set sc = ex.AsStatus() + While attempts<2 { + Set attempts = attempts + 1 + Try { + Do ..API.sendMessageToQueue(queue, stream) + Return sc + } Catch ex { + #Dim ex As %Exception.General + If ($ZE["") { + Set sc = ..IsOpen() + Set:$$$ISERR(sc) sc = $$$ADDSC(sc, ex.AsStatus()) + } Else { + Set sc = ex.AsStatus() + } + Return:$$$ISERR(sc) sc // quit if reconnect is unsuccessful or we got unknown exception + } } Quit sc }