-
Notifications
You must be signed in to change notification settings - Fork 2.1k
addCosmosClientCacheInKafkaConnector #45633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
addCosmosClientCacheInKafkaConnector #45633
Conversation
918b1ec
to
b01a852
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a client cache mechanism in the Kafka connector to ensure that Cosmos client resources are properly managed and released, addressing Out-Of-Memory issues caused by multiple connector restarts.
- Replaces direct CosmosAsyncClient creation via CosmosClientStore with a cache-based approach using CosmosClientCache and CosmosClientCacheItem.
- Updates tests and connector classes to use the new client cache, ensuring proper resource cleanup.
- Adds equality methods in auth config classes for better cache key consistency.
Reviewed Changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
MetadataMonitorThreadTest.java | Updated to use CosmosClientCacheItem for client management. |
MetadataKafkaStorageManagerTest.java | Updated client acquisition and cleanup via client cache. |
CosmosSourceTask.java | Replaced direct client access with the cached client item and its getClient() method. |
CosmosSinkTask.java | Refactored to acquire and release clients using the CosmosClientCache. |
CosmosMasterKeyAuthConfig.java | Added equals/hashCode implementations for cache consistency. |
CosmosAadAuthConfig.java | Added equals/hashCode implementations for cache correctness. |
CosmosSourceConnector.java & CosmosSinkConnector.java | Updated client instantiation and resource release patterns to use CosmosClientCacheItem. |
CosmosClientCache.java and related cache classes | Introduced new cache implementation with client reference counting and cleanup logic. |
CHANGELOG.md | Updated to document the fix for improper client resource cleanup. |
Comments suppressed due to low confidence (2)
sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java:90
- Within the start method, invoking stop() inside the catch block may risk double-release of resources if stop() is later called externally. Consider adding a flag (e.g., 'isStopped') to ensure resources are released only once.
try { this.config = new CosmosSourceConfig(props); ... } catch (Exception e) { this.stop(); throw e; }
sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientCache.java:240
- Ensure that reference count management is robust under concurrent access to avoid the possibility of negative values. Consider adding safeguards (such as assertions or lower-bound checks) when decrementing the reference count.
if (metadata.getRefCount() <= 0 && (Instant.now().toEpochMilli() - metadata.getLastAccessed().toEpochMilli() > UNUSED_CLIENT_TTL_IN_MS)) {
/azp run java - cosmos - kafka |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run java - cosmos - kafka |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...a-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientCache.java
Show resolved
Hide resolved
...t/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientCacheMetadata.java
Show resolved
Hide resolved
...nnect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientCacheItem.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/azp run java - cosmos - kafka |
Azure Pipelines successfully started running 1 pipeline(s). |
containersConfig.getDatabaseName()); | ||
} catch (Exception e) { | ||
// if connector failed to start, release initialized resources here | ||
this.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should log the error message here.
|
||
this.monitorThread.start(); | ||
} catch (Exception e) { | ||
// if the connector failed to start, release initialized resources here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, please log the error.
&& Objects.equals(clientSecret, that.clientSecret) | ||
&& Objects.equals(authEndpointOverride, that.authEndpointOverride) | ||
&& Objects.equals(tenantId, that.tenantId) | ||
&& azureEnvironment == that.azureEnvironment; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the check on CosmosAzureEnvironment based on reference and not equals by value like others? It is an object, right?
Issue:
When CosmosSourceConnector or CosmosSourceConnector failed to start, the created cosmos client resources are not being released properly, which causing OOM issues when the connector being restarted multiple times.
Fixes:
Added Cosmos client cache logic in the kafka connector - which will be responsible to cleanup the resources eventually