Skip to content

Commit

Permalink
Delivery mode - persistent
Browse files Browse the repository at this point in the history
sendMessage split into sendMessage and sendMessageId
sendMessageToQueue split into sendMessageToQueue and sendMessageToQueueId

Because of an overload bug in 2019.1.0
  • Loading branch information
eduard93 committed Sep 7, 2019
1 parent a086d29 commit bd1ba37
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions src/isc/rabbitmq/API.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* Created by eduard on 06.10.2017.
*/
public class API {
private Channel _channel;
private com.rabbitmq.client.Channel _channel;

private final String _queue;

Expand All @@ -28,12 +28,18 @@ public API(String host, int port, String user, String pass, String virtualHost,

public API(String host, int port, String user, String pass, String virtualHost, String queue, int durable, String exchange) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(pass);
factory.setVirtualHost(virtualHost);
//factory.setAutomaticRecoveryEnabled(true);

if (host.toLowerCase().startsWith("amqp://")) {
// we got URI connection string
factory.setUri(host);
} else{
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(pass);
factory.setVirtualHost(virtualHost);
//factory.setAutomaticRecoveryEnabled(true);
}

_connection = factory.newConnection();
_channel = _connection.createChannel();
Expand Down Expand Up @@ -68,19 +74,19 @@ public API(String host, int port, String user, String pass, String virtualHost,
_exchange = exchange != null ? exchange : "";
}

public void sendMessage(byte[] msg, String correlationId, String messageId) throws Exception {
sendMessageToQueue(_queue, msg, correlationId, messageId);
public void sendMessageId(byte[] msg, String correlationId, String messageId) throws Exception {
sendMessageToQueueId(_queue, msg, correlationId, messageId);
}

public void sendMessage(byte[] msg) throws Exception {
sendMessageToQueue(_queue, msg);
}

public void sendMessageToQueue(String queue, byte[] msg) throws Exception {
sendMessageToQueue(queue, msg, null, null);
sendMessageToQueueId(queue, msg, null, null);
}

public void sendMessageToQueue(String queue, byte[] msg, String correlationId, String messageId) throws Exception {
public void sendMessageToQueueId(String queue, byte[] msg, String correlationId, String messageId) throws Exception {
AMQP.BasicProperties props = createProperties(correlationId, messageId);
_channel.basicPublish(_exchange, queue, props, msg);
}
Expand Down Expand Up @@ -163,7 +169,7 @@ private AMQP.BasicProperties createProperties(String correlationId, String messa
String contentType = ContentType;
String contentEncoding = null;
HashMap<String, Object> headers = null;
Integer deliveryMode = null;
Integer deliveryMode = Integer.valueOf(2);
Integer priority = null;
//String correlationId= null;
String replyTo = null;
Expand All @@ -174,6 +180,7 @@ private AMQP.BasicProperties createProperties(String correlationId, String messa
String userId= null;
String appId = null;
String clusterId= null;

AMQP.BasicProperties props = new AMQP.BasicProperties(contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, expiration, messageId, timestamp, type, userId, appId, clusterId);
return props;
}
Expand Down

0 comments on commit bd1ba37

Please sign in to comment.