Skip to content

Commit

Permalink
[fix]Fix the first schema change does not take effect when the source…
Browse files Browse the repository at this point in the history
… table has no data (apache#354)
  • Loading branch information
DongLiang-0 authored Mar 28, 2024
1 parent a7ad42c commit 995da44
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
Expand All @@ -30,21 +31,30 @@
import org.apache.doris.flink.exception.ConnectedFailedException;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.DorisSchemaChangeException;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.exception.ShouldNeverHappenException;
import org.apache.doris.flink.rest.models.Backend;
import org.apache.doris.flink.rest.models.BackendRow;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;

import java.io.IOException;
Expand All @@ -56,12 +66,12 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -78,13 +88,13 @@ public class RestService implements Serializable {
public static final int REST_RESPONSE_STATUS_OK = 200;
public static final int REST_RESPONSE_CODE_OK = 0;
private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
@Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends";
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
private static final String FE_LOGIN = "/rest/v1/login";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";

/**
* send request to Doris FE and get response json string.
Expand Down Expand Up @@ -138,13 +148,9 @@ private static String send(
try {
String response;
if (request instanceof HttpGet) {
response =
getConnectionGet(
request, options.getUsername(), options.getPassword(), logger);
response = getConnectionGet(request, options, logger);
} else {
response =
getConnectionPost(
request, options.getUsername(), options.getPassword(), logger);
response = getConnectionPost(request, options, logger);
}
if (response == null) {
logger.warn(
Expand Down Expand Up @@ -177,17 +183,12 @@ private static String send(
}

private static String getConnectionPost(
HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
HttpRequestBase request, DorisOptions dorisOptions, Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
String authEncoding =
Base64.getEncoder()
.encodeToString(
String.format("%s:%s", user, passwd)
.getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.setRequestProperty("Authorization", authHeader(dorisOptions));
InputStream content = ((HttpPost) request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
Expand All @@ -204,16 +205,11 @@ private static String getConnectionPost(
}

private static String getConnectionGet(
HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
HttpRequestBase request, DorisOptions dorisOptions, Logger logger) throws IOException {
URL realUrl = new URL(request.getURI().toString());
// open connection
HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
String authEncoding =
Base64.getEncoder()
.encodeToString(
String.format("%s:%s", user, passwd)
.getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);
connection.setRequestProperty("Authorization", authHeader(dorisOptions));

connection.connect();
connection.setConnectTimeout(request.getConfig().getConnectTimeout());
Expand Down Expand Up @@ -323,14 +319,14 @@ static List<String> allEndpoints(String feNodes, Logger logger) {
public static String randomBackend(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException, IOException {
List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
List<BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
}

Expand Down Expand Up @@ -408,7 +404,7 @@ static List<BackendRow> parseBackend(String response, Logger logger)
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
public static List<BackendV2.BackendRowV2> getBackendsV2(
public static List<BackendRowV2> getBackendsV2(
DorisOptions options, DorisReadOptions readOptions, Logger logger) {
String feNodes = options.getFenodes();
List<String> feNodeList = allEndpoints(feNodes, logger);
Expand All @@ -423,7 +419,7 @@ public static List<BackendV2.BackendRowV2> getBackendsV2(
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, readOptions, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
List<BackendRowV2> backends = parseBackendV2(response, logger);
return backends;
} catch (ConnectedFailedException e) {
logger.info(
Expand All @@ -444,16 +440,16 @@ public static List<BackendV2.BackendRowV2> getBackendsV2(
* @param feNodeList
* @return
*/
private static List<BackendV2.BackendRowV2> convert(List<String> feNodeList) {
List<BackendV2.BackendRowV2> nodeList = new ArrayList<>();
private static List<BackendRowV2> convert(List<String> feNodeList) {
List<BackendRowV2> nodeList = new ArrayList<>();
for (String node : feNodeList) {
String[] split = node.split(":");
nodeList.add(BackendV2.BackendRowV2.of(split[0], Integer.valueOf(split[1]), true));
nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]), true));
}
return nodeList;
}

static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) {
static List<BackendRowV2> parseBackendV2(String response, Logger logger) {
ObjectMapper mapper = new ObjectMapper();
BackendV2 backend;
try {
Expand All @@ -476,32 +472,11 @@ static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logge
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
List<BackendRowV2> backendRows = backend.getBackends();
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}

/**
* get a valid URI to connect Doris FE.
*
* @param options configuration of request
* @param logger {@link Logger}
* @return uri string
* @throws IllegalArgumentException throw when configuration is illegal
*/
@VisibleForTesting
static String getUriStr(DorisOptions options, Logger logger) throws IllegalArgumentException {
String[] identifier = parseIdentifier(options.getTableIdentifier(), logger);
return "http://"
+ randomEndpoint(options.getFenodes(), logger)
+ API_PREFIX
+ "/"
+ identifier[0]
+ "/"
+ identifier[1]
+ "/";
}

/**
* discover Doris table schema from Doris FE.
*
Expand All @@ -514,12 +489,72 @@ public static Schema getSchema(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
String tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
HttpGet httpGet = new HttpGet(tableSchemaUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
}

public static Schema getSchema(
DorisOptions dorisOptions, String db, String table, Logger logger) {
logger.trace("start get " + db + "." + table + " schema from doris.");
Object responseData = null;
try {
String tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(dorisOptions.getFenodes(), logger),
db,
table);
HttpGetWithEntity httpGet = new HttpGetWithEntity(tableSchemaUri);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader(dorisOptions));
Map<String, Object> responseMap = handleResponse(httpGet, logger);
responseData = responseMap.get("data");
String schemaStr = objectMapper.writeValueAsString(responseData);
return objectMapper.readValue(schemaStr, Schema.class);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new DorisSchemaChangeException(
"can not parse response schema " + responseData, e);
}
}

private static Map handleResponse(HttpUriRequest request, Logger logger) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode == 200 && response.getEntity() != null) {
String responseEntity = EntityUtils.toString(response.getEntity());
return objectMapper.readValue(responseEntity, Map.class);
} else {
throw new DorisSchemaChangeException(
"Failed to schemaChange, status: "
+ statusCode
+ ", reason: "
+ reasonPhrase);
}
} catch (Exception e) {
logger.trace("SchemaChange request error,", e);
throw new DorisSchemaChangeException(
"SchemaChange request error with " + e.getMessage());
}
}

private static String authHeader(DorisOptions dorisOptions) {
return "Basic "
+ new String(
org.apache.commons.codec.binary.Base64.encodeBase64(
(dorisOptions.getUsername() + ":" + dorisOptions.getPassword())
.getBytes(StandardCharsets.UTF_8)));
}

public static boolean isUniqueKeyType(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisRuntimeException {
Expand Down Expand Up @@ -606,8 +641,14 @@ public static List<PartitionDefinition> findPartitions(
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);

HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
String queryPlanUri =
String.format(
QUERY_PLAN_API,
options.getFenodes(),
tableIdentifier[0],
tableIdentifier[1]);
HttpPost httpPost = new HttpPost(queryPlanUri);
String entity = "{\"sql\": \"" + sql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
Expand Down Expand Up @@ -683,7 +724,7 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws DorisExcept
static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger)
throws DorisException {
Map<String, List<Long>> be2Tablets = new HashMap<>();
for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
for (Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
logger.debug("Parse tablet info: '{}'.", part);
long tabletId;
try {
Expand Down Expand Up @@ -777,7 +818,7 @@ static List<PartitionDefinition> tabletsMapToPartition(
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
for (Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
logger.debug("Generate partition with beInfo: '{}'.", beInfo);
HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
beInfo.getValue().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,19 @@ public boolean checkSchemaChange(String database, String table, Map<String, Obje
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
return handleResponse(httpGet);
String responseEntity = "";
Map<String, Object> responseMap = handleResponse(httpGet, responseEntity);
return handleSchemaChange(responseMap, responseEntity);
}

private boolean handleSchemaChange(Map<String, Object> responseMap, String responseEntity) {
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
throw new DorisSchemaChangeException(
"Failed to schemaChange, response: " + responseEntity);
}
}

/** execute sql in doris. */
Expand All @@ -145,7 +157,9 @@ public boolean execute(String ddl, String database)
}
LOG.info("Execute SQL: {}", ddl);
HttpPost httpPost = buildHttpPost(ddl, database);
return handleResponse(httpPost);
String responseEntity = "";
Map<String, Object> responseMap = handleResponse(httpPost, responseEntity);
return handleSchemaChange(responseMap, responseEntity);
}

public HttpPost buildHttpPost(String ddl, String database)
Expand All @@ -164,21 +178,15 @@ public HttpPost buildHttpPost(String ddl, String database)
return httpPost;
}

private boolean handleResponse(HttpUriRequest request) {
private Map<String, Object> handleResponse(HttpUriRequest request, String responseEntity) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
throw new DorisSchemaChangeException(
"Failed to schemaChange, response: " + loadResult);
}
responseEntity = EntityUtils.toString(response.getEntity());
Map<String, Object> responseMap = objectMapper.readValue(responseEntity, Map.class);
return responseMap;
} else {
throw new DorisSchemaChangeException(
"Failed to schemaChange, status: "
Expand Down
Loading

0 comments on commit 995da44

Please sign in to comment.