Skip to content

Commit

Permalink
Merge pull request #18 from pgrabarsky/master
Browse files Browse the repository at this point in the history
Polish
  • Loading branch information
pgrabarsky authored Jan 21, 2020
2 parents c79459d + 4c7ae97 commit 9b5d62e
Show file tree
Hide file tree
Showing 39 changed files with 239 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public static void main(String... args) throws InterruptedException, FunctionInv
.get();

ReactiveStreamFactory reactiveStreamFactory = instance.select(ReactiveStreamFactory.class)
.get();
.get();

Topology instrumentedTopology = InstrumentedTopologyBuilderVisitor.build("logger",
() -> new LoggerNodeInterceptor(), container.getBeanManager(), topology);
Topology instrumentedTopology = InstrumentedTopologyBuilderVisitor.build("logger", LoggerNodeInterceptor::new,
container.getBeanManager(), topology);

reactiveStreamFactory.build(instrumentedTopology);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void stage4(KafkaIncoming kc, Async<KafkaIncoming> akc, EventMetadata ec,
logger.info("EventMetadata after POJO update direct={},cdi={}", ec, gec);

// Yet another way, without injection, to get the KafkaIncoming
KafkaIncoming kc2 = msg.<KafkaIncoming>getMetadata(KafkaIncoming.KEY);
KafkaIncoming kc2 = msg.<KafkaIncoming>getMetadata(KafkaIncoming.META_KEY);
logger.debug("KafkaIncoming from message={}", kc2);

// Let's start asynchronous processing with the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amadeus.middleware.odyssey.reactive.messaging.core.NodeName;
import com.amadeus.middleware.odyssey.reactive.messaging.kafka.connector.provider.KafkaIncoming;
import com.amadeus.middleware.odyssey.reactive.messaging.kafka.connector.provider.KafkaTarget;

@ApplicationScoped
public class MyKafkaAwareProcessor {
private static final Logger logger = LoggerFactory.getLogger(MyKafkaAwareProcessor.class);

@Incoming("kafka_channel")
@Outgoing("rxin")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void output() {
logger.info("output: acking the message");

// Look for a possible KafkaTarget
KafkaTarget kafkaTarget = message.getMetadata(KafkaTarget.KEY);
KafkaTarget kafkaTarget = message.getMetadata(KafkaTarget.META_KEY);
if (kafkaTarget != null) {
logger.debug("If I where a Kafka connector I would send msgId={} to topic={} with key={}",
ec.getUniqueMessageId(), kafkaTarget.topic(), kafkaTarget.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amadeus.middleware.odyssey.reactive.messaging.core.Message;
import com.amadeus.middleware.odyssey.reactive.messaging.core.NodeName;
Expand All @@ -18,7 +16,6 @@

@ApplicationScoped
public class MyRxJavaProcessor {
private static final Logger logger = LoggerFactory.getLogger(MyRxJavaProcessor.class);

@SuppressWarnings("unchecked")
@Incoming("rxin")
Expand All @@ -33,7 +30,7 @@ public Publisher<Message<String>> stage6(Publisher<Message<String>> publisher) {
.payload(message.getPayload())
.build();

KafkaTarget target = child.getMetadata(KafkaTarget.KEY);
KafkaTarget target = child.getMetadata(KafkaTarget.META_KEY);
target.topic(target.topic() + "-child");

return Flowable.fromArray(message, child);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

public class AnnotationUtils {

private AnnotationUtils() {
}

/**
* Limitation: This has not a clean semantic as it will check for inherited annotations through interfaces... However,
* it enables to veto Beans in order to setup our Producer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ <T, X> void registerAllMessageAndAsyncTypes(@Observes ProcessInjectionPoint<T, X
} else if (Method.class.isAssignableFrom(member.getClass())) {
registerAllMessageAndAsyncTypesInMethods(dc, (Method) member);
} else if (Constructor.class.isAssignableFrom(member.getClass())) {
registerAllMessageAndAsyncTypesInConstructors(dc, (Constructor<?>) member);
registerAllMessageAndAsyncTypesInConstructors((Constructor<?>) member);
}
}

Expand All @@ -101,7 +101,7 @@ private void registerAllMessageAndAsyncTypesInFields(Class<?> dc, Field member)
}
}

private void registerAllMessageAndAsyncTypesInConstructors(Class<?> dc, Constructor<?> constructor) {
private void registerAllMessageAndAsyncTypesInConstructors(Constructor<?> constructor) {
for (Parameter parameter : constructor.getParameters()) {
registerMessageAndAsyncType(parameter.getType(), parameter.getParameterizedType());
}
Expand Down Expand Up @@ -149,10 +149,8 @@ <T> void processManagedBean(@Observes ProcessManagedBean<T> event) {
annotatedType.getMethods()
.stream()
.filter(m -> m.isAnnotationPresent(MessageInitializer.class))
.forEach(annotatedMethod -> {
messageInitializerRegistry.add(event.getAnnotatedBeanClass()
.getJavaClass(), annotatedMethod.getJavaMember());
});
.forEach(annotatedMethod -> messageInitializerRegistry.add(event.getAnnotatedBeanClass()
.getJavaClass(), annotatedMethod.getJavaMember()));
}

public void processFlowingMethod(AnnotatedType<?> annotatedType, AnnotatedMethod<?> method) {
Expand All @@ -167,12 +165,12 @@ private void processFlowingProcessor(AnnotatedType<?> annotatedType, AnnotatedMe
CDIFunctionInvoker functionInvoker = new CDIFunctionInvoker(annotatedType.getJavaClass(), method.getJavaMember());
Stream<?> is = method.getAnnotations(Incoming.class)
.stream()
.map(annotation -> ((Incoming) annotation).value());
.map(annotation -> annotation.value());
String[] inputChannels = is.collect(Collectors.toList())
.toArray(new String[] {});
Stream<?> os = method.getAnnotations(Outgoing.class)
.stream()
.map(annotation -> ((Outgoing) annotation).value());
.map(Outgoing::value);
String[] outputChannels = os.collect(Collectors.toList())
.toArray(new String[] {});
builder.addProcessor(getName(annotatedType, method), functionInvoker, inputChannels, outputChannels);
Expand All @@ -183,7 +181,7 @@ private void processFlowingPublisher(AnnotatedType<?> annotatedType, AnnotatedMe
method.getJavaMember());
Stream<String> os = method.getAnnotations(Outgoing.class)
.stream()
.map(annotation -> ((Outgoing) annotation).value());
.map(Outgoing::value);
String[] outputChannels = os.collect(Collectors.toList())
.toArray(new String[] {});
builder.addPublisherNode(getName(annotatedType, method), publisherInvoker, outputChannels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

@MessageScoped
public interface EventMetadata extends Metadata {
String KEY = "MY_EVENT_METADATA";
String MERGE_KEY = KEY;
String META_KEY = "MY_EVENT_METADATA";
String META_MERGE_KEY = META_KEY;

String getUniqueMessageId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public boolean isMetadataPropagable() {

@Override
public String getMetadataKey() {
return KEY;
return META_KEY;
}

@Override
public String getMetadataMergeKey() {
return MERGE_KEY;
return META_MERGE_KEY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class EventMetadataMessageInitializer {
public void initialize(KafkaIncoming direcKafkaIncoming) {

// If there is already a kind of EventContext, then do nothing.
if (message.hasMergeableMetadata(EventMetadata.MERGE_KEY)) {
if (message.hasMergeableMetadata(EventMetadata.META_MERGE_KEY)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public static class DummyClass {

@State(Scope.Thread)
public static class MyState {
public PublisherBuilder<Integer> stream;
public Method method;
public SeContainer container;
public int counter = 0;
private PublisherBuilder<Integer> stream;
private Method method;
private SeContainer container;
private int counter = 0;

public MyState() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public class OnlyReactiveStreams {

@State(Scope.Thread)
public static class MyState {
public PublisherBuilder<Integer> stream;
public Method method;
public int counter = 0;
private PublisherBuilder<Integer> stream;
private Method method;
private int counter = 0;

public MyState() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

@MessageScoped
public interface KafkaIncoming extends Metadata {
String KEY = "MY_KAFKA_INCOMING";
String MERGE_KEY = KEY;
String META_KEY = "MY_KAFKA_INCOMING";
String META_MERGE_KEY = META_KEY;

String topic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public boolean isMetadataPropagable() {

@Override
public String getMetadataKey() {
return KEY;
return META_KEY;
}

@Override
public String getMetadataMergeKey() {
return MERGE_KEY;
return META_MERGE_KEY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class KafkaMessageInitializer {
public void initialize() {

// If the Metadata is already present, then do nothing.
if (message.hasMergeableMetadata(KafkaIncoming.MERGE_KEY)) {
if (message.hasMergeableMetadata(KafkaIncoming.META_MERGE_KEY)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
@MessageScoped
public interface KafkaTarget extends MutableMetadata {
String KEY = "MY_KAFKATARGET";
String MERGE_KEY = KEY;
String META_KEY = "MY_KAFKATARGET";
String META_MERGE_KEY = META_KEY;

String topic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public boolean isMetadataPropagable() {

@Override
public String getMetadataKey() {
return KEY;
return META_KEY;
}

@Override
public String getMetadataMergeKey() {
return MERGE_KEY;
return META_MERGE_KEY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import com.amadeus.middleware.odyssey.reactive.messaging.core.MutableMetadata;

public interface MultiKafkaTarget extends MutableMetadata {
String KEY = "MY_MULTIKAFKATARGET";
String MERGE_KEY = KafkaTarget.MERGE_KEY;
String META_KEY = "MY_MULTIKAFKATARGET";
String META_MERGE_KEY = KafkaTarget.META_MERGE_KEY;

List<KafkaTarget> getTargets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public boolean isMetadataPropagable() {

@Override
public String getMetadataKey() {
return KEY;
return META_KEY;
}

@Override
public String getMetadataMergeKey() {
return MERGE_KEY;
return META_MERGE_KEY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

public class CompletableFutureUtils {

private CompletableFutureUtils() {
}

/**
* Propagate the completion state from a CompletableFuture to Another one.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// This is a store for global stuff
public class ReactiveMessagingContext {

private ReactiveMessagingContext() {
}

private static MessageInitializerRegistry messageInitializerRegistry;

public static void setMessageInitializerRegistry(MessageInitializerRegistry messageInitializerRegistry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,13 @@ public Object getTargetInstance() {

@Override
public Object invoke(Message<?> message) throws FunctionInvocationException {
MessageScopedContext context = null;
boolean contextAlreadyActive = false;
MessageScopedContext activatedContext = null;
if (contextActivation) {
MessageImpl<?> messageImpl = (MessageImpl<?>) message;
context = MessageScopedContext.getInstance();
contextAlreadyActive = context.isActive();
if (!contextAlreadyActive) {
context.start(messageImpl.getScopeContextId());
MessageScopedContext context = MessageScopedContext.getInstance();
if (!context.isActive()) {
activatedContext = context;
activatedContext.start(messageImpl.getScopeContextId());
}
}
Object[] parameters = buildParameters(message);
Expand All @@ -122,8 +121,8 @@ public Object invoke(Message<?> message) throws FunctionInvocationException {
} catch (Exception e) {
throw new FunctionInvocationException(e);
} finally {
if ((contextActivation) && (!contextAlreadyActive)) {
context.suspend();
if (activatedContext != null) {
activatedContext.suspend();
}
}
}
Expand Down Expand Up @@ -153,8 +152,7 @@ private Object[] buildParameters(Message<?> message) {
}

// Message Scoped object
Class<?> clazz = (Class<?>) param.getType();
Object object = MessageImpl.get(message, clazz);
Object object = MessageImpl.get(message, param.getType());
if (object != null) {
parameters.add(object);
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.amadeus.middleware.odyssey.reactive.messaging.core.impl.cdi;

import com.amadeus.middleware.odyssey.reactive.messaging.core.Async;
import com.amadeus.middleware.odyssey.reactive.messaging.core.impl.cdi.MessageScopedContext;

public class CDIAsync<T> implements Async<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class CDIMessageBuilderImpl<T> extends AbstractMessageBuilder<T> {

@Override
public Message<T> build() {
MessageImpl<T> message = new MessageImpl<T>(metadata, payload);
MessageImpl<T> message = new MessageImpl<>(metadata, payload);

AbstractMessageBuilder.setupParentChildLink(parents, message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

import javax.enterprise.context.ApplicationScoped;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amadeus.middleware.odyssey.reactive.messaging.core.impl.FunctionInvocationException;
import com.amadeus.middleware.odyssey.reactive.messaging.core.topology.PublisherNode;
import com.amadeus.middleware.odyssey.reactive.messaging.core.topology.Topology;

@ApplicationScoped
public class ReactiveStreamFactory {
private static final Logger logger = LoggerFactory.getLogger(ReactiveStreamFactory.class);

// Simplistic limited build logic
public void build(Topology topology) throws FunctionInvocationException {
Expand Down
Loading

0 comments on commit 9b5d62e

Please sign in to comment.