Skip to content

Commit

Permalink
Support additional Kafka parameters for ftp and dcap.
Browse files Browse the repository at this point in the history
This is an alternative to the configs property prefix for cells which
are not implemented in terms of the spring framework.
  • Loading branch information
paurkedal authored and mksahakyan committed Oct 6, 2023
1 parent 60dd0ba commit c370ba2
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import dmg.cells.nucleus.CellAddressCore;
import dmg.cells.nucleus.CellEndpoint;
import dmg.cells.nucleus.CellPath;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
Expand All @@ -42,8 +45,11 @@
import org.dcache.poolmanager.PoolManagerStub;
import org.dcache.services.login.RemoteLoginStrategy;
import org.dcache.util.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DcapDoorSettings {
private static final Logger LOGGER = LoggerFactory.getLogger(DcapDoorSettings.class);

@Option(name = "authorization")
protected String auth;
Expand Down Expand Up @@ -99,6 +105,9 @@ public class DcapDoorSettings {
@Option(name = "kafka-clientid")
protected String kafkaclientid;

@Option(name = "kafka-config-file")
protected String kafkaConfigFile;


@Option(name = "hsm",
description = "Cell address of hsm manager",
Expand Down Expand Up @@ -332,6 +341,24 @@ public PoolManagerStub createPoolManagerStub(CellEndpoint cellEndpoint, CellAddr

public KafkaProducer createKafkaProducer() {
Properties props = new Properties();
if (kafkaConfigFile != null && !kafkaConfigFile.trim().isEmpty()) {
try {
FileInputStream fis = new FileInputStream(kafkaConfigFile);
try {
props.load(fis);
} catch (IOException ex) {
LOGGER.error("failed to load configuration ", ex);
} finally {
try {
fis.close();
} catch (IOException ex) {
LOGGER.error("failed to close " + kafkaConfigFile, ex);
}
}
} catch (FileNotFoundException ex) {
LOGGER.error(ex.toString());
}
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaclientid);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import dmg.cells.nucleus.CellAddressCore;
import dmg.cells.nucleus.CellEndpoint;
import dmg.cells.nucleus.CellPath;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -14,13 +17,16 @@
import org.dcache.util.Option;
import org.dcache.util.OptionParser;
import org.dcache.util.PortRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Object holding configuration options for FTP doors.
* <p>
* Settings can be injected using {@link OptionParser}.
*/
public class FtpDoorSettings {
private static final Logger LOGGER = LoggerFactory.getLogger(FtpDoorSettings.class);

@Option(name = "poolManager",
description = "Well known name of the pool manager",
Expand Down Expand Up @@ -67,6 +73,9 @@ public class FtpDoorSettings {
@Option(name = "kafka-clientid")
protected String kafkaclientid;

@Option(name = "kafka-config-file")
protected String kafkaConfigFile;


@Option(name = "clientDataPortRange",
defaultValue = "0")
Expand Down Expand Up @@ -388,6 +397,24 @@ public PoolManagerStub createPoolManagerStub(CellEndpoint cellEndpoint,

public KafkaProducer createKafkaProducer() {
Properties props = new Properties();
if (kafkaConfigFile != null && !kafkaConfigFile.trim().isEmpty()) {
try {
FileInputStream fis = new FileInputStream(kafkaConfigFile);
try {
props.load(fis);
} catch (IOException ex) {
LOGGER.error("failed to load configuration ", ex);
} finally {
try {
fis.close();
} catch (IOException ex) {
LOGGER.error("failed to close " + kafkaConfigFile, ex);
}
}
} catch (FileNotFoundException ex) {
LOGGER.error(ex.toString());
}
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaclientid);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down
3 changes: 3 additions & 0 deletions skel/share/defaults/dcap.properties
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,6 @@ dcap.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
dcap.kafka.topic = ${dcache.kafka.topic}

# File from which to load additional Kafka properties.
dcap.kafka.config-file = ${dcache.kafka.config-file}
3 changes: 3 additions & 0 deletions skel/share/defaults/ftp.properties
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,6 @@ ftp.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
ftp.kafka.topic = ${dcache.kafka.topic}

# File from which to load additional Kafka properties.
ftp.kafka.config-file = ${dcache.kafka.config-file}
4 changes: 4 additions & 0 deletions skel/share/defaults/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ dcache.kafka.bootstrap-servers = localhost:9092
# Kafka topic name
dcache.kafka.topic = billing

# Optional file from which to load additional Kafka properties for ftp and
# dcap. For other services, use dcache.kafka.configs.
dcache.kafka.config-file =



#--------------------Default Properties------------------------------
Expand Down
1 change: 1 addition & 0 deletions skel/share/services/dcap.batch
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ create dmg.cells.services.login.LoginManager ${dcap.cell.name} \
-kafka-max-block-units=${dcap.kafka.maximum-block.unit}\
-retries-kafka=0 \
-kafka-clientid=\"${dcap.cell.name}\" \
-kafka-config-file=\"${dcap.kafka.config-file}\" \
-stageConfigurationFilePath=\"${dcap.authz.staging}\" \
-allowAnonymousStaging=\"${dcap.authz.anonymous-staging}\" \
-io-queue=${dcap.mover.queue} \
Expand Down
1 change: 1 addition & 0 deletions skel/share/services/ftp.batch
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ create dmg.cells.services.login.LoginManager ${ftp.cell.name} \
-billing=\"${ftp.service.billing}\" \
-kafka=\"${ftp.enable.kafka}\" \
-kafka-clientid=\"${ftp.cell.name}\" \
-kafka-config-file=\"${ftp.kafka.config-file}\" \
-bootstrap-server-kafka=\"${ftp.kafka.bootstrap-servers}\" \
-kafka-topic=\"${ftp.kafka.topic}\" \
-kafka-max-block=${ftp.kafka.maximum-block}\
Expand Down

0 comments on commit c370ba2

Please sign in to comment.