Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18842: add configurable max number of connectors #63

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback
});
}

@Override
public boolean validateConnectorNumberLimitNotExceeded() {
return worker.config().getMaxConnectorsCount() == 0 || connectors().size() < worker.config().getMaxConnectorsCount();
}

/**
* Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request
* and the current status of the connector and task instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
validateConnectorConfig(connectorConfig, callback);
}

/**
* Validate that configured maximum number of connectors is not exceeded.
* See {@link WorkerConfig#MAX_CONNECTORS_COUNT}
* @return True if there is still room for instantiating a connector. False if maximum is reached.
*/
boolean validateConnectorNumberLimitNotExceeded();

/**
* Restart the task with the given id.
* @param id id of the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public class WorkerConfig extends AbstractConfig {
+ "to create topics automatically.";
protected static final boolean TOPIC_CREATION_ENABLE_DEFAULT = true;

public static final String MAX_CONNECTORS_COUNT = "connect.max.connectors";
protected static final String MAX_CONNECTORS_COUNT_DOC = "Maximum number of connectors allowed to be instantiated. Defaults to 0 which means unlimited.";
protected static final int MAX_CONNECTORS_COUNT_DEFAULT = 0;

/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
Expand Down Expand Up @@ -264,6 +268,8 @@ protected static ConfigDef baseConfigDef() {
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
.define(TOPIC_CREATION_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_CREATION_ENABLE_DEFAULT, Importance.LOW,
TOPIC_CREATION_ENABLE_DOC)
.define(MAX_CONNECTORS_COUNT, Type.INT, MAX_CONNECTORS_COUNT_DEFAULT, Importance.LOW,
MAX_CONNECTORS_COUNT_DOC)
// security support
.withClientSslSupport();
addTopicTrackingConfig(result);
Expand Down Expand Up @@ -428,6 +434,10 @@ public String kafkaClusterId() {
return kafkaClusterId;
}

public int getMaxConnectorsCount() {
return getInt(MAX_CONNECTORS_COUNT);
}

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public Response listConnectors(
public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers,
final CreateConnectorRequest createRequest) throws Throwable {
if (!herder.validateConnectorNumberLimitNotExceeded()) {
throw new ConnectRestException(Response.Status.CONFLICT, "Number of connectors is at maximum.");
}

// Trim leading and trailing whitespaces from the connector name, replace null with empty string
// if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
// allows null values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public void testRestForwardToLeader(boolean dualListener, boolean followerSsl, b
DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(dualListener, followerSsl));
DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(dualListener, leaderSsl));

// Follower and leader have both room for connectors
when(followerHerder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);
when(leaderHerder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

// Follower worker setup
RestClient followerClient = new RestClient(followerConfig);
followerServer = new ConnectRestServer(null, followerClient, followerConfig.originals());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,41 @@ public void testConfigValidationAllOverride() {
verifyValidationIsolation();
}

@Test
public void testConnectorCountMaximumNotExceededValidation() {
final ClusterConfigState clusterConfigStateMock = mock(ClusterConfigState.class);
final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getMaxConnectorsCount()).thenReturn(1);
when(configStore.snapshot()).thenReturn(clusterConfigStateMock);
when(clusterConfigStateMock.connectors()).thenReturn(Collections.emptySet());
assertTrue(herder.validateConnectorNumberLimitNotExceeded());
verify(configStore, times(1)).snapshot();
verify(clusterConfigStateMock, times(1)).connectors();
}

@Test
public void testConnectorCountMaximumExceededValidation() {
final ClusterConfigState clusterConfigStateMock = mock(ClusterConfigState.class);
final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getMaxConnectorsCount()).thenReturn(1);
when(configStore.snapshot()).thenReturn(clusterConfigStateMock);
when(clusterConfigStateMock.connectors()).thenReturn(Collections.singleton("one-connector-allowed"));
assertFalse(herder.validateConnectorNumberLimitNotExceeded());
verify(configStore, times(1)).snapshot();
verify(clusterConfigStateMock, times(1)).connectors();
}

@Test
public void testConnectorCountMaximumUnlimitedValidation() {
final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy());
when(worker.config()).thenReturn(workerConfig);
when(workerConfig.getMaxConnectorsCount()).thenReturn(0);
assertTrue(herder.validateConnectorNumberLimitNotExceeded());
verify(configStore, never()).snapshot();
}

@Test
public void testReverseTransformConfigs() {
// Construct a task config with constant values for TEST_KEY and TEST_KEY2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -292,58 +293,67 @@ public void testExpandConnectorsWithConnectorNotFound() {
public void testCreateConnector() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithPausedInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.PAUSED);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.PAUSED), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithStoppedInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.STOPPED);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STOPPED), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithRunningInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.RUNNING);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STARTED), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorNotLeader() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackNotLeaderException(cb).when(herder)
Expand All @@ -352,12 +362,15 @@ public void testCreateConnectorNotLeader() throws Throwable {
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
.thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorWithHeaders() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
expectAndCallbackNotLeaderException(cb)
Expand All @@ -366,17 +379,20 @@ public void testCreateConnectorWithHeaders() throws Throwable {
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
connectorsResource.createConnector(FORWARD, httpHeaders, body);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorExists() {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new AlreadyExistsException("already exists"))
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());
assertThrows(AlreadyExistsException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body));
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand All @@ -386,13 +402,15 @@ public void testCreateConnectorNameTrimWhitespaces() throws Throwable {
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand All @@ -402,13 +420,15 @@ public void testCreateConnectorNameAllWhitespaces() throws Throwable {
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand All @@ -418,13 +438,26 @@ public void testCreateConnectorNoName() throws Throwable {
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());

connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
public void testCreateConnectorMaximumExceeded() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(false);
ConnectRestException conflictException = assertThrows(ConnectRestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body));
assertEquals(Response.Status.CONFLICT.getStatusCode(), conflictException.statusCode());
assertEquals("Number of connectors is at maximum.", conflictException.getMessage());
verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded();
}

@Test
Expand Down Expand Up @@ -535,6 +568,7 @@ public void testCreateConnectorWithSpecialCharsInName() throws Throwable {
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_SPECIAL_CHARS, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), eq(body.config()), isNull(), eq(false), cb.capture());
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Expand All @@ -550,6 +584,7 @@ public void testCreateConnectorWithControlSequenceInName() throws Throwable {
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), eq(body.config()), isNull(), eq(false), cb.capture());
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);

String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Expand Down Expand Up @@ -595,6 +630,7 @@ public void testCreateConnectorConfigNameMismatch() {
Map<String, String> connConfig = new HashMap<>();
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig, null);
when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true);
assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
}

Expand Down
Loading