diff --git a/src/isc/rabbitmq/API.java b/src/isc/rabbitmq/API.java index e57eb02..de57d04 100644 --- a/src/isc/rabbitmq/API.java +++ b/src/isc/rabbitmq/API.java @@ -5,6 +5,10 @@ import java.io.IOException; import java.nio.file.*; + +import java.util.Date; +import java.util.HashMap; + /** * Created by eduard on 06.10.2017. */ @@ -30,6 +34,10 @@ public API(String host, int port, String user, String pass, String virtualHost, _queue = queue; } + public void sendMessage(byte[] msg, String correlationId, String messageId) throws Exception { + sendMessageToQueue(_queue, msg, correlationId, messageId); + } + public void sendMessage(byte[] msg) throws Exception { sendMessageToQueue(_queue, msg); } @@ -39,6 +47,11 @@ public void sendMessageToQueue(String queue, byte[] msg) throws Exception { _channel.basicPublish("", queue, null, msg); } + public void sendMessageToQueue(String queue, byte[] msg, String correlationId, String messageId) throws Exception { + AMQP.BasicProperties props = createProperties(correlationId, messageId); + _channel.basicPublish("", queue, props, msg); + } + public byte[] readMessageStream(String[] result) throws Exception { GetResponse response = readMessage(result); if (response == null) { @@ -102,4 +115,24 @@ public void close()throws Exception { _connection.close(); } + private AMQP.BasicProperties createProperties(String correlationId, String messageId) throws Exception + { + String contentType = null; + String contentEncoding = null; + HashMap headers = null; + Integer deliveryMode = null; + Integer priority = null; + //String correlationId= null; + String replyTo = null; + String expiration= null; + //String messageId= null; + Date timestamp= null; + String type = null; + String userId= null; + String appId = null; + String clusterId= null; + AMQP.BasicProperties props = new AMQP.BasicProperties(contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, expiration, messageId, timestamp, type, userId, appId, clusterId); + return props; + } + }