Skip to content

Commit

Permalink
KAFKA-18842: add configurable max number of connectors
Browse files Browse the repository at this point in the history
When Kafka Connect is offered as managed services there is need to restrict the
number of allowed connectors to be run in the Connect cluster. Creating high
number of connectors will make the Connect cluster unresponsive.
  • Loading branch information
jjaakola-aiven committed Feb 21, 2025
1 parent 337fb8c commit a138485
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback
});
}

@Override
public boolean validateConnectorNumberLimitNotExceeded() {
return worker.config().getMaxConnectorsCount() == 0 || connectors().size() < worker.config().getMaxConnectorsCount();
}

/**
* Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request
* and the current status of the connector and task instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
validateConnectorConfig(connectorConfig, callback);
}

/**
* Validate that configured maximum number of connectors is not exceeded.
* See {@link WorkerConfig#MAX_CONNECTORS_COUNT}
* @return True if there is still room for instantiating a connector. False if maximum is reached.
*/
boolean validateConnectorNumberLimitNotExceeded();

/**
* Restart the task with the given id.
* @param id id of the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public class WorkerConfig extends AbstractConfig {
+ "to create topics automatically.";
protected static final boolean TOPIC_CREATION_ENABLE_DEFAULT = true;

public static final String MAX_CONNECTORS_COUNT = "connect.max.connectors";
protected static final String MAX_CONNECTORS_COUNT_DOC = "Maximum number of connectors allowed to be instantiated. Defaults to 0 which means unlimited.";
protected static final int MAX_CONNECTORS_COUNT_DEFAULT = 0;

/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
Expand Down Expand Up @@ -264,6 +268,8 @@ protected static ConfigDef baseConfigDef() {
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
.define(TOPIC_CREATION_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_CREATION_ENABLE_DEFAULT, Importance.LOW,
TOPIC_CREATION_ENABLE_DOC)
.define(MAX_CONNECTORS_COUNT, Type.INT, MAX_CONNECTORS_COUNT_DEFAULT, Importance.LOW,
MAX_CONNECTORS_COUNT_DOC)
// security support
.withClientSslSupport();
addTopicTrackingConfig(result);
Expand Down Expand Up @@ -428,6 +434,10 @@ public String kafkaClusterId() {
return kafkaClusterId;
}

public int getMaxConnectorsCount() {
return getInt(MAX_CONNECTORS_COUNT);
}

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public Response listConnectors(
public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers,
final CreateConnectorRequest createRequest) throws Throwable {
if (!herder.validateConnectorNumberLimitNotExceeded()) {
return Response.status(Response.Status.CONFLICT).entity("Number of connectors is at maximum.").build();
}

// Trim leading and trailing whitespaces from the connector name, replace null with empty string
// if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
// allows null values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public void testRestForwardToLeader(boolean dualListener, boolean followerSsl, b
DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(dualListener, followerSsl));
DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(dualListener, leaderSsl));

// Follower and leader have both room for connectors
when(followerHerder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);
when(leaderHerder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

// Follower worker setup
RestClient followerClient = new RestClient(followerConfig);
followerServer = new ConnectRestServer(null, followerClient, followerConfig.originals());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,41 @@ public void testConfigValidationAllOverride() {
verifyValidationIsolation();
}

@Test
public void testConnectorCountMaximumNotExceededValidation() {
final ClusterConfigState clusterConfigStateMock = mock(ClusterConfigState.class);
final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getMaxConnectorsCount()).thenReturn(1);
when(configStore.snapshot()).thenReturn(clusterConfigStateMock);
when(clusterConfigStateMock.connectors()).thenReturn(Collections.emptySet());
assertTrue(herder.validateConnectorNumberLimitNotExceeded());
verify(configStore, times(1)).snapshot();
verify(clusterConfigStateMock, times(1)).connectors();
}

@Test
public void testConnectorCountMaximumExceededValidation() {
final ClusterConfigState clusterConfigStateMock = mock(ClusterConfigState.class);
final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getMaxConnectorsCount()).thenReturn(1);
when(configStore.snapshot()).thenReturn(clusterConfigStateMock);
when(clusterConfigStateMock.connectors()).thenReturn(Collections.singleton("one-connector-allowed"));
assertFalse(herder.validateConnectorNumberLimitNotExceeded());
verify(configStore, times(1)).snapshot();
verify(clusterConfigStateMock, times(1)).connectors();
}

@Test
public void testConnectorCountMaximumUnlimitedValidation() {
final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getMaxConnectorsCount()).thenReturn(0);
assertTrue(herder.validateConnectorNumberLimitNotExceeded());
verify(configStore, never()).snapshot();
}

@Test
public void testReverseTransformConfigs() {
// Construct a task config with constant values for TEST_KEY and TEST_KEY2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -292,58 +293,67 @@ public void testExpandConnectorsWithConnectorNotFound() {
public void testCreateConnector() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithPausedInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.PAUSED);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.PAUSED), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithStoppedInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.STOPPED);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STOPPED), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithRunningInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.RUNNING);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STARTED), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorNotLeader() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackNotLeaderException(cb).when(herder)
Expand All @@ -352,12 +362,15 @@ public void testCreateConnectorNotLeader() throws Throwable {
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
.thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithHeaders() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
expectAndCallbackNotLeaderException(cb)
Expand All @@ -366,17 +379,20 @@ public void testCreateConnectorWithHeaders() throws Throwable {
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
connectorsResource.createConnector(FORWARD, httpHeaders, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorExists() {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new AlreadyExistsException("already exists"))
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());
assertThrows(AlreadyExistsException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body));
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand All @@ -386,13 +402,15 @@ public void testCreateConnectorNameTrimWhitespaces() throws Throwable {
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand All @@ -402,13 +420,15 @@ public void testCreateConnectorNameAllWhitespaces() throws Throwable {
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand All @@ -418,13 +438,27 @@ public void testCreateConnectorNoName() throws Throwable {
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorMaximumExceeded() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(false);

Response response = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
assertEquals(Response.Status.CONFLICT.getStatusCode(), response.getStatus());
assertEquals("Number of connectors is at maximum.", response.getEntity().toString());
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand Down Expand Up @@ -535,6 +569,7 @@ public void testCreateConnectorWithSpecialCharsInName() throws Throwable {
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_SPECIAL_CHARS, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), eq(body.config()), isNull(), eq(false), cb.capture());
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Expand All @@ -550,6 +585,7 @@ public void testCreateConnectorWithControlSequenceInName() throws Throwable {
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), eq(body.config()), isNull(), eq(false), cb.capture());
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Expand Down Expand Up @@ -595,6 +631,7 @@ public void testCreateConnectorConfigNameMismatch() {
Map<String, String> connConfig = new HashMap<>();
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);
assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
}

Expand Down

0 comments on commit a138485

Please sign in to comment.