@@ -38,10 +38,14 @@ public API(String host, int port, String user, String pass, String virtualHost,
3838 factory .setUsername (user );
3939 factory .setPassword (pass );
4040 factory .setVirtualHost (virtualHost );
41- //factory.setAutomaticRecoveryEnabled(true);
4241 }
4342
43+ //factory.setAutomaticRecoveryEnabled(true);
44+ factory .setRequestedHeartbeat (0 );
45+
46+
4447 _connection = factory .newConnection ();
48+
4549 _channel = _connection .createChannel ();
4650 try {
4751 // Do we need to declare queue?
@@ -50,7 +54,7 @@ public API(String host, int port, String user, String pass, String virtualHost,
5054 // Check that queue exists
5155 // Method throws exception if queue does not exist or is exclusive
5256 // Correct exception text: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue'
53- com . rabbitmq . client . AMQP .Queue .DeclareOk declareOk = _channel .queueDeclarePassive (queue );
57+ AMQP .Queue .DeclareOk declareOk = _channel .queueDeclarePassive (queue );
5458 }
5559 } catch (java .io .IOException ex ) {
5660 // Exception closes the channel.
@@ -70,8 +74,34 @@ public API(String host, int port, String user, String pass, String virtualHost,
7074
7175 }
7276
77+ if (exchange != null ) {
78+ _exchange = exchange ;
79+ try {
80+ AMQP .Exchange .DeclareOk declareOk = _channel .exchangeDeclarePassive (exchange );
81+ } catch (java .io .IOException ex ) {
82+ // Exception closes the channel.
83+ // So we need to create new one.
84+ // _channel.basicRecover() doesn't do the trick
85+ _channel = _connection .createChannel ();
86+
87+ Boolean durableBool = (durable != 0 );
88+ Boolean autoDelete = false ;
89+ Boolean passive = false ;
90+ // exchange - name of the exchange
91+ // type - direct, topic, fanout, headers. See https://lostechies.com/derekgreer/2012/03/28/rabbitmq-for-windows-exchange-types/
92+ // passive - if true, works the same as exchangeDeclarePassive
93+ // durable - true if we are declaring a durable exchange (the exchange will survive a server restart)
94+ // autoDelete - true if we are declaring an autodelete exchange (server will delete it when no longer in use)
95+ // arguments - other properties (construction arguments) for the exchange
96+
97+ AMQP .Exchange .DeclareOk declareOk = _channel .exchangeDeclare (exchange , "direct" , passive , durableBool , autoDelete , null ); // , exclusive, autoDelete, null
98+ }
99+ } else {
100+ _exchange = "" ;
101+ }
102+
73103 _queue = queue ;
74- _exchange = exchange != null ? exchange : "" ;
104+ // _exchange = exchange != null ? exchange : "";
75105 }
76106
77107 public void sendMessageId (byte [] msg , String correlationId , String messageId ) throws Exception {
@@ -160,8 +190,13 @@ public Boolean isOpen()
160190 }
161191
162192 public void close ()throws Exception {
163- _channel .close ();
164- _connection .close ();
193+ try {
194+ _channel .close ();
195+ } catch ( Exception ex ) {}
196+
197+ try {
198+ _connection .close ();
199+ } catch ( Exception ex ) {}
165200 }
166201
167202 private AMQP .BasicProperties createProperties (String correlationId , String messageId ) throws Exception
0 commit comments