Skip to content

Fix: record tracker does not support concurrent event batch removal #315

Fix: record tracker does not support concurrent event batch removal

Fix: record tracker does not support concurrent event batch removal #315

Workflow file for this run

name: CI Build Test
on:
workflow_dispatch:
pull_request_target:
branches-ignore:
- /^release\/.*/
- master
workflow_call:
secrets:
FOSSA_API_KEY:
description: API token for FOSSA app
required: true
SEMGREP_PUBLISH_TOKEN:
description: Publish token for Semgrep
required: true
permissions:
checks: write
pull-requests: write
jobs:
workflow_approval:
name: Approve workflow
runs-on: ubuntu-20.04
environment: workflow-approval
steps:
- name: Approve workflow
run: echo For security reasons, all pull requests need to be approved first before running any automated CI.
fossa-scan:
continue-on-error: true
runs-on: ubuntu-latest
needs:
- workflow_approval
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: run fossa anlyze and create report
run: |
curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/fossas/fossa-cli/master/install-latest.sh | bash
fossa analyze --debug
fossa report attribution --format text > /tmp/THIRDPARTY
env:
FOSSA_API_KEY: ${{ secrets.FOSSA_API_KEY }}
- name: upload THIRDPARTY file
uses: actions/upload-artifact@v3
with:
name: THIRDPARTY
path: /tmp/THIRDPARTY
- name: run fossa test
run: |
fossa test --debug
env:
FOSSA_API_KEY: ${{ secrets.FOSSA_API_KEY }}
semgrep:
runs-on: ubuntu-latest
needs:
- workflow_approval
name: security-sast-semgrep
if: github.actor != 'dependabot[bot]'
container:
# A Docker image with Semgrep installed. Do not change this.
image: returntocorp/semgrep
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: Semgrep
id: semgrep
run: |
semgrep login
semgrep ci
env:
# Connect to Semgrep Cloud Platform through your SEMGREP_APP_TOKEN.
# Generate a token from Semgrep Cloud Platform > Settings
# and add it to your GitHub secrets.
SEMGREP_APP_TOKEN: ${{ secrets.SEMGREP_PUBLISH_TOKEN }}
build-unit-test:
name: build and run unit test
runs-on: ubuntu-20.04
needs:
- workflow_approval
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: Get maven dependencies
run: |
mvn dependency:go-offline
- name: Run Unit tests
run: |
mvn package -Dsurefire.useSystemClassLoader=false -q
cp -R target/splunk-kafka-connect*.jar /tmp
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: splunk-kafka-connector
path: /tmp/splunk-kafka-connect*.jar
- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
check_name: Unit Test Results
files: "target/surefire-reports/*.xml"
e2e_test:
name: e2e test - kafka version-${{ matrix.kafka_version }}
runs-on: ubuntu-20.04
needs:
- build-unit-test
strategy:
fail-fast: false
matrix:
include:
- kafka_version: "1.1.1"
kafka_package: "kafka_2.11-1.1.1.tgz"
- kafka_version: "2.0.0"
kafka_package: "kafka_2.11-2.0.0.tgz"
- kafka_version: "2.1.0"
kafka_package: "kafka_2.12-2.1.0.tgz"
- kafka_version: "2.6.0"
kafka_package: "kafka_2.13-2.6.0.tgz"
- kafka_version: "2.7.1"
kafka_package: "kafka_2.13-2.7.1.tgz"
- kafka_version: "2.8.0"
kafka_package: "kafka_2.13-2.8.0.tgz"
- kafka_version: "3.0.0"
kafka_package: "kafka_2.13-3.0.0.tgz"
- kafka_version: "3.1.0"
kafka_package: "kafka_2.13-3.1.0.tgz"
- kafka_version: "3.3.1"
kafka_package: "kafka_2.13-3.3.1.tgz"
- kafka_version: "3.4.1"
kafka_package: "kafka_2.13-3.4.1.tgz"
- kafka_version: "3.5.1"
kafka_package: "kafka_2.13-3.5.1.tgz"
env:
CI_SPLUNK_VERSION: "9.0.2"
CI_SPLUNK_FILENAME: splunk-9.0.2-17e00c557dc1-Linux-x86_64.tgz
CI_SPLUNK_HOST: 127.0.0.1
CI_SPLUNK_PORT: 8089
CI_SPLUNK_USERNAME: admin
CI_SPLUNK_HEC_TOKEN: a6b5e77f-d5f6-415a-bd43-930cecb12959
CI_SPLUNK_HEC_TOKEN_ACK: a6b5e77f-d5f6-415a-bd43-930cecb12950
CI_SPLUNK_PASSWORD: helloworld
CI_INDEX_EVENTS: main
CI_KAFKA_HEADER_INDEX: kafka
CI_DATAGEN_IMAGE: rock1017/log-generator:latest
CI_OLD_CONNECTOR_VERSION: v2.0.1
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{github.event.pull_request.head.sha}}
repository: ${{github.event.pull_request.head.repo.full_name}}
- name: Install Splunk
run: |
cd /opt && wget -O $CI_SPLUNK_FILENAME 'https://d7wz6hmoaavd0.cloudfront.net/products/splunk/releases/'$CI_SPLUNK_VERSION'/linux/'$CI_SPLUNK_FILENAME''
sudo tar xzvf $CI_SPLUNK_FILENAME
# Set user seed
hashed_pwd=$(sudo /opt/splunk/bin/splunk hash-passwd $CI_SPLUNK_PASSWORD)
# sudo tee /opt/splunk/etc/system/local/user-seed.conf > /dev/null \<< EOF
sudo tee /opt/splunk/etc/system/local/user-seed.conf <<EOF >/dev/null
[user_info]
USERNAME = $CI_SPLUNK_USERNAME
HASHED_PASSWORD = $hashed_pwd
EOF
# Add delete capability to admin role
sudo tee /opt/splunk/etc/system/local/authorize.conf <<EOF >/dev/null
[role_admin]
delete_by_keyword = enabled
EOF
sudo /opt/splunk/bin/splunk start --accept-license --answer-yes --no-prompt
# Enable HEC services
curl -X POST -u $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD -k https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT/servicesNS/nobody/splunk_httpinput/data/inputs/http/http/enable
# Create new HEC token
curl -X POST -u $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD -k -d "name=splunk_hec_token&token=$CI_SPLUNK_HEC_TOKEN" https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT/servicesNS/nobody/splunk_httpinput/data/inputs/http
# Enable HEC new-token
sudo /opt/splunk/bin/splunk http-event-collector enable -name splunk_hec_token -uri https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT -auth $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD
# Create new HEC token with ack
curl -X POST -u $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD -k -d "name=splunk_hec_token_ack&token=$CI_SPLUNK_HEC_TOKEN_ACK&useACK=1" https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT/servicesNS/nobody/splunk_httpinput/data/inputs/http
# Enable HEC new-token
sudo /opt/splunk/bin/splunk http-event-collector enable -name splunk_hec_token_ack -uri https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT -auth $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD
# Setup Indexes
curl -X POST -u $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD -k -d "name=$CI_INDEX_EVENTS&datatype=event" https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT/servicesNS/-/search/data/indexes
curl -X POST -u $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD -k -d "name=$CI_KAFKA_HEADER_INDEX&datatype=event" https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT/servicesNS/-/search/data/indexes
# Restart Splunk
curl -k -u $CI_SPLUNK_USERNAME:$CI_SPLUNK_PASSWORD https://$CI_SPLUNK_HOST:$CI_SPLUNK_PORT/services/server/control/restart -X POST
- name: Install Kafka ${{ matrix.kafka_version }}
run: |
cd /tmp && wget https://archive.apache.org/dist/kafka/${{ matrix.kafka_version }}/${{ matrix.kafka_package }}
sudo tar xzf ${{ matrix.kafka_package }}
rm ${{ matrix.kafka_package }}
sudo mv kafka_* /usr/local/kafka
cd /usr/local/kafka && ls
- name: Start ZooKeeper Server
working-directory: /usr/local/kafka
run: |
echo "Start ZooKeeper"
sudo bin/zookeeper-server-start.sh config/zookeeper.properties &
- name: Start Kafka Server
working-directory: /usr/local/kafka
run: |
echo "Start kafka server"
sudo bin/kafka-server-start.sh config/server.properties &
- uses: actions/setup-python@v4
with:
python-version: 3.11
check-latest: true
- name: Download artifact
uses: actions/download-artifact@v3
with:
name: splunk-kafka-connector
path: /tmp
- name: Up the Schema Registry
run: |
cd /tmp && wget https://packages.confluent.io/archive/7.1/confluent-community-7.1.1.tar.gz
sudo tar xzf confluent-community-7.1.1.tar.gz
cd confluent-7.1.1
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
- name: Register the protobuf schema
run: |
sleep 10
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schemaType\": \"PROTOBUF\",\"schema\": \"syntax = \\\"proto3\\\";\\npackage com.mycorp.mynamespace;\\n\\nmessage MyRecord {\\n string id = 1;\\n float amount = 2;\\n string customer_id = 3;\\n}\\n\"}" http://localhost:8081/subjects/prototopic-value/versions
- name: Test kafka connect upgrade
run: |
echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION
# Summary for the test
#1)We will deploy old kafka connector and create 2 tasks for that to check ack and without ack functionality
#2)then we will remove that old kafka connector and deploy new kafka connector with updation of two tasks
#3) Here in the updation we will check for the new functionality("splunk.hec.json.event.formatted" and "splunk.hec.raw") so that we can check if we can successfully upgrade the connector
#4)At last we will check if we have recieved 2000 events for both the tasks
sudo mkdir -p /usr/local/share/kafka/plugins/
wget https://github.com/splunk/kafka-connect-splunk/releases/download/$CI_OLD_CONNECTOR_VERSION/splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar
sudo cp splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar /usr/local/share/kafka/plugins/
sudo mkdir -p /usr/local/share/kafka-connector/
sudo cp /tmp/splunk-kafka-connect*.jar /usr/local/share/kafka-connector/
test /usr/local/share/kafka-connector/splunk-kafka-connect*.jar && echo /usr/local/share/kafka-connector/splunk-kafka-connect*.jar
sed -i 's/plugin\.path\=connectors\//plugin\.path\=\/usr\/local\/share\/kafka\/plugins\//' $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties
sed -i 's/key\.converter\=org\.apache\.kafka\.connect\.storage\.StringConverter/key\.converter\=org\.apache\.kafka\.connect\.json\.JsonConverter/' $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties
sed -i 's/value\.converter\=org\.apache\.kafka\.connect\.storage\.StringConverter/value\.converter\=org\.apache\.kafka\.connect\.json\.JsonConverter/' $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties
pip install --upgrade pip
pip install -r test/requirements.txt
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
echo "Test kafka connect upgrade ..."
source $GITHUB_WORKSPACE/test/config.sh
test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name
# Starting Old Connector
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
sleep 20
# Creating the two tasks (with ack and without ack)
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka_connect",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token"'" ,
"splunk.sources": "kafka_connect",
"splunk.hec.raw": "true",
"splunk.sourcetypes":"upgraded_test"
}
}'
sleep 5
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka_connect_ack",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token_ack"'" ,
"splunk.sources": "kafka_connect_ack",
"splunk.hec.raw": "true",
"splunk.sourcetypes":"upgraded_test"
}
}'
sleep 5
# Generating 1000 events
python test/lib/eventproducer_connector_upgrade.py 1000 --log-level=INFO
sudo kill $(sudo lsof -t -i:8083) && sleep 2
sudo rm $connector_path/$old_connector_name && sleep 2
sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2
# Starting New Connector
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
# Updating the two tasks (with ack and without ack)
sleep 10
curl ${kafka_connect_url}/connectors/kafka_connect/config -X PUT -H "Content-Type: application/json" -d '{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token"'" ,
"splunk.sources": "kafka_connect",
"splunk.hec.raw": "false",
"splunk.hec.json.event.formatted": "true",
"splunk.sourcetypes":"upgraded_test"
}'
sleep 5
curl ${kafka_connect_url}/connectors/kafka_connect_ack/config -X PUT -H "Content-Type: application/json" -d '{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token_ack"'" ,
"splunk.sources": "kafka_connect_ack",
"splunk.hec.raw": "false",
"splunk.hec.json.event.formatted": "true",
"splunk.sourcetypes":"upgraded_test"
}'
sleep 5
# Generating 1000 events
python test/lib/eventproducer_connector_upgrade.py 2000 --log-level=INFO
# Check in splunk that we have recieved 2000 events for with ack and without ack tasks
python test/lib/connector_upgrade.py --log-level=INFO
- uses: actions/upload-artifact@v3
if: failure()
with:
name: kafka-connect-logs-${{ matrix.kafka_version }}
path: output.log
- name: Install kafka connect
run: |
sudo kill $(sudo lsof -t -i:8083) && sleep 2
sudo rm -f /usr/local/share/kafka/plugins/splunk-kafka-connect*.jar
sudo cp /tmp/splunk-kafka-connect*.jar /usr/local/share/kafka/plugins/
sudo /usr/local/kafka/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties &
- name: Run functional tests
run: |
sleep 5
pip install --upgrade pip
pip install -r test/requirements.txt
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
echo "Running functional tests....."
python -m pytest --log-level=INFO
- uses: actions/upload-artifact@v3
if: failure()
with:
name: splunk-events-${{ matrix.kafka_version }}
path: events.txt