From 54a20aea6aa02ed6cbbcd174c33da065a3171c5b Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Thu, 21 Jul 2022 15:13:06 +0530 Subject: [PATCH 01/37] Add semgrep --- .github/workflows/ci_build_test.yaml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index f2490c6c..1bd509c9 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -10,6 +10,10 @@ on: FOSSA_API_KEY: description: API token for FOSSA app required: true + + SEMGREP_PUBLISH_TOKEN: + description: Publish token for Semgrep + required: true jobs: fossa-scan: @@ -35,11 +39,24 @@ jobs: env: FOSSA_API_KEY: ${{ secrets.FOSSA_API_KEY }} + semgrep: + runs-on: ubuntu-latest + name: security-sast-semgrep + if: github.actor != 'dependabot[bot]' + steps: + - uses: actions/checkout@v3 + - name: Semgrep + id: semgrep + uses: returntocorp/semgrep-action@v1 + with: + publishToken: ${{ secrets.SEMGREP_PUBLISH_TOKEN }} + build-unit-test: name: build and run unit test runs-on: ubuntu-20.04 needs: - fossa-scan + - semgrep steps: - name: Checkout uses: actions/checkout@v2 From 3c63f70dee269d060e5ea9be91c92c4db0f91bf9 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Fri, 22 Jul 2022 11:34:24 +0530 Subject: [PATCH 02/37] Add semgrep --- .github/workflows/ci_build_test.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 1bd509c9..6a91c920 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -54,9 +54,6 @@ jobs: build-unit-test: name: build and run unit test runs-on: ubuntu-20.04 - needs: - - fossa-scan - - semgrep steps: - name: Checkout uses: actions/checkout@v2 From 42bc2d7de44f0c3ff146820fd0c947494f1d7e6b Mon Sep 17 00:00:00 2001 From: foram-splunk <89519924+foram-splunk@users.noreply.github.com> Date: Wed, 10 Aug 2022 12:47:40 +0530 Subject: [PATCH 03/37] Update ci_build_test.yaml --- .github/workflows/ci_build_test.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 6a91c920..dec5ef3e 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -81,6 +81,7 @@ jobs: files: "target/surefire-reports/*.xml" e2e_test: + environment: workflow-approval name: e2e test - kafka version-${{ matrix.kafka_version }} runs-on: ubuntu-20.04 needs: From 7a34d20d45494baef8b9e7735429939456b52ae7 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Fri, 12 Aug 2022 17:32:28 +0530 Subject: [PATCH 04/37] Added job for approval of functional tests --- .github/workflows/ci_build_test.yaml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index dec5ef3e..8976e9f3 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -1,7 +1,7 @@ name: CI Build Test on: - pull_request: + pull_request_target: branches-ignore: - /^release\/.*/ - master @@ -80,12 +80,21 @@ jobs: check_name: Unit Test Results files: "target/surefire-reports/*.xml" - e2e_test: + functional_tests_approval: + name: Approve functional tests + runs-on: ubuntu-20.04 environment: workflow-approval + needs: + - build-unit-test + steps: + - name: Approve functional tests + run: echo For security reasons, all pull requests need to be approved first before running any automated CI. + + e2e_test: name: e2e test - kafka version-${{ matrix.kafka_version }} runs-on: ubuntu-20.04 needs: - - build-unit-test + - functional_tests_approval strategy: fail-fast: false matrix: From 46d83b74dd114e010f9157b2c0a2f108a4e064d4 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Tue, 16 Aug 2022 17:16:26 +0530 Subject: [PATCH 05/37] Updated workflow jobs --- .github/workflows/ci_build_test.yaml | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 8976e9f3..52c5930c 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -16,9 +16,19 @@ on: required: true jobs: + workflow_approval: + name: Approve workflow + runs-on: ubuntu-20.04 + environment: workflow-approval + steps: + - name: Approve workflow + run: echo For security reasons, all pull requests need to be approved first before running any automated CI. + fossa-scan: continue-on-error: true runs-on: ubuntu-latest + needs: + - workflow_approval steps: - uses: actions/checkout@v3 - name: run fossa anlyze and create report @@ -41,6 +51,8 @@ jobs: semgrep: runs-on: ubuntu-latest + needs: + - workflow_approval name: security-sast-semgrep if: github.actor != 'dependabot[bot]' steps: @@ -54,6 +66,8 @@ jobs: build-unit-test: name: build and run unit test runs-on: ubuntu-20.04 + needs: + - workflow_approval steps: - name: Checkout uses: actions/checkout@v2 @@ -80,21 +94,11 @@ jobs: check_name: Unit Test Results files: "target/surefire-reports/*.xml" - functional_tests_approval: - name: Approve functional tests - runs-on: ubuntu-20.04 - environment: workflow-approval - needs: - - build-unit-test - steps: - - name: Approve functional tests - run: echo For security reasons, all pull requests need to be approved first before running any automated CI. - e2e_test: name: e2e test - kafka version-${{ matrix.kafka_version }} runs-on: ubuntu-20.04 needs: - - functional_tests_approval + - build-unit-test strategy: fail-fast: false matrix: From 30625e373549b909e589e74aa3cfe637d70b973f Mon Sep 17 00:00:00 2001 From: Alexandre Garnier Date: Mon, 1 Aug 2022 16:26:54 +0200 Subject: [PATCH 06/37] Handle other type of truststore Only the discouraged JKS was available --- src/main/java/com/splunk/hecclient/Hec.java | 7 ++++--- .../java/com/splunk/hecclient/HecConfig.java | 8 ++++++++ .../connect/SplunkSinkConnectorConfig.java | 9 ++++++++- .../com/splunk/hecclient/HecConfigTest.java | 2 ++ .../splunk/hecclient/HttpClientBuilderTest.java | 16 ++++++++++++++-- .../com/splunk/kafka/connect/ConfigProfile.java | 15 +++++++++++++-- .../connect/SplunkSinkConnectorConfigTest.java | 5 ++++- .../java/com/splunk/kafka/connect/UnitUtil.java | 1 + src/test/resources/keystoretest.p12 | Bin 0 -> 2724 bytes 9 files changed, 54 insertions(+), 9 deletions(-) create mode 100644 src/test/resources/keystoretest.p12 diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index 802804ba..1a965bd3 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -287,7 +287,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { } // Code block for custom keystore client construction - SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStorePassword()); + SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword()); if (context != null) { return new HttpClientBuilder() @@ -309,6 +309,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { * a Hec Client with custom key store functionality. * * @param path A file path to the custom key store to be used. + * @param type The type of the key store file. * @param pass The password for the key store file. * @since 1.1.0 * @throws HecException @@ -316,9 +317,9 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) { * @see KeyStore * @see SSLContext */ - public static SSLContext loadCustomSSLContext(String path, String pass) { + public static SSLContext loadCustomSSLContext(String path, String type, String pass) { try { - KeyStore ks = KeyStore.getInstance("JKS"); + KeyStore ks = KeyStore.getInstance(type); FileInputStream fileInputStream = new FileInputStream(path); ks.load(fileInputStream, pass.toCharArray()); diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index d6421603..d29eccb5 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -34,6 +34,7 @@ public final class HecConfig { private boolean enableChannelTracking = false; private boolean hasCustomTrustStore = false; private String trustStorePath; + private String trustStoreType = "JKS"; private String trustStorePassword; private int lbPollInterval = 120; // in seconds private String kerberosPrincipal; @@ -104,6 +105,8 @@ public int getBackoffThresholdSeconds() { public String getTrustStorePath() { return trustStorePath; } + public String getTrustStoreType() { return trustStoreType; } + public String getTrustStorePassword() { return trustStorePassword; } public HecConfig setDisableSSLCertVerification(boolean disableVerfication) { @@ -161,6 +164,11 @@ public HecConfig setTrustStorePath(String path) { return this; } + public HecConfig setTrustStoreType(String type) { + trustStoreType = type; + return this; + } + public HecConfig setTrustStorePassword(String pass) { trustStorePassword = pass; return this; diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 937c2bea..dddd3291 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -74,6 +74,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HEC_EVENT_FORMATTED_CONF = "splunk.hec.json.event.formatted"; // Trust store static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path"; + static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type"; static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password"; //Headers static final String HEADER_SUPPORT_CONF = "splunk.header.support"; @@ -178,6 +179,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { + "correctly by Splunk."; // TBD static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store."; + static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...)."; static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store."; static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override"; @@ -236,6 +238,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final boolean hasTrustStorePath; final String trustStorePath; + final String trustStoreType; final String trustStorePassword; final boolean headerSupport; @@ -265,6 +268,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF); trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF); hasTrustStorePath = StringUtils.isNotBlank(trustStorePath); + trustStoreType = getString(SSL_TRUSTSTORE_TYPE_CONF); trustStorePassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONF).value(); validateHttpsConfig(splunkURI); eventBatchTimeout = getInt(EVENT_TIMEOUT_CONF); @@ -318,6 +322,7 @@ public static ConfigDef conf() { .define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC) .define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC) .define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC) + .define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC) .define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC) .define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC) .define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC) @@ -368,6 +373,7 @@ public HecConfig getHecConfig() { .setEnableChannelTracking(trackData) .setBackoffThresholdSeconds(backoffThresholdSeconds) .setTrustStorePath(trustStorePath) + .setTrustStoreType(trustStoreType) .setTrustStorePassword(trustStorePassword) .setHasCustomTrustStore(hasTrustStorePath) .setKerberosPrincipal(kerberosUserPrincipal) @@ -393,6 +399,7 @@ public String toString() { + "httpKeepAlive:" + httpKeepAlive + ", " + "validateCertificates:" + validateCertificates + ", " + "trustStorePath:" + trustStorePath + ", " + + "trustStoreType:" + trustStoreType + ", " + "socketTimeout:" + socketTimeout + ", " + "eventBatchTimeout:" + eventBatchTimeout + ", " + "ackPollInterval:" + ackPollInterval + ", " @@ -544,4 +551,4 @@ private static boolean getNamedGroupCandidates(String regex) { } return false; } -} \ No newline at end of file +} diff --git a/src/test/java/com/splunk/hecclient/HecConfigTest.java b/src/test/java/com/splunk/hecclient/HecConfigTest.java index a43d9054..3826bff4 100644 --- a/src/test/java/com/splunk/hecclient/HecConfigTest.java +++ b/src/test/java/com/splunk/hecclient/HecConfigTest.java @@ -44,6 +44,7 @@ public void getterSetter() { .setEnableChannelTracking(true) .setEventBatchTimeout(7) .setTrustStorePath("test") + .setTrustStoreType("PKCS12") .setTrustStorePassword("pass") .setHasCustomTrustStore(true) .setBackoffThresholdSeconds(10) @@ -60,6 +61,7 @@ public void getterSetter() { Assert.assertEquals(6, config.getAckPollThreads()); Assert.assertEquals(7, config.getEventBatchTimeout()); Assert.assertEquals("test", config.getTrustStorePath()); + Assert.assertEquals("PKCS12", config.getTrustStoreType()); Assert.assertEquals("pass", config.getTrustStorePassword()); Assert.assertEquals(10000, config.getBackoffThresholdSeconds()); Assert.assertEquals(120000, config.getlbPollInterval()); diff --git a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java index 3d5fbfff..80490f4f 100644 --- a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java +++ b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java @@ -52,7 +52,19 @@ public void buildSecureCustomKeystore() { .setSocketSendBufferSize(1024) .setSocketTimeout(120) .setDisableSSLCertVerification(false) - .setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks","Notchangeme")) + .setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks", "JKS", "Notchangeme")) + .build(); + Assert.assertNotNull(client); + } + @Test + public void buildSecureCustomKeystorePkcs12() { + HttpClientBuilder builder = new HttpClientBuilder(); + CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1) + .setMaxConnectionPoolSize(2) + .setSocketSendBufferSize(1024) + .setSocketTimeout(120) + .setDisableSSLCertVerification(false) + .setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.p12", "PKCS12", "Notchangeme")) .build(); Assert.assertNotNull(client); } @@ -63,4 +75,4 @@ public void buildDefault() { CloseableHttpClient client = builder.build(); Assert.assertNotNull(client); } -} \ No newline at end of file +} diff --git a/src/test/java/com/splunk/kafka/connect/ConfigProfile.java b/src/test/java/com/splunk/kafka/connect/ConfigProfile.java index b8085d4e..f6c75b9f 100644 --- a/src/test/java/com/splunk/kafka/connect/ConfigProfile.java +++ b/src/test/java/com/splunk/kafka/connect/ConfigProfile.java @@ -17,6 +17,7 @@ public class ConfigProfile { private boolean validateCertificates; private boolean hasTrustStorePath; private String trustStorePath; + private String trustStoreType; private String trustStorePassword; private int eventBatchTimeout; private int ackPollInterval; @@ -77,6 +78,7 @@ public ConfigProfile buildProfileDefault() { this.validateCertificates = true; this.hasTrustStorePath = true; this.trustStorePath = "./src/test/resources/keystoretest.jks"; + this.trustStoreType = "JKS"; this.trustStorePassword = "Notchangeme"; this.eventBatchTimeout = 1; this.ackPollInterval = 1; @@ -110,7 +112,8 @@ public ConfigProfile buildProfileOne() { this.httpKeepAlive = true; this.validateCertificates = true; this.hasTrustStorePath = true; - this.trustStorePath = "./src/test/resources/keystoretest.jks"; + this.trustStorePath = "./src/test/resources/keystoretest.p12"; + this.trustStoreType = "PKCS12"; this.trustStorePassword = "Notchangeme"; this.eventBatchTimeout = 1; this.ackPollInterval = 1; @@ -332,6 +335,14 @@ public void setTrustStorePath(String trustStorePath) { this.trustStorePath = trustStorePath; } + public String getTrustStoreType() { + return trustStoreType; + } + + public void setTrustStoreType(String trustStoreType) { + this.trustStoreType = trustStoreType; + } + public String getTrustStorePassword() { return trustStorePassword; } @@ -461,6 +472,6 @@ public void setHeaderHost(String headerHost) { } @Override public String toString() { - return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}'; + return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}'; } } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java index 82f1c997..2113fade 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java @@ -83,6 +83,7 @@ public void getHecConfigCustomKeystore() { HecConfig config = connectorConfig.getHecConfig(); Assert.assertEquals(true, config.getHasCustomTrustStore()); Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath()); + Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType()); Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword()); } @@ -95,9 +96,10 @@ public void testCustomKeystore() throws KeyStoreException { HecConfig config = connectorConfig.getHecConfig(); Assert.assertEquals(true, config.getHasCustomTrustStore()); Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath()); + Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType()); Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword()); - SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(),config.getTrustStorePassword()); + SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword()); Assert.assertNotNull(context); } @@ -315,6 +317,7 @@ private void commonAssert(final SplunkSinkConnectorConfig connectorConfig) { Assert.assertEquals(uu.configProfile.isHttpKeepAlive(), connectorConfig.httpKeepAlive); Assert.assertEquals(uu.configProfile.isValidateCertificates(), connectorConfig.validateCertificates); Assert.assertEquals(uu.configProfile.getTrustStorePath(), connectorConfig.trustStorePath); + Assert.assertEquals(uu.configProfile.getTrustStoreType(), connectorConfig.trustStoreType); Assert.assertEquals(uu.configProfile.getTrustStorePassword(), connectorConfig.trustStorePassword); Assert.assertEquals(uu.configProfile.getEventBatchTimeout(), connectorConfig.eventBatchTimeout); Assert.assertEquals(uu.configProfile.getAckPollInterval(), connectorConfig.ackPollInterval); diff --git a/src/test/java/com/splunk/kafka/connect/UnitUtil.java b/src/test/java/com/splunk/kafka/connect/UnitUtil.java index 1c2b3296..85ae6e04 100644 --- a/src/test/java/com/splunk/kafka/connect/UnitUtil.java +++ b/src/test/java/com/splunk/kafka/connect/UnitUtil.java @@ -45,6 +45,7 @@ public Map createTaskConfig() { if(configProfile.getTrustStorePath() != null ) { config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PATH_CONF, configProfile.getTrustStorePath()); + config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_TYPE_CONF, configProfile.getTrustStoreType()); config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PASSWORD_CONF, configProfile.getTrustStorePassword()); } diff --git a/src/test/resources/keystoretest.p12 b/src/test/resources/keystoretest.p12 new file mode 100644 index 0000000000000000000000000000000000000000..5aa24124725ef40fe93e796818e01996318c1d3e GIT binary patch literal 2724 zcma);X*d*$8pmhG%-APs3@6)U%Mxa+$y)Y=jGdDq##$01yBKR^kOmQ=#4wgJ*$P>* zWsc=oj-6~}X^=s-PWQR@={)z-y&vA^ecs>y{l6do&x<0kJp%!mPz1KCP*&-9qxd~e zAT#h8flVDuU{n2x6;T98*?&opTrdHW^%H0OtW+rbe_R}FK+rP+MCT{eK)L^NKsZtM zsEhxM94Hwud|;U`yfM*aIzi<^I2&=y&=#4J4+5R%0s)aI4k*jNFM?Ph0F)q<)j8e> zh+_f+rNLaqe-5?=7*2t6reC{VaGJ9N6Tky#J@f9j9BI=!M!KJ--J1e?y3_k!u_s;b zC9=5OZBVH}#$iDs8ty`;DRUPzTu5Dx^VTUjpCuJ;3_KC~9vMHB(6B#RFR>uXVx@J9 zD07ScRt4%9pvd$^{$aP8=FJh~-?%qh&S2Z+B#b|bt?VL;NbIpw@w)7R;WAc*I+aGc z0<^`1jufLux1XFL$=8#cEmrIL^N!83!|q3muNDdd%6m5}D#l`I>UKK{CV?i+sX1{X zv>NALGkI4v4?Y;JuYZpgf2MQjl=zJV54~|0H+65tRaoS$A#8`*2)?b<*=^(cioUW| z3sW_it*S+((Y_{7tbkSB3lmatrJLATlAtQ}TjV6ILd>ZzSnK4)M3GjB7<`TcqPq=$ zh?E15$~P6*D(!NPT9D|O)V1#SQuT}VXK~m7A$}_bJ7@2d5xzv~IPrSAN+u?GpHk8p zYjBiNadP)<1e+XFtHIol=-Ep=j-n3Y$Kj6;W+}ATNJBkid8K)srQtCjrkKr*+Jwsd z)TBTdv!6q`1rnhiNpDlVtJ_j=KwY{{EHv84U;Cj(0$r)goS2I0s~I1^b~yCy=u4%7 zt@gR4;LRKwI&A|I{!q?o-2uRhVYIL9_NTnrxpj@xw0z-5;NAut$ZLdcr?sW27_pw4 z0psCj>$15%-27~w-`D-_83s?$D`%TQOy0n3s+xQva}$Hg zih8>bs^tvyPdf_9ZTJ}DeXV#V&TuHO$4gzJh_;n>zK2Fm8Vd@S%FmcMnfc-uy(@(` zj@1s^_5J2K$o?pK$$E*A1xIS%`W-3CL+0dN^Wumh+ovHD9SWL#r3b1vS9mwtW~*#d&X|tIYP@-I=Uf^XLB&bo)*^N zB#611`GG%Z$`=jsx3u0mb-p#5b`tDtX*NE#%WOgVUi?l4Tl-;d6-3WpJ$Ha#!j z(%3GX-GWD9FEXf$%mx;s`Rfaqr$nNyR^x^_Sqao>zq55`r#kILJkDm1t?oMS?=(lq zAKJNP$dw!CKg(ee#$khISL`zq3yrm}K2`?IW{)raIj<+husmda6g0DsbMc0qbGfyKZZCBCUqi!ES? z^qkujSsT$l&&Ohub^ZzGL!x=H^n25WU?I_2pQB9~Rg1Y6i$@CKz9x5asv{n_8lO3~ z`ecfT%8|PDackaeT2f}3j8$guXO%S(AI0=F-Y5oXm|jL0Lb=XfzNPdk7vb}zCMP<3 zRKhQWB$JRgofyRrbCP1IifYYf%0)94+e-z;Vis+^mL%pikThUj;K7)Vg@hiy)dr$p8 z5p7wX#X+aNQ*qT3-1#fOyDW;pv?`*>5&dRk?+PZb#j?n=P;a0whJ=g zDGh?udVJJ=GXZ?SQM+W>A(0OW=*rH=&-5ox`FK{(S2sf3*4=Jf~eV@&o%Vs6epbN;!-RcsXRO-Im7hF_~AMW zK;YuWNdAhpkcMr-kdu~v9DTxsmCz7pvGOH`)W$h*>#6eUYaVim8?(M_eEKiz`e76vedw#_; z6hv35+jQEy%BmFQ+q87t3+S2ly*-DBC)Y}mN!}BmgL<8cT$Dm_g>wOtaeQ*N`V)&ikI3l59k*{NR^JP8HfBl zm!1bREhH^gV3#T^eRpjKd1Z>yE4svzppvu6rH&P8@Jr`Y%lpQ5$3h1KqpogbRWHYM z2w2z2?;MRKrou^-))6IZPj_7_d-NF!@8ss=$opBLfZnuP*|5jmx3$*%v*0ouQo4jB zM|Z9JVF6_9QJGEo<^CSES@q?Z8l2*LCs*WXCK0I8i<4E5SQ_@nyUu@yXm5L4M3<~e zFJgtZnJC|8K_&R?wpehJNBWUHj^VO!yp;X&0%vsS=~Ed&BA35yJy$fbvBdUCeDDG2 z#@_MnBta5uVGL^HNrSZ5j?Q5R(h_kO1ny?a8Yzw0|2pSea3SPRL`T98l@f;%1Qrrh zOzfG7d7{kQW3p;2?`L2jBz;ycLdY9Vz9SQtcJ`Z&f1FdS&8Y}%N8Zi*^Pl)~R`jZ! zSmq9=o+Rv`s`AQ~r0)fQ)jP)uHQ&`WO;zoJE}}f1+UAbGMwz0}D9&G>8wdyifRWeD z26e(m_kH!y4#f<1%7X!uu&YwYpkVqC!F^A5(F+WJWLrDD%s{c{9 literal 0 HcmV?d00001 From 21152253cedbbf9db04c9a179d46e9d5ddfaa858 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Tue, 23 Aug 2022 21:24:08 +0530 Subject: [PATCH 07/37] Stabilize Test kafka connect upgrade --- .github/workflows/ci_build_test.yaml | 86 +++++++++++++++++++++ test/config.sh | 18 +++++ test/lib/connector_upgrade.py | 14 ++-- test/lib/eventproducer_connector_upgrade.py | 52 +++++++++++++ 4 files changed, 163 insertions(+), 7 deletions(-) create mode 100644 test/config.sh create mode 100644 test/lib/eventproducer_connector_upgrade.py diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 52c5930c..fbc8883e 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -218,7 +218,93 @@ jobs: pip install -r test/requirements.txt export PYTHONWARNINGS="ignore:Unverified HTTPS request" echo "Test kafka connect upgrade ..." + source $GITHUB_WORKSPACE/test/config.sh + test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name + sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & + sleep 20 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "false", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token"'" , + "splunk.flush.window": "1", + "splunk.sources": "kafka_connect" + } + }' + sleep 10 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect_ack", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "true", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token_ack"'" , + "splunk.flush.window": "1", + "splunk.sources": "kafka_connect_ack" + } + }' + sleep 5 + python test/lib/eventproducer_connector_upgrade.py --log-level=INFO + curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect" + curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect_ack" + sudo kill $(sudo lsof -t -i:8083) && sleep 2 + sudo rm $connector_path/$old_connector_name && sleep 2 + sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2 + sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & + sleep 10 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "false", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token"'" , + "splunk.sources": "kafka_connect", + "splunk.hec.json.event.formatted": "true", + "splunk.flush.window": "1", + "splunk.hec.raw": "false" + } + }' + sleep 10 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect_ack", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "true", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token_ack"'" , + "splunk.sources": "kafka_connect_ack", + "splunk.hec.json.event.formatted": "true", + "splunk.flush.window": "1", + "splunk.hec.raw": "false" + } + }' + sleep 5 + python test/lib/eventproducer_connector_upgrade.py --log-level=INFO python test/lib/connector_upgrade.py --log-level=INFO + - uses: actions/upload-artifact@v3 + if: always() + with: + name: kafka-connect-logs + path: output.log - name: Install kafka connect run: | diff --git a/test/config.sh b/test/config.sh new file mode 100644 index 00000000..dc52c68a --- /dev/null +++ b/test/config.sh @@ -0,0 +1,18 @@ +export splunkd_url=https://127.0.0.1:8089 +export splunk_hec_url=https://127.0.0.1:8088 +export splunk_user=admin +export splunk_password=helloworld +export splunk_index=main +export splunk_token=a6b5e77f-d5f6-415a-bd43-930cecb12959 +export splunk_token_ack=a6b5e77f-d5f6-415a-bd43-930cecb12950 +export kafka_broker_url=127.0.0.1:9092 +export kafka_connect_url=http://127.0.0.1:8083 +export kafka_topic=test-datagen +export kafka_topic_2=kafka_topic_2 +export kafka_header_topic=kafka_header_topic +export kafka_header_index=kafka +export connector_path=/usr/local/share/kafka/plugins +export connector_build_target=/usr/local/share/kafka-connector +export kafka_home=/usr/local/kafka +export kafka_connect_home=/home/circleci/repo +export old_connector_name=splunk-kafka-connect-v2.0.1.jar \ No newline at end of file diff --git a/test/lib/connector_upgrade.py b/test/lib/connector_upgrade.py index 564ec88e..fdc19463 100644 --- a/test/lib/connector_upgrade.py +++ b/test/lib/connector_upgrade.py @@ -158,21 +158,21 @@ def update_kafka_connectors(): thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True) thread_upgrade.start() time.sleep(100) - search_query_1 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector}" + search_query_1 = f"index={config['splunk_index']} | search source::{_connector}" logger.debug(search_query_1) - events_1 = check_events_from_splunk(start_time="-15m@m", + events_1 = check_events_from_splunk(start_time="-24h@h", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_1}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 15m", len(events_1)) + logger.info("Splunk received %s events in the last 24h", len(events_1)) assert len(events_1) == 2000 - search_query_2 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector_ack}" + search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack}" logger.debug(search_query_2) - events_2 = check_events_from_splunk(start_time="-15m@m", + events_2 = check_events_from_splunk(start_time="-24h@h", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_2}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 15m", len(events_2)) - assert len(events_2) == 2000 + logger.info("Splunk received %s events in the last 24h", len(events_2)) + assert len(events_2) == 2000 \ No newline at end of file diff --git a/test/lib/eventproducer_connector_upgrade.py b/test/lib/eventproducer_connector_upgrade.py new file mode 100644 index 00000000..f66f9e0f --- /dev/null +++ b/test/lib/eventproducer_connector_upgrade.py @@ -0,0 +1,52 @@ +from kafka.producer import KafkaProducer +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) + +from lib.commonsplunk import check_events_from_splunk +from lib.commonkafka import * +from lib.helper import * +from datetime import datetime +import threading +import logging.config +import yaml +import subprocess +import logging +import time + +logging.config.fileConfig(os.path.join(get_test_folder(), "logging.conf")) +logger = logging.getLogger('connector_upgrade') + +_config_path = os.path.join(get_test_folder(), 'config.yaml') +with open(_config_path, 'r') as yaml_file: + config = yaml.load(yaml_file) +now = datetime.now() +_time_stamp = str(datetime.timestamp(now)) +_topic = 'kafka_connect_upgrade' + + +def generate_kafka_events(num): + topics = [_topic] + client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test') + broker_topics = client.list_topics() + logger.info(broker_topics) + if _topic not in broker_topics: + create_kafka_topics(config, topics) + producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], + value_serializer=lambda v: json.dumps(v).encode('utf-8')) + + for _ in range(num): + msg = {"timestamp": _time_stamp} + producer.send(_topic, msg) + time.sleep(0.05) + producer.flush() + + + +if __name__ == '__main__': + + time.sleep(20) + logger.info("Generate Kafka events ...") + thread_gen = threading.Thread(target=generate_kafka_events, args=(1000,), daemon=True) + thread_gen.start() + time.sleep(100) \ No newline at end of file From 61ce47da2da96b6bc873297025224a260a781721 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Mon, 29 Aug 2022 11:01:09 +0530 Subject: [PATCH 08/37] Updated tasks and stabilizing crud test --- .github/workflows/ci_build_test.yaml | 62 +++++---- test/conftest.py | 15 ++ test/lib/commonkafka.py | 5 +- test/lib/connector_upgrade.py | 144 +------------------- test/lib/eventproducer_connector_upgrade.py | 27 +++- test/testcases/test_crud.py | 2 +- 6 files changed, 83 insertions(+), 172 deletions(-) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index fbc8883e..ff347f71 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -205,6 +205,11 @@ jobs: - name: Test kafka connect upgrade run: | echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION + # Summary for the test + #1)We will deploy old kafka connector and create 2 tasks for that to check ack and without ack functionality + #2)then we will remove that old kafka connector and deploy new kafka connector with updation of two tasks + #3) Here in the updation we will check for the new functionality("splunk.hec.json.event.formatted" and "splunk.hec.raw") so that we can check if we can successfully upgrade the connector + #4)At last we will check if we have recieved 2000 events for both the tasks sudo mkdir -p /usr/local/share/kafka/plugins/ wget https://github.com/splunk/kafka-connect-splunk/releases/download/$CI_OLD_CONNECTOR_VERSION/splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar sudo cp splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar /usr/local/share/kafka/plugins/ @@ -220,8 +225,10 @@ jobs: echo "Test kafka connect upgrade ..." source $GITHUB_WORKSPACE/test/config.sh test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name + # Starting Old Connector sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & sleep 20 + # Creating the two tasks (with ack and without ack) curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ "name": "kafka_connect", "config": { @@ -233,11 +240,12 @@ jobs: "splunk.hec.uri": "'"$splunk_hec_url"'", "splunk.hec.ssl.validate.certs": "false", "splunk.hec.token": "'"$splunk_token"'" , - "splunk.flush.window": "1", - "splunk.sources": "kafka_connect" + "splunk.sources": "kafka_connect", + "splunk.hec.raw": "true", + "splunk.sourcetypes":"upgraded_test" } }' - sleep 10 + sleep 5 curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ "name": "kafka_connect_ack", "config": { @@ -248,23 +256,23 @@ jobs: "splunk.hec.ack.enabled": "true", "splunk.hec.uri": "'"$splunk_hec_url"'", "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": "'"$splunk_token_ack"'" , - "splunk.flush.window": "1", - "splunk.sources": "kafka_connect_ack" + "splunk.hec.token": "'"$splunk_token_ack"'" , + "splunk.sources": "kafka_connect_ack", + "splunk.hec.raw": "true", + "splunk.sourcetypes":"upgraded_test" } }' sleep 5 - python test/lib/eventproducer_connector_upgrade.py --log-level=INFO - curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect" - curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect_ack" + # Generating 1000 events + python test/lib/eventproducer_connector_upgrade.py 1000 --log-level=INFO sudo kill $(sudo lsof -t -i:8083) && sleep 2 sudo rm $connector_path/$old_connector_name && sleep 2 sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2 + # Starting New Connector sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & + # Updating the two tasks (with ack and without ack) sleep 10 - curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ - "name": "kafka_connect", - "config": { + curl ${kafka_connect_url}/connectors/kafka_connect/config -X PUT -H "Content-Type: application/json" -d '{ "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "1", "splunk.indexes": "'"$splunk_index"'", @@ -274,15 +282,12 @@ jobs: "splunk.hec.ssl.validate.certs": "false", "splunk.hec.token": "'"$splunk_token"'" , "splunk.sources": "kafka_connect", + "splunk.hec.raw": "false", "splunk.hec.json.event.formatted": "true", - "splunk.flush.window": "1", - "splunk.hec.raw": "false" - } + "splunk.sourcetypes":"upgraded_test" }' - sleep 10 - curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ - "name": "kafka_connect_ack", - "config": { + sleep 5 + curl ${kafka_connect_url}/connectors/kafka_connect_ack/config -X PUT -H "Content-Type: application/json" -d '{ "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "1", "splunk.indexes": "'"$splunk_index"'", @@ -292,18 +297,19 @@ jobs: "splunk.hec.ssl.validate.certs": "false", "splunk.hec.token": "'"$splunk_token_ack"'" , "splunk.sources": "kafka_connect_ack", + "splunk.hec.raw": "false", "splunk.hec.json.event.formatted": "true", - "splunk.flush.window": "1", - "splunk.hec.raw": "false" - } + "splunk.sourcetypes":"upgraded_test" }' sleep 5 - python test/lib/eventproducer_connector_upgrade.py --log-level=INFO + # Generating 1000 events + python test/lib/eventproducer_connector_upgrade.py 2000 --log-level=INFO + # Check in splunk that we have recieved 2000 events for with ack and without ack tasks python test/lib/connector_upgrade.py --log-level=INFO - uses: actions/upload-artifact@v3 - if: always() + if: failure() with: - name: kafka-connect-logs + name: kafka-connect-logs-${{ matrix.kafka_version }} path: output.log - name: Install kafka connect @@ -320,3 +326,9 @@ jobs: export PYTHONWARNINGS="ignore:Unverified HTTPS request" echo "Running functional tests....." python -m pytest --log-level=INFO + + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: splunk-events-${{ matrix.kafka_version }} + path: events.txt \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index ad3d0ad1..a043e28e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -17,6 +17,7 @@ from lib.connect_params import * from kafka.producer import KafkaProducer +from lib.commonsplunk import check_events_from_splunk from lib.helper import get_test_folder from lib.data_gen import generate_connector_content import pytest @@ -89,3 +90,17 @@ def pytest_unconfigure(): # Delete launched connectors for param in connect_params: delete_kafka_connector(config, param) + +def pytest_sessionfinish(session, exitstatus): + if exitstatus != 0: + search_query = f"index={setup['splunk_index']}" + logger.info(search_query) + events = check_events_from_splunk(start_time="-24h@h", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=[f"search {search_query}"], + password=setup["splunk_password"]) + myfile = open('events.txt', 'w+') + for i in events: + myfile.write("%s\n" % i) + myfile.close() \ No newline at end of file diff --git a/test/lib/commonkafka.py b/test/lib/commonkafka.py index dc101b2e..d55f82f5 100644 --- a/test/lib/commonkafka.py +++ b/test/lib/commonkafka.py @@ -81,11 +81,11 @@ def delete_kafka_connector(setup, connector): return False -def get_kafka_connector_tasks(setup, params): +def get_kafka_connector_tasks(setup, params, sleepDuration=0): ''' Get kafka connect connector tasks using kafka connect REST API ''' - + time.sleep(sleepDuration) t_end = time.time() + 10 while time.time() < t_end: response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/tasks", @@ -96,7 +96,6 @@ def get_kafka_connector_tasks(setup, params): return 0 - def get_kafka_connector_status(setup, params, action, state): ''' Get kafka connect connector tasks using kafka connect REST API diff --git a/test/lib/connector_upgrade.py b/test/lib/connector_upgrade.py index fdc19463..90cce3e6 100644 --- a/test/lib/connector_upgrade.py +++ b/test/lib/connector_upgrade.py @@ -21,158 +21,28 @@ with open(_config_path, 'r') as yaml_file: config = yaml.load(yaml_file) now = datetime.now() -_time_stamp = str(datetime.timestamp(now)) -_topic = 'kafka_connect_upgrade' _connector = 'kafka_connect' _connector_ack = 'kafka_connect_ack' -def start_old_connector(): - cmds = [f"test -f {config['connector_path']}/{config['old_connector_name']} && echo {config['connector_path']}/{config['old_connector_name']}", - f"cd {config['kafka_home']}", - f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"] - - cmd = "\n".join(cmds) - try: - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - proc.wait() - except OSError as e: - logger.error(e) - - -def generate_kafka_events(num): - # Generate message data - topics = [_topic] - connector_content = { - "name": _connector, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "false", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token"], - "splunk.sources": _connector - } - } - create_kafka_connector(config, connector_content) - connector_content_ack = { - "name": _connector_ack, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "true", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token_ack"], - "splunk.sources": _connector_ack - } - } - create_kafka_connector(config, connector_content_ack) - client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test') - broker_topics = client.list_topics() - logger.info(broker_topics) - if _topic not in broker_topics: - create_kafka_topics(config, topics) - producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], - value_serializer=lambda v: json.dumps(v).encode('utf-8')) - - for _ in range(num): - msg = {"timestamp": _time_stamp} - producer.send(_topic, msg) - time.sleep(0.05) - producer.flush() - - -def upgrade_connector_plugin(): - cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2", - f"sudo rm {config['connector_path']}/{config['old_connector_name']} && sleep 2", - f"sudo cp {config['connector_build_target']}/splunk-kafka-connect*.jar {config['connector_path']} && sleep 2", - f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"] - - cmd = "\n".join(cmds) - try: - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - output, error = proc.communicate() - logger.debug(output) - time.sleep(2) - update_kafka_connectors() - except OSError as e: - logger.error(e) - - -def update_kafka_connectors(): - logger.info("Update kafka connectors ...") - connector_content = { - "name": _connector, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "false", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token"], - "splunk.sources": _connector, - "splunk.hec.json.event.formatted": "true", - "splunk.hec.raw": True - } - } - create_kafka_connector(config, connector_content) - connector_content_ack = { - "name": _connector_ack, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "true", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token_ack"], - "splunk.sources": _connector_ack, - "splunk.hec.json.event.formatted": "true", - "splunk.hec.raw": True - } - } - create_kafka_connector(config, connector_content_ack) - if __name__ == '__main__': - logger.info("Start old Kafka connector ...") - thread_old_connect = threading.Thread(target=start_old_connector, daemon=True) - thread_old_connect.start() - time.sleep(10) - logger.info("Generate Kafka events ...") - thread_gen = threading.Thread(target=generate_kafka_events, args=(2000,), daemon=True) - thread_gen.start() - time.sleep(50) - logger.info("Upgrade Kafka connector ...") - thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True) - thread_upgrade.start() time.sleep(100) - search_query_1 = f"index={config['splunk_index']} | search source::{_connector}" + search_query_1 = f"index={config['splunk_index']} | search source::{_connector} sourcetype::upgraded_test" logger.debug(search_query_1) - events_1 = check_events_from_splunk(start_time="-24h@h", + events_1 = check_events_from_splunk(start_time="-48h@h", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_1}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 24h", len(events_1)) + logger.info("Splunk received %s events", len(events_1)) assert len(events_1) == 2000 - search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack}" + search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack} sourcetype::upgraded_test" logger.debug(search_query_2) - events_2 = check_events_from_splunk(start_time="-24h@h", + events_2 = check_events_from_splunk(start_time="-48h@m", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_2}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 24h", len(events_2)) - assert len(events_2) == 2000 \ No newline at end of file + logger.info("Splunk received %s events ", len(events_2)) + assert len(events_2) == 2000 \ No newline at end of file diff --git a/test/lib/eventproducer_connector_upgrade.py b/test/lib/eventproducer_connector_upgrade.py index f66f9e0f..5eede3ba 100644 --- a/test/lib/eventproducer_connector_upgrade.py +++ b/test/lib/eventproducer_connector_upgrade.py @@ -15,7 +15,7 @@ import time logging.config.fileConfig(os.path.join(get_test_folder(), "logging.conf")) -logger = logging.getLogger('connector_upgrade') +logger = logging.getLogger('eventproducer_connector_upgrade') _config_path = os.path.join(get_test_folder(), 'config.yaml') with open(_config_path, 'r') as yaml_file: @@ -25,7 +25,23 @@ _topic = 'kafka_connect_upgrade' +def check_events_from_topic(target): + + t_end = time.time() + 100 + time.sleep(5) + while time.time() < t_end: + output1 = subprocess.getoutput(" echo $(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 'localhost:9092' --topic kafka_connect_upgrade --time -1 | grep -e ':[[:digit:]]*:' | awk -F ':' '{sum += $3} END {print sum}')") + output2 = subprocess.getoutput("echo $(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 'localhost:9092' --topic kafka_connect_upgrade --time -2 | grep -e ':[[:digit:]]*:' | awk -F ':' '{sum += $3} END {print sum}')") + time.sleep(5) + if (int(output1)-int(output2))==target: + logger.info("Events in the topic :" + str(int(output1)-int(output2))) + break + elif (int(output1)-int(output2))>2000: + logger.info("Events in the topic :" + str(int(output1)-int(output2))) + logger.info("Events in the topic :" + str(int(output1)-int(output2))) + def generate_kafka_events(num): + # Generate message data topics = [_topic] client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test') broker_topics = client.list_topics() @@ -35,18 +51,17 @@ def generate_kafka_events(num): producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], value_serializer=lambda v: json.dumps(v).encode('utf-8')) - for _ in range(num): - msg = {"timestamp": _time_stamp} + for i in range(num): + msg = f'timestamp={_time_stamp} count={i+1}\n' producer.send(_topic, msg) time.sleep(0.05) producer.flush() - - if __name__ == '__main__': time.sleep(20) logger.info("Generate Kafka events ...") thread_gen = threading.Thread(target=generate_kafka_events, args=(1000,), daemon=True) thread_gen.start() - time.sleep(100) \ No newline at end of file + check_events_from_topic(int(sys.argv[1])) + time.sleep(50) \ No newline at end of file diff --git a/test/testcases/test_crud.py b/test/testcases/test_crud.py index 0ff5e771..59798137 100644 --- a/test/testcases/test_crud.py +++ b/test/testcases/test_crud.py @@ -68,7 +68,7 @@ def test_valid_crud_tasks(self, setup, test_input, expected): assert update_kafka_connector(setup, connector_definition) == expected # Validate get tasks - tasks = get_kafka_connector_tasks(setup, connector_definition) + tasks = get_kafka_connector_tasks(setup, connector_definition,10) assert tasks == int(connector_definition["config"]["tasks.max"]) # Validate pause task From 25436267470dee683668e44e5368044af01d4e6e Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Mon, 29 Aug 2022 14:03:41 +0530 Subject: [PATCH 09/37] Modify search query --- test/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index a043e28e..68061e21 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -93,7 +93,7 @@ def pytest_unconfigure(): def pytest_sessionfinish(session, exitstatus): if exitstatus != 0: - search_query = f"index={setup['splunk_index']}" + search_query = "index=*" logger.info(search_query) events = check_events_from_splunk(start_time="-24h@h", url=setup["splunkd_url"], From e182a40f1636b87d30f3c5bf1cf07a58cec7a41b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 16 Nov 2022 03:16:28 +0000 Subject: [PATCH 10/37] Bump jackson-databind from 2.12.6.1 to 2.12.7.1 Bumps [jackson-databind](https://github.com/FasterXML/jackson) from 2.12.6.1 to 2.12.7.1. - [Release notes](https://github.com/FasterXML/jackson/releases) - [Commits](https://github.com/FasterXML/jackson/commits) --- updated-dependencies: - dependency-name: com.fasterxml.jackson.core:jackson-databind dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d600dae3..aa3be477 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ com.fasterxml.jackson.core jackson-databind - 2.12.6.1 + 2.12.7.1 compile From 22e94e57b7a5972ab1aae82e908729b6d113daf6 Mon Sep 17 00:00:00 2001 From: Pawel Szkamruk Date: Wed, 16 Nov 2022 15:28:15 +0100 Subject: [PATCH 11/37] update kafaka testing matrix, update splunk version for tests --- .github/workflows/ci_build_test.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index ff347f71..b9b0053c 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -119,9 +119,11 @@ jobs: kafka_package: "kafka_2.13-3.0.0.tgz" - kafka_version: "3.1.0" kafka_package: "kafka_2.13-3.1.0.tgz" + - kafka_version: "3.3.1" + kafka_package: "kafka_2.13-3.3.1.tgz" env: - CI_SPLUNK_VERSION: "8.2.2" - CI_SPLUNK_FILENAME: splunk-8.2.2-87344edfcdb4-Linux-x86_64.tgz + CI_SPLUNK_VERSION: "9.0.2" + CI_SPLUNK_FILENAME: splunk-9.0.2-17e00c557dc1-Linux-x86_64.tgz CI_SPLUNK_HOST: 127.0.0.1 CI_SPLUNK_PORT: 8089 CI_SPLUNK_USERNAME: admin From d1d1ace2a8d2ba3ef1538b92d176069dd0cd0380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Wed, 23 Nov 2022 11:22:24 +0530 Subject: [PATCH 12/37] docs(readme): update definition of splunk.hec.backoff.threshhold.seconds --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index eae192e1..6acaf8f8 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,7 @@ Use the below schema to configure Splunk Connect for Kafka | `splunk.hec.json.event.formatted` | Set to `true` for events that are already in HEC format. Valid settings are `true` or `false`. |`false`| | `splunk.hec.max.outstanding.events` | Maximum amount of un-acknowledged events kept in memory by connector. Will trigger back-pressure event to slow down collection if reached. | `1000000` | | `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` | -| `splunk.hec.backoff.threshhold.seconds` | The amount of time Splunk Connect for Kafka waits to attempt resending after errors from a HEC endpoint." | `60` | +| `splunk.hec.backoff.threshhold.seconds` | The amount of duration the Indexer will be stopped after getting error code while posting the data.
**NOTE:**
Other Indexer won't get affected with this parameter." | `60` | | `splunk.hec.lb.poll.interval` | Specify this parameter(in seconds) to control the polling interval(increase to do less polling, decrease to do more frequent polling, set `-1` to disable polling) | `120` | | `splunk.hec.enable.compression` | Valid settings are true or false. Used for enable or disable gzip-compression. |`false`| ### Acknowledgement Parameters @@ -232,6 +232,13 @@ Use the below schema to configure Splunk Connect for Kafka | `timestamp.regex` | Regex for timestamp extraction.
**NOTE:**
Regex must have name captured group `"time"` For eg.: `\\\"time\\\":\\s*\\\"(?