Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a timestamp.timezone configuration in SplunkSinkConnectorConfig #403

Merged
merged 9 commits into from
Sep 5, 2023
1 change: 0 additions & 1 deletion dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,3 @@
<junit.platform.version>1.3.2</junit.platform.version>
</properties>
</project>

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,4 @@

</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String ENABLE_TIMESTAMP_EXTRACTION_CONF = "enable.timestamp.extraction";
static final String REGEX_CONF = "timestamp.regex";
static final String TIMESTAMP_FORMAT_CONF = "timestamp.format";
static final String TIMESTAMP_TIMEZONE_CONF = "timestamp.timezone";

// Kafka configuration description strings
// Required Parameters
Expand Down Expand Up @@ -206,6 +207,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String ENABLE_TIMESTAMP_EXTRACTION_DOC = "Set to true if you want to extract the timestamp";
static final String REGEX_DOC = "Regex";
static final String TIMESTAMP_FORMAT_DOC = "Timestamp format";
static final String TIMESTAMP_TIMEZONE_DOC = "Timestamp timezone";

final String splunkToken;
final String splunkURI;
Expand Down Expand Up @@ -263,6 +265,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final String timestampFormat;
final int queueCapacity;

final String timeZone;

SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
super(conf(), taskConfig);
splunkToken = getPassword(TOKEN_CONF).value();
Expand Down Expand Up @@ -316,6 +320,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
enableTimestampExtraction = getBoolean(ENABLE_TIMESTAMP_EXTRACTION_CONF);
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
timeZone = getString(TIMESTAMP_TIMEZONE_CONF);
validateRegexForTimestamp(regex);
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
validateQueueCapacity(queueCapacity);
Expand Down Expand Up @@ -367,7 +372,8 @@ public static ConfigDef conf() {
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
.define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC)
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
.define(TIMESTAMP_TIMEZONE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_TIMEZONE_DOC)
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,12 @@ private void timestampExtraction(Event event) {
}
} else {
SimpleDateFormat df = new SimpleDateFormat(connectorConfig.timestampFormat);
df.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date;
try {
if(!connectorConfig.timeZone.isEmpty())
df.setTimeZone(TimeZone.getTimeZone(connectorConfig.timeZone));

date = df.parse(timestamp);
event.setTime(date.getTime() / 1000.0);
} catch (ParseException e) {
Expand Down
64 changes: 63 additions & 1 deletion src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,69 @@ public void checkExtractedTimestamp() {
}
task.stop();
}


@Test
public void checkExtractedTimestampWithTimezone() {
SplunkSinkTask task = new SplunkSinkTask();
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"REQ_TIME\": \"20230904133016993\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");

UnitUtil uu = new UnitUtil(0);
Map<String, String> config = uu.createTaskConfig();
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true));
config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"REQ_TIME\\\":\\s*\\\"(?<time>.*?)\"");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF, "yyyyMMddHHmmssSSS");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_TIMEZONE_CONF, "Asia/Seoul");
HecMock hec = new HecMock(task);
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.put(record);

List<EventBatch> batches = hec.getBatches();
for (Iterator<EventBatch> iter = batches.listIterator(); iter.hasNext();) {
EventBatch batch = iter.next();
List<Event> event_list = batch.getEvents();
Iterator<Event> iterator = event_list.listIterator() ;
Event event = iterator.next();

Assert.assertEquals(1.693801816993E9, event.getTime(), 0);
break;
}
task.stop();
}

@Test
public void checkExtractedTimestampWithoutTimezoneAsUTC() {
SplunkSinkTask task = new SplunkSinkTask();
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"REQ_TIME\": \"20230904133016993\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");

UnitUtil uu = new UnitUtil(0);
Map<String, String> config = uu.createTaskConfig();
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true));
config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"REQ_TIME\\\":\\s*\\\"(?<time>.*?)\"");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF, "yyyyMMddHHmmssSSS");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_TIMEZONE_CONF, "");
HecMock hec = new HecMock(task);
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.put(record);

List<EventBatch> batches = hec.getBatches();
for (Iterator<EventBatch> iter = batches.listIterator(); iter.hasNext();) {
EventBatch batch = iter.next();
List<Event> event_list = batch.getEvents();
Iterator<Event> iterator = event_list.listIterator() ;
Event event = iterator.next();

Assert.assertEquals(1.693834216993E9, event.getTime(), 0);
break;
}
task.stop();
}

@Test(expected = ConfigException.class)
public void emptyRegex() {
SplunkSinkTask task = new SplunkSinkTask();
Expand Down