Skip to content

addCosmosClientCacheInKafkaConnector #45633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

xinlian12
Copy link
Member

@xinlian12 xinlian12 commented Jun 9, 2025

Issue:
When CosmosSourceConnector or CosmosSourceConnector failed to start, the created cosmos client resources are not being released properly, which causing OOM issues when the connector being restarted multiple times.

Fixes:
Added Cosmos client cache logic in the kafka connector - which will be responsible to cleanup the resources eventually

sequenceDiagram
    participant Source as CosmosSourceConnector
    participant Sink as CosmosSinkConnector
    participant Cache as CosmosClientCache
    participant Cleanup as Cleanup Thread
    participant Client as CosmosAsyncClient
    
    Note over Cache: Singleton Instance<br/>Maintains client cache map<br/>and cleanup map

    par Source Connector
        Source->>+Cache: getCosmosClient(config, sourceName)
        alt Client exists
            Cache->>Cache: Update last accessed time<br/>Increment ref count
            Cache-->>-Source: Return existing client
        else Create new client
            Cache->>Client: Create new CosmosAsyncClient
            Cache->>Cache: Store in cache map with metadata
            Cache-->>Source: Return new client
        end
    and Sink Connector
        Sink->>+Cache: getCosmosClient(config, sinkName)
        alt Client exists
            Cache->>Cache: Update last accessed time<br/>Increment ref count
            Cache-->>-Sink: Return existing client
        else Create new client
            Cache->>Client: Create new CosmosAsyncClient
            Cache->>Cache: Store in cache map with metadata
            Cache-->>Sink: Return new client
        end
    end

    loop Every 60 seconds
        Cleanup->>Cache: Check for unused clients
        alt Client unused for >15 mins and ref count = 0
            Cache->>Cache: Move to cleanup map
            Cache->>Client: Close client if ref count = 0
        end
    end

    Source->>Cache: releaseCosmosClient(config)
    Cache->>Cache: Decrement ref count

    Sink->>Cache: releaseCosmosClient(config)
    Cache->>Cache: Decrement ref count

    Note over Cache: Client eligible for cleanup<br/>when ref count = 0 and<br/>idle > 15 minutes
Loading

@github-actions github-actions bot added the Cosmos label Jun 9, 2025
@xinlian12 xinlian12 force-pushed the usingCosmosClientCacheInKafkaConnector branch from 918b1ec to b01a852 Compare June 9, 2025 06:41
@xinlian12 xinlian12 marked this pull request as ready for review June 9, 2025 06:47
@Copilot Copilot AI review requested due to automatic review settings June 9, 2025 06:47
@xinlian12 xinlian12 requested review from kirankumarkolli and a team as code owners June 9, 2025 06:47
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a client cache mechanism in the Kafka connector to ensure that Cosmos client resources are properly managed and released, addressing Out-Of-Memory issues caused by multiple connector restarts.

  • Replaces direct CosmosAsyncClient creation via CosmosClientStore with a cache-based approach using CosmosClientCache and CosmosClientCacheItem.
  • Updates tests and connector classes to use the new client cache, ensuring proper resource cleanup.
  • Adds equality methods in auth config classes for better cache key consistency.

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated no comments.

Show a summary per file
File Description
MetadataMonitorThreadTest.java Updated to use CosmosClientCacheItem for client management.
MetadataKafkaStorageManagerTest.java Updated client acquisition and cleanup via client cache.
CosmosSourceTask.java Replaced direct client access with the cached client item and its getClient() method.
CosmosSinkTask.java Refactored to acquire and release clients using the CosmosClientCache.
CosmosMasterKeyAuthConfig.java Added equals/hashCode implementations for cache consistency.
CosmosAadAuthConfig.java Added equals/hashCode implementations for cache correctness.
CosmosSourceConnector.java & CosmosSinkConnector.java Updated client instantiation and resource release patterns to use CosmosClientCacheItem.
CosmosClientCache.java and related cache classes Introduced new cache implementation with client reference counting and cleanup logic.
CHANGELOG.md Updated to document the fix for improper client resource cleanup.
Comments suppressed due to low confidence (2)

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java:90

  • Within the start method, invoking stop() inside the catch block may risk double-release of resources if stop() is later called externally. Consider adding a flag (e.g., 'isStopped') to ensure resources are released only once.
try { this.config = new CosmosSourceConfig(props); ... } catch (Exception e) { this.stop(); throw e; }

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosClientCache.java:240

  • Ensure that reference count management is robust under concurrent access to avoid the possibility of negative values. Consider adding safeguards (such as assertions or lower-bound checks) when decrementing the reference count.
if (metadata.getRefCount() <= 0 && (Instant.now().toEpochMilli() - metadata.getLastAccessed().toEpochMilli() > UNUSED_CLIENT_TTL_IN_MS)) {

@xinlian12
Copy link
Member Author

/azp run java - cosmos - kafka

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@xinlian12
Copy link
Member Author

/azp run java - cosmos - kafka

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Member

@tvaron3 tvaron3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xinlian12
Copy link
Member Author

/azp run java - cosmos - kafka

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

containersConfig.getDatabaseName());
} catch (Exception e) {
// if connector failed to start, release initialized resources here
this.stop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log the error message here.


this.monitorThread.start();
} catch (Exception e) {
// if the connector failed to start, release initialized resources here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, please log the error.

&& Objects.equals(clientSecret, that.clientSecret)
&& Objects.equals(authEndpointOverride, that.authEndpointOverride)
&& Objects.equals(tenantId, that.tenantId)
&& azureEnvironment == that.azureEnvironment;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the check on CosmosAzureEnvironment based on reference and not equals by value like others? It is an object, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants