Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BinaryLogClient into an abstract base class to allow for more flexible use #16

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
Improved factoring:
1. AbstractBinaryLogClient now no longer depends on EventListener or LifecycleListener.
2. LifecycleListener goes back to accepting BinaryLogClient arguments.

Still haven't run tests.
ldcasillas-progreso committed Apr 18, 2014
commit f39aceb248a64bc7bae53d688a22cffd580a9c17
Original file line number Diff line number Diff line change
@@ -200,7 +200,7 @@ public void connect() throws IOException {
if (logger.isLoggable(Level.INFO)) {
logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition);
}
getLifecycleListener().onConnect(this);
onConnect();
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
@@ -321,7 +321,7 @@ private void listenForEventPackets() throws IOException {
event = eventDeserializer.nextEvent(inputStream);
} catch (Exception e) {
if (isConnected()) {
getLifecycleListener().onEventDeserializationFailure(this, e);
onEventDeserializationFailure(e);
}
continue;
}
@@ -332,7 +332,7 @@ private void listenForEventPackets() throws IOException {
}
} catch (Exception e) {
if (isConnected()) {
getLifecycleListener().onCommunicationFailure(this, e);
onCommunicationFailure(e);
}
} finally {
if (isConnected()) {
@@ -369,7 +369,7 @@ private ResultSetRowPacket[] readResultSet() throws IOException {
}

private void notifyEventListener(Event event) {
getEventListener().onEvent(event);
onEvent(event);
}

/**
@@ -415,12 +415,34 @@ private void disconnectChannel() throws IOException {
channel.close();
}
} finally {
getLifecycleListener().onDisconnect(this);
onDisconnect();
}
}

public abstract BinaryLogClient.LifecycleListener getLifecycleListener();
/**
* Invoked once for each {@link Event}, in the order they are processed.
*/
protected abstract void onEvent(Event event);

public abstract BinaryLogClient.EventListener getEventListener();
/**
* Invoked when a connection is established.
*/
protected abstract void onConnect();

/**
* It's guarantied to be called before {@link #onDisconnect()}) in case of communication failure.
*/
protected abstract void onCommunicationFailure(Exception ex);

/**
* Called in case of failed event deserialization. Note this type of error does NOT cause client to
* disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually.
*/
protected abstract void onEventDeserializationFailure(Exception ex);

/**
* Called upon disconnect (regardless of the reason).
*/
protected abstract void onDisconnect();

}
116 changes: 67 additions & 49 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
@@ -16,43 +16,17 @@
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.SocketFactory;
import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;

import java.io.EOFException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

@@ -126,7 +100,7 @@ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutExcep
final CountDownLatch countDownLatch = new CountDownLatch(1);
BinaryLogClient.AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onConnect(AbstractBinaryLogClient client) {
public void onConnect(BinaryLogClient client) {
countDownLatch.countDown();
}
};
@@ -164,8 +138,8 @@ public void run() {
}

@Override
public EventListener getEventListener() {
return eventListener;
protected void onEvent(Event event) {
eventListener.onEvent(event);
}

/**
@@ -197,9 +171,25 @@ public void unregisterEventListener(EventListener listener) {
eventListener.unregisterEventListener(listener);
}


@Override
public LifecycleListener getLifecycleListener() {
return lifecycleListener;
protected void onConnect() {
lifecycleListener.onConnect(this);
}

@Override
protected void onCommunicationFailure(Exception ex) {
lifecycleListener.onCommunicationFailure(this, ex);
}

@Override
protected void onEventDeserializationFailure(Exception ex) {
lifecycleListener.onEventDeserializationFailure(this, ex);
}

@Override
protected void onDisconnect() {
lifecycleListener.onDisconnect(this);
}

/**
@@ -239,9 +229,16 @@ public interface EventListener {
void onEvent(Event event);
}

/**
* An {@link EventListener} that rebroadcasts events to a dynamically managed list of other event listeners.
*/
public class BroadcastEventListener implements EventListener {
private final List<EventListener> eventListeners = new LinkedList<EventListener>();

/**
* Rebroadcast the event to the child listeners. If any of the children throws an exception, we log it and
* continue with the next one.
*/
@Override
public void onEvent(Event event) {
for (BinaryLogClient.EventListener eventListener : eventListeners) {
@@ -305,25 +302,27 @@ public interface LifecycleListener {

/**
* Called once client has successfully logged in but before started to receive binlog events.
* @param client
*/
void onConnect(AbstractBinaryLogClient client);
void onConnect(BinaryLogClient client);

/**
* It's guarantied to be called before {@link #onDisconnect(AbstractBinaryLogClient)}) in case of
* It's guarantied to be called before {@link #onDisconnect(BinaryLogClient)}) in case of
* communication failure.
*/
void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex);
void onCommunicationFailure(BinaryLogClient client, Exception ex);

/**
* Called in case of failed event deserialization. Note this type of error does NOT cause client to
* disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually.
*/
void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex);
void onEventDeserializationFailure(BinaryLogClient client, Exception ex);

/**
* Called upon disconnect (regardless of the reason).
* @param client
*/
void onDisconnect(AbstractBinaryLogClient client);
void onDisconnect(BinaryLogClient client);
}

/**
@@ -332,44 +331,63 @@ public interface LifecycleListener {
public static abstract class AbstractLifecycleListener implements LifecycleListener {

@Override
public void onConnect(AbstractBinaryLogClient client) {
public void onConnect(BinaryLogClient client) {
}

@Override
public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
}

@Override
public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
}

@Override
public void onDisconnect(AbstractBinaryLogClient client) {
public void onDisconnect(BinaryLogClient client) {
}

}

/**
* A {@link LifecycleListener} that rebroadcasts events to a dynamic list of children.
*/
public static class BroadcastLifecycleListener implements LifecycleListener {
final List<LifecycleListener> lifecycleListeners = new LinkedList<LifecycleListener>();

@Override
public void onConnect(AbstractBinaryLogClient client) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onConnect(BinaryLogClient client) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onConnect(client);
}
}
}

@Override
public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onCommunicationFailure(client, ex);
}
}
}

@Override
public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onEventDeserializationFailure(client, ex);
}
}
}

@Override
public void onDisconnect(AbstractBinaryLogClient client) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onDisconnect(BinaryLogClient client) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onDisconnect(client);
}
}
}

/**
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
*/
package com.github.shyiko.mysql.binlog.jmx;

import com.github.shyiko.mysql.binlog.AbstractBinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
@@ -100,24 +99,24 @@ public void onEvent(Event event) {
}

@Override
public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
numberOfSkippedEvents.getAndIncrement();
lastEventHeader.set(null);
timestampOfLastEvent.set(getCurrentTimeMillis());
totalNumberOfEventsSeen.getAndIncrement();
}

@Override
public void onDisconnect(AbstractBinaryLogClient client) {
public void onDisconnect(BinaryLogClient client) {
numberOfDisconnects.getAndIncrement();
}

@Override
public void onConnect(AbstractBinaryLogClient client) {
public void onConnect(BinaryLogClient client) {
}

@Override
public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
}

protected long getCurrentTimeMillis() {