Skip to content

Commit

Permalink
Merge branch 'develop' into update-python-to-latest
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Aug 16, 2023
2 parents 26f6b45 + 0d5d3c7 commit 2aa7ab4
Show file tree
Hide file tree
Showing 36 changed files with 815 additions and 267 deletions.
173 changes: 165 additions & 8 deletions .github/workflows/ci_build_test.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: CI Build Test

on:
pull_request:
pull_request_target:
branches-ignore:
- /^release\/.*/
- master
Expand All @@ -10,13 +10,35 @@ on:
FOSSA_API_KEY:
description: API token for FOSSA app
required: true

SEMGREP_PUBLISH_TOKEN:
description: Publish token for Semgrep
required: true

permissions:
checks: write
pull-requests: write

jobs:
workflow_approval:
name: Approve workflow
runs-on: ubuntu-20.04
environment: workflow-approval
steps:
- name: Approve workflow
run: echo For security reasons, all pull requests need to be approved first before running any automated CI.

fossa-scan:
continue-on-error: true
runs-on: ubuntu-latest
needs:
- workflow_approval
steps:
- uses: actions/checkout@v3
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: run fossa anlyze and create report
run: |
curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/fossas/fossa-cli/master/install-latest.sh | bash
Expand All @@ -35,14 +57,35 @@ jobs:
env:
FOSSA_API_KEY: ${{ secrets.FOSSA_API_KEY }}

semgrep:
runs-on: ubuntu-latest
needs:
- workflow_approval
name: security-sast-semgrep
if: github.actor != 'dependabot[bot]'
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: Semgrep
id: semgrep
uses: returntocorp/semgrep-action@v1
with:
publishToken: ${{ secrets.SEMGREP_PUBLISH_TOKEN }}

build-unit-test:
name: build and run unit test
runs-on: ubuntu-20.04
needs:
- fossa-scan
- workflow_approval
steps:
- name: Checkout
uses: actions/checkout@3
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}

- name: Get maven dependencies
run: |
Expand All @@ -60,7 +103,7 @@ jobs:
path: /tmp/splunk-kafka-connect*.jar

- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action/composite@v1
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
check_name: Unit Test Results
Expand Down Expand Up @@ -91,9 +134,11 @@ jobs:
kafka_package: "kafka_2.13-3.0.0.tgz"
- kafka_version: "3.1.0"
kafka_package: "kafka_2.13-3.1.0.tgz"
- kafka_version: "3.3.1"
kafka_package: "kafka_2.13-3.3.1.tgz"
env:
CI_SPLUNK_VERSION: "8.2.2"
CI_SPLUNK_FILENAME: splunk-8.2.2-87344edfcdb4-Linux-x86_64.tgz
CI_SPLUNK_VERSION: "9.0.2"
CI_SPLUNK_FILENAME: splunk-9.0.2-17e00c557dc1-Linux-x86_64.tgz
CI_SPLUNK_HOST: 127.0.0.1
CI_SPLUNK_PORT: 8089
CI_SPLUNK_USERNAME: admin
Expand All @@ -104,11 +149,13 @@ jobs:
CI_KAFKA_HEADER_INDEX: kafka
CI_DATAGEN_IMAGE: rock1017/log-generator:latest
CI_OLD_CONNECTOR_VERSION: v2.0.1
SCHEMA_REGISTRY_URL: ${{ Secrets.SCHEMA_REGISTRY_URL }}

steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}

- name: Install Splunk
run: |
Expand Down Expand Up @@ -175,9 +222,26 @@ jobs:
name: splunk-kafka-connector
path: /tmp

- name: Up the Schema Registry
run: |
cd /tmp && wget https://packages.confluent.io/archive/7.1/confluent-community-7.1.1.tar.gz
sudo tar xzf confluent-community-7.1.1.tar.gz
cd confluent-7.1.1
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
- name: Register the protobuf schema
run: |
sleep 10
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schemaType\": \"PROTOBUF\",\"schema\": \"syntax = \\\"proto3\\\";\\npackage com.mycorp.mynamespace;\\n\\nmessage MyRecord {\\n string id = 1;\\n float amount = 2;\\n string customer_id = 3;\\n}\\n\"}" http://localhost:8081/subjects/prototopic-value/versions
- name: Test kafka connect upgrade
run: |
echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION
# Summary for the test
#1)We will deploy old kafka connector and create 2 tasks for that to check ack and without ack functionality
#2)then we will remove that old kafka connector and deploy new kafka connector with updation of two tasks
#3) Here in the updation we will check for the new functionality("splunk.hec.json.event.formatted" and "splunk.hec.raw") so that we can check if we can successfully upgrade the connector
#4)At last we will check if we have recieved 2000 events for both the tasks
sudo mkdir -p /usr/local/share/kafka/plugins/
wget https://github.com/splunk/kafka-connect-splunk/releases/download/$CI_OLD_CONNECTOR_VERSION/splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar
sudo cp splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar /usr/local/share/kafka/plugins/
Expand All @@ -191,7 +255,94 @@ jobs:
pip install -r test/requirements.txt
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
echo "Test kafka connect upgrade ..."
source $GITHUB_WORKSPACE/test/config.sh
test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name
# Starting Old Connector
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
sleep 20
# Creating the two tasks (with ack and without ack)
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka_connect",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token"'" ,
"splunk.sources": "kafka_connect",
"splunk.hec.raw": "true",
"splunk.sourcetypes":"upgraded_test"
}
}'
sleep 5
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka_connect_ack",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token_ack"'" ,
"splunk.sources": "kafka_connect_ack",
"splunk.hec.raw": "true",
"splunk.sourcetypes":"upgraded_test"
}
}'
sleep 5
# Generating 1000 events
python test/lib/eventproducer_connector_upgrade.py 1000 --log-level=INFO
sudo kill $(sudo lsof -t -i:8083) && sleep 2
sudo rm $connector_path/$old_connector_name && sleep 2
sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2
# Starting New Connector
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
# Updating the two tasks (with ack and without ack)
sleep 10
curl ${kafka_connect_url}/connectors/kafka_connect/config -X PUT -H "Content-Type: application/json" -d '{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token"'" ,
"splunk.sources": "kafka_connect",
"splunk.hec.raw": "false",
"splunk.hec.json.event.formatted": "true",
"splunk.sourcetypes":"upgraded_test"
}'
sleep 5
curl ${kafka_connect_url}/connectors/kafka_connect_ack/config -X PUT -H "Content-Type: application/json" -d '{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token_ack"'" ,
"splunk.sources": "kafka_connect_ack",
"splunk.hec.raw": "false",
"splunk.hec.json.event.formatted": "true",
"splunk.sourcetypes":"upgraded_test"
}'
sleep 5
# Generating 1000 events
python test/lib/eventproducer_connector_upgrade.py 2000 --log-level=INFO
# Check in splunk that we have recieved 2000 events for with ack and without ack tasks
python test/lib/connector_upgrade.py --log-level=INFO
- uses: actions/upload-artifact@v3
if: failure()
with:
name: kafka-connect-logs-${{ matrix.kafka_version }}
path: output.log

- name: Install kafka connect
run: |
Expand All @@ -207,3 +358,9 @@ jobs:
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
echo "Running functional tests....."
python -m pytest --log-level=INFO
- uses: actions/upload-artifact@v3
if: failure()
with:
name: splunk-events-${{ matrix.kafka_version }}
path: events.txt
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ Splunk Connect for Kafka is a Kafka Connect Sink for Splunk with the following f
* In-flight data transformation and enrichment.

## Requirements
1. Kafka version 1.0.0 and above.
1. Kafka version 1.0.0 and above.
* Tested with following versions: 1.1.1, 2.0.0, 2.1.0, 2.6.0, 2.7.1, 2.8.0, 3.0.0, 3.1.0, 3.3.1
2. Java 8 and above.
3. A Splunk environment of version 7.1 and above, configured with valid HTTP Event Collector (HEC) tokens.
3. A Splunk environment of version 8.0.0 and above, configured with valid HTTP Event Collector (HEC) tokens.

* HEC token settings should be the same on all Splunk Indexers and Heavy Forwarders in your environment.
* Task configuration parameters will vary depending on acknowledgement setting (See the [Configuration](#configuration) section for details).
Expand Down Expand Up @@ -161,6 +162,7 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.sources` | Splunk event source metadata for Kafka topic data. The same configuration rules as indexes can be applied. If left unconfigured, the default source binds to the HEC token. | `""` |
| `splunk.sourcetypes` | Splunk event sourcetype metadata for Kafka topic data. The same configuration rules as indexes can be applied here. If left unconfigured, the default sourcetype binds to the HEC token. | `""` |
| `splunk.flush.window` | The interval in seconds at which the events from kafka connect will be flushed to Splunk. | `30` |
| `splunk.validation.disable` | Disable validating splunk configurations before creating task. | `false` |
| `splunk.hec.ssl.validate.certs` | Valid settings are `true` or `false`. Enables or disables HTTPS certification validation. |`true`|
| `splunk.hec.http.keepalive` | Valid settings are `true` or `false`. Enables or disables HTTP connection keep-alive. |`true`|
| `splunk.hec.max.http.connection.per.channel` | Controls how many HTTP connections will be created and cached in the HTTP pool for one HEC channel. |`2`|
Expand All @@ -173,7 +175,7 @@ Use the below schema to configure Splunk Connect for Kafka
| `splunk.hec.json.event.formatted` | Set to `true` for events that are already in HEC format. Valid settings are `true` or `false`. |`false`|
| `splunk.hec.max.outstanding.events` | Maximum amount of un-acknowledged events kept in memory by connector. Will trigger back-pressure event to slow down collection if reached. | `1000000` |
| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` |
| `splunk.hec.backoff.threshhold.seconds` | The amount of time Splunk Connect for Kafka waits to attempt resending after errors from a HEC endpoint." | `60` |
| `splunk.hec.backoff.threshhold.seconds` | The amount of duration the Indexer object will be stopped after getting error code while posting the data.</br> **NOTE:** <br/> Other Indexer won't get affected." | `60` |
| `splunk.hec.lb.poll.interval` | Specify this parameter(in seconds) to control the polling interval(increase to do less polling, decrease to do more frequent polling, set `-1` to disable polling) | `120` |
| `splunk.hec.enable.compression` | Valid settings are true or false. Used for enable or disable gzip-compression. |`false`|
### Acknowledgement Parameters
Expand Down Expand Up @@ -230,7 +232,14 @@ Use the below schema to configure Splunk Connect for Kafka
|-------- |----------------------------|-----------------------|
| `enable.timestamp.extraction` | To enable timestamp extraction ,set the value of this field to `true`. <br/> **NOTE:** <br/> Applicable only if `splunk.hec.raw` is `false` | `false` |
| `timestamp.regex` | Regex for timestamp extraction. <br/> **NOTE:** <br/> Regex must have name captured group `"time"` For eg.: `\\\"time\\\":\\s*\\\"(?<time>.*?)\"` | `""` |
| `timestamp.format` | Time-format for timestamp extraction .<br/>For eg.: <br/>If timestamp is `1555209605000` , set `timestamp.format` to `"epoch"` format .<br/> If timestamp is `Jun 13 2010 23:11:52.454 UTC` , set `timestamp.format` to `"MMM dd yyyy HH:mm:ss.SSS zzz"` | `""` |
| `timestamp.format` | Time-format for timestamp extraction .<br/>For eg.: <br/>If timestamp is `1555209605000` , set `timestamp.format` to `"epoch"` format.<br/> If timestamp is `Jun 13 2010 23:11:52.454 UTC` , set `timestamp.format` to `"MMM dd yyyy HH:mm:ss.SSS zzz".`. <br/> If timestamp is in ISO8601 format `2022-03-29'T'23:11:52.054` , set `timestamp.format` to `"yyyy-MM-dd'\''T'\''HH:mm:ss.SSS"` | `""` |

### Out-of-band Health Checks and In-band Health Checks
| Health Checks | Description |
|-------- |----------------------------|
| `Out of band health check` | This health check targets Loadbalancer and aims to remove all the unhealthy channels from the pool; all unhealthy channels are released for the configurable period using the parameter `splunk.hec.lb.poll.interval`, Although this is configurable (by default 120 seconds), It may still get a 503 result code from the Splunk indexer. For that, there is another health check, and it can be called the in-band-health check. |
| `In band healthcheck` | This health check targets Indexer object while posting data. If an error code is received, then it will trigger this health check. When this check fails, It will Pause the indexing from the Particular Indexer object for a configurable time using the parameter `Splunk.hec.backoff.threshhold.seconds` and trigger backpressure handling So that event that could not be indexed will be retried again. |


## Load balancing

Expand Down
2 changes: 1 addition & 1 deletion dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.github.splunk.kafka.connect</groupId>
<artifactId>splunk-kafka-connect</artifactId>
<name>splunk-kafka-connect</name>
<version>v2.0.9</version>
<version>v2.1.1</version>
<build>
<plugins>
<plugin>
Expand Down
Loading

0 comments on commit 2aa7ab4

Please sign in to comment.