diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index c4f53ad413711..ba78464024175 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -599,6 +599,11 @@ public void validateConnectorConfig(Map connectorProps, Callback }); } + @Override + public boolean validateConnectorNumberLimitNotExceeded() { + return worker.config().getMaxConnectorsCount() == 0 || connectors().size() < worker.config().getMaxConnectorsCount(); + } + /** * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request * and the current status of the connector and task instances. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 859e3f2728e12..063ff41c7d41e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -245,6 +245,13 @@ default void validateConnectorConfig(Map connectorConfig, Callba validateConnectorConfig(connectorConfig, callback); } + /** + * Validate that configured maximum number of connectors is not exceeded. + * See {@link WorkerConfig#MAX_CONNECTORS_COUNT} + * @return True if there is still room for instantiating a connector. False if maximum is reached. + */ + boolean validateConnectorNumberLimitNotExceeded(); + /** * Restart the task with the given id. * @param id id of the task diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index a68cdb4ea03d0..0e516b8884573 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -196,6 +196,10 @@ public class WorkerConfig extends AbstractConfig { + "to create topics automatically."; protected static final boolean TOPIC_CREATION_ENABLE_DEFAULT = true; + public static final String MAX_CONNECTORS_COUNT = "connect.max.connectors"; + protected static final String MAX_CONNECTORS_COUNT_DOC = "Maximum number of connectors allowed to be instantiated. Defaults to 0 which means unlimited."; + protected static final int MAX_CONNECTORS_COUNT_DEFAULT = 0; + /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to * bootstrap their own ConfigDef. @@ -264,6 +268,8 @@ protected static ConfigDef baseConfigDef() { Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC) .define(TOPIC_CREATION_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_CREATION_ENABLE_DEFAULT, Importance.LOW, TOPIC_CREATION_ENABLE_DOC) + .define(MAX_CONNECTORS_COUNT, Type.INT, MAX_CONNECTORS_COUNT_DEFAULT, Importance.LOW, + MAX_CONNECTORS_COUNT_DOC) // security support .withClientSslSupport(); addTopicTrackingConfig(result); @@ -428,6 +434,10 @@ public String kafkaClusterId() { return kafkaClusterId; } + public int getMaxConnectorsCount() { + return getInt(MAX_CONNECTORS_COUNT); + } + @Override protected Map postProcessParsedConfig(final Map parsedValues) { return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index efbf39d790bef..938f581dcf5fa 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -140,6 +140,10 @@ public Response listConnectors( public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final @Context HttpHeaders headers, final CreateConnectorRequest createRequest) throws Throwable { + if (!herder.validateConnectorNumberLimitNotExceeded()) { + throw new ConnectRestException(Response.Status.CONFLICT, "Number of connectors is at maximum."); + } + // Trim leading and trailing whitespaces from the connector name, replace null with empty string // if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars // allows null values) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java index 81b18d03442ff..9fd6219d36e0a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java @@ -162,6 +162,10 @@ public void testRestForwardToLeader(boolean dualListener, boolean followerSsl, b DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(dualListener, followerSsl)); DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(dualListener, leaderSsl)); + // Follower and leader have both room for connectors + when(followerHerder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); + when(leaderHerder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); + // Follower worker setup RestClient followerClient = new RestClient(followerConfig); followerServer = new ConnectRestServer(null, followerClient, followerConfig.originals()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 01a9bcf6373e4..b0e095262d6fe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -774,6 +774,41 @@ public void testConfigValidationAllOverride() { verifyValidationIsolation(); } + @Test + public void testConnectorCountMaximumNotExceededValidation() { + final ClusterConfigState clusterConfigStateMock = mock(ClusterConfigState.class); + final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy()); + when(worker.config()).thenReturn(workerConfig); + when(workerConfig.getMaxConnectorsCount()).thenReturn(1); + when(configStore.snapshot()).thenReturn(clusterConfigStateMock); + when(clusterConfigStateMock.connectors()).thenReturn(Collections.emptySet()); + assertTrue(herder.validateConnectorNumberLimitNotExceeded()); + verify(configStore, times(1)).snapshot(); + verify(clusterConfigStateMock, times(1)).connectors(); + } + + @Test + public void testConnectorCountMaximumExceededValidation() { + final ClusterConfigState clusterConfigStateMock = mock(ClusterConfigState.class); + final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy()); + when(worker.config()).thenReturn(workerConfig); + when(workerConfig.getMaxConnectorsCount()).thenReturn(1); + when(configStore.snapshot()).thenReturn(clusterConfigStateMock); + when(clusterConfigStateMock.connectors()).thenReturn(Collections.singleton("one-connector-allowed")); + assertFalse(herder.validateConnectorNumberLimitNotExceeded()); + verify(configStore, times(1)).snapshot(); + verify(clusterConfigStateMock, times(1)).connectors(); + } + + @Test + public void testConnectorCountMaximumUnlimitedValidation() { + final AbstractHerder herder = testHerder(new AllConnectorClientConfigOverridePolicy()); + when(worker.config()).thenReturn(workerConfig); + when(workerConfig.getMaxConnectorsCount()).thenReturn(0); + assertTrue(herder.validateConnectorNumberLimitNotExceeded()); + verify(configStore, never()).snapshot(); + } + @Test public void testReverseTransformConfigs() { // Construct a task config with constant values for TEST_KEY and TEST_KEY2 diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 9dfead77220f6..9dee2b5b9b360 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -78,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; @@ -292,6 +293,7 @@ public void testExpandConnectorsWithConnectorNotFound() { public void testCreateConnector() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, @@ -299,12 +301,14 @@ public void testCreateConnector() throws Throwable { ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test public void testCreateConnectorWithPausedInitialState() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.PAUSED); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, @@ -312,12 +316,14 @@ public void testCreateConnectorWithPausedInitialState() throws Throwable { ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.PAUSED), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test public void testCreateConnectorWithStoppedInitialState() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.STOPPED); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, @@ -325,12 +331,14 @@ public void testCreateConnectorWithStoppedInitialState() throws Throwable { ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STOPPED), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test public void testCreateConnectorWithRunningInitialState() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.RUNNING); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, @@ -338,12 +346,14 @@ public void testCreateConnectorWithRunningInitialState() throws Throwable { ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STARTED), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test public void testCreateConnectorNotLeader() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackNotLeaderException(cb).when(herder) @@ -352,12 +362,15 @@ public void testCreateConnectorNotLeader() throws Throwable { when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any())) .thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test public void testCreateConnectorWithHeaders() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); + final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); HttpHeaders httpHeaders = mock(HttpHeaders.class); expectAndCallbackNotLeaderException(cb) @@ -366,17 +379,20 @@ public void testCreateConnectorWithHeaders() throws Throwable { when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any())) .thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.createConnector(FORWARD, httpHeaders, body); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test public void testCreateConnectorExists() { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackException(cb, new AlreadyExistsException("already exists")) .when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture()); assertThrows(AlreadyExistsException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body)); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test @@ -386,6 +402,7 @@ public void testCreateConnectorNameTrimWhitespaces() throws Throwable { Map inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME); final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig, null); final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), @@ -393,6 +410,7 @@ public void testCreateConnectorNameTrimWhitespaces() throws Throwable { ).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test @@ -402,6 +420,7 @@ public void testCreateConnectorNameAllWhitespaces() throws Throwable { Map inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME); final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig, null); final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), @@ -409,6 +428,7 @@ public void testCreateConnectorNameAllWhitespaces() throws Throwable { ).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test @@ -418,6 +438,7 @@ public void testCreateConnectorNoName() throws Throwable { Map inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME); final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig, null); final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), @@ -425,6 +446,18 @@ public void testCreateConnectorNoName() throws Throwable { ).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); + } + + @Test + public void testCreateConnectorMaximumExceeded() throws Throwable { + CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, + Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(false); + ConnectRestException conflictException = assertThrows(ConnectRestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body)); + assertEquals(Response.Status.CONFLICT.getStatusCode(), conflictException.statusCode()); + assertEquals("Number of connectors is at maximum.", conflictException.getMessage()); + verify(herder, atMost(1)).validateConnectorNumberLimitNotExceeded(); } @Test @@ -535,6 +568,7 @@ public void testCreateConnectorWithSpecialCharsInName() throws Throwable { expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_SPECIAL_CHARS, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)) ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), eq(body.config()), isNull(), eq(false), cb.capture()); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString(); String decoded = new URI(rspLocation).getPath(); @@ -550,6 +584,7 @@ public void testCreateConnectorWithControlSequenceInName() throws Throwable { expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)) ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), eq(body.config()), isNull(), eq(false), cb.capture()); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString(); String decoded = new URI(rspLocation).getPath(); @@ -595,6 +630,7 @@ public void testCreateConnectorConfigNameMismatch() { Map connConfig = new HashMap<>(); connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name"); CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig, null); + when(herder.validateConnectorNumberLimitNotExceeded()).thenReturn(true); assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request)); }