-
Notifications
You must be signed in to change notification settings - Fork 740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Insert activities via Kafka Connect #2736
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request introduces a comprehensive update to the activity processing system by integrating a queue-based mechanism across multiple services and components. The changes span from low-level data access layers to worker services, adding queue client functionality to activity insertion, movement, and update processes. The modifications include updating Docker Compose configurations, creating Kafka Connect configurations, and modifying various TypeScript files to support queue-based activity processing. Changes
Sequence DiagramsequenceDiagram
participant Client
participant ActivityService
participant QueueClient
participant DatabaseRepository
participant KafkaConnect
Client->>ActivityService: Request activity insertion
ActivityService->>QueueClient: Enqueue activity
QueueClient->>KafkaConnect: Send activity message
KafkaConnect->>DatabaseRepository: Process and insert activity
DatabaseRepository-->>KafkaConnect: Confirm insertion
KafkaConnect-->>QueueClient: Acknowledge message
QueueClient-->>ActivityService: Confirm processing
ActivityService-->>Client: Return response
Possibly related PRs
Suggested Labels
Suggested Reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
We'll be sending kafka messages when inserting activities
33309b3
to
730a22c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Nitpick comments (12)
scripts/scaffold/kafka-connect/worker-local.properties (1)
11-13
: Add monitoring and performance configurationsThe configuration lacks monitoring settings and the offset flush interval might need tuning.
Add these configurations:
offset.storage.file.filename=/storage/connect.offsets offset.flush.interval.ms=10000 plugin.path=/usr/share/java,/usr/share/filestream-connectors,/usr/share/confluent-hub-components +# Monitoring +metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter +confluent.metrics.reporter.bootstrap.servers=kafka:9092 +confluent.metrics.reporter.topic.replicas=1 +# Performance tuning +producer.batch.size=16384 +producer.linger.ms=100 +consumer.fetch.min.bytes=1 +consumer.fetch.max.wait.ms=500scripts/scaffold/kafka-connect/build-docker-image.sh (2)
8-8
: Add Docker daemon availability checkThe script should verify Docker daemon availability before attempting to build.
Add this check before the build:
+if ! docker info > /dev/null 2>&1; then + echo "Error: Docker daemon is not running" + exit 1 +fi + docker build -t "${TAG}" .
13-21
: Consider non-interactive mode for CI/CD environmentsThe interactive prompt might block automated builds in CI/CD pipelines.
Consider adding a non-interactive mode:
-echo -n "Type 'y' and press Enter to push the image to the registry. Ctrl+C to cancel: " -read -r PUSH -if [ "${PUSH}" = "y" ]; then +if [ "${AUTO_PUSH:-}" = "true" ] || [ "${1:-}" = "--push" ]; then echo "Pushing image to the registry..." echo "----------------------------------------" docker push "${TAG}" else + echo "Skipping push (use --push flag or set AUTO_PUSH=true to push automatically)" - echo "Skipping push" fiservices/libs/data-access-layer/src/activities/update.ts (2)
58-58
: Optional: Expand comments for the new queueClient parameter.
A brief comment explaining how queueClient is used here would aid maintainability and help future contributors.export async function updateActivities( qdb: DbConnOrTx, + // queueClient enables insertion tasks to be queued for further processing queueClient: IQueue, ... ) {
67-67
: Check for unnecessary concurrency blocking.
You’re calling insertActivities in a loop for each activity chunk, which might block if queue operations are slow. Confirm whether these insert calls need concurrency control or batching to improve performance.services/libs/data-access-layer/src/activities/ilp.ts (3)
10-10
: Logger naming is appropriate.
The logger is well-defined for insert-activities; consider also tagging messages with “ILP” or “Queue” for more clarity.
43-46
: Potential for better batching of queue messages.
Sending each row individually can be slower than sending a batch if the queue supports bulk enqueues. Consider exploring that option if throughput is a concern.
48-48
: Telemetry usage is helpful.
Tracking the number of inserted activities is great for monitoring throughput. Optionally, you could break it down by queue usage if needed.services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts (1)
37-44
: Function signature update to include queueClient
By extending the signature to accept a queueClient and passing it to updateActivities, you enable asynchronous or distributed task processing. This change is coherent with a queue-driven design and should help scale. Ensure that edge cases—such as queue unavailability or exceptions thrown by the queue—are handled gracefully upstream.services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)
45-45
: Updated function signature for moveActivitiesToNewMember.Adding queueClient to the parameters aligns with queue-based processing. Ensure all existing callers provide a valid queueClient instance.
134-134
: Passing queueClient to updateActivities.Ensure errors during queue usage are caught and handled to avoid silent failures.
services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1)
21-21
: Added queueClient to runMemberAffiliationsUpdate.Passing queueClient into this function is consistent with the new queue-based architecture. Verify downstream calls handle it properly.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (21)
backend/src/database/repositories/activityRepository.ts
(2 hunks)backend/src/services/activityService.ts
(2 hunks)scripts/scaffold.yaml
(3 hunks)scripts/scaffold/kafka-connect/Dockerfile
(1 hunks)scripts/scaffold/kafka-connect/build-docker-image.sh
(1 hunks)scripts/scaffold/kafka-connect/console-local-sink.properties
(1 hunks)scripts/scaffold/kafka-connect/questdb-local-sink.properties
(1 hunks)scripts/scaffold/kafka-connect/worker-local.properties
(1 hunks)services/apps/entity_merging_worker/src/activities/members.ts
(2 hunks)services/apps/entity_merging_worker/src/activities/organizations.ts
(1 hunks)services/apps/entity_merging_worker/src/main.ts
(1 hunks)services/apps/profiles_worker/src/activities/member/memberUpdate.ts
(1 hunks)services/apps/profiles_worker/src/main.ts
(1 hunks)services/libs/data-access-layer/package.json
(1 hunks)services/libs/data-access-layer/src/activities/ilp.ts
(1 hunks)services/libs/data-access-layer/src/activities/update.ts
(3 hunks)services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
(4 hunks)services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(3 hunks)services/libs/queue/src/types.ts
(1 hunks)services/libs/queue/src/vendors/kafka/config.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- scripts/scaffold/kafka-connect/console-local-sink.properties
🔇 Additional comments (25)
scripts/scaffold/kafka-connect/Dockerfile (2)
1-1
: Version pinning strategy needs review
The Confluent Platform version 7.8.0-2 should be verified for compatibility with your Kafka cluster version and for any known security vulnerabilities.
Run this script to check for security advisories:
✅ Verification successful
Let me gather more information about the Kafka setup to make a more informed decision.
Let me check the docker-compose file to understand the Kafka setup better.
Version 7.8.0-2 of Confluent Platform appears compatible with the setup
The Kafka Connect image version appears suitable for the environment based on:
- No security vulnerabilities found in GitHub's database for this version
- The test environment uses Bitnami Kafka latest version, which is compatible with Confluent Platform 7.8.x
- The Dockerfile installs recent versions of connectors (Snowflake 2.5.0 and QuestDB 0.12) which are compatible with CP 7.8.x
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for CVEs related to Confluent Platform 7.8.0
gh api graphql -f query='
{
securityVulnerabilities(first: 5, ecosystem: MAVEN, package: "confluent-platform") {
nodes {
advisory {
summary
severity
publishedAt
}
vulnerableVersionRange
firstPatchedVersion {
identifier
}
}
}
}'
Length of output: 353
Script:
#!/bin/bash
# Check for Kafka version and configuration in the codebase
rg -l "kafka" --type yaml --type properties
Length of output: 78
Script:
#!/bin/bash
# Look for docker-compose or similar configuration files that might show Kafka version
fd -e yml -e yaml docker-compose
Length of output: 67
Script:
#!/bin/bash
# Check if there are any Kafka configuration files
fd -e properties kafka
Length of output: 24
Script:
#!/bin/bash
# Check the content of the Dockerfile for any additional context
cat scripts/scaffold/kafka-connect/Dockerfile
Length of output: 336
Script:
#!/bin/bash
# Check the docker-compose file for Kafka configuration
cat backend/docker-compose.test.yaml
Length of output: 1955
Script:
#!/bin/bash
# Look for any documentation about Kafka setup
fd README -e md | xargs cat
Length of output: 3307
Script:
#!/bin/bash
# Check if there are any other Kafka-related Dockerfiles
fd Dockerfile | xargs grep -l "kafka"
Length of output: 79
10-12
: Review volume permissions for non-root user
The volume /storage
is created while running as root but will be accessed by appuser
. This might cause permission issues.
Add appropriate permissions:
VOLUME /storage
+RUN mkdir -p /storage && chown appuser:appuser /storage
USER appuser
scripts/scaffold/kafka-connect/questdb-local-sink.properties (1)
9-11
: Verify timestamp handling configuration
The timestamp configuration needs verification:
- Ensure the format matches your activity data
- Verify timezone handling
- Consider adding error handling for invalid timestamps
Run this script to verify timestamp formats in your codebase:
✅ Verification successful
Based on the search results, I can provide a final response since I have enough information about timestamp handling in the codebase:
Timestamp configuration is correctly set for activity data
The timestamp configuration in the Kafka Connect properties matches the timestamp formats used throughout the codebase:
- The
timestamp.field.name=timestamp
aligns with the activity timestamp field used consistently in the codebase - The
timestamp.string.fields=createdAt,updatedAt
correctly identifies date string fields - The
timestamp.string.format=yyyy-MM-ddTHH:mm:ss.SSSZ
matches the ISO-8601 format used throughout the codebase as evidenced by the widespread use oftoISOString()
for timestamps
The codebase consistently handles dates in UTC/ISO-8601 format with timezone information, which aligns with the configured format. Error handling for invalid timestamps is built into the platform's date handling through JavaScript's native Date object validation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for timestamp formats in activity-related files
rg -A 2 'new Date|toISOString|DateTime'
Length of output: 68820
scripts/scaffold.yaml (2)
143-143
: LGTM: Good practice using specific Kafka version
Using a specific version (bitnami/kafka:3.8.0-debian-12-r6
) instead of latest
is a good practice for reproducible builds.
169-172
: Verify Kafka Connect configuration files
Please ensure that the mounted configuration files are properly configured:
worker-local.properties
should have correct Kafka bootstrap serversquestdb-local-sink.properties
should have proper QuestDB connection details
Let's verify the configuration files:
✅ Verification successful
Let me try a different approach to locate and verify these configuration files.
Configuration files are properly set up for Kafka Connect integration
The configuration files are correctly configured with all necessary parameters:
-
worker-local.properties
:- Has correct bootstrap servers:
kafka:9092
- Proper converter settings for keys and values
- Configured offset storage and plugin paths
- Has correct bootstrap servers:
-
questdb-local-sink.properties
:- Proper QuestDB connection URL:
questdb:9000
- Correctly configured for the
activities
topic - Appropriate timestamp field configurations
- Compatible converter settings
- Proper QuestDB connection URL:
-
console-local-sink.properties
:- Properly configured for debugging/monitoring
- Set up for the same
activities
topic
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check Kafka Connect configuration files
echo "Checking worker configuration..."
if [ -f "scaffold/kafka-connect/worker-local.properties" ]; then
grep -E "bootstrap.servers|group.id|key.converter|value.converter" scaffold/kafka-connect/worker-local.properties
fi
echo "Checking QuestDB sink configuration..."
if [ -f "scaffold/kafka-connect/questdb-local-sink.properties" ]; then
grep -E "connection.url|connection.user|topics" scaffold/kafka-connect/questdb-local-sink.properties
fi
Length of output: 287
Script:
#!/bin/bash
# Search for Kafka Connect configuration files in the repository
echo "Searching for Kafka Connect configuration files..."
fd -t f "worker.*properties|questdb.*properties|console.*properties"
# If files are found, let's check their content
echo -e "\nChecking files content if they exist:"
fd -t f "worker.*properties|questdb.*properties|console.*properties" -x cat {}
Length of output: 1740
services/libs/data-access-layer/src/activities/update.ts (2)
5-5
: Queue interface import looks good.
The introduction of the “IQueue” interface is consistent with the new queue-based design.
25-32
: Ensure error propagation and concurrency correctness in the Promise logic.
This Promise-based approach cleanly coordinates the stream completion with result resolution. However, be sure that if onActivity throws, it’s properly bubbled up and caught by the .catch(reject) block.
✅ Verification successful
Let me analyze the code further by checking the error handling implementation in this file and related stream processing logic.
Error handling is properly implemented in the Promise chain
The code correctly handles error propagation through the Promise chain:
- The main Promise chain has a
.catch(reject)
that will handle any errors from the stream processing - Errors from
onActivity
will be automatically propagated through the async/await in the stream processing - The
tryFinish
coordination logic is correctly implemented to resolve only when both conditions are met
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Let's first see the complete implementation of this file to understand the error handling
rg -A 20 "return new Promise\(\(resolve, reject\)" services/libs/data-access-layer/src/activities/update.ts
Length of output: 590
Script:
#!/bin/bash
# Let's also check if there are any catch blocks or error handling patterns in this file
rg "catch|reject|throw" services/libs/data-access-layer/src/activities/update.ts
Length of output: 147
Script:
# Let's see if there are similar stream processing patterns in the codebase for comparison
ast-grep --pattern 'return new Promise((resolve, reject) => {
$$$
function tryFinish() {
$$$
}
$$$
})'
Length of output: 2471
services/libs/data-access-layer/src/activities/ilp.ts (4)
1-5
: Imports are aligned with the new queue architecture.
The usage of moment, getServiceChildLogger, and the new queue interfaces looks consistent.
13-13
: Queue client parameter fosters modular design.
Exposing queueClient in the function signature effectively decouples the queue-based insertion from the rest of the data logic. Good move.
50-50
: Return pattern is straightforward and correct.
Returning IDs from the inserted activities helps with further chaining if needed.
17-41
: Validate partial attributes in activity objects.
You’re setting default fields for git and member-related attributes. Make sure this approach won’t override valid ‘falsey’ attributes that might be intentionally passed in as empty or null.
✅ Verification successful
Let me gather more information about how these activity attributes are handled throughout the codebase.
Let me try a different search pattern to find more specific information about activity attributes.
Let me try one more search with corrected patterns.
Based on the code analysis, I can now generate the final response:
The git-related attributes are safely handled in the code.
The code explicitly extracts git-related attributes before spreading the rest of the activity object, which prevents any unintended overrides. Additionally, the code has proper fallback handling with the ||
operator for insertions/additions, and direct access for other git attributes. The implementation in both ilp.ts
and sql.ts
shows consistent handling of these attributes, with no risk of losing falsey values.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for attribute handling patterns and their types
ast-grep --pattern 'activity.attributes'
# Search for activity interface or type definitions
ast-grep --pattern 'interface Activity'
# Look for tests that might show expected behavior with falsey values
rg "activity.*attributes" -A 5 -g "*.test.ts" -g "*.spec.ts"
Length of output: 65975
Script:
#!/bin/bash
# Search for activity type definitions and interfaces
rg -g "*.ts" "interface.*Activity.*{" -B 2 -A 10
# Search for tests related to activity attributes handling
rg -g "*.test.ts" -g "*.spec.ts" "attributes.*false" -A 5
Length of output: 1046
Script:
#!/bin/bash
# Search for activity interface definitions
rg -g "*.ts" "interface Activity" -B 2 -A 10
# Look for activity attributes handling in the data access layer
rg -g "*.ts" "activity\.attributes" -A 5 --glob="!*.test.ts" --glob="!*.spec.ts" -C 10
Length of output: 34288
services/apps/entity_merging_worker/src/main.ts (1)
29-31
: Queue enabled setting is consistent with the new architecture.
Enabling the queue in the entity_merging_worker
ensures it aligns with the rest of the system’s queue-based flow. Great addition.
services/apps/profiles_worker/src/main.ts (1)
28-30
: Queue enabled for profiles worker.
This addition matches the entity_merging_worker’s queue settings, ensuring standardized usage across different workers.
services/apps/profiles_worker/src/activities/member/memberUpdate.ts (1)
13-18
: Integration with svc.queue is consistent.
Passing svc.queue to runMemberAffiliationsUpdate ensures that any new affiliations or changes can be queued for downstream processing. Confirm that queue connectivity issues will be handled gracefully.
services/libs/data-access-layer/src/old/apps/entity_merging_worker/orgs.ts (1)
2-2
: Import of IQueue is consistent with queue-based approach
The import aligns well with the new queue usage pattern introduced in this PR, ensuring that the function can leverage the queue’s capabilities.
services/libs/queue/src/vendors/kafka/config.ts (1)
58-65
: Introduction of ACTIVITIES_QUEUE_SETTINGS
Defining a dedicated queue configuration for activities makes sense, given the need to isolate activity-related events. If there's an expected higher throughput, consider customizing partition counts. Otherwise, this is a straightforward addition that fits consistently with existing settings.
services/libs/queue/src/types.ts (1)
80-80
: New ACTIVITIES queue enumerated value
Adding 'ACTIVITIES' to the CrowdQueue enum is straightforward and integral to expanding the queue-based architecture for handling activity events.
services/apps/entity_merging_worker/src/activities/organizations.ts (1)
32-32
: Passing queue parameter in moveActivitiesBetweenOrgs
Forwarding svc.queue to moveActivitiesToNewOrg ensures uniform usage of the queue for activity migrations. This keeps the workflow consistent with the broader queue integration approach. Verify the non-null status of svc.queue in upstream code to avoid potential runtime errors.
✅ Verification successful
Let me verify the non-null status of svc.queue
by examining the code.
Let me check the service initialization and the actual implementation of moveActivitiesToNewOrg
from the data access layer.
Let me try one final approach to check the service structure and queue usage.
Let me check the service initialization and the main file where svc
is defined.
Let me check the implementation of moveActivitiesToNewOrg
in the data access layer.
From the output, I can see that moveActivitiesToNewOrg
is defined in the data access layer and explicitly requires a queue parameter in its signature. Let me check the service initialization to verify the queue setup.
Based on the code we've seen so far:
- The
moveActivitiesToNewOrg
function in the data access layer explicitly requires a queue parameter - The service initialization in
main.ts
shows queue is enabled withqueue: { enabled: true }
- The queue is being passed through
svc.queue
which is initialized as part of the ServiceWorker
Queue parameter is correctly passed and initialized
The queue parameter is properly initialized in the service and correctly passed to moveActivitiesToNewOrg
. The service configuration shows queue is enabled, and the function signature in the data access layer explicitly requires it, making this a valid and necessary parameter.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Check the service initialization and queue setup
ast-grep --pattern 'class $_ {
queue: $_
$$$
}'
# Check how svc.queue is initialized
rg "queue.*=" -A 3 services/apps/entity_merging_worker/src
Length of output: 114
Script:
#!/bin/bash
# Check the moveActivitiesToNewOrg function definition and usage
ast-grep --pattern 'function moveActivitiesToNewOrg($_) {
$$$
}'
# Look for any null checks on queue parameter
rg "moveActivitiesToNewOrg" -A 5 services/apps/entity_merging_worker/src
Length of output: 1406
Script:
#!/bin/bash
# Check the service initialization
rg "class.*Service" -A 10 services/apps/entity_merging_worker/src
# Check the moveActivitiesToNewOrg implementation in data access layer
fd moveActivitiesToNewOrg --exec cat {}
Length of output: 111
Script:
#!/bin/bash
# Look for service type definition
rg "type.*Service" -A 5 services/apps/entity_merging_worker/src
# Check imports and usage of queue in the organizations file
rg "import.*queue" -A 2 services/apps/entity_merging_worker/src/activities/organizations.ts
# Check the entire organizations.ts file for context
cat services/apps/entity_merging_worker/src/activities/organizations.ts
Length of output: 3987
Script:
#!/bin/bash
# Check the main file where svc is initialized
cat services/apps/entity_merging_worker/src/main.ts
# Check the service type definition if it exists in a types file
fd "types?.ts" --exec cat {} services/apps/entity_merging_worker/src
Length of output: 73446
Script:
#!/bin/bash
# Look for moveActivitiesToNewOrg implementation
rg "moveActivitiesToNewOrg" -A 10 -B 2
# Check if queue is used in any other similar functions
rg "queue.*moveActivities" -A 5
Length of output: 4090
Script:
#!/bin/bash
# Check the service worker options and queue configuration
rg "queue.*enabled" -A 5 services/apps/entity_merging_worker/src
# Check the ServiceWorker implementation for queue initialization
fd ServiceWorker --exec cat {}
Length of output: 101
services/apps/entity_merging_worker/src/activities/members.ts (1)
70-70
: Handle potential queue unavailability gracefully.
Similar to the previous function call, ensure that passing svc.queue to moveIdentityActivitiesToNewMember is accompanied by robust error handling or logging, in case svc.queue is not properly initialized.
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)
2-2
: Import of IQueue looks good.
No issues here; the import is consistent with the approach of injecting a queue dependency.
125-125
: Revised function signature for moveIdentityActivitiesToNewMember.
Similar to moveActivitiesToNewMember, adding queueClient aligns with the move to queue-based patterns. Confirm that code and tests reflect these changes thoroughly.
services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1)
6-6
: Imported IQueue to support queue-based insertion.
Looks good. Make sure the type definition suits any advanced queue usage patterns you may introduce.
backend/src/database/repositories/activityRepository.ts (1)
17-18
: Queue client import introduced via QUEUE_CLIENT.
This is consistent with queue-based activity insertion. Confirm that QUEUE_CLIENT is correctly initialized in all environments.
✅ Verification successful
Let me verify the initialization of QUEUE_CLIENT by first checking its implementation and usage across the codebase.
Let me check the implementation details of the queue service to verify its initialization.
Let me check the queue configuration that's being used to initialize the queue client.
Let me check the configuration implementation and environment-specific settings.
Based on the analysis of the codebase, I can now provide a final response:
QUEUE_CLIENT initialization is properly configured across environments.
The queue client is correctly initialized through the QueueFactory.createQueueService(QUEUE_CONFIG)
using environment-specific configurations loaded via the config
package. The configuration is properly handled across all environments (test, development, staging, production) through NODE_ENV environment variable, and the queue service is consistently initialized across the application.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Find the implementation of QUEUE_CLIENT
rg "QUEUE_CLIENT" -B 3 -A 3
# Find the queue service implementation
fd queueService
Length of output: 5823
Script:
#!/bin/bash
# Check the queue service implementation
cat backend/src/serverless/utils/queueService.ts
# Check for environment configurations
rg "IS_TEST_ENV|IS_DEV_ENV" -A 3 backend/src/serverless/utils/queueService.ts
Length of output: 3853
Script:
#!/bin/bash
# Check the queue configuration
rg "QUEUE_CONFIG" -B 3 -A 3
# Check the configuration file
fd "conf.(ts|js)" --exec cat {}
Length of output: 50757
Script:
#!/bin/bash
# Check the configuration implementation in backend
cat backend/src/conf/index.ts
# Check for environment-specific configurations
rg "IS_TEST_ENV|IS_DEV_ENV|IS_PROD_ENV" -A 3 backend/src/conf/index.ts
Length of output: 7387
backend/src/services/activityService.ts (1)
33-33
: Ensure proper queue client configuration.
Importing QUEUE_CLIENT indicates that we are now passing a queue instance to the activity insertion process. Verify that the queue service is properly configured and handles failures or reconnection scenarios gracefully in production environments.
Run the following script to locate other references to QUEUE_CLIENT and confirm that error handling is present:
✅ Verification successful
Let me try a different search pattern to find QUEUE_CLIENT usage:
Let me check the queue service implementation to verify error handling:
Let me check the queue configuration and implementation from the common services package:
Let me check the queue configuration and implementation details:
Based on the comprehensive analysis of the queue implementation, I can now provide the final response:
Queue client implementation is properly configured with error handling and reconnection
The QUEUE_CLIENT is implemented using a robust Kafka-based queue service that includes:
- Automatic retries with exponential backoff (8 retries starting at 100ms)
- Proper client initialization with logging
- Error handling through the Kafka client configuration
- Connection management through the QueueFactory
- Integration with Redis for additional reliability
- Priority-based message handling through QueuePriorityContextLoader
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for all references to QUEUE_CLIENT across the codebase
ast-grep --pattern $'QUEUE_CLIENT($_)'
Length of output: 1501
Script:
#!/bin/bash
# Search for QUEUE_CLIENT references and imports
rg "QUEUE_CLIENT" -A 3
# Also search for queue service configuration
rg "queueService" -A 3
Length of output: 6737
Script:
#!/bin/bash
# Check queue service implementation
cat backend/src/serverless/utils/queueService.ts
Length of output: 3773
Script:
#!/bin/bash
# Check queue configuration
cat backend/src/conf/index.ts | grep -A 10 "QUEUE_CONFIG"
# Check if we can see the queue implementation
rg "QueueFactory" -A 5
Length of output: 44762
Script:
#!/bin/bash
# Check the queue implementation from common packages
rg -g "*.ts" "class QueueFactory" -A 10
rg -g "*.ts" "interface IQueue" -A 10
Length of output: 4781
services/libs/data-access-layer/package.json (1)
17-17
: Pin or verify the workspace dependency.
Using "workspace:*" requires diligence in version management. Make sure that any local modifications in the "@crowd/queue" package are compatible with production builds and won’t introduce breaking changes.
✅ Verification successful
Let me gather information about the workspace dependencies and version management in the repository.
Let me check the version of @crowd/queue package and its usage pattern.
Let me try one more search to locate the queue package and its structure.
Let me check the queue package contents directly.
Based on the gathered information, I can now provide the final response:
Workspace dependency pattern is consistent across the codebase
The use of workspace:*
for @crowd/queue
follows the established pattern in the monorepo. The package is used by multiple services, and all of them consistently use the workspace:*
dependency specification. The @crowd/queue
package itself follows the same pattern for its internal dependencies (@crowd/common, @crowd/logging, @crowd/types). This consistency in version management across the workspace suggests a deliberate architectural decision for maintaining internal package dependencies.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Check package.json files for workspace dependencies and version patterns
fd package.json --exec cat {} \; | jq -r 'select(.dependencies."@crowd/queue" != null) | "\(.name): @crowd/queue version: \(.dependencies."@crowd/queue")"'
# Look for workspace configuration
fd "package.json|pnpm-workspace.yaml|lerna.json" --max-depth 1 --exec cat {} \;
Length of output: 980
Script:
#!/bin/bash
# Check the package.json of @crowd/queue package
fd queue/package.json --exec cat {} \;
# Look for any version constraints or release tags
git tag | grep "@crowd/queue"
Length of output: 73
Script:
#!/bin/bash
# Find the queue package location
fd -t d -d 3 queue
# Check if there are any changelog or version files
rg -g '!node_modules' -g '!dist' -l "queue.*version|CHANGELOG"
Length of output: 106
Script:
#!/bin/bash
# Check the package.json in the queue directory
cat services/libs/queue/package.json
# Look for any version-related changes in recent commits
git log -n 5 services/libs/queue/package.json
Length of output: 584
RUN yum install -y jq findutils unzip | ||
|
||
RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt | ||
RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize Dockerfile layers and security
- Combine RUN commands to reduce image layers and cleanup package manager cache
- Pin specific versions of system packages for reproducibility
- Verify connector versions for compatibility
Consider this optimization:
-RUN yum install -y jq findutils unzip
-
-RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
-RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
+RUN yum install -y jq-1.6-* findutils-4.6.0-* unzip-6.0-* \
+ && yum clean all \
+ && rm -rf /var/cache/yum \
+ && confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt \
+ && confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
Committable suggestion skipped: line range outside the PR's diff.
name=questdb-sink | ||
client.conf.string=http::addr=questdb:9000; | ||
topics=activities | ||
table=activities | ||
connector.class=io.questdb.kafka.QuestDBSinkConnector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling and monitoring configurations
The connector configuration lacks essential error handling and monitoring settings.
Add these configurations:
name=questdb-sink
client.conf.string=http::addr=questdb:9000;
topics=activities
table=activities
connector.class=io.questdb.kafka.QuestDBSinkConnector
+errors.tolerance=all
+errors.deadletterqueue.topic.name=dlq-activities
+errors.deadletterqueue.topic.replication.factor=1
+errors.deadletterqueue.context.headers.enable=true
+errors.log.enable=true
+errors.log.include.messages=true
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
name=questdb-sink | |
client.conf.string=http::addr=questdb:9000; | |
topics=activities | |
table=activities | |
connector.class=io.questdb.kafka.QuestDBSinkConnector | |
name=questdb-sink | |
client.conf.string=http::addr=questdb:9000; | |
topics=activities | |
table=activities | |
connector.class=io.questdb.kafka.QuestDBSinkConnector | |
errors.tolerance=all | |
errors.deadletterqueue.topic.name=dlq-activities | |
errors.deadletterqueue.topic.replication.factor=1 | |
errors.deadletterqueue.context.headers.enable=true | |
errors.log.enable=true | |
errors.log.include.messages=true |
bootstrap.servers=kafka:9092 | ||
group.id=kafka-connect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add security configurations for Kafka connection
The current configuration lacks security settings for Kafka connection.
Add these security configurations:
bootstrap.servers=kafka:9092
group.id=kafka-connect
+security.protocol=SSL
+ssl.truststore.location=/etc/kafka/secrets/kafka.connect.truststore.jks
+ssl.truststore.password=${file:/etc/kafka/secrets/connect-secrets:truststore-password}
+ssl.keystore.location=/etc/kafka/secrets/kafka.connect.keystore.jks
+ssl.keystore.password=${file:/etc/kafka/secrets/connect-secrets:keystore-password}
+ssl.key.password=${file:/etc/kafka/secrets/connect-secrets:key-password}
Committable suggestion skipped: line range outside the PR's diff.
TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)" | ||
readonly TAG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Parameterize the registry path for better flexibility
The registry path sjc.ocir.io/axbydjxa5zuh
is hardcoded, which could cause issues when deploying to different environments.
Consider using environment variables:
-TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)"
+REGISTRY_PATH="${REGISTRY_PATH:-sjc.ocir.io/axbydjxa5zuh}"
+TAG="${REGISTRY_PATH}/kafka-connect:$(date +%s)"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
TAG="sjc.ocir.io/axbydjxa5zuh/kafka-connect:$(date +%s)" | |
readonly TAG | |
REGISTRY_PATH="${REGISTRY_PATH:-sjc.ocir.io/axbydjxa5zuh}" | |
TAG="${REGISTRY_PATH}/kafka-connect:$(date +%s)" | |
readonly TAG |
kafka-connect: | ||
build: | ||
context: scaffold/kafka-connect | ||
restart: unless-stopped | ||
entrypoint: | ||
- connect-standalone | ||
- /etc/kafka-connect/worker-local.properties | ||
- /etc/kafka-connect/console-local-sink.properties | ||
- /etc/kafka-connect/questdb-local-sink.properties | ||
volumes: | ||
- kafka-connect-dev:/storage | ||
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties | ||
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties | ||
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties | ||
networks: | ||
- crowd-bridge | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add essential service configurations for Kafka Connect
The Kafka Connect service configuration needs improvements for production readiness:
- No health check defined
- No resource limits set
- No explicit dependency on Kafka
- Using standalone mode (not recommended for production)
Consider these improvements:
kafka-connect:
build:
context: scaffold/kafka-connect
restart: unless-stopped
+ depends_on:
+ kafka:
+ condition: service_started
+ healthcheck:
+ test: ["CMD-SHELL", "curl -f http://localhost:8083/connectors || exit 1"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ start_period: 30s
+ deploy:
+ resources:
+ limits:
+ memory: 1G
+ reservations:
+ memory: 512M
entrypoint:
- connect-standalone
Also, consider switching to distributed mode for better scalability:
entrypoint:
- - connect-standalone
+ - connect-distributed
- /etc/kafka-connect/worker-local.properties
- - /etc/kafka-connect/console-local-sink.properties
- - /etc/kafka-connect/questdb-local-sink.properties
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
kafka-connect: | |
build: | |
context: scaffold/kafka-connect | |
restart: unless-stopped | |
entrypoint: | |
- connect-standalone | |
- /etc/kafka-connect/worker-local.properties | |
- /etc/kafka-connect/console-local-sink.properties | |
- /etc/kafka-connect/questdb-local-sink.properties | |
volumes: | |
- kafka-connect-dev:/storage | |
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties | |
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties | |
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties | |
networks: | |
- crowd-bridge | |
kafka-connect: | |
build: | |
context: scaffold/kafka-connect | |
restart: unless-stopped | |
depends_on: | |
kafka: | |
condition: service_started | |
healthcheck: | |
test: ["CMD-SHELL", "curl -f http://localhost:8083/connectors || exit 1"] | |
interval: 30s | |
timeout: 10s | |
retries: 3 | |
start_period: 30s | |
deploy: | |
resources: | |
limits: | |
memory: 1G | |
reservations: | |
memory: 512M | |
entrypoint: | |
- connect-distributed | |
- /etc/kafka-connect/worker-local.properties | |
volumes: | |
- kafka-connect-dev:/storage | |
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties | |
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties | |
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties | |
networks: | |
- crowd-bridge |
@@ -38,7 +38,7 @@ export async function moveActivitiesBetweenMembers( | |||
if (!memberExists) { | |||
return | |||
} | |||
await moveActivitiesToNewMember(svc.questdbSQL, primaryId, secondaryId, tenantId) | |||
await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Based on the code analysis, I can now provide the final response:
Queue error handling needs to be improved in activity merging flow
The implementation in insertActivities
uses QueueEmitter
to send messages but doesn't have proper error handling for queue failures. While the queue client is properly typed as IQueue
, the actual error handling for queue unavailability is missing. Specific issues:
- No try-catch block around
emitter.sendMessage()
calls - No fallback mechanism if queue operations fail
- No logging of queue-related errors
🔗 Analysis chain
Ensure proper error handling if the queue is unavailable.
You are now passing svc.queue to moveActivitiesToNewMember. Confirm that any internal logic gracefully handles a null or undefined queue. Consider adding logging or a fallback if queue usage fails.
Let me gather information about the error handling in the moveActivitiesToNewMember
function and its queue usage.
Let me check the implementation details of the function to understand how it handles the queue parameter.
Let me check the implementation of the updateActivities
function since it's the one actually using the queue parameter.
Let me check the implementation of insertActivities
to see how it handles the queue.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Find the implementation of moveActivitiesToNewMember function
ast-grep --pattern 'function moveActivitiesToNewMember($_) {
$$$
}'
# Also search for any direct references to this function
rg "moveActivitiesToNewMember" -A 5
Length of output: 1977
Script:
#!/bin/bash
# Get the implementation of moveActivitiesToNewMember from the data-access-layer
cat services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
Length of output: 4938
Script:
#!/bin/bash
# Get the implementation of updateActivities function
cat services/libs/data-access-layer/src/activities/update.ts
Length of output: 2103
Script:
#!/bin/bash
# Get the implementation of insertActivities function
cat services/libs/data-access-layer/src/activities/ilp.ts
Length of output: 2521
primaryId: string, | ||
secondaryId: string, | ||
tenantId: string, | ||
) { | ||
await updateActivities( | ||
qdb, | ||
queueClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Based on the code analysis, I can now provide the final response:
Queue error handling needs improvement in activity updates
The current implementation of updateActivities
and its dependencies shows insufficient error handling for queue operations:
- The
streamActivities
function properly handles stream errors through Promise rejection - However,
insertActivities
usesQueueEmitter
without try-catch blocks around queue operations - While there's telemetry tracking (
telemetry.increment
), there's no explicit error logging for queue failures
Suggested improvements:
- Add try-catch blocks around queue operations in
insertActivities
- Implement error logging using the existing
logger
instance - Consider adding retry mechanisms for failed queue operations
🔗 Analysis chain
Inclusion of queueClient in updateActivities.
Check that updateActivities handles queueClient errors and logs failures, if any, to maintain a reliable queue-based system.
Let me search for the implementation of updateActivities
to verify how it handles queueClient errors.
Let me check the implementation of updateActivities
and streamActivities
to see how errors are handled.
Let me check the implementation of insertActivities
to see how it handles queue-related errors.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for updateActivities implementation
ast-grep --pattern 'async function updateActivities($$$) {
$$$
}'
# Also search for any function declaration containing updateActivities
rg -A 10 "updateActivities.*\("
Length of output: 6222
Script:
#!/bin/bash
# Get the content of the update.ts file
cat services/libs/data-access-layer/src/activities/update.ts
# Also search for any error handling patterns in streamActivities
rg -A 10 "streamActivities.*\("
Length of output: 4051
Script:
#!/bin/bash
# Search for insertActivities implementation
cat services/libs/data-access-layer/src/activities/ilp.ts
# Also search for any error handling patterns in insertActivities
rg -A 10 "insertActivities.*\("
Length of output: 10777
@@ -154,7 +156,7 @@ export async function runMemberAffiliationsUpdate( | |||
} | |||
} | |||
|
|||
await insertActivities([activity], true) | |||
await insertActivities(queueClient, [activity], true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Error handling is missing in the queue-based activity insertion
The implementation of insertActivities
shows that while it uses queueClient
to send messages through QueueEmitter
, it lacks proper error handling and retry mechanisms. Here are the specific concerns:
- No try-catch blocks around
emitter.sendMessage()
calls - No retry logic for failed queue operations
- Basic telemetry exists but no error logging for failed operations
🔗 Analysis chain
insertIfMatches now leverages queueClient.
This is a key integration point for activity insertion via queue. Consider adding logging or retry logic to handle insertion failures.
Let me verify the integration point for activity insertion by first checking the implementation of insertActivities
function and its usage of queueClient
.
Let me check the implementation of insertActivities
in the data-access-layer to verify the queueClient integration and error handling.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for insertActivities function definition
ast-grep --pattern 'function insertActivities($$$) {
$$$
}'
# Also search for any direct references to this function
rg "insertActivities" -A 3
Length of output: 5353
Script:
#!/bin/bash
# Check the implementation of insertActivities in the data-access-layer
cat services/libs/data-access-layer/src/activities/ilp.ts
Length of output: 2521
@@ -58,7 +60,7 @@ class ActivityRepository { | |||
data.platform = data.platform.toLowerCase() | |||
} | |||
|
|||
const ids = await insertActivities([ | |||
const ids = await insertActivities(QUEUE_CLIENT(), [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Passing QUEUE_CLIENT() to insertActivities.
Good shift toward a queue-based approach. Consider robust error-handling for partial insertions or queue outages, potentially logging failures or implementing rollback logic.
@@ -181,7 +181,7 @@ export default class ActivityService extends LoggerBase { | |||
) | |||
|
|||
record = await ActivityRepository.create(data, repositoryOptions) | |||
await insertActivities([{ ...data, id: record.id }], true) | |||
await insertActivities(QUEUE_CLIENT(), [{ ...data, id: record.id }], true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling or retry logic for queue insertion.
The call to “insertActivities(QUEUE_CLIENT(), …)” doesn’t appear to have explicit fallback logic if the queue is unreachable or insertion fails. Consider adding retry logic or error handling to ensure critical data isn’t lost when insertion fails.
730a22c
to
571b106
Compare
571b106
to
6a7fcd8
Compare
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores