Skip to content

Commit

Permalink
Unify protocol port configuration refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
qqeasonchen committed Nov 19, 2024
1 parent 3bc459b commit 5dbf4fc
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 27 deletions.
6 changes: 6 additions & 0 deletions eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=0000
eventMesh.server.protocol.unified.port=10000
eventMesh.server.protocol.http.enabled=true
eventMesh.server.protocol.grpc.enabled=true
eventMesh.server.protocol.tcp.enabled=true

# Legacy port configurations - to be deprecated
eventMesh.server.tcp.port=10000
eventMesh.server.http.port=10105
eventMesh.server.grpc.port=10205
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -175,9 +177,10 @@ public void start() throws Exception {
.childHandler(new HttpsServerInitializer(useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null))
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

log.info("HTTPServer[port={}] started.", this.getPort());
int port = eventMeshHttpConfiguration.getProtocolConfiguration().getUnifiedPort();
log.info("HTTPServer[port={}] started.", port);

bootstrap.bind(this.getPort())
bootstrap.bind(port)
.channel()
.closeFuture()
.sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@

package org.apache.eventmesh.runtime.boot;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.SimpleChannelInboundHandler;
import io.opentelemetry.api.trace.Span;

import org.apache.eventmesh.common.Pair;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.tcp.Command;
Expand Down Expand Up @@ -52,26 +72,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.opentelemetry.api.trace.Span;

import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -143,7 +143,7 @@ public void start() throws Exception {
.childHandler(new TcpServerInitializer());

try {
int port = eventMeshTCPConfiguration.getEventMeshTcpServerPort();
int port = eventMeshTCPConfiguration.getProtocolConfiguration().getUnifiedPort();
ChannelFuture f = bootstrap.bind(port).sync();
log.info("EventMeshTCPServer[port={}] started.....", port);
f.channel().closeFuture().sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void init() throws Exception {

grpcRetryer = new GrpcRetryer(this);

int serverPort = eventMeshGrpcConfiguration.getGrpcServerPort();
int serverPort = eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort();

server = ServerBuilder.forPort(serverPort)
.addService(new ConsumerService(this, sendMsgExecutor, replyMsgExecutor))
Expand Down Expand Up @@ -175,7 +175,7 @@ public boolean register() {
boolean registerResult = false;
try {
String endPoints = IPUtils.getLocalAddress()
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getGrpcServerPort();
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort();
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.getEventMeshCluster());
eventMeshRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.getEventMeshName() + "-"
Expand All @@ -192,7 +192,7 @@ public boolean register() {

private void unRegister() throws Exception {
String endPoints = IPUtils.getLocalAddress()
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getGrpcServerPort();
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.getProtocolConfiguration().getUnifiedPort();
EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.getEventMeshCluster());
eventMeshUnRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.getEventMeshName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,25 @@
public class EventMeshGrpcConfiguration extends CommonConfiguration {

@ConfigField(field = "grpc.port", notNull = true, beNumber = true)
@Deprecated
private int grpcServerPort = 10205;

@ConfigField(field = "protocol")
private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration();

public int getGrpcServerPort() {
return protocolConfiguration.getGrpcPort();
}

public void setGrpcServerPort(int port) {
this.grpcServerPort = port;
this.protocolConfiguration.setGrpcPort(port);
}

public boolean isGrpcEnabled() {
return protocolConfiguration.isGrpcEnabled();
}

@ConfigField(field = "session.expiredInMills")
private int eventMeshSessionExpiredInMills = 60000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Collections;
import java.util.List;

import inet.ipaddr.IPAddress;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import inet.ipaddr.IPAddress;
import org.apache.eventmesh.runtime.configuration.ProtocolConfiguration;

@Data
@EqualsAndHashCode(callSuper = true)
Expand All @@ -37,8 +39,28 @@
public class EventMeshHTTPConfiguration extends CommonConfiguration {

@ConfigField(field = "http.port", notNull = true, beNumber = true)
@Deprecated
private int httpServerPort = 10105;

@ConfigField(field = "protocol")
private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration();

public int getHttpServerPort() {
return protocolConfiguration.getHttpPort();
}

public void setHttpServerPort(int port) {
this.httpServerPort = port;
this.protocolConfiguration.setHttpPort(port);
}

public boolean isHttpEnabled() {
return protocolConfiguration.isHttpEnabled();
}

@ConfigField(field = "http.path")
private String eventMeshServerHttpPath = "/eventmesh";

@ConfigField(field = "batchmsg.batch.enabled")
private boolean eventMeshServerBatchMsgBatchEnabled = Boolean.TRUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,25 @@
public class EventMeshTCPConfiguration extends CommonConfiguration {

@ConfigField(field = "tcp.port")
@Deprecated
private int eventMeshTcpServerPort = 10000;

@ConfigField(field = "protocol")
private ProtocolConfiguration protocolConfiguration = new ProtocolConfiguration();

public int getEventMeshTcpServerPort() {
return protocolConfiguration.getTcpPort();
}

public void setEventMeshTcpServerPort(int port) {
this.eventMeshTcpServerPort = port;
this.protocolConfiguration.setTcpPort(port);
}

public boolean isTcpEnabled() {
return protocolConfiguration.isTcpEnabled();
}

@ConfigField(field = "tcp.allIdleSeconds")
private int eventMeshTcpIdleAllSeconds = 60;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.configuration;

import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigField;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@Config(prefix = "eventMesh.server.protocol")
public class ProtocolConfiguration {

@ConfigField(field = "unified.port", notNull = true, beNumber = true)
private int unifiedPort = 10000;

@ConfigField(field = "http.enabled")
private boolean httpEnabled = true;

@ConfigField(field = "grpc.enabled")
private boolean grpcEnabled = true;

@ConfigField(field = "tcp.enabled")
private boolean tcpEnabled = true;

public int getHttpPort() {
return unifiedPort;
}

public int getGrpcPort() {
return unifiedPort;
}

public int getTcpPort() {
return unifiedPort;
}

public void setHttpPort(int port) {
this.unifiedPort = port;
}

public void setGrpcPort(int port) {
this.unifiedPort = port;
}

public void setTcpPort(int port) {
this.unifiedPort = port;
}
}

0 comments on commit 5dbf4fc

Please sign in to comment.