Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert activities via Kafka Connect #2736

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

Conversation

sausage-todd
Copy link
Contributor

@sausage-todd sausage-todd commented Dec 23, 2024

Summary by CodeRabbit

  • New Features

    • Introduced a queue-based processing mechanism for activities across various services.
    • Added new configuration files for Kafka Connect, enhancing data handling capabilities.
    • New services and volumes added to Docker Compose for improved service management.
  • Bug Fixes

    • Updated function signatures to include queue client parameters, ensuring proper activity processing.
  • Documentation

    • Added new configuration files detailing properties for Kafka Connect connectors.
  • Chores

    • Updated dependencies to include queue-related functionalities across multiple services.

Copy link

coderabbitai bot commented Dec 23, 2024

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This pull request introduces a comprehensive update to the activity processing system by integrating a queue-based mechanism across multiple services and components. The changes span from low-level data access layers to worker services, adding queue client functionality to activity insertion, movement, and update processes. The modifications include updating Docker Compose configurations, creating Kafka Connect configurations, and modifying various TypeScript files to support queue-based activity processing.

Changes

File Change Summary
backend/src/database/repositories/activityRepository.ts Updated to use QUEUE_CLIENT in activity insertion
backend/src/services/activityService.ts Modified import and method call to include queue client
scripts/scaffold.yaml Added Kafka service configuration and Kafka Connect service
services/libs/data-access-layer/src/activities/* Updated activity processing functions to support queue client
services/apps/entity_merging_worker/src/activities/* Modified activity movement functions to include queue parameter
services/apps/profiles_worker/src/activities/* Updated member update functions to incorporate queue

Sequence Diagram

sequenceDiagram
    participant Client
    participant ActivityService
    participant QueueClient
    participant DatabaseRepository
    participant KafkaConnect

    Client->>ActivityService: Request activity insertion
    ActivityService->>QueueClient: Enqueue activity
    QueueClient->>KafkaConnect: Send activity message
    KafkaConnect->>DatabaseRepository: Process and insert activity
    DatabaseRepository-->>KafkaConnect: Confirm insertion
    KafkaConnect-->>QueueClient: Acknowledge message
    QueueClient-->>ActivityService: Confirm processing
    ActivityService-->>Client: Return response
Loading

Possibly related PRs

Suggested Labels

feature, queue-processing, kafka-integration

Suggested Reviewers

  • themarolt

Poem

🐰 Queues are dancing, messages flow
Activities hop from to and fro
Kafka connects with graceful might
Processing data day and night
A rabbit's dream of smooth design! 🚀


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (12)
scripts/scaffold/kafka-connect/worker-local.properties (1)

11-13: Add monitoring and performance configurations

The configuration lacks monitoring settings and the offset flush interval might need tuning.

Add these configurations:

 offset.storage.file.filename=/storage/connect.offsets
 offset.flush.interval.ms=10000
 plugin.path=/usr/share/java,/usr/share/filestream-connectors,/usr/share/confluent-hub-components
+# Monitoring
+metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
+confluent.metrics.reporter.bootstrap.servers=kafka:9092
+confluent.metrics.reporter.topic.replicas=1
+# Performance tuning
+producer.batch.size=16384
+producer.linger.ms=100
+consumer.fetch.min.bytes=1
+consumer.fetch.max.wait.ms=500
scripts/scaffold/kafka-connect/build-docker-image.sh (2)

8-8: Add Docker daemon availability check

The script should verify Docker daemon availability before attempting to build.

Add this check before the build:

+if ! docker info > /dev/null 2>&1; then
+    echo "Error: Docker daemon is not running"
+    exit 1
+fi
+
 docker build -t "${TAG}" .

13-21: Consider non-interactive mode for CI/CD environments

The interactive prompt might block automated builds in CI/CD pipelines.

Consider adding a non-interactive mode:

-echo -n "Type 'y' and press Enter to push the image to the registry. Ctrl+C to cancel: "
-read -r PUSH
-if [ "${PUSH}" = "y" ]; then
+if [ "${AUTO_PUSH:-}" = "true" ] || [ "${1:-}" = "--push" ]; then
     echo "Pushing image to the registry..."
     echo "----------------------------------------"
     docker push "${TAG}"
 else
+    echo "Skipping push (use --push flag or set AUTO_PUSH=true to push automatically)"
-    echo "Skipping push"
 fi
services/libs/data-access-layer/src/activities/update.ts (2)

58-58: Optional: Expand comments for the new queueClient parameter.
A brief comment explaining how queueClient is used here would aid maintainability and help future contributors.

 export async function updateActivities(
   qdb: DbConnOrTx,
+  // queueClient enables insertion tasks to be queued for further processing
   queueClient: IQueue,
   ...
 ) {

67-67: Check for unnecessary concurrency blocking.
You’re calling insertActivities in a loop for each activity chunk, which might block if queue operations are slow. Confirm whether these insert calls need concurrency control or batching to improve performance.

services/libs/data-access-layer/src/activities/ilp.ts (3)

10-10: Logger naming is appropriate.
The logger is well-defined for insert-activities; consider also tagging messages with “ILP” or “Queue” for more clarity.


43-46: Potential for better batching of queue messages.
Sending each row individually can be slower than sending a batch if the queue supports bulk enqueues. Consider exploring that option if throughput is a concern.


48-48: Telemetry usage is helpful.
Tracking the number of inserted activities is great for monitoring throughput. Optionally, you could break it down by queue usage if needed.

services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts (1)

37-44: Function signature update to include queueClient
By extending the signature to accept a queueClient and passing it to updateActivities, you enable asynchronous or distributed task processing. This change is coherent with a queue-driven design and should help scale. Ensure that edge cases—such as queue unavailability or exceptions thrown by the queue—are handled gracefully upstream.

services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)

45-45: Updated function signature for moveActivitiesToNewMember.

Adding queueClient to the parameters aligns with queue-based processing. Ensure all existing callers provide a valid queueClient instance.


134-134: Passing queueClient to updateActivities.

Ensure errors during queue usage are caught and handled to avoid silent failures.

services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1)

21-21: Added queueClient to runMemberAffiliationsUpdate.

Passing queueClient into this function is consistent with the new queue-based architecture. Verify downstream calls handle it properly.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 730fcbb and 730a22c.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (21)
  • backend/src/database/repositories/activityRepository.ts (2 hunks)
  • backend/src/services/activityService.ts (2 hunks)
  • scripts/scaffold.yaml (3 hunks)
  • scripts/scaffold/kafka-connect/Dockerfile (1 hunks)
  • scripts/scaffold/kafka-connect/build-docker-image.sh (1 hunks)
  • scripts/scaffold/kafka-connect/console-local-sink.properties (1 hunks)
  • scripts/scaffold/kafka-connect/questdb-local-sink.properties (1 hunks)
  • scripts/scaffold/kafka-connect/worker-local.properties (1 hunks)
  • services/apps/entity_merging_worker/src/activities/members.ts (2 hunks)
  • services/apps/entity_merging_worker/src/activities/organizations.ts (1 hunks)
  • services/apps/entity_merging_worker/src/main.ts (1 hunks)
  • services/apps/profiles_worker/src/activities/member/memberUpdate.ts (1 hunks)
  • services/apps/profiles_worker/src/main.ts (1 hunks)
  • services/libs/data-access-layer/package.json (1 hunks)
  • services/libs/data-access-layer/src/activities/ilp.ts (1 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (3 hunks)
  • services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (4 hunks)
  • services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (3 hunks)
  • services/libs/queue/src/types.ts (1 hunks)
  • services/libs/queue/src/vendors/kafka/config.ts (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • scripts/scaffold/kafka-connect/console-local-sink.properties
🔇 Additional comments (25)
scripts/scaffold/kafka-connect/Dockerfile (2)

1-1: Version pinning strategy needs review

The Confluent Platform version 7.8.0-2 should be verified for compatibility with your Kafka cluster version and for any known security vulnerabilities.

Run this script to check for security advisories:

✅ Verification successful

Let me gather more information about the Kafka setup to make a more informed decision.


Let me check the docker-compose file to understand the Kafka setup better.


Version 7.8.0-2 of Confluent Platform appears compatible with the setup

The Kafka Connect image version appears suitable for the environment based on:

  • No security vulnerabilities found in GitHub's database for this version
  • The test environment uses Bitnami Kafka latest version, which is compatible with Confluent Platform 7.8.x
  • The Dockerfile installs recent versions of connectors (Snowflake 2.5.0 and QuestDB 0.12) which are compatible with CP 7.8.x
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for CVEs related to Confluent Platform 7.8.0
gh api graphql -f query='
{
  securityVulnerabilities(first: 5, ecosystem: MAVEN, package: "confluent-platform") {
    nodes {
      advisory {
        summary
        severity
        publishedAt
      }
      vulnerableVersionRange
      firstPatchedVersion {
        identifier
      }
    }
  }
}'

Length of output: 353


Script:

#!/bin/bash
# Check for Kafka version and configuration in the codebase
rg -l "kafka" --type yaml --type properties

Length of output: 78


Script:

#!/bin/bash
# Look for docker-compose or similar configuration files that might show Kafka version
fd -e yml -e yaml docker-compose

Length of output: 67


Script:

#!/bin/bash
# Check if there are any Kafka configuration files
fd -e properties kafka

Length of output: 24


Script:

#!/bin/bash
# Check the content of the Dockerfile for any additional context
cat scripts/scaffold/kafka-connect/Dockerfile

Length of output: 336


Script:

#!/bin/bash
# Check the docker-compose file for Kafka configuration
cat backend/docker-compose.test.yaml

Length of output: 1955


Script:

#!/bin/bash
# Look for any documentation about Kafka setup
fd README -e md | xargs cat

Length of output: 3307


Script:

#!/bin/bash
# Check if there are any other Kafka-related Dockerfiles
fd Dockerfile | xargs grep -l "kafka"

Length of output: 79


10-12: Review volume permissions for non-root user

The volume /storage is created while running as root but will be accessed by appuser. This might cause permission issues.

Add appropriate permissions:

 VOLUME /storage
+RUN mkdir -p /storage && chown appuser:appuser /storage
 
 USER appuser
scripts/scaffold/kafka-connect/questdb-local-sink.properties (1)

9-11: Verify timestamp handling configuration

The timestamp configuration needs verification:

  1. Ensure the format matches your activity data
  2. Verify timezone handling
  3. Consider adding error handling for invalid timestamps

Run this script to verify timestamp formats in your codebase:

✅ Verification successful

Based on the search results, I can provide a final response since I have enough information about timestamp handling in the codebase:

Timestamp configuration is correctly set for activity data

The timestamp configuration in the Kafka Connect properties matches the timestamp formats used throughout the codebase:

  • The timestamp.field.name=timestamp aligns with the activity timestamp field used consistently in the codebase
  • The timestamp.string.fields=createdAt,updatedAt correctly identifies date string fields
  • The timestamp.string.format=yyyy-MM-ddTHH:mm:ss.SSSZ matches the ISO-8601 format used throughout the codebase as evidenced by the widespread use of toISOString() for timestamps

The codebase consistently handles dates in UTC/ISO-8601 format with timezone information, which aligns with the configured format. Error handling for invalid timestamps is built into the platform's date handling through JavaScript's native Date object validation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for timestamp formats in activity-related files
rg -A 2 'new Date|toISOString|DateTime' 

Length of output: 68820

scripts/scaffold.yaml (2)

143-143: LGTM: Good practice using specific Kafka version

Using a specific version (bitnami/kafka:3.8.0-debian-12-r6) instead of latest is a good practice for reproducible builds.


169-172: Verify Kafka Connect configuration files

Please ensure that the mounted configuration files are properly configured:

  1. worker-local.properties should have correct Kafka bootstrap servers
  2. questdb-local-sink.properties should have proper QuestDB connection details

Let's verify the configuration files:

✅ Verification successful

Let me try a different approach to locate and verify these configuration files.


Configuration files are properly set up for Kafka Connect integration

The configuration files are correctly configured with all necessary parameters:

  • worker-local.properties:

    • Has correct bootstrap servers: kafka:9092
    • Proper converter settings for keys and values
    • Configured offset storage and plugin paths
  • questdb-local-sink.properties:

    • Proper QuestDB connection URL: questdb:9000
    • Correctly configured for the activities topic
    • Appropriate timestamp field configurations
    • Compatible converter settings
  • console-local-sink.properties:

    • Properly configured for debugging/monitoring
    • Set up for the same activities topic
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check Kafka Connect configuration files

echo "Checking worker configuration..."
if [ -f "scaffold/kafka-connect/worker-local.properties" ]; then
    grep -E "bootstrap.servers|group.id|key.converter|value.converter" scaffold/kafka-connect/worker-local.properties
fi

echo "Checking QuestDB sink configuration..."
if [ -f "scaffold/kafka-connect/questdb-local-sink.properties" ]; then
    grep -E "connection.url|connection.user|topics" scaffold/kafka-connect/questdb-local-sink.properties
fi

Length of output: 287


Script:

#!/bin/bash
# Search for Kafka Connect configuration files in the repository
echo "Searching for Kafka Connect configuration files..."
fd -t f "worker.*properties|questdb.*properties|console.*properties" 

# If files are found, let's check their content
echo -e "\nChecking files content if they exist:"
fd -t f "worker.*properties|questdb.*properties|console.*properties" -x cat {}

Length of output: 1740

services/libs/data-access-layer/src/activities/update.ts (2)

5-5: Queue interface import looks good.
The introduction of the “IQueue” interface is consistent with the new queue-based design.


25-32: Ensure error propagation and concurrency correctness in the Promise logic.
This Promise-based approach cleanly coordinates the stream completion with result resolution. However, be sure that if onActivity throws, it’s properly bubbled up and caught by the .catch(reject) block.

✅ Verification successful

Let me analyze the code further by checking the error handling implementation in this file and related stream processing logic.


Error handling is properly implemented in the Promise chain

The code correctly handles error propagation through the Promise chain:

  • The main Promise chain has a .catch(reject) that will handle any errors from the stream processing
  • Errors from onActivity will be automatically propagated through the async/await in the stream processing
  • The tryFinish coordination logic is correctly implemented to resolve only when both conditions are met
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Let's first see the complete implementation of this file to understand the error handling
rg -A 20 "return new Promise\(\(resolve, reject\)" services/libs/data-access-layer/src/activities/update.ts

Length of output: 590


Script:

#!/bin/bash
# Let's also check if there are any catch blocks or error handling patterns in this file
rg "catch|reject|throw" services/libs/data-access-layer/src/activities/update.ts

Length of output: 147


Script:

# Let's see if there are similar stream processing patterns in the codebase for comparison
ast-grep --pattern 'return new Promise((resolve, reject) => {
  $$$
  function tryFinish() {
    $$$
  }
  $$$
})'

Length of output: 2471

services/libs/data-access-layer/src/activities/ilp.ts (4)

1-5: Imports are aligned with the new queue architecture.
The usage of moment, getServiceChildLogger, and the new queue interfaces looks consistent.


13-13: Queue client parameter fosters modular design.
Exposing queueClient in the function signature effectively decouples the queue-based insertion from the rest of the data logic. Good move.


50-50: Return pattern is straightforward and correct.
Returning IDs from the inserted activities helps with further chaining if needed.


17-41: Validate partial attributes in activity objects.
You’re setting default fields for git and member-related attributes. Make sure this approach won’t override valid ‘falsey’ attributes that might be intentionally passed in as empty or null.

✅ Verification successful

Let me gather more information about how these activity attributes are handled throughout the codebase.


Let me try a different search pattern to find more specific information about activity attributes.


Let me try one more search with corrected patterns.


Based on the code analysis, I can now generate the final response:

The git-related attributes are safely handled in the code.
The code explicitly extracts git-related attributes before spreading the rest of the activity object, which prevents any unintended overrides. Additionally, the code has proper fallback handling with the || operator for insertions/additions, and direct access for other git attributes. The implementation in both ilp.ts and sql.ts shows consistent handling of these attributes, with no risk of losing falsey values.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for attribute handling patterns and their types
ast-grep --pattern 'activity.attributes'

# Search for activity interface or type definitions
ast-grep --pattern 'interface Activity'

# Look for tests that might show expected behavior with falsey values
rg "activity.*attributes" -A 5 -g "*.test.ts" -g "*.spec.ts"

Length of output: 65975


Script:

#!/bin/bash
# Search for activity type definitions and interfaces
rg -g "*.ts" "interface.*Activity.*{" -B 2 -A 10

# Search for tests related to activity attributes handling
rg -g "*.test.ts" -g "*.spec.ts" "attributes.*false" -A 5

Length of output: 1046


Script:

#!/bin/bash
# Search for activity interface definitions
rg -g "*.ts" "interface Activity" -B 2 -A 10

# Look for activity attributes handling in the data access layer
rg -g "*.ts" "activity\.attributes" -A 5 --glob="!*.test.ts" --glob="!*.spec.ts" -C 10

Length of output: 34288

services/apps/entity_merging_worker/src/main.ts (1)

29-31: Queue enabled setting is consistent with the new architecture.
Enabling the queue in the entity_merging_worker ensures it aligns with the rest of the system’s queue-based flow. Great addition.

services/apps/profiles_worker/src/main.ts (1)

28-30: Queue enabled for profiles worker.
This addition matches the entity_merging_worker’s queue settings, ensuring standardized usage across different workers.

services/apps/profiles_worker/src/activities/member/memberUpdate.ts (1)

13-18: Integration with svc.queue is consistent.
Passing svc.queue to runMemberAffiliationsUpdate ensures that any new affiliations or changes can be queued for downstream processing. Confirm that queue connectivity issues will be handled gracefully.

services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts (1)

2-2: Import of IQueue is consistent with queue-based approach
The import aligns well with the new queue usage pattern introduced in this PR, ensuring that the function can leverage the queue’s capabilities.

services/libs/queue/src/vendors/kafka/config.ts (1)

58-65: Introduction of ACTIVITIES_QUEUE_SETTINGS
Defining a dedicated queue configuration for activities makes sense, given the need to isolate activity-related events. If there's an expected higher throughput, consider customizing partition counts. Otherwise, this is a straightforward addition that fits consistently with existing settings.

services/libs/queue/src/types.ts (1)

80-80: New ACTIVITIES queue enumerated value
Adding 'ACTIVITIES' to the CrowdQueue enum is straightforward and integral to expanding the queue-based architecture for handling activity events.

services/apps/entity_merging_worker/src/activities/organizations.ts (1)

32-32: Passing queue parameter in moveActivitiesBetweenOrgs
Forwarding svc.queue to moveActivitiesToNewOrg ensures uniform usage of the queue for activity migrations. This keeps the workflow consistent with the broader queue integration approach. Verify the non-null status of svc.queue in upstream code to avoid potential runtime errors.

✅ Verification successful

Let me verify the non-null status of svc.queue by examining the code.


Let me check the service initialization and the actual implementation of moveActivitiesToNewOrg from the data access layer.


Let me try one final approach to check the service structure and queue usage.


Let me check the service initialization and the main file where svc is defined.


Let me check the implementation of moveActivitiesToNewOrg in the data access layer.


From the output, I can see that moveActivitiesToNewOrg is defined in the data access layer and explicitly requires a queue parameter in its signature. Let me check the service initialization to verify the queue setup.


Based on the code we've seen so far:

  1. The moveActivitiesToNewOrg function in the data access layer explicitly requires a queue parameter
  2. The service initialization in main.ts shows queue is enabled with queue: { enabled: true }
  3. The queue is being passed through svc.queue which is initialized as part of the ServiceWorker

Queue parameter is correctly passed and initialized
The queue parameter is properly initialized in the service and correctly passed to moveActivitiesToNewOrg. The service configuration shows queue is enabled, and the function signature in the data access layer explicitly requires it, making this a valid and necessary parameter.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Check the service initialization and queue setup
ast-grep --pattern 'class $_ {
  queue: $_
  $$$
}'

# Check how svc.queue is initialized
rg "queue.*=" -A 3 services/apps/entity_merging_worker/src

Length of output: 114


Script:

#!/bin/bash
# Check the moveActivitiesToNewOrg function definition and usage
ast-grep --pattern 'function moveActivitiesToNewOrg($_) {
  $$$
}'

# Look for any null checks on queue parameter
rg "moveActivitiesToNewOrg" -A 5 services/apps/entity_merging_worker/src

Length of output: 1406


Script:

#!/bin/bash
# Check the service initialization
rg "class.*Service" -A 10 services/apps/entity_merging_worker/src

# Check the moveActivitiesToNewOrg implementation in data access layer
fd moveActivitiesToNewOrg --exec cat {}

Length of output: 111


Script:

#!/bin/bash
# Look for service type definition
rg "type.*Service" -A 5 services/apps/entity_merging_worker/src

# Check imports and usage of queue in the organizations file
rg "import.*queue" -A 2 services/apps/entity_merging_worker/src/activities/organizations.ts

# Check the entire organizations.ts file for context
cat services/apps/entity_merging_worker/src/activities/organizations.ts

Length of output: 3987


Script:

#!/bin/bash
# Check the main file where svc is initialized
cat services/apps/entity_merging_worker/src/main.ts

# Check the service type definition if it exists in a types file
fd "types?.ts" --exec cat {} services/apps/entity_merging_worker/src

Length of output: 73446


Script:

#!/bin/bash
# Look for moveActivitiesToNewOrg implementation
rg "moveActivitiesToNewOrg" -A 10 -B 2

# Check if queue is used in any other similar functions
rg "queue.*moveActivities" -A 5

Length of output: 4090


Script:

#!/bin/bash
# Check the service worker options and queue configuration
rg "queue.*enabled" -A 5 services/apps/entity_merging_worker/src

# Check the ServiceWorker implementation for queue initialization
fd ServiceWorker --exec cat {}

Length of output: 101

services/apps/entity_merging_worker/src/activities/members.ts (1)

70-70: Handle potential queue unavailability gracefully.

Similar to the previous function call, ensure that passing svc.queue to moveIdentityActivitiesToNewMember is accompanied by robust error handling or logging, in case svc.queue is not properly initialized.

services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)

2-2: Import of IQueue looks good.

No issues here; the import is consistent with the approach of injecting a queue dependency.


125-125: Revised function signature for moveIdentityActivitiesToNewMember.

Similar to moveActivitiesToNewMember, adding queueClient aligns with the move to queue-based patterns. Confirm that code and tests reflect these changes thoroughly.

services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1)

6-6: Imported IQueue to support queue-based insertion.

Looks good. Make sure the type definition suits any advanced queue usage patterns you may introduce.

backend/src/database/repositories/activityRepository.ts (1)

17-18: Queue client import introduced via QUEUE_CLIENT.

This is consistent with queue-based activity insertion. Confirm that QUEUE_CLIENT is correctly initialized in all environments.

✅ Verification successful

Let me verify the initialization of QUEUE_CLIENT by first checking its implementation and usage across the codebase.


Let me check the implementation details of the queue service to verify its initialization.


Let me check the queue configuration that's being used to initialize the queue client.


Let me check the configuration implementation and environment-specific settings.


Based on the analysis of the codebase, I can now provide a final response:

QUEUE_CLIENT initialization is properly configured across environments.

The queue client is correctly initialized through the QueueFactory.createQueueService(QUEUE_CONFIG) using environment-specific configurations loaded via the config package. The configuration is properly handled across all environments (test, development, staging, production) through NODE_ENV environment variable, and the queue service is consistently initialized across the application.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find the implementation of QUEUE_CLIENT
rg "QUEUE_CLIENT" -B 3 -A 3

# Find the queue service implementation
fd queueService

Length of output: 5823


Script:

#!/bin/bash
# Check the queue service implementation
cat backend/src/serverless/utils/queueService.ts

# Check for environment configurations
rg "IS_TEST_ENV|IS_DEV_ENV" -A 3 backend/src/serverless/utils/queueService.ts

Length of output: 3853


Script:

#!/bin/bash
# Check the queue configuration
rg "QUEUE_CONFIG" -B 3 -A 3

# Check the configuration file
fd "conf.(ts|js)" --exec cat {}

Length of output: 50757


Script:

#!/bin/bash
# Check the configuration implementation in backend
cat backend/src/conf/index.ts

# Check for environment-specific configurations
rg "IS_TEST_ENV|IS_DEV_ENV|IS_PROD_ENV" -A 3 backend/src/conf/index.ts

Length of output: 7387

backend/src/services/activityService.ts (1)

33-33: Ensure proper queue client configuration.
Importing QUEUE_CLIENT indicates that we are now passing a queue instance to the activity insertion process. Verify that the queue service is properly configured and handles failures or reconnection scenarios gracefully in production environments.

Run the following script to locate other references to QUEUE_CLIENT and confirm that error handling is present:

✅ Verification successful

Let me try a different search pattern to find QUEUE_CLIENT usage:


Let me check the queue service implementation to verify error handling:


Let me check the queue configuration and implementation from the common services package:


Let me check the queue configuration and implementation details:


Based on the comprehensive analysis of the queue implementation, I can now provide the final response:

Queue client implementation is properly configured with error handling and reconnection

The QUEUE_CLIENT is implemented using a robust Kafka-based queue service that includes:

  • Automatic retries with exponential backoff (8 retries starting at 100ms)
  • Proper client initialization with logging
  • Error handling through the Kafka client configuration
  • Connection management through the QueueFactory
  • Integration with Redis for additional reliability
  • Priority-based message handling through QueuePriorityContextLoader
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all references to QUEUE_CLIENT across the codebase
ast-grep --pattern $'QUEUE_CLIENT($_)' 

Length of output: 1501


Script:

#!/bin/bash
# Search for QUEUE_CLIENT references and imports
rg "QUEUE_CLIENT" -A 3

# Also search for queue service configuration
rg "queueService" -A 3

Length of output: 6737


Script:

#!/bin/bash
# Check queue service implementation
cat backend/src/serverless/utils/queueService.ts

Length of output: 3773


Script:

#!/bin/bash
# Check queue configuration
cat backend/src/conf/index.ts | grep -A 10 "QUEUE_CONFIG"

# Check if we can see the queue implementation
rg "QueueFactory" -A 5

Length of output: 44762


Script:

#!/bin/bash
# Check the queue implementation from common packages
rg -g "*.ts" "class QueueFactory" -A 10
rg -g "*.ts" "interface IQueue" -A 10

Length of output: 4781

services/libs/data-access-layer/package.json (1)

17-17: Pin or verify the workspace dependency.
Using "workspace:*" requires diligence in version management. Make sure that any local modifications in the "@crowd/queue" package are compatible with production builds and won’t introduce breaking changes.

✅ Verification successful

Let me gather information about the workspace dependencies and version management in the repository.


Let me check the version of @crowd/queue package and its usage pattern.


Let me try one more search to locate the queue package and its structure.


Let me check the queue package contents directly.


Based on the gathered information, I can now provide the final response:

Workspace dependency pattern is consistent across the codebase
The use of workspace:* for @crowd/queue follows the established pattern in the monorepo. The package is used by multiple services, and all of them consistently use the workspace:* dependency specification. The @crowd/queue package itself follows the same pattern for its internal dependencies (@crowd/common, @crowd/logging, @crowd/types). This consistency in version management across the workspace suggests a deliberate architectural decision for maintaining internal package dependencies.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Check package.json files for workspace dependencies and version patterns
fd package.json --exec cat {} \; | jq -r 'select(.dependencies."@crowd/queue" != null) | "\(.name): @crowd/queue version: \(.dependencies."@crowd/queue")"'

# Look for workspace configuration
fd "package.json|pnpm-workspace.yaml|lerna.json" --max-depth 1 --exec cat {} \;

Length of output: 980


Script:

#!/bin/bash
# Check the package.json of @crowd/queue package
fd queue/package.json --exec cat {} \;

# Look for any version constraints or release tags
git tag | grep "@crowd/queue"

Length of output: 73


Script:

#!/bin/bash
# Find the queue package location
fd -t d -d 3 queue

# Check if there are any changelog or version files
rg -g '!node_modules' -g '!dist' -l "queue.*version|CHANGELOG" 

Length of output: 106


Script:

#!/bin/bash
# Check the package.json in the queue directory
cat services/libs/queue/package.json

# Look for any version-related changes in recent commits
git log -n 5 services/libs/queue/package.json

Length of output: 584

Comment on lines +5 to +8
RUN yum install -y jq findutils unzip

RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize Dockerfile layers and security

  1. Combine RUN commands to reduce image layers and cleanup package manager cache
  2. Pin specific versions of system packages for reproducibility
  3. Verify connector versions for compatibility

Consider this optimization:

-RUN yum install -y jq findutils unzip
-
-RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
-RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
+RUN yum install -y jq-1.6-* findutils-4.6.0-* unzip-6.0-* \
+    && yum clean all \
+    && rm -rf /var/cache/yum \
+    && confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt \
+    && confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +1 to +5
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and monitoring configurations

The connector configuration lacks essential error handling and monitoring settings.

Add these configurations:

 name=questdb-sink
 client.conf.string=http::addr=questdb:9000;
 topics=activities
 table=activities
 connector.class=io.questdb.kafka.QuestDBSinkConnector
+errors.tolerance=all
+errors.deadletterqueue.topic.name=dlq-activities
+errors.deadletterqueue.topic.replication.factor=1
+errors.deadletterqueue.context.headers.enable=true
+errors.log.enable=true
+errors.log.include.messages=true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-activities
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true

Comment on lines +1 to +2
bootstrap.servers=kafka:9092
group.id=kafka-connect
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add security configurations for Kafka connection

The current configuration lacks security settings for Kafka connection.

Add these security configurations:

 bootstrap.servers=kafka:9092
 group.id=kafka-connect
+security.protocol=SSL
+ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
+ssl.truststore.password=${file:/etc/kafka/secrets/connect-secrets:truststore-password}
+ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
+ssl.keystore.password=${file:/etc/kafka/secrets/connect-secrets:keystore-password}
+ssl.key.password=${file:/etc/kafka/secrets/connect-secrets:key-password}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +5 to +6
TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
readonly TAG
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Parameterize the registry path for better flexibility

The registry path sjc.ocir.io/axbydjxa5zuh is hardcoded, which could cause issues when deploying to different environments.

Consider using environment variables:

-TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
+REGISTRY_PATH="${REGISTRY_PATH:-sjc.ocir.io/axbydjxa5zuh}"
+TAG="${REGISTRY_PATH}/kafka-connect:$(date +%s)"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
readonly TAG
REGISTRY_PATH="${REGISTRY_PATH:-sjc.ocir.io/axbydjxa5zuh}"
TAG="${REGISTRY_PATH}/kafka-connect:$(date +%s)"
readonly TAG

Comment on lines +164 to +180
kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
entrypoint:
- connect-standalone
- /etc/kafka-connect/worker-local.properties
- /etc/kafka-connect/console-local-sink.properties
- /etc/kafka-connect/questdb-local-sink.properties
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
networks:
- crowd-bridge

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add essential service configurations for Kafka Connect

The Kafka Connect service configuration needs improvements for production readiness:

  1. No health check defined
  2. No resource limits set
  3. No explicit dependency on Kafka
  4. Using standalone mode (not recommended for production)

Consider these improvements:

   kafka-connect:
     build:
       context: scaffold/kafka-connect
     restart: unless-stopped
+    depends_on:
+      kafka:
+        condition: service_started
+    healthcheck:
+      test: ["CMD-SHELL", "curl -f http://localhost:8083/connectors || exit 1"]
+      interval: 30s
+      timeout: 10s
+      retries: 3
+      start_period: 30s
+    deploy:
+      resources:
+        limits:
+          memory: 1G
+        reservations:
+          memory: 512M
     entrypoint:
       - connect-standalone

Also, consider switching to distributed mode for better scalability:

     entrypoint:
-      - connect-standalone
+      - connect-distributed
       - /etc/kafka-connect/worker-local.properties
-      - /etc/kafka-connect/console-local-sink.properties
-      - /etc/kafka-connect/questdb-local-sink.properties
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
entrypoint:
- connect-standalone
- /etc/kafka-connect/worker-local.properties
- /etc/kafka-connect/console-local-sink.properties
- /etc/kafka-connect/questdb-local-sink.properties
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
networks:
- crowd-bridge
kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
depends_on:
kafka:
condition: service_started
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8083/connectors || exit 1"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 1G
reservations:
memory: 512M
entrypoint:
- connect-distributed
- /etc/kafka-connect/worker-local.properties
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
networks:
- crowd-bridge

@@ -38,7 +38,7 @@ export async function moveActivitiesBetweenMembers(
if (!memberExists) {
return
}
await moveActivitiesToNewMember(svc.questdbSQL, primaryId, secondaryId, tenantId)
await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the code analysis, I can now provide the final response:

Queue error handling needs to be improved in activity merging flow

The implementation in insertActivities uses QueueEmitter to send messages but doesn't have proper error handling for queue failures. While the queue client is properly typed as IQueue, the actual error handling for queue unavailability is missing. Specific issues:

  • No try-catch block around emitter.sendMessage() calls
  • No fallback mechanism if queue operations fail
  • No logging of queue-related errors
🔗 Analysis chain

Ensure proper error handling if the queue is unavailable.

You are now passing svc.queue to moveActivitiesToNewMember. Confirm that any internal logic gracefully handles a null or undefined queue. Consider adding logging or a fallback if queue usage fails.


Let me gather information about the error handling in the moveActivitiesToNewMember function and its queue usage.


Let me check the implementation details of the function to understand how it handles the queue parameter.


Let me check the implementation of the updateActivities function since it's the one actually using the queue parameter.


Let me check the implementation of insertActivities to see how it handles the queue.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find the implementation of moveActivitiesToNewMember function
ast-grep --pattern 'function moveActivitiesToNewMember($_) {
  $$$
}'

# Also search for any direct references to this function
rg "moveActivitiesToNewMember" -A 5

Length of output: 1977


Script:

#!/bin/bash
# Get the implementation of moveActivitiesToNewMember from the data-access-layer
cat services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts

Length of output: 4938


Script:

#!/bin/bash
# Get the implementation of updateActivities function
cat services/libs/data-access-layer/src/activities/update.ts

Length of output: 2103


Script:

#!/bin/bash
# Get the implementation of insertActivities function
cat services/libs/data-access-layer/src/activities/ilp.ts

Length of output: 2521

primaryId: string,
secondaryId: string,
tenantId: string,
) {
await updateActivities(
qdb,
queueClient,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the code analysis, I can now provide the final response:

Queue error handling needs improvement in activity updates

The current implementation of updateActivities and its dependencies shows insufficient error handling for queue operations:

  • The streamActivities function properly handles stream errors through Promise rejection
  • However, insertActivities uses QueueEmitter without try-catch blocks around queue operations
  • While there's telemetry tracking (telemetry.increment), there's no explicit error logging for queue failures

Suggested improvements:

  • Add try-catch blocks around queue operations in insertActivities
  • Implement error logging using the existing logger instance
  • Consider adding retry mechanisms for failed queue operations
🔗 Analysis chain

Inclusion of queueClient in updateActivities.

Check that updateActivities handles queueClient errors and logs failures, if any, to maintain a reliable queue-based system.


Let me search for the implementation of updateActivities to verify how it handles queueClient errors.


Let me check the implementation of updateActivities and streamActivities to see how errors are handled.


Let me check the implementation of insertActivities to see how it handles queue-related errors.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for updateActivities implementation
ast-grep --pattern 'async function updateActivities($$$) {
  $$$
}'

# Also search for any function declaration containing updateActivities
rg -A 10 "updateActivities.*\(" 

Length of output: 6222


Script:

#!/bin/bash
# Get the content of the update.ts file
cat services/libs/data-access-layer/src/activities/update.ts

# Also search for any error handling patterns in streamActivities
rg -A 10 "streamActivities.*\(" 

Length of output: 4051


Script:

#!/bin/bash
# Search for insertActivities implementation
cat services/libs/data-access-layer/src/activities/ilp.ts

# Also search for any error handling patterns in insertActivities
rg -A 10 "insertActivities.*\(" 

Length of output: 10777

@@ -154,7 +156,7 @@ export async function runMemberAffiliationsUpdate(
}
}

await insertActivities([activity], true)
await insertActivities(queueClient, [activity], true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Error handling is missing in the queue-based activity insertion

The implementation of insertActivities shows that while it uses queueClient to send messages through QueueEmitter, it lacks proper error handling and retry mechanisms. Here are the specific concerns:

  • No try-catch blocks around emitter.sendMessage() calls
  • No retry logic for failed queue operations
  • Basic telemetry exists but no error logging for failed operations
🔗 Analysis chain

insertIfMatches now leverages queueClient.

This is a key integration point for activity insertion via queue. Consider adding logging or retry logic to handle insertion failures.


Let me verify the integration point for activity insertion by first checking the implementation of insertActivities function and its usage of queueClient.


Let me check the implementation of insertActivities in the data-access-layer to verify the queueClient integration and error handling.

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for insertActivities function definition
ast-grep --pattern 'function insertActivities($$$) {
  $$$
}'

# Also search for any direct references to this function
rg "insertActivities" -A 3

Length of output: 5353


Script:

#!/bin/bash
# Check the implementation of insertActivities in the data-access-layer
cat services/libs/data-access-layer/src/activities/ilp.ts

Length of output: 2521

@@ -58,7 +60,7 @@ class ActivityRepository {
data.platform = data.platform.toLowerCase()
}

const ids = await insertActivities([
const ids = await insertActivities(QUEUE_CLIENT(), [
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Passing QUEUE_CLIENT() to insertActivities.

Good shift toward a queue-based approach. Consider robust error-handling for partial insertions or queue outages, potentially logging failures or implementing rollback logic.

@@ -181,7 +181,7 @@ export default class ActivityService extends LoggerBase {
)

record = await ActivityRepository.create(data, repositoryOptions)
await insertActivities([{ ...data, id: record.id }], true)
await insertActivities(QUEUE_CLIENT(), [{ ...data, id: record.id }], true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling or retry logic for queue insertion.
The call to “insertActivities(QUEUE_CLIENT(), …)” doesn’t appear to have explicit fallback logic if the queue is unreachable or insertion fails. Consider adding retry logic or error handling to ensure critical data isn’t lost when insertion fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant