Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BinaryLogClient into an abstract base class to allow for more flexible use #16

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Undo the BroadcastEventListener and BroadcastLifecycleListener refact…
…or, since now it only serves to make the code longer.
ldcasillas-progreso committed Apr 18, 2014
commit 71abcc2dc7cdd1fb569564d8be7080ad00410f7e
178 changes: 81 additions & 97 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
@@ -37,8 +37,8 @@
*/
public class BinaryLogClient extends AbstractBinaryLogClient {

private final BroadcastEventListener eventListener = new BroadcastEventListener();
private final BroadcastLifecycleListener lifecycleListener = new BroadcastLifecycleListener();
private final List<EventListener> eventListeners = new LinkedList<EventListener>();
private final List<LifecycleListener> lifecycleListeners = new LinkedList<LifecycleListener>();
private ThreadFactory threadFactory;
private final Logger logger = Logger.getLogger(getClass().getName());

@@ -134,163 +134,147 @@ public void run() {
}
}

@Override
protected void onEvent(Event event) {
eventListener.onEvent(event);
}

/**
* @return registered event listeners
*/
public List<EventListener> getEventListeners() {
return eventListener.getEventListeners();
return Collections.unmodifiableList(eventListeners);
}

/**
* Register event listener. Note that multiple event listeners will be called in order they
* where registered.
*/
public void registerEventListener(EventListener listener) {
eventListener.registerEventListener(listener);
public void registerEventListener(EventListener eventListener) {
synchronized (eventListeners) {
eventListeners.add(eventListener);
}
}

/**
* Unregister all event listener of specific type.
*/
public void unregisterEventListener(Class<? extends EventListener> listenerClass) {
eventListener.unregisterEventListener(listenerClass);
synchronized (eventListeners) {
Iterator<EventListener> iterator = eventListeners.iterator();
while (iterator.hasNext()) {
EventListener eventListener = iterator.next();
if (listenerClass.isInstance(eventListener)) {
iterator.remove();
}
}
}
}

/**
* Unregister single event listener.
*/
public void unregisterEventListener(EventListener listener) {
eventListener.unregisterEventListener(listener);
}

@Override
protected void onConnect() {
lifecycleListener.onConnect(this);
}

@Override
protected void onCommunicationFailure(Exception ex) {
lifecycleListener.onCommunicationFailure(this, ex);
}

@Override
protected void onEventDeserializationFailure(Exception ex) {
lifecycleListener.onEventDeserializationFailure(this, ex);
public void unregisterEventListener(EventListener eventListener) {
synchronized (eventListeners) {
eventListeners.remove(eventListener);
}
}

@Override
protected void onDisconnect() {
lifecycleListener.onDisconnect(this);
protected void onEvent(Event event) {
synchronized (eventListeners) {
for (EventListener eventListener : eventListeners) {
try {
eventListener.onEvent(event);
} catch (Exception e) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, eventListener + " choked on " + event, e);
}
}
}
}
}

/**
* @return registered lifecycle listeners
*/
public List<LifecycleListener> getLifecycleListeners() {
return lifecycleListener.getLifecycleListeners();
return Collections.unmodifiableList(lifecycleListeners);
}

/**
* Register lifecycle listener. Note that multiple lifecycle listeners will be called in order they
* where registered.
*/
public void registerLifecycleListener(LifecycleListener listener) {
lifecycleListener.registerLifecycleListener(listener);
public void registerLifecycleListener(LifecycleListener lifecycleListener) {
synchronized (lifecycleListeners) {
lifecycleListeners.add(lifecycleListener);
}
}

/**
* Unregister all lifecycle listener of specific type.
*/
public synchronized void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
lifecycleListener.unregisterLifecycleListener(listenerClass);
synchronized (lifecycleListeners) {
Iterator<LifecycleListener> iterator = lifecycleListeners.iterator();
while (iterator.hasNext()) {
LifecycleListener lifecycleListener = iterator.next();
if (listenerClass.isInstance(lifecycleListener)) {
iterator.remove();
}
}
}
}

/**
* Unregister single lifecycle listener.
*/
public synchronized void unregisterLifecycleListener(LifecycleListener listener) {
lifecycleListener.unregisterLifecycleListener(listener);
public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) {
synchronized (lifecycleListeners) {
lifecycleListeners.remove(eventListener);
}
}

/**
* {@link BinaryLogClient}'s event listener.
*/
public interface EventListener {

void onEvent(Event event);
@Override
protected void onConnect() {
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
}
}
}

/**
* An {@link EventListener} that rebroadcasts events to a dynamically managed list of other event listeners.
*/
public class BroadcastEventListener implements EventListener {
private final List<EventListener> eventListeners = new LinkedList<EventListener>();

/**
* Rebroadcast the event to the child listeners. If any of the children throws an exception, we log it and
* continue with the next one.
*/
@Override
public synchronized void onEvent(Event event) {
synchronized (eventListeners) {
for (BinaryLogClient.EventListener eventListener : eventListeners) {
try {
eventListener.onEvent(event);
} catch (Exception e) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, eventListener + " choked on " + event, e);
}
}
}
@Override
protected void onCommunicationFailure(Exception ex) {
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onCommunicationFailure(this, ex);
}
}

/**
* @return registered event listeners
*/
public List<EventListener> getEventListeners() {
return Collections.unmodifiableList(eventListeners);
}
}

/**
* Register event listener. Note that multiple event listeners will be called in order they
* where registered.
*/
public void registerEventListener(EventListener eventListener) {
synchronized (eventListeners) {
eventListeners.add(eventListener);
@Override
protected void onEventDeserializationFailure(Exception ex) {
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, ex);
}
}

/**
* Unregister all event listener of specific type.
*/
public void unregisterEventListener(Class<? extends EventListener> listenerClass) {
synchronized (eventListeners) {
Iterator<EventListener> iterator = eventListeners.iterator();
while (iterator.hasNext()) {
EventListener eventListener = iterator.next();
if (listenerClass.isInstance(eventListener)) {
iterator.remove();
}
}
}
}
}

/**
* Unregister single event listener.
*/
public void unregisterEventListener(EventListener eventListener) {
synchronized (eventListeners) {
eventListeners.remove(eventListener);
@Override
protected void onDisconnect() {
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDisconnect(this);
}
}
}

/**
* {@link BinaryLogClient}'s event listener.
*/
public interface EventListener {

void onEvent(Event event);
}

/**