Skip to content

Commit

Permalink
Merge pull request #403 from mo-mosh/develop
Browse files Browse the repository at this point in the history
added a timestamp.timezone configuration in SplunkSinkConnectorConfig
  • Loading branch information
VihasMakwana authored Sep 5, 2023
2 parents 2372eb4 + 1c6a738 commit 44c9d9a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 4 deletions.
1 change: 0 additions & 1 deletion dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,3 @@
<junit.platform.version>1.3.2</junit.platform.version>
</properties>
</project>

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,4 @@

</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String ENABLE_TIMESTAMP_EXTRACTION_CONF = "enable.timestamp.extraction";
static final String REGEX_CONF = "timestamp.regex";
static final String TIMESTAMP_FORMAT_CONF = "timestamp.format";
static final String TIMESTAMP_TIMEZONE_CONF = "timestamp.timezone";

// Kafka configuration description strings
// Required Parameters
Expand Down Expand Up @@ -206,6 +207,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String ENABLE_TIMESTAMP_EXTRACTION_DOC = "Set to true if you want to extract the timestamp";
static final String REGEX_DOC = "Regex";
static final String TIMESTAMP_FORMAT_DOC = "Timestamp format";
static final String TIMESTAMP_TIMEZONE_DOC = "Timestamp timezone";

final String splunkToken;
final String splunkURI;
Expand Down Expand Up @@ -263,6 +265,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final String timestampFormat;
final int queueCapacity;

final String timeZone;

SplunkSinkConnectorConfig(Map<String, String> taskConfig) {
super(conf(), taskConfig);
splunkToken = getPassword(TOKEN_CONF).value();
Expand Down Expand Up @@ -316,6 +320,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
enableTimestampExtraction = getBoolean(ENABLE_TIMESTAMP_EXTRACTION_CONF);
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
timeZone = getString(TIMESTAMP_TIMEZONE_CONF);
validateRegexForTimestamp(regex);
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
validateQueueCapacity(queueCapacity);
Expand Down Expand Up @@ -367,7 +372,8 @@ public static ConfigDef conf() {
.define(ENABLE_TIMESTAMP_EXTRACTION_CONF, ConfigDef.Type.BOOLEAN, false , ConfigDef.Importance.MEDIUM, ENABLE_TIMESTAMP_EXTRACTION_DOC)
.define(REGEX_CONF, ConfigDef.Type.STRING, "" , ConfigDef.Importance.MEDIUM, REGEX_DOC)
.define(TIMESTAMP_FORMAT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
.define(TIMESTAMP_TIMEZONE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_TIMEZONE_DOC)
.define(QUEUE_CAPACITY_CONF, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, QUEUE_CAPACITY_DOC);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,12 @@ private void timestampExtraction(Event event) {
}
} else {
SimpleDateFormat df = new SimpleDateFormat(connectorConfig.timestampFormat);
df.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date;
try {
if(!connectorConfig.timeZone.isEmpty())
df.setTimeZone(TimeZone.getTimeZone(connectorConfig.timeZone));

date = df.parse(timestamp);
event.setTime(date.getTime() / 1000.0);
} catch (ParseException e) {
Expand Down
64 changes: 63 additions & 1 deletion src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,69 @@ public void checkExtractedTimestamp() {
}
task.stop();
}


@Test
public void checkExtractedTimestampWithTimezone() {
SplunkSinkTask task = new SplunkSinkTask();
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"REQ_TIME\": \"20230904133016993\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");

UnitUtil uu = new UnitUtil(0);
Map<String, String> config = uu.createTaskConfig();
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true));
config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"REQ_TIME\\\":\\s*\\\"(?<time>.*?)\"");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF, "yyyyMMddHHmmssSSS");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_TIMEZONE_CONF, "Asia/Seoul");
HecMock hec = new HecMock(task);
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.put(record);

List<EventBatch> batches = hec.getBatches();
for (Iterator<EventBatch> iter = batches.listIterator(); iter.hasNext();) {
EventBatch batch = iter.next();
List<Event> event_list = batch.getEvents();
Iterator<Event> iterator = event_list.listIterator() ;
Event event = iterator.next();

Assert.assertEquals(1.693801816993E9, event.getTime(), 0);
break;
}
task.stop();
}

@Test
public void checkExtractedTimestampWithoutTimezoneAsUTC() {
SplunkSinkTask task = new SplunkSinkTask();
Collection<SinkRecord> record = createSinkRecords(1,"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname1\",\"CLASS\":\"class1\",\"cust_id\":\"000013934\",\"REQ_TIME\": \"20230904133016993\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}");

UnitUtil uu = new UnitUtil(0);
Map<String, String> config = uu.createTaskConfig();
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
config.put(SplunkSinkConnectorConfig.ENABLE_TIMESTAMP_EXTRACTION_CONF, String.valueOf(true));
config.put(SplunkSinkConnectorConfig.REGEX_CONF, "\\\"REQ_TIME\\\":\\s*\\\"(?<time>.*?)\"");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_FORMAT_CONF, "yyyyMMddHHmmssSSS");
config.put(SplunkSinkConnectorConfig.TIMESTAMP_TIMEZONE_CONF, "");
HecMock hec = new HecMock(task);
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.put(record);

List<EventBatch> batches = hec.getBatches();
for (Iterator<EventBatch> iter = batches.listIterator(); iter.hasNext();) {
EventBatch batch = iter.next();
List<Event> event_list = batch.getEvents();
Iterator<Event> iterator = event_list.listIterator() ;
Event event = iterator.next();

Assert.assertEquals(1.693834216993E9, event.getTime(), 0);
break;
}
task.stop();
}

@Test(expected = ConfigException.class)
public void emptyRegex() {
SplunkSinkTask task = new SplunkSinkTask();
Expand Down

0 comments on commit 44c9d9a

Please sign in to comment.