From 9e91c2bcc2d477a03c0e7893bc7693f222b980f9 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Fri, 10 Jun 2022 19:59:28 -0700 Subject: [PATCH 01/13] Add thread affinity code and tcp change code --- deploy/configs/nonclustered.yaml | 5 ++- .../commons2/network/NetworkClient.java | 15 +++++++- memq-commons/pom.xml | 5 +++ .../client/examples/ExampleMemqProducer.java | 21 ++++++++--- .../client/examples/KafkaProducerTest.java | 37 +++++++++++++++++++ .../memq/client/examples/PingPong.java | 34 +++++++++++++++++ .../memq/core/rpc/MemqNettyServer.java | 8 +++- 7 files changed, 113 insertions(+), 12 deletions(-) create mode 100644 memq-examples/src/main/java/com/pinterest/memq/client/examples/KafkaProducerTest.java create mode 100644 memq-examples/src/main/java/com/pinterest/memq/client/examples/PingPong.java diff --git a/deploy/configs/nonclustered.yaml b/deploy/configs/nonclustered.yaml index 62900a3..8822dc0 100644 --- a/deploy/configs/nonclustered.yaml +++ b/deploy/configs/nonclustered.yaml @@ -8,10 +8,11 @@ topicConfig: ringBufferSize: 1024 outputParallelism: 4 batchSizeMB: 1 - batchMilliSeconds: 50 - storageHandlerName: devnull + batchMilliSeconds: 1 + storageHandlerName: filesystem storageHandlerConfig: retryTimeoutMillis: 5000 + storageDirs: /Volumes/RAMDisk/val maxAttempts: 2 disableNotifications: true bucket: diff --git a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java index e145834..cac77eb 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,9 +58,12 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.ReferenceCountUtil; +import net.openhft.affinity.AffinityStrategies; +import net.openhft.affinity.AffinityThreadFactory; // No thread-safety guarantees public class NetworkClient implements Closeable { + private static final int DEFAULT_EVENT_LOOP_THREADS = 2; private static final Logger logger = LoggerFactory.getLogger(NetworkClient.class); public static final String CONFIG_INITIAL_RETRY_INTERVAL_MS = "initialRetryIntervalMs"; public static final String CONFIG_MAX_RETRY_COUNT = "maxRetryCount"; @@ -107,15 +111,17 @@ public NetworkClient(Properties properties, SSLConfig sslConfig) { } this.responseHandler = new ResponseHandler(); bootstrap = new Bootstrap(); + ThreadFactory threadFactory = new AffinityThreadFactory("atf_wrk", true, AffinityStrategies.DIFFERENT_CORE); if (Epoll.isAvailable()) { - eventLoopGroup = new EpollEventLoopGroup(1, new DaemonThreadFactory("MemqCommonClientNettyGroup")); + eventLoopGroup = new EpollEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS, threadFactory); bootstrap.channel(EpollSocketChannel.class); } else { - eventLoopGroup = new NioEventLoopGroup(1, new DaemonThreadFactory("MemqCommonClientNettyGroup")); + eventLoopGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS, threadFactory); bootstrap.channel(NioSocketChannel.class); } bootstrap.group(eventLoopGroup); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs); + bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.handler(new ClientChannelInitializer(responseHandler, sslConfig, idleTimeoutMs)); ScheduledThreadPoolExecutor tmpScheduler = new ScheduledThreadPoolExecutor(1); tmpScheduler.setRemoveOnCancelPolicy(true); @@ -159,7 +165,12 @@ public CompletableFuture send(InetSocketAddress socketAddress, R try { buffer = PooledByteBufAllocator.DEFAULT.buffer(request.getSize(RequestType.PROTOCOL_VERSION)); request.write(buffer, RequestType.PROTOCOL_VERSION); + long ts = System.currentTimeMillis(); channelFuture.channel().writeAndFlush(buffer); + ts = System.currentTimeMillis() - ts; + if (ts>2) { + System.out.println(ts+"ms size:"+request.getSize(RequestType.PROTOCOL_VERSION)); + } } catch (Exception e) { logger.warn("Failed to write request " + request.getClientRequestId(), e); ReferenceCountUtil.release(buffer); diff --git a/memq-commons/pom.xml b/memq-commons/pom.xml index cc4d4f0..fe2f353 100644 --- a/memq-commons/pom.xml +++ b/memq-commons/pom.xml @@ -82,6 +82,11 @@ metrics-core 4.1.17 + + net.openhft + affinity + 3.0.6 + io.netty netty-codec diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index ced7265..85dbfba 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -62,17 +62,19 @@ public Thread newThread(Runnable r) { try { String topicName = "test"; MemqProducer instance = new MemqProducer.Builder() - .disableAcks(false).keySerializer(new ByteArraySerializer()) + .disableAcks(true).keySerializer(new ByteArraySerializer()) .valueSerializer(new ByteArraySerializer()).topic(topicName).cluster("local") - .compression(Compression.ZSTD).maxPayloadBytes(1024 * 150) + .compression(Compression.NONE).maxPayloadBytes(1024 * 10).maxInflightRequests(60) .bootstrapServers("127.0.0.1:9092").build(); StringBuilder builder = new StringBuilder(); - while (builder.length() < 1024 * 100) { + while (builder.length() < 1024 * 5) { builder.append(UUID.randomUUID().toString()); } byte[] bytes = builder.toString().getBytes("utf-8"); - for (int i = 0; i < 5000; i++) { + for (int i = 0; i < 500000; i++) { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; long ts = System.currentTimeMillis(); List> result = new ArrayList<>(); for (int k = 0; k < 30; k++) { @@ -81,10 +83,17 @@ public Thread newThread(Runnable r) { } instance.flush(); for (Future future : result) { - future.get(); + MemqWriteResult memqWriteResult = future.get(); + int ackLatency = memqWriteResult.getAckLatency(); + if (min > ackLatency) { + min = ackLatency; + } + if (max < ackLatency) { + max = ackLatency; + } } ts = System.currentTimeMillis() - ts; - System.out.println(ts + "ms"); + System.out.println(ts + "ms " + min + "ms " + max + "ms"); } } catch (Exception e) { e.printStackTrace(); diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/KafkaProducerTest.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/KafkaProducerTest.java new file mode 100644 index 0000000..300b8c7 --- /dev/null +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/KafkaProducerTest.java @@ -0,0 +1,37 @@ +package com.pinterest.memq.client.examples; + +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +public class KafkaProducerTest { + + public static void main(String[] args) throws InterruptedException, ExecutionException { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "-1"); + props.put(ProducerConfig.RETRIES_CONFIG, "3"); + props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); + KafkaProducer producer = new KafkaProducer(props); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 50; i++) { + builder.append(UUID.randomUUID().toString()); + } + byte[] bytes = builder.toString().getBytes(); + System.out.println(bytes.length); + while (true) { + long ts = System.nanoTime(); + producer.send(new ProducerRecord("test", bytes)).get(); + System.out.println((System.nanoTime() - ts) / (1000)+"us"); + } + } + +} diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/PingPong.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/PingPong.java new file mode 100644 index 0000000..77f86d8 --- /dev/null +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/PingPong.java @@ -0,0 +1,34 @@ +package com.pinterest.memq.client.examples; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +public class PingPong { + + public static void main(String[] args) throws IOException { + if (args[0].equalsIgnoreCase("server")) { + ServerSocket sc = new ServerSocket(9096); + Socket socket = sc.accept(); + DataInputStream inputStream = new DataInputStream(socket.getInputStream()); + DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream()); + while (true) { + outputStream.writeLong(inputStream.readLong()); + outputStream.flush(); + } + } else { + Socket socket = new Socket(args[1], 9096); + DataInputStream inputStream = new DataInputStream(socket.getInputStream()); + DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream()); + while (true) { + outputStream.writeLong(System.nanoTime()); + outputStream.flush(); + long readLong = inputStream.readLong(); + System.out.println(System.nanoTime() - readLong); + } + } + } + +} diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java b/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java index 4d467c1..c6ba49d 100644 --- a/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java @@ -18,6 +18,7 @@ import java.net.UnknownHostException; import java.nio.ByteOrder; import java.util.Map; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -61,6 +62,8 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; +import net.openhft.affinity.AffinityStrategies; +import net.openhft.affinity.AffinityThreadFactory; public class MemqNettyServer { @@ -186,11 +189,12 @@ private Authorizer enableAuthenticationAuthorizationAuditing(MemqConfig configur } private EventLoopGroup getEventLoopGroup(int nThreads) { + ThreadFactory threadFactory = new AffinityThreadFactory("atf_wrk", true, AffinityStrategies.DIFFERENT_CORE); if (useEpoll) { logger.info("Epoll is available and will be used"); - return new EpollEventLoopGroup(nThreads, new DaemonThreadFactory()); + return new EpollEventLoopGroup(nThreads, threadFactory); } else { - return new NioEventLoopGroup(nThreads, new DaemonThreadFactory()); + return new NioEventLoopGroup(nThreads, threadFactory); } } From b6f08427dd7a1a5779e75cd7ba18a9637f629e82 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Fri, 10 Jun 2022 20:14:15 -0700 Subject: [PATCH 02/13] Add messages --- .../pinterest/memq/client/commons2/network/NetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java index cac77eb..b86ad8d 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java @@ -169,7 +169,7 @@ public CompletableFuture send(InetSocketAddress socketAddress, R channelFuture.channel().writeAndFlush(buffer); ts = System.currentTimeMillis() - ts; if (ts>2) { - System.out.println(ts+"ms size:"+request.getSize(RequestType.PROTOCOL_VERSION)); + System.out.println("Network dispatch delay:"+ts+"ms size:"+request.getSize(RequestType.PROTOCOL_VERSION)); } } catch (Exception e) { logger.warn("Failed to write request " + request.getClientRequestId(), e); From e25b301ba34e240d85510a3d6ce799b5afdd59b6 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 12:32:53 -0700 Subject: [PATCH 03/13] Upgrade affinity version --- memq-commons/pom.xml | 10 ++++------ .../memq/client/examples/ExampleMemqProducer.java | 7 ++++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/memq-commons/pom.xml b/memq-commons/pom.xml index fe2f353..6c57509 100644 --- a/memq-commons/pom.xml +++ b/memq-commons/pom.xml @@ -1,6 +1,4 @@ - + 4.0.0 com.pinterest.memq @@ -83,9 +81,9 @@ 4.1.17 - net.openhft - affinity - 3.0.6 + net.openhft + affinity + 3.21ea83 io.netty diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index 85dbfba..ba8bd5d 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -41,6 +41,11 @@ public static void main(String[] args) throws IOException, InterruptedException, if (args.length > 0) { nThreads = Integer.parseInt(args[0]); } + String hostport = "127.0.0.1:9092"; + if (args.length > 1) { + hostport = args[1]; + } + final String conn = hostport; ExecutorService es = Executors.newFixedThreadPool(nThreads, new ThreadFactory() { @Override @@ -65,7 +70,7 @@ public Thread newThread(Runnable r) { .disableAcks(true).keySerializer(new ByteArraySerializer()) .valueSerializer(new ByteArraySerializer()).topic(topicName).cluster("local") .compression(Compression.NONE).maxPayloadBytes(1024 * 10).maxInflightRequests(60) - .bootstrapServers("127.0.0.1:9092").build(); + .bootstrapServers(conn).build(); StringBuilder builder = new StringBuilder(); while (builder.length() < 1024 * 5) { builder.append(UUID.randomUUID().toString()); From cc7dd31a3c43ac71670bb4b92fc2ee2f76ee07d6 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 12:38:12 -0700 Subject: [PATCH 04/13] Add fix for ip/host change --- .../com/pinterest/memq/client/examples/ExampleMemqProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index ba8bd5d..6f08489 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -58,7 +58,7 @@ public Thread newThread(Runnable r) { String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); - String s = "{\"az\": \"us-east-1a\", \"ip\": \"127.0.0.1\", \"port\": \"8080\", \"stage_name\": \"prototype\", \"version\": \"none\", \"weight\": 1}"; + String s = "{\"az\": \"us-east-1a\", \"ip\": \""+hostport.split(":")[0]+"\", \"port\": \"8080\", \"stage_name\": \"prototype\", \"version\": \"none\", \"weight\": 1}"; pr.println(s); pr.close(); for (int x = 0; x < nThreads; x++) { From 45a550bd75f18d1f069180b9bf45ba2b43fc47fb Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 12:57:53 -0700 Subject: [PATCH 05/13] Add periodic latency monitoring --- .../client/examples/ExampleMemqProducer.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index 6f08489..ea285f4 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -25,9 +25,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; import com.pinterest.memq.client.commons.Compression; import com.pinterest.memq.client.commons.serde.ByteArraySerializer; import com.pinterest.memq.client.producer.MemqWriteResult; @@ -37,6 +41,9 @@ public class ExampleMemqProducer { public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { + MetricRegistry reg = new MetricRegistry(); + Timer totalLatency = reg.timer("totalLatency"); + Timer messageLatency = reg.timer("messageLatency"); int nThreads = 1; if (args.length > 0) { nThreads = Integer.parseInt(args[0]); @@ -46,7 +53,7 @@ public static void main(String[] args) throws IOException, InterruptedException, hostport = args[1]; } final String conn = hostport; - ExecutorService es = Executors.newFixedThreadPool(nThreads, new ThreadFactory() { + ThreadFactory tf = new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -54,11 +61,17 @@ public Thread newThread(Runnable r) { th.setDaemon(true); return th; } - }); + }; + ExecutorService es = Executors.newFixedThreadPool(nThreads, tf); + ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf); + bg.schedule(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() + " " + + messageLatency.getSnapshot().get99thPercentile() + " " + + messageLatency.getSnapshot().getMean()), 1, TimeUnit.SECONDS); String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); - String s = "{\"az\": \"us-east-1a\", \"ip\": \""+hostport.split(":")[0]+"\", \"port\": \"8080\", \"stage_name\": \"prototype\", \"version\": \"none\", \"weight\": 1}"; + String s = "{\"az\": \"us-east-1a\", \"ip\": \"" + hostport.split(":")[0] + + "\", \"port\": \"8080\", \"stage_name\": \"prototype\", \"version\": \"none\", \"weight\": 1}"; pr.println(s); pr.close(); for (int x = 0; x < nThreads; x++) { @@ -67,7 +80,7 @@ public Thread newThread(Runnable r) { try { String topicName = "test"; MemqProducer instance = new MemqProducer.Builder() - .disableAcks(true).keySerializer(new ByteArraySerializer()) + .disableAcks(false).keySerializer(new ByteArraySerializer()) .valueSerializer(new ByteArraySerializer()).topic(topicName).cluster("local") .compression(Compression.NONE).maxPayloadBytes(1024 * 10).maxInflightRequests(60) .bootstrapServers(conn).build(); @@ -78,9 +91,7 @@ public Thread newThread(Runnable r) { byte[] bytes = builder.toString().getBytes("utf-8"); for (int i = 0; i < 500000; i++) { - long min = Long.MAX_VALUE; - long max = Long.MIN_VALUE; - long ts = System.currentTimeMillis(); + Context time = totalLatency.time(); List> result = new ArrayList<>(); for (int k = 0; k < 30; k++) { Future writeToTopic = instance.write(null, bytes, System.nanoTime()); @@ -90,15 +101,9 @@ public Thread newThread(Runnable r) { for (Future future : result) { MemqWriteResult memqWriteResult = future.get(); int ackLatency = memqWriteResult.getAckLatency(); - if (min > ackLatency) { - min = ackLatency; - } - if (max < ackLatency) { - max = ackLatency; - } + messageLatency.update(ackLatency, TimeUnit.MILLISECONDS); } - ts = System.currentTimeMillis() - ts; - System.out.println(ts + "ms " + min + "ms " + max + "ms"); + time.stop(); } } catch (Exception e) { e.printStackTrace(); From 0951e52dd022c2e06df890a944cdbfed19265aa7 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 13:02:58 -0700 Subject: [PATCH 06/13] Convert latencies to ms --- .../pinterest/memq/client/examples/ExampleMemqProducer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index ea285f4..5d75561 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -64,9 +64,9 @@ public Thread newThread(Runnable r) { }; ExecutorService es = Executors.newFixedThreadPool(nThreads, tf); ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf); - bg.schedule(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() + " " - + messageLatency.getSnapshot().get99thPercentile() + " " - + messageLatency.getSnapshot().getMean()), 1, TimeUnit.SECONDS); + bg.schedule(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + + messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " " + + messageLatency.getSnapshot().getMean() / 1000_000), 1, TimeUnit.SECONDS); String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); From cc4a0bc990fcfa497721d47e73bbd5173cd4c1cc Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 13:05:24 -0700 Subject: [PATCH 07/13] Add millis latency --- .../com/pinterest/memq/client/examples/ExampleMemqProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index 5d75561..a6e897a 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -66,7 +66,7 @@ public Thread newThread(Runnable r) { ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf); bg.schedule(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " " - + messageLatency.getSnapshot().getMean() / 1000_000), 1, TimeUnit.SECONDS); + + messageLatency.getSnapshot().getMean() / 1000_000), 10, TimeUnit.MILLISECONDS); String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); From a73adbc63e71def8c8b4e65d3f30f9b06ca694b9 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 13:17:54 -0700 Subject: [PATCH 08/13] Update seconds --- .../pinterest/memq/client/examples/ExampleMemqProducer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index a6e897a..0bfcf83 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -64,9 +64,9 @@ public Thread newThread(Runnable r) { }; ExecutorService es = Executors.newFixedThreadPool(nThreads, tf); ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf); - bg.schedule(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + bg.schedule(() -> System.out.println("" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " " - + messageLatency.getSnapshot().getMean() / 1000_000), 10, TimeUnit.MILLISECONDS); + + messageLatency.getSnapshot().getMean() / 1000_000), 1, TimeUnit.SECONDS); String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); From 88dc1566a2ab566442ff59fd232311ed2da8bafe Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 13:20:13 -0700 Subject: [PATCH 09/13] Change to scheduled rate --- .../pinterest/memq/client/examples/ExampleMemqProducer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index 0bfcf83..dea8c67 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -64,9 +64,9 @@ public Thread newThread(Runnable r) { }; ExecutorService es = Executors.newFixedThreadPool(nThreads, tf); ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf); - bg.schedule(() -> System.out.println("" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + bg.scheduleAtFixedRate(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " " - + messageLatency.getSnapshot().getMean() / 1000_000), 1, TimeUnit.SECONDS); + + messageLatency.getSnapshot().getMean() / 1000_000), 1, 1, TimeUnit.SECONDS); String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); From 3fa0916c78f59247c48abb97bee49feeda4304f9 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 13:47:07 -0700 Subject: [PATCH 10/13] Add tcpnodelay for responses from server --- .../memq/client/examples/ExampleMemqProducer.java | 8 +++++--- .../java/com/pinterest/memq/core/rpc/MemqNettyServer.java | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java index dea8c67..fa310f9 100644 --- a/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java +++ b/memq-examples/src/main/java/com/pinterest/memq/client/examples/ExampleMemqProducer.java @@ -64,9 +64,11 @@ public Thread newThread(Runnable r) { }; ExecutorService es = Executors.newFixedThreadPool(nThreads, tf); ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf); - bg.scheduleAtFixedRate(() -> System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " " - + messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " " - + messageLatency.getSnapshot().getMean() / 1000_000), 1, 1, TimeUnit.SECONDS); + bg.scheduleAtFixedRate(() -> { + System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " " + + messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " " + + messageLatency.getSnapshot().getMean() / 1000_000); + }, 1, 1, TimeUnit.SECONDS); String pathname = "/tmp/memq_serverset"; PrintWriter pr = new PrintWriter(new File(pathname)); diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java b/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java index c6ba49d..e968bc3 100644 --- a/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/MemqNettyServer.java @@ -49,6 +49,7 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; @@ -112,6 +113,7 @@ public void initialize() throws Exception { } else { serverBootstrap.channel(NioServerSocketChannel.class); } + serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); serverBootstrap.localAddress(nettyServerConfig.getPort()); serverBootstrap.childHandler(new ChannelInitializer() { From 324022175555588fd8e6ff2d091ffb2c068c1730 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Wed, 15 Jun 2022 14:15:30 -0700 Subject: [PATCH 11/13] Revert to single event loop thread --- .../pinterest/memq/client/commons2/network/NetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java index b86ad8d..b9fe383 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java @@ -63,7 +63,7 @@ // No thread-safety guarantees public class NetworkClient implements Closeable { - private static final int DEFAULT_EVENT_LOOP_THREADS = 2; + private static final int DEFAULT_EVENT_LOOP_THREADS = 1; private static final Logger logger = LoggerFactory.getLogger(NetworkClient.class); public static final String CONFIG_INITIAL_RETRY_INTERVAL_MS = "initialRetryIntervalMs"; public static final String CONFIG_MAX_RETRY_COUNT = "maxRetryCount"; From 8fdf657a9e132655b9a869eb14fcb382a9cc3910 Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Mon, 27 Jun 2022 16:15:53 -0700 Subject: [PATCH 12/13] Reduce number of metrics reporting threads --- .../memq/client/commons/MemqCommonClient.java | 2 +- .../commons2/network/NetworkClient.java | 1 - .../consumer/KafkaNotificationSource.java | 2 +- .../producer/http/DaemonThreadFactory.java | 38 ------------------- .../producer/netty/MemqNettyProducer.java | 2 +- .../memq/commons/mon/OpenTSDBReporter.java | 6 ++- .../memq/core/utils/DaemonThreadFactory.java | 14 +++++++ 7 files changed, 22 insertions(+), 43 deletions(-) delete mode 100644 memq-client/src/main/java/com/pinterest/memq/client/producer/http/DaemonThreadFactory.java diff --git a/memq-client/src/main/java/com/pinterest/memq/client/commons/MemqCommonClient.java b/memq-client/src/main/java/com/pinterest/memq/client/commons/MemqCommonClient.java index 39c785f..7a8aedb 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/commons/MemqCommonClient.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/commons/MemqCommonClient.java @@ -41,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.pinterest.memq.client.producer.http.DaemonThreadFactory; import com.pinterest.memq.client.producer.netty.MemqNettyProducer; import com.pinterest.memq.commons.config.SSLConfig; import com.pinterest.memq.commons.protocol.Broker; @@ -53,6 +52,7 @@ import com.pinterest.memq.commons.protocol.TopicMetadata; import com.pinterest.memq.commons.protocol.TopicMetadataRequestPacket; import com.pinterest.memq.commons.protocol.TopicMetadataResponsePacket; +import com.pinterest.memq.core.utils.DaemonThreadFactory; import com.pinterest.memq.core.utils.MemqUtils; import io.netty.bootstrap.Bootstrap; diff --git a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java index b9fe383..ecc1a88 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/commons2/network/NetworkClient.java @@ -39,7 +39,6 @@ import com.pinterest.memq.client.commons2.network.netty.ClientChannelInitializer; import com.pinterest.memq.client.commons2.retry.ExponentialBackoffRetryStrategy; import com.pinterest.memq.client.commons2.retry.RetryStrategy; -import com.pinterest.memq.client.producer.http.DaemonThreadFactory; import com.pinterest.memq.commons.config.SSLConfig; import com.pinterest.memq.commons.protocol.RequestPacket; import com.pinterest.memq.commons.protocol.RequestType; diff --git a/memq-client/src/main/java/com/pinterest/memq/client/consumer/KafkaNotificationSource.java b/memq-client/src/main/java/com/pinterest/memq/client/consumer/KafkaNotificationSource.java index ef66df5..b61edac 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/consumer/KafkaNotificationSource.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/consumer/KafkaNotificationSource.java @@ -52,8 +52,8 @@ import com.google.common.collect.ImmutableList; import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.pinterest.memq.client.producer.http.DaemonThreadFactory; import com.pinterest.memq.commons.MemqLogMessage; +import com.pinterest.memq.core.utils.DaemonThreadFactory; public class KafkaNotificationSource { diff --git a/memq-client/src/main/java/com/pinterest/memq/client/producer/http/DaemonThreadFactory.java b/memq-client/src/main/java/com/pinterest/memq/client/producer/http/DaemonThreadFactory.java deleted file mode 100644 index d809154..0000000 --- a/memq-client/src/main/java/com/pinterest/memq/client/producer/http/DaemonThreadFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2022 Pinterest, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pinterest.memq.client.producer.http; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public final class DaemonThreadFactory implements ThreadFactory { - - private String basename; - private AtomicInteger counter; - - public DaemonThreadFactory(String basename) { - this.basename = basename; - this.counter = new AtomicInteger(); - } - - @Override - public Thread newThread(Runnable r) { - Thread th = new Thread(r); - th.setDaemon(true); - th.setName(basename + "-" + counter.getAndIncrement()); - return th; - } -} \ No newline at end of file diff --git a/memq-client/src/main/java/com/pinterest/memq/client/producer/netty/MemqNettyProducer.java b/memq-client/src/main/java/com/pinterest/memq/client/producer/netty/MemqNettyProducer.java index c96f3fb..457a6a7 100644 --- a/memq-client/src/main/java/com/pinterest/memq/client/producer/netty/MemqNettyProducer.java +++ b/memq-client/src/main/java/com/pinterest/memq/client/producer/netty/MemqNettyProducer.java @@ -40,10 +40,10 @@ import com.pinterest.memq.client.commons2.MemqCommonClient; import com.pinterest.memq.client.producer.MemqProducer; import com.pinterest.memq.client.producer.TaskRequest; -import com.pinterest.memq.client.producer.http.DaemonThreadFactory; import com.pinterest.memq.commons.config.SSLConfig; import com.pinterest.memq.commons.protocol.Broker; import com.pinterest.memq.commons.protocol.TopicMetadata; +import com.pinterest.memq.core.utils.DaemonThreadFactory; public class MemqNettyProducer extends MemqProducer { diff --git a/memq-commons/src/main/java/com/pinterest/memq/commons/mon/OpenTSDBReporter.java b/memq-commons/src/main/java/com/pinterest/memq/commons/mon/OpenTSDBReporter.java index 8fbadf9..483d136 100644 --- a/memq-commons/src/main/java/com/pinterest/memq/commons/mon/OpenTSDBReporter.java +++ b/memq-commons/src/main/java/com/pinterest/memq/commons/mon/OpenTSDBReporter.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -34,10 +36,12 @@ import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import com.pinterest.memq.commons.mon.OpenTSDBClient.MetricsBuffer; +import com.pinterest.memq.core.utils.DaemonThreadFactory; public class OpenTSDBReporter extends ScheduledReporter { private static final Logger logger = Logger.getLogger(OpenTSDBClient.class.getName()); + private static ScheduledExecutorService ES = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); private OpenTSDBClient client; private String[] tags; private String baseName; @@ -52,7 +56,7 @@ protected OpenTSDBReporter(String baseName, OpenTSDBClient client, String localHostAddress, Map tags) throws UnknownHostException { - super(registry, registryName, filter, rateUnit, durationUnit); + super(registry, registryName, filter, rateUnit, durationUnit, ES); if (baseName == null || baseName.isEmpty()) { this.baseName = ""; } else { diff --git a/memq-commons/src/main/java/com/pinterest/memq/core/utils/DaemonThreadFactory.java b/memq-commons/src/main/java/com/pinterest/memq/core/utils/DaemonThreadFactory.java index 0486582..2f975a9 100644 --- a/memq-commons/src/main/java/com/pinterest/memq/core/utils/DaemonThreadFactory.java +++ b/memq-commons/src/main/java/com/pinterest/memq/core/utils/DaemonThreadFactory.java @@ -16,15 +16,29 @@ package com.pinterest.memq.core.utils; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class DaemonThreadFactory implements ThreadFactory { public static DaemonThreadFactory INSTANCE = new DaemonThreadFactory(); + private String basename; + private AtomicInteger counter; + + public DaemonThreadFactory() { + this("common"); + } + + public DaemonThreadFactory(String basename) { + this.basename = basename; + this.counter = new AtomicInteger(); + } + @Override public Thread newThread(Runnable r) { Thread th = new Thread(r); th.setDaemon(true); + th.setName(basename + "-" + counter.getAndIncrement()); return th; } } \ No newline at end of file From 2dbad0e1edbd6d62740899f9d40c819010f4e7ca Mon Sep 17 00:00:00 2001 From: ambudsharma Date: Mon, 27 Jun 2022 16:52:07 -0700 Subject: [PATCH 13/13] Remove dropwizard from MemQ to reduce unnecessary threads --- memq/pom.xml | 27 ++---- .../com/pinterest/memq/core/MemqMain.java | 51 +++++------ .../com/pinterest/memq/core/MemqManager.java | 12 +-- .../memq/core/config/MemqConfig.java | 17 ++-- .../memq/core/mon/MemqMgrHealthCheck.java | 38 -------- .../memq/core/mon/MonitorEndpoint.java | 89 ------------------- .../processing/bucketing/BatchManager.java | 28 +++--- .../bucketing/BucketingTopicProcessor.java | 9 +- .../memq/core/rpc/MemqRequestDecoder.java | 12 +-- .../memq/core/rpc/PacketSwitchingHandler.java | 14 ++- .../rpc/exceptions/BadRequestException.java | 38 ++++++++ .../InternalServerErrorException.java | 38 ++++++++ .../exceptions/NotAuthorizedException.java | 38 ++++++++ .../rpc/exceptions/NotFoundException.java | 38 ++++++++ .../rpc/exceptions/RedirectionException.java | 38 ++++++++ .../ServiceUnavailableException.java | 38 ++++++++ 16 files changed, 294 insertions(+), 231 deletions(-) delete mode 100644 memq/src/main/java/com/pinterest/memq/core/mon/MemqMgrHealthCheck.java delete mode 100644 memq/src/main/java/com/pinterest/memq/core/mon/MonitorEndpoint.java create mode 100644 memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/BadRequestException.java create mode 100644 memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/InternalServerErrorException.java create mode 100644 memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotAuthorizedException.java create mode 100644 memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotFoundException.java create mode 100644 memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/RedirectionException.java create mode 100644 memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/ServiceUnavailableException.java diff --git a/memq/pom.xml b/memq/pom.xml index e676a9f..c3d1186 100644 --- a/memq/pom.xml +++ b/memq/pom.xml @@ -13,8 +13,6 @@ memq A hyperscale PubSub System - 1.3.27 - 2.9.7 3.1.8 @@ -29,11 +27,6 @@ - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - com.pinterest.memq memq-commons @@ -45,21 +38,6 @@ - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - - - io.dropwizard - dropwizard-assets - ${dropwizard.version} - - - io.dropwizard - dropwizard-core - ${dropwizard.version} - io.dropwizard.metrics metrics-core @@ -69,6 +47,11 @@ junit junit + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.10.0 + com.pinterest.memq memq-client diff --git a/memq/src/main/java/com/pinterest/memq/core/MemqMain.java b/memq/src/main/java/com/pinterest/memq/core/MemqMain.java index eeb4ceb..329d1da 100644 --- a/memq/src/main/java/com/pinterest/memq/core/MemqMain.java +++ b/memq/src/main/java/com/pinterest/memq/core/MemqMain.java @@ -15,6 +15,7 @@ */ package com.pinterest.memq.core; +import java.io.FileInputStream; import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; @@ -28,41 +29,34 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet; -import com.codahale.metrics.jvm.GarbageCollectorMetricSet; -import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +//import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet; +//import com.codahale.metrics.jvm.GarbageCollectorMetricSet; +//import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.pinterest.memq.commons.mon.OpenTSDBClient; import com.pinterest.memq.commons.mon.OpenTSDBReporter; import com.pinterest.memq.core.clustering.MemqGovernor; import com.pinterest.memq.core.config.EnvironmentProvider; import com.pinterest.memq.core.config.MemqConfig; -import com.pinterest.memq.core.mon.MemqMgrHealthCheck; -import com.pinterest.memq.core.mon.MonitorEndpoint; import com.pinterest.memq.core.rpc.MemqNettyServer; import com.pinterest.memq.core.utils.DaemonThreadFactory; import com.pinterest.memq.core.utils.MiscUtils; -import io.dropwizard.Application; -import io.dropwizard.setup.Environment; - -public class MemqMain extends Application { +public class MemqMain { private static final Logger logger = Logger.getLogger(MemqMain.class.getName()); - @Override - public void run(MemqConfig configuration, Environment environment) throws Exception { + public void run(MemqConfig configuration) throws Exception { MiscUtils.printAllLines(ClassLoader.getSystemResourceAsStream("logo.txt")); Map metricsRegistryMap = new HashMap<>(); enableJVMMetrics(metricsRegistryMap); MetricRegistry misc = new MetricRegistry(); emitStartMetrics(misc); metricsRegistryMap.put("_misc", misc); - OpenTSDBClient client = initializeMetricsTransmitter(configuration, environment, - metricsRegistryMap); + OpenTSDBClient client = initializeMetricsTransmitter(configuration, metricsRegistryMap); MemqManager memqManager = new MemqManager(client, configuration, metricsRegistryMap); memqManager.init(); - environment.lifecycle().manage(memqManager); - addAPIs(environment, memqManager); if (configuration.isResetEnabled()) { Executors.newScheduledThreadPool(1, new DaemonThreadFactory()).schedule(() -> { @@ -72,7 +66,7 @@ public void run(MemqConfig configuration, Environment environment) throws Except }, 1, TimeUnit.HOURS); } - environment.healthChecks().register("base", new MemqMgrHealthCheck(memqManager)); +// environment.healthChecks().register("base", new MemqMgrHealthCheck(memqManager)); MemqGovernor memqGovernor = initializeGovernor(configuration, memqManager); MemqNettyServer nettyServer = initializeNettyServer(configuration, memqManager, memqGovernor, @@ -80,6 +74,8 @@ public void run(MemqConfig configuration, Environment environment) throws Except logger.info("Memq started"); initializeShutdownHooks(memqManager, memqGovernor, nettyServer); + // block until server is stopped + nettyServer.getServerChannelFuture().channel().closeFuture().sync(); } private void initializeShutdownHooks(MemqManager memqManager, @@ -100,11 +96,11 @@ public void run() { } private void enableJVMMetrics(Map metricsRegistryMap) { - MetricRegistry registry = new MetricRegistry(); - registry.register("gc", new GarbageCollectorMetricSet()); - registry.register("threads", new CachedThreadStatesGaugeSet(10, TimeUnit.SECONDS)); - registry.register("memory", new MemoryUsageGaugeSet()); - metricsRegistryMap.put("_jvm", registry); +// MetricRegistry registry = new MetricRegistry(); +// registry.register("gc", new GarbageCollectorMetricSet()); +// registry.register("threads", new CachedThreadStatesGaugeSet(10, TimeUnit.SECONDS)); +// registry.register("memory", new MemoryUsageGaugeSet()); +// metricsRegistryMap.put("_jvm", registry); } public MemqGovernor initializeGovernor(MemqConfig configuration, @@ -135,7 +131,6 @@ public MemqNettyServer initializeNettyServer(MemqConfig configuration, } private OpenTSDBClient initializeMetricsTransmitter(MemqConfig configuration, - Environment environment, Map metricsRegistryMap) throws UnknownHostException { if (configuration.getOpenTsdbConfig() != null) { OpenTSDBClient client = new OpenTSDBClient(configuration.getOpenTsdbConfig().getHost(), @@ -159,18 +154,16 @@ private OpenTSDBClient initializeMetricsTransmitter(MemqConfig configuration, return null; } - private void addAPIs(Environment environment, MemqManager memqManager) { - environment.jersey().setUrlPattern("/api/*"); - environment.jersey().register(new MonitorEndpoint(memqManager.getRegistry())); - } - private void emitStartMetrics(MetricRegistry registry) { final long startTs = System.currentTimeMillis(); - registry.gauge("start", () -> (Gauge) () -> System.currentTimeMillis() - startTs < 3 * 60_000 ? 1 : 0); + registry.gauge("start", + () -> (Gauge) () -> System.currentTimeMillis() - startTs < 3 * 60_000 ? 1 : 0); } public static void main(String[] args) throws Exception { - new MemqMain().run(args); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + MemqConfig config = mapper.readValue(new FileInputStream(args[1]), MemqConfig.class); + new MemqMain().run(config); } } \ No newline at end of file diff --git a/memq/src/main/java/com/pinterest/memq/core/MemqManager.java b/memq/src/main/java/com/pinterest/memq/core/MemqManager.java index d3e7305..0be36a7 100644 --- a/memq/src/main/java/com/pinterest/memq/core/MemqManager.java +++ b/memq/src/main/java/com/pinterest/memq/core/MemqManager.java @@ -34,10 +34,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.InternalServerErrorException; -import javax.ws.rs.NotFoundException; - import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.ScheduledReporter; import com.google.gson.Gson; @@ -51,12 +47,14 @@ import com.pinterest.memq.core.processing.TopicProcessor; import com.pinterest.memq.core.processing.TopicProcessorState; import com.pinterest.memq.core.processing.bucketing.BucketingTopicProcessor; +import com.pinterest.memq.core.rpc.exceptions.BadRequestException; +import com.pinterest.memq.core.rpc.exceptions.InternalServerErrorException; +import com.pinterest.memq.core.rpc.exceptions.NotFoundException; import com.pinterest.memq.core.utils.DaemonThreadFactory; import com.pinterest.memq.core.utils.MiscUtils; -import io.dropwizard.lifecycle.Managed; -public class MemqManager implements Managed { +public class MemqManager { private static final Logger logger = Logger.getLogger(MemqManager.class.getName()); private static final Gson gson = new Gson(); @@ -211,12 +209,10 @@ public Map getRegistry() { return metricsRegistryMap; } - @Override public void start() throws Exception { logger.info("Memq manager started"); } - @Override public void stop() throws Exception { try { for (Iterator> iterator = processorMap.entrySet() diff --git a/memq/src/main/java/com/pinterest/memq/core/config/MemqConfig.java b/memq/src/main/java/com/pinterest/memq/core/config/MemqConfig.java index 9ddf727..1e71a86 100644 --- a/memq/src/main/java/com/pinterest/memq/core/config/MemqConfig.java +++ b/memq/src/main/java/com/pinterest/memq/core/config/MemqConfig.java @@ -15,26 +15,21 @@ */ package com.pinterest.memq.core.config; -import com.google.common.collect.ImmutableList; -import com.pinterest.memq.commons.protocol.TopicConfig; import com.pinterest.memq.commons.protocol.Broker.BrokerType; +import com.pinterest.memq.commons.protocol.TopicConfig; -import io.dropwizard.Configuration; -import io.dropwizard.request.logging.LogbackAccessRequestLogFactory; -import io.dropwizard.server.DefaultServerFactory; - -public class MemqConfig extends Configuration { +public class MemqConfig { public MemqConfig() { - DefaultServerFactory defaultServerFactory = (DefaultServerFactory) getServerFactory(); +// DefaultServerFactory defaultServerFactory = (DefaultServerFactory) getServerFactory(); // Note that if someone explicitly enables gzip in the Dropwizard config YAML // then settings will be over-ruled causing the UI to stop working // Disable HTTP request logging - LogbackAccessRequestLogFactory accessRequestLogFactory = new LogbackAccessRequestLogFactory(); - accessRequestLogFactory.setAppenders(ImmutableList.of()); - defaultServerFactory.setRequestLogFactory(accessRequestLogFactory); +// LogbackAccessRequestLogFactory accessRequestLogFactory = new LogbackAccessRequestLogFactory(); +// accessRequestLogFactory.setAppenders(ImmutableList.of()); +// defaultServerFactory.setRequestLogFactory(accessRequestLogFactory); } private int defaultBufferSize = 1024 * 1024; diff --git a/memq/src/main/java/com/pinterest/memq/core/mon/MemqMgrHealthCheck.java b/memq/src/main/java/com/pinterest/memq/core/mon/MemqMgrHealthCheck.java deleted file mode 100644 index 7976d4a..0000000 --- a/memq/src/main/java/com/pinterest/memq/core/mon/MemqMgrHealthCheck.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2022 Pinterest, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pinterest.memq.core.mon; - -import com.codahale.metrics.health.HealthCheck; -import com.pinterest.memq.core.MemqManager; - -public class MemqMgrHealthCheck extends HealthCheck { - - private MemqManager mgr; - - public MemqMgrHealthCheck(MemqManager mgr) { - this.mgr = mgr; - } - - @Override - protected Result check() throws Exception { - if (mgr.isRunning()) { - return Result.healthy(); - } else { - return Result.unhealthy("Memq Manager is not healthy"); - } - } - -} diff --git a/memq/src/main/java/com/pinterest/memq/core/mon/MonitorEndpoint.java b/memq/src/main/java/com/pinterest/memq/core/mon/MonitorEndpoint.java deleted file mode 100644 index fdf17dc..0000000 --- a/memq/src/main/java/com/pinterest/memq/core/mon/MonitorEndpoint.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright 2022 Pinterest, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pinterest.memq.core.mon; - -import java.util.Map; -import java.util.Map.Entry; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonObject; - -@Path("/metrics") -@Produces({ MediaType.APPLICATION_JSON }) -public class MonitorEndpoint { - - private Map registryMap; - - public MonitorEndpoint(Map map) { - this.registryMap = map; - } - - @SuppressWarnings("rawtypes") - @GET - public String getMetrics() { - JsonObject topicMetrics = new JsonObject(); - for (Entry entry2 : registryMap.entrySet()) { - MetricRegistry registry = entry2.getValue(); - JsonObject metrics = new JsonObject(); - topicMetrics.add(entry2.getKey(), metrics); - - JsonObject allCounters = new JsonObject(); - for (Entry entry : registry.getCounters().entrySet()) { - allCounters.addProperty(entry.getKey(), entry.getValue().getCount()); - } - JsonObject allLatencies = new JsonObject(); - for (Entry entry : registry.getTimers().entrySet()) { - JsonObject obj = new JsonObject(); - Snapshot snapshot = entry.getValue().getSnapshot(); - obj.addProperty("max", snapshot.getMax()); - obj.addProperty("min", snapshot.getMin()); - obj.addProperty("mean", snapshot.getMean()); - allLatencies.add(entry.getKey(), obj); - } - - JsonObject allGauges = new JsonObject(); - for (Entry entry : registry.getGauges().entrySet()) { - Gauge gauge = entry.getValue(); - if (gauge.getValue() instanceof Long) { - allGauges.addProperty(entry.getKey(), (Long) gauge.getValue()); - } else if (gauge.getValue() instanceof Double) { - allGauges.addProperty(entry.getKey(), (Double) gauge.getValue()); - } else { - String val = entry.getValue().getValue().toString(); - if (!val.contains("[")) { - allGauges.addProperty(entry.getKey(), Double.parseDouble(val)); - } - } - } - - metrics.add("timers", allLatencies); - metrics.add("counters", allCounters); - metrics.add("gauges", allGauges); - } - return new GsonBuilder().setPrettyPrinting().create().toJson(topicMetrics); - } - -} \ No newline at end of file diff --git a/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BatchManager.java b/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BatchManager.java index 9a504f1..9fc444e 100644 --- a/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BatchManager.java +++ b/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BatchManager.java @@ -15,19 +15,6 @@ */ package com.pinterest.memq.core.processing.bucketing; -import com.pinterest.memq.commons.protocol.WriteRequestPacket; -import com.pinterest.memq.commons.storage.StorageHandler; -import com.pinterest.memq.core.commons.MemqProcessingThreadFactory; -import com.pinterest.memq.core.utils.MiscUtils; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; - import java.nio.ByteBuffer; import java.time.Duration; import java.util.Queue; @@ -37,7 +24,20 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; -import javax.ws.rs.BadRequestException; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.pinterest.memq.commons.protocol.WriteRequestPacket; +import com.pinterest.memq.commons.storage.StorageHandler; +import com.pinterest.memq.core.commons.MemqProcessingThreadFactory; +import com.pinterest.memq.core.rpc.exceptions.BadRequestException; +import com.pinterest.memq.core.utils.MiscUtils; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; public class BatchManager { private volatile Batch currentBatch; diff --git a/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BucketingTopicProcessor.java b/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BucketingTopicProcessor.java index b17f1dd..9e439ca 100644 --- a/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BucketingTopicProcessor.java +++ b/memq/src/main/java/com/pinterest/memq/core/processing/bucketing/BucketingTopicProcessor.java @@ -25,11 +25,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.InternalServerErrorException; -import javax.ws.rs.core.Response; - import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; @@ -51,6 +46,8 @@ import com.pinterest.memq.commons.storage.StorageHandler; import com.pinterest.memq.core.processing.Ackable; import com.pinterest.memq.core.processing.TopicProcessor; +import com.pinterest.memq.core.rpc.exceptions.BadRequestException; +import com.pinterest.memq.core.rpc.exceptions.InternalServerErrorException; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -274,7 +271,7 @@ public void read(RequestPacket requestPacket, requestPacket.getClientRequestId(), requestPacket.getRequestType(), ResponseCodes.NO_DATA, new ReadResponsePacket(EMPTY_BATCH_DATA))); } catch (IOException e) { - throw new InternalServerErrorException(Response.serverError().build(), e); + throw new InternalServerErrorException(e); } } diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/MemqRequestDecoder.java b/memq/src/main/java/com/pinterest/memq/core/rpc/MemqRequestDecoder.java index a86f7eb..9e2fc41 100644 --- a/memq/src/main/java/com/pinterest/memq/core/rpc/MemqRequestDecoder.java +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/MemqRequestDecoder.java @@ -20,12 +20,6 @@ import java.util.logging.Logger; import javax.net.ssl.SSLSession; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.InternalServerErrorException; -import javax.ws.rs.NotAuthorizedException; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.RedirectionException; -import javax.ws.rs.ServiceUnavailableException; import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; @@ -34,6 +28,12 @@ import com.pinterest.memq.commons.protocol.ResponsePacket; import com.pinterest.memq.core.MemqManager; import com.pinterest.memq.core.clustering.MemqGovernor; +import com.pinterest.memq.core.rpc.exceptions.BadRequestException; +import com.pinterest.memq.core.rpc.exceptions.InternalServerErrorException; +import com.pinterest.memq.core.rpc.exceptions.NotAuthorizedException; +import com.pinterest.memq.core.rpc.exceptions.NotFoundException; +import com.pinterest.memq.core.rpc.exceptions.RedirectionException; +import com.pinterest.memq.core.rpc.exceptions.ServiceUnavailableException; import com.pinterest.memq.core.security.Authorizer; import io.netty.buffer.ByteBuf; diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/PacketSwitchingHandler.java b/memq/src/main/java/com/pinterest/memq/core/rpc/PacketSwitchingHandler.java index 4940cec..e6de9e4 100644 --- a/memq/src/main/java/com/pinterest/memq/core/rpc/PacketSwitchingHandler.java +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/PacketSwitchingHandler.java @@ -18,13 +18,9 @@ import java.security.Principal; import java.util.logging.Logger; -import javax.ws.rs.InternalServerErrorException; -import javax.ws.rs.NotAuthorizedException; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.RedirectionException; - import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; +import com.pinterest.memq.commons.protocol.Broker.BrokerType; import com.pinterest.memq.commons.protocol.ReadRequestPacket; import com.pinterest.memq.commons.protocol.RequestPacket; import com.pinterest.memq.commons.protocol.ResponseCodes; @@ -33,11 +29,13 @@ import com.pinterest.memq.commons.protocol.TopicMetadataRequestPacket; import com.pinterest.memq.commons.protocol.TopicMetadataResponsePacket; import com.pinterest.memq.commons.protocol.WriteRequestPacket; -import com.pinterest.memq.commons.protocol.WriteResponsePacket; -import com.pinterest.memq.commons.protocol.Broker.BrokerType; import com.pinterest.memq.core.MemqManager; import com.pinterest.memq.core.clustering.MemqGovernor; import com.pinterest.memq.core.processing.TopicProcessor; +import com.pinterest.memq.core.rpc.exceptions.InternalServerErrorException; +import com.pinterest.memq.core.rpc.exceptions.NotAuthorizedException; +import com.pinterest.memq.core.rpc.exceptions.NotFoundException; +import com.pinterest.memq.core.rpc.exceptions.RedirectionException; import com.pinterest.memq.core.security.Authorizer; import io.netty.channel.ChannelHandlerContext; @@ -149,7 +147,7 @@ protected void executeWriteRequest(ChannelHandlerContext ctx, topicProcessor.registerChannel(ctx.channel()); topicProcessor.write(requestPacket, writePacket, ctx); } else if (governor.getTopicMetadataMap().containsKey(writePacket.getTopicName())) { - throw new RedirectionException(301, null); + throw new RedirectionException(); } else { logger.severe("Topic not found:" + writePacket.getTopicName()); throw TOPIC_NOT_FOUND; diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/BadRequestException.java b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/BadRequestException.java new file mode 100644 index 0000000..1149b85 --- /dev/null +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/BadRequestException.java @@ -0,0 +1,38 @@ +package com.pinterest.memq.core.rpc.exceptions; + +public class BadRequestException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public BadRequestException() { + super(); + // TODO Auto-generated constructor stub + } + + public BadRequestException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public BadRequestException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public BadRequestException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public BadRequestException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +} diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/InternalServerErrorException.java b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/InternalServerErrorException.java new file mode 100644 index 0000000..2affb59 --- /dev/null +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/InternalServerErrorException.java @@ -0,0 +1,38 @@ +package com.pinterest.memq.core.rpc.exceptions; + +public class InternalServerErrorException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public InternalServerErrorException() { + super(); + // TODO Auto-generated constructor stub + } + + public InternalServerErrorException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public InternalServerErrorException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public InternalServerErrorException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public InternalServerErrorException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +} diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotAuthorizedException.java b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotAuthorizedException.java new file mode 100644 index 0000000..5b789c2 --- /dev/null +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotAuthorizedException.java @@ -0,0 +1,38 @@ +package com.pinterest.memq.core.rpc.exceptions; + +public class NotAuthorizedException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public NotAuthorizedException() { + super(); + // TODO Auto-generated constructor stub + } + + public NotAuthorizedException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public NotAuthorizedException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public NotAuthorizedException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public NotAuthorizedException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +} diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotFoundException.java b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotFoundException.java new file mode 100644 index 0000000..03c7812 --- /dev/null +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/NotFoundException.java @@ -0,0 +1,38 @@ +package com.pinterest.memq.core.rpc.exceptions; + +public class NotFoundException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public NotFoundException() { + super(); + // TODO Auto-generated constructor stub + } + + public NotFoundException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public NotFoundException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public NotFoundException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public NotFoundException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +} diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/RedirectionException.java b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/RedirectionException.java new file mode 100644 index 0000000..8bcaaff --- /dev/null +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/RedirectionException.java @@ -0,0 +1,38 @@ +package com.pinterest.memq.core.rpc.exceptions; + +public class RedirectionException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public RedirectionException() { + super(); + // TODO Auto-generated constructor stub + } + + public RedirectionException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public RedirectionException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public RedirectionException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public RedirectionException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +} diff --git a/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/ServiceUnavailableException.java b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/ServiceUnavailableException.java new file mode 100644 index 0000000..39fa8af --- /dev/null +++ b/memq/src/main/java/com/pinterest/memq/core/rpc/exceptions/ServiceUnavailableException.java @@ -0,0 +1,38 @@ +package com.pinterest.memq.core.rpc.exceptions; + +public class ServiceUnavailableException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public ServiceUnavailableException() { + super(); + // TODO Auto-generated constructor stub + } + + public ServiceUnavailableException(String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + + public ServiceUnavailableException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public ServiceUnavailableException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public ServiceUnavailableException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +}