From c5f6bfc5d794d77410519ec4ffa5e5b3163a0288 Mon Sep 17 00:00:00 2001 From: kalencaya <1942460489@qq.com> Date: Sat, 14 Oct 2023 21:21:14 +0800 Subject: [PATCH] [Feature][scaleph-engine-sql-gateway] add flink sql gateway session implementions (#628) * feature: add idea formatter onoff comment * feature: add flink sql gateway session * feature: add flink sql gateway session * feature: add flink sql gateway session --- .../scaleph/api/config/WebSecurityConfig.java | 46 ++-- .../ws/WsFlinkSqlGatewayController.java | 8 +- .../master/ws/WsFlinkSqlGatewaySession.java | 49 ++++ .../ws/WsFlinkSqlGatewaySessionMapper.java | 33 +++ .../ws/WsFlinkSqlGatewaySessionMapper.xml | 43 ++++ .../environment/TableEnvironmentProvider.java | 25 ++ .../TableEnvironmentProviderImpl.java | 36 +++ .../internal/ScalephSqlGatewayService.java | 162 +++++++++++++ .../internal/ScalephSqlGatewaySession.java | 72 ------ .../ScalephSqlGatewaySessionManager.java | 132 ----------- .../sql/gateway/services/SessionService.java | 75 +++--- .../services/WsFlinkSqlGatewayService.java | 4 +- .../services/dto/FlinkSqlGatewaySession.java | 43 ++++ .../services/impl/SessionServiceImpl.java | 222 ++++++++++++++++++ .../impl/WsFlinkSqlGatewayServiceImpl.java | 4 +- .../WsFlinkSqlGatewayCreateCatalogParam.java} | 4 +- .../WsFlinkSqlGatewayQueryParam.java} | 4 +- .../docker/mysql/init.d/scaleph-ws-mysql.sql | 28 ++- 18 files changed, 716 insertions(+), 274 deletions(-) create mode 100644 scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewaySession.java create mode 100644 scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.java create mode 100644 scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.xml create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java delete mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySession.java delete mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java create mode 100644 scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java rename scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/{dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java => param/WsFlinkSqlGatewayCreateCatalogParam.java} (92%) rename scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/{dto/WsFlinkSqlGatewayQueryParamsDTO.java => param/WsFlinkSqlGatewayQueryParam.java} (92%) diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/WebSecurityConfig.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/WebSecurityConfig.java index 1511c458a..af4b0ecc9 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/WebSecurityConfig.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/config/WebSecurityConfig.java @@ -84,48 +84,48 @@ public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { } } + // @formatter:off http //禁用cors .csrf().disable() + //.addFilterBefore(corsFilter, UsernamePasswordAuthenticationFilter.class) .sessionManagement() - .sessionCreationPolicy(SessionCreationPolicy.STATELESS) + .sessionCreationPolicy(SessionCreationPolicy.STATELESS) + .and() //禁用iframe - .and() .headers() - .frameOptions() - .disable() + .frameOptions().disable() + .and() //请求权限配置 - .and() .authorizeRequests() - //放行endpoint - .requestMatchers(EndpointRequest.toAnyEndpoint()).permitAll() - //自定义匿名访问url - .antMatchers(anonymousUrls.toArray(new String[0])).permitAll() - //静态资源 - .antMatchers(HttpMethod.GET, "/**/*.css", "/**/*.js", "/**/*.png", + //放行endpoint + .requestMatchers(EndpointRequest.toAnyEndpoint()).permitAll() + //自定义匿名访问url + .antMatchers(anonymousUrls.toArray(new String[0])).permitAll() + //静态资源 + .antMatchers(HttpMethod.GET, "/**/*.css", "/**/*.js", "/**/*.png", "/**/*.woff", "/**/*.woff2", "/**/*.svg", "/**/*.json", "/**/*.ttf", "/**/*.ico", - "/index.html" - ).permitAll() - .antMatchers("/swagger**/**", "/doc.html", "/v3/**", "/webjars/**").permitAll() - .antMatchers("/ui/**") - .permitAll() - //放行options请求 - .antMatchers(HttpMethod.OPTIONS, "/**").permitAll() - .anyRequest().authenticated() - + "/index.html").permitAll() + .antMatchers("/swagger**/**", "/doc.html", "/v3/**", "/webjars/**").permitAll() + .antMatchers("/ui/**").permitAll() + //放行options请求 + .antMatchers(HttpMethod.OPTIONS, "/**").permitAll() + .anyRequest().authenticated() .and() + .exceptionHandling() - .authenticationEntryPoint(customAuthenticationEntryPoint) - .accessDeniedHandler(customAccessDeniedHandler) + .authenticationEntryPoint(customAuthenticationEntryPoint) + .accessDeniedHandler(customAccessDeniedHandler) + .and() //禁用session - .and() .apply(tokenConfigurer) ; + // @formatter:on return http.build(); } diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java index 3d9efc230..521be0eca 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java @@ -19,8 +19,8 @@ package cn.sliew.scaleph.api.controller.ws; import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayCreateCatalogParamsDTO; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayQueryParamsDTO; +import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayCreateCatalogParam; +import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayQueryResultDTO; import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; import io.swagger.v3.oas.annotations.Operation; @@ -73,7 +73,7 @@ public ResponseEntity> getCatalogInfo(@PathVariable("clusterId" @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), }) public ResponseEntity executeSql(@PathVariable("clusterId") String clusterId, - @RequestBody WsFlinkSqlGatewayQueryParamsDTO params) { + @RequestBody WsFlinkSqlGatewayQueryParam params) { return ResponseEntity.ok(wsFlinkSqlGatewayService.executeSql(clusterId, params)); } @@ -144,7 +144,7 @@ public ResponseEntity addDependencies(@PathVariable("clusterId") String @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), }) public ResponseEntity addCatalog(@PathVariable("clusterId") String clusterId, - @RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params) { + @RequestBody WsFlinkSqlGatewayCreateCatalogParam params) { return ResponseEntity.ok(wsFlinkSqlGatewayService.addCatalog(clusterId, params.getCatalogName(), params.getOptions())); } diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewaySession.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewaySession.java new file mode 100644 index 000000000..62955b92b --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkSqlGatewaySession.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.entity.master.ws; + +import cn.sliew.scaleph.dao.entity.BaseDO; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + *

+ * flink sql gateway session + *

+ */ +@Data +@TableName("ws_flink_sql_gateway_session") +public class WsFlinkSqlGatewaySession extends BaseDO { + + private static final long serialVersionUID = 1L; + + @TableField("session_handler") + private String sessionHandler; + + @TableField("session_name") + private String sessionName; + + @TableField("session_config") + private String sessionConfig; + + @TableField("default_catalog") + private String defaultCatalog; + +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.java new file mode 100644 index 000000000..664fe422d --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.mapper.master.ws; + +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewaySession; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.springframework.stereotype.Repository; + +/** + *

+ * flink sql gateway session Mapper 接口 + *

+ */ +@Repository +public interface WsFlinkSqlGatewaySessionMapper extends BaseMapper { + +} diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.xml new file mode 100644 index 000000000..b31b67229 --- /dev/null +++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkSqlGatewaySessionMapper.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + id, + creator, + create_time, + editor, + update_time, + session_handler, session_name, session_config, default_catalog + + diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java new file mode 100644 index 000000000..100ebd31b --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.environment; + +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; + +public interface TableEnvironmentProvider { + + TableEnvironmentInternal getTableEnvironment(FlinkSqlGatewaySession session); +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java new file mode 100644 index 000000000..72f274ec5 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/environment/TableEnvironmentProviderImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.environment; + +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.gateway.service.operation.OperationExecutor; + +/** + * @see OperationExecutor#getTableEnvironment() + * @see org.apache.flink.table.api.internal.TableEnvironmentInternal#create(EnvironmentSettings) + * @see org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#create(EnvironmentSettings) + */ +public class TableEnvironmentProviderImpl implements TableEnvironmentProvider { + + @Override + public TableEnvironmentInternal getTableEnvironment(FlinkSqlGatewaySession session) { + // todo to do + return null; + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java new file mode 100644 index 000000000..5f4aee449 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewayService.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.internal; + +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.*; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +@Slf4j +public class ScalephSqlGatewayService implements SqlGatewayService { + + private SessionService sessionService; + + public ScalephSqlGatewayService(SessionService sessionService) { + this.sessionService = sessionService; + } + + @Override + public SessionHandle openSession(SessionEnvironment sessionEnvironment) throws SqlGatewayException { + return sessionService.openSession(sessionEnvironment); + } + + @Override + public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException { + sessionService.closeSession(sessionHandle); + } + + @Override + public void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException { + sessionService.configureSession(sessionHandle, statement, executionTimeoutMs); + } + + @Override + public Map getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException { + return sessionService.getSessionConfig(sessionHandle); + } + + @Override + public EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException { + return sessionService.getSessionEndpointVersion(sessionHandle); + } + + @Override + public OperationHandle submitOperation(SessionHandle sessionHandle, Callable callable) throws SqlGatewayException { + return null; + } + + @Override + public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + + } + + @Override + public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + + } + + @Override + public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + return null; + } + + @Override + public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + return null; + } + + @Override + public OperationHandle executeStatement(SessionHandle sessionHandle, String s, long l, Configuration configuration) throws SqlGatewayException { + return null; + } + + @Override + public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long l, int i) throws SqlGatewayException { + return null; + } + + @Override + public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation fetchOrientation, int i) throws SqlGatewayException { + return null; + } + + @Override + public String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException { + return null; + } + + @Override + public Set listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException { + return null; + } + + @Override + public Set listDatabases(SessionHandle sessionHandle, String s) throws SqlGatewayException { + return null; + } + + @Override + public Set listTables(SessionHandle sessionHandle, String s, String s1, Set set) throws SqlGatewayException { + return null; + } + + @Override + public ResolvedCatalogBaseTable getTable(SessionHandle sessionHandle, ObjectIdentifier objectIdentifier) throws SqlGatewayException { + return null; + } + + @Override + public Set listUserDefinedFunctions(SessionHandle sessionHandle, String s, String s1) throws SqlGatewayException { + return null; + } + + @Override + public Set listSystemFunctions(SessionHandle sessionHandle) throws SqlGatewayException { + return null; + } + + @Override + public FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier unresolvedIdentifier) throws SqlGatewayException { + return null; + } + + @Override + public GatewayInfo getGatewayInfo() { + return GatewayInfo.INSTANCE; + } + + @Override + public List completeStatement(SessionHandle sessionHandle, String s, int i) throws SqlGatewayException { + return null; + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySession.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySession.java deleted file mode 100644 index 28c90a0fe..000000000 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySession.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.engine.sql.gateway.internal; - -import org.apache.flink.table.gateway.service.context.SessionContext; - -import java.time.Duration; -import java.util.UUID; - -public class ScalephSqlGatewaySession { - private long lastAccessTime; - private SessionContext sessionContext; - - public static ScalephSqlGatewaySession create(SessionContext sessionContext) { - return new ScalephSqlGatewaySession(sessionContext); - } - - ScalephSqlGatewaySession(SessionContext sessionContext) { - this.lastAccessTime = System.currentTimeMillis(); - this.sessionContext = sessionContext; - } - - public void touch() { - this.lastAccessTime = System.currentTimeMillis(); - } - - public long getLastAccessTime() { - return lastAccessTime; - } - - public ScalephSqlGatewaySession setLastAccessTime(long lastAccessTime) { - this.lastAccessTime = lastAccessTime; - return this; - } - - public SessionContext getSessionContext() { - return sessionContext; - } - - public ScalephSqlGatewaySession setSessionContext(SessionContext sessionContext) { - this.sessionContext = sessionContext; - return this; - } - - public boolean isExpired(Duration duration) { - return System.currentTimeMillis() - lastAccessTime > duration.toMillis(); - } - - public void close() { - this.sessionContext.close(); - } - - public UUID getIdentifier() { - return sessionContext.getSessionId().getIdentifier(); - } -} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java deleted file mode 100644 index 16c2a5419..000000000 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.engine.sql.gateway.internal; - -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.gateway.api.results.GatewayInfo; -import org.apache.flink.table.gateway.api.session.SessionEnvironment; -import org.apache.flink.table.gateway.api.session.SessionHandle; -import org.apache.flink.table.gateway.api.utils.ThreadUtils; -import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; -import org.apache.flink.table.gateway.service.context.DefaultContext; -import org.apache.flink.table.gateway.service.context.SessionContext; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL; -import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT; -import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; -import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; -import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN; - -public class ScalephSqlGatewaySessionManager { - private final Map sessions = new ConcurrentHashMap<>(); - private final DefaultContext defaultContext; - private final ReadableConfig readableConfig; - private ScheduledExecutorService cleanupService; - private ExecutorService operationExecutorService; - - public ScalephSqlGatewaySessionManager(DefaultContext defaultContext) { - this.defaultContext = defaultContext; - this.readableConfig = defaultContext.getFlinkConfig(); - } - - public void start() { - this.cleanupService = Executors.newSingleThreadScheduledExecutor(); - cleanupService.scheduleAtFixedRate(() -> { - for (Map.Entry entry : sessions.entrySet()) { - SessionHandle sessionHandle = entry.getKey(); - ScalephSqlGatewaySession session = entry.getValue(); - if (session.isExpired(readableConfig.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT))) { - closeSession(sessionHandle); - } - } - }, 0, - readableConfig.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis() - , TimeUnit.MILLISECONDS); - this.operationExecutorService = - ThreadUtils.newThreadPool( - readableConfig.get(SQL_GATEWAY_WORKER_THREADS_MIN), - readableConfig.get(SQL_GATEWAY_WORKER_THREADS_MAX), - readableConfig.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(), - "scaleph-sql-gateway-pool"); - } - - public ScalephSqlGatewaySession openSession() { - SessionHandle sessionHandle; - do { - sessionHandle = SessionHandle.create(); - } while (sessions.containsKey(sessionHandle)); - SessionContext sessionContext = SessionContext.create(defaultContext, - sessionHandle, - SessionEnvironment.newBuilder() - .setSessionName(UUID.randomUUID().toString()) - .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V2) - .build(), - operationExecutorService - ); - ScalephSqlGatewaySession session = ScalephSqlGatewaySession.create(sessionContext); - sessions.put(sessionHandle, session); - return session; - } - - public ScalephSqlGatewaySession getSession(SessionHandle sessionHandle) { - if (sessions.containsKey(sessionHandle)) { - ScalephSqlGatewaySession session = sessions.get(sessionHandle); - session.touch(); - return session; - } else { - throw new IllegalArgumentException("Session not exists!"); - } - } - - public boolean sessionExists(SessionHandle sessionHandle) { - return sessions.containsKey(sessionHandle); - } - - public void closeSession(SessionHandle sessionHandle) { - if (sessionExists(sessionHandle)) { - ScalephSqlGatewaySession session = getSession(sessionHandle); - session.close(); - sessions.remove(sessionHandle); - } - } - - public void stop() { - for (Map.Entry entry : sessions.entrySet()) { - closeSession(entry.getKey()); - } - if (this.cleanupService != null) { - this.cleanupService.shutdown(); - } - if (this.operationExecutorService != null) { - this.operationExecutorService.shutdown(); - } - } - - public GatewayInfo getGatewayInfo() { - return GatewayInfo.INSTANCE; - } -} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java index 9ffa7a75e..23d0f13dd 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/SessionService.java @@ -18,43 +18,60 @@ package cn.sliew.scaleph.engine.sql.gateway.services; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.session.Session; import java.util.Map; -/** - * 多级的 session 管理。 - * 默认配置。scaleph 系统级别 - * 全局配置。用户全局配置级别 - * 用户级别。和用户 id 关联 || 项目级别。和项目 id 关联 - * - * SessionContext 中信息分为配置信息和状态信息。配置来自 SessoinEnvironment 和 DefaultContext 类的配置信息 - * SessionState 和 OperationManager 信息属于状态信息。SessionState 信息可以通过配置信息重建,OperationManager 信息由 - * OperationService 类管理 - */ public interface SessionService { - // ------------------------------------------------------------------------------------------- - // Global Session - // ------------------------------------------------------------------------------------------- - - Map getGlobalSessionConfig() throws Exception; - - void configureGlobalSession(Map config) throws Exception; - - void configureGlobalSession(String statement) throws Exception; - - // ------------------------------------------------------------------------------------------- - // User Session || Project Session - // ------------------------------------------------------------------------------------------- - - Session openSession() throws Exception; + /** + * Open the {@code Session}. + * + * @param environment Environment to initialize the Session. + * @return Returns a handle that used to identify the Session. + */ + SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException; - void closeSession() throws Exception; + /** + * Close the {@code Session}. + * + * @param sessionHandle handle to identify the Session needs to be closed. + */ + void closeSession(SessionHandle sessionHandle) throws SqlGatewayException; - Map getSessionConfig() throws Exception; + /** + * Using the statement to initialize the Session. It's only allowed to execute + * SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR. + * + *

It returns until the execution finishes. + * + * @param sessionHandle handle to identify the session. + * @param statement the statement used to configure the session. + * @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the + * timeout mechanism. + */ + void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) + throws SqlGatewayException; - void configureSession(Map config) throws Exception; + /** + * Get the current configuration of the {@code Session}. + * + * @param sessionHandle handle to identify the session. + * @return Returns configuration of the session. + */ + Map getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException; - void configureSession(String statement) throws Exception; + /** + * Get endpoint version that is negotiated in the openSession. + * + * @param sessionHandle handle to identify the session. + * @return Returns the version. + */ + EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) + throws SqlGatewayException; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java index 1160f6a4a..b27ac0a55 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java @@ -18,7 +18,7 @@ package cn.sliew.scaleph.engine.sql.gateway.services; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayQueryParamsDTO; +import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephCatalogManager; import org.apache.flink.table.gateway.api.results.GatewayInfo; @@ -83,7 +83,7 @@ public interface WsFlinkSqlGatewayService { * @param params Sql query params * @return Operation handle id {@link org.apache.flink.table.gateway.api.operation.OperationHandle} */ - String executeSql(String clusterId, WsFlinkSqlGatewayQueryParamsDTO params); + String executeSql(String clusterId, WsFlinkSqlGatewayQueryParam params); /** * Fetch sql result diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java new file mode 100644 index 000000000..846932418 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/FlinkSqlGatewaySession.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.dto; + +import cn.sliew.scaleph.system.model.BaseDTO; +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Data; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.service.context.SessionContext; + +import java.util.Map; + +/** + * org.apache.flink.table.gateway.service.session.Session + */ +@Data +public class FlinkSqlGatewaySession extends BaseDTO implements AutoCloseable { + + private SessionHandle sessionHandle; + private Map sessionConfig; + + @JsonIgnore + private SessionContext sessionContext; + + @Override + public void close() { + sessionContext.close(); + } +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java new file mode 100644 index 000000000..3d3365b73 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/SessionServiceImpl.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.services.impl; + +import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewaySession; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkSqlGatewaySessionMapper; +import cn.sliew.scaleph.engine.sql.gateway.services.SessionService; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.fasterxml.jackson.core.type.TypeReference; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * org.apache.flink.table.gateway.service.session.SessionManager + */ +@Slf4j +@Service +public class SessionServiceImpl implements SessionService, InitializingBean, DisposableBean { + + private ExecutorService operationExecutorService; + private DefaultContext defaultContext; + + private LoadingCache sessions = Caffeine.newBuilder() + .maximumSize(100) + .expireAfterAccess(Duration.ofDays(3L)) + .evictionListener((SessionHandle sessionHandle, FlinkSqlGatewaySession session, RemovalCause removalCause) -> doCloseSession(sessionHandle, session, removalCause)) + .build(sessionHandle -> doGetSession(sessionHandle)); + + @Autowired + private WsFlinkSqlGatewaySessionMapper wsFlinkSqlGatewaySessionMapper; + + /** + * session 的配置 2 级:sql gateway 实例级和 session 级别 + * DefaultContext 相当于实例级别,而 sessionconfig 相当于 session 级别 + */ + @Override + public void afterPropertiesSet() throws Exception { + this.operationExecutorService = Executors.newFixedThreadPool(4); + this.defaultContext = DefaultContext.load(new Configuration(), Collections.emptyList(), false, false); + // 加载所有的 session + List wsFlinkSqlGatewaySessions = wsFlinkSqlGatewaySessionMapper.selectList(Wrappers.emptyWrapper()); + wsFlinkSqlGatewaySessions.stream().map(this::convertSession).forEach(session -> sessions.put(session.getSessionHandle(), session)); + } + + @Override + public void destroy() throws Exception { + sessions.cleanUp(); + } + + public FlinkSqlGatewaySession getSession(SessionHandle sessionHandle) throws SqlGatewayException { + return sessions.get(sessionHandle); + } + + public FlinkSqlGatewaySession doGetSession(SessionHandle sessionHandle) throws SqlGatewayException { + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkSqlGatewaySession.class) + .eq(WsFlinkSqlGatewaySession::getSessionHandler, sessionHandle.toString()); + WsFlinkSqlGatewaySession record = wsFlinkSqlGatewaySessionMapper.selectOne(queryWrapper); + if (record == null) { + String msg = String.format("Session '%s' does not exist.", sessionHandle); + log.warn(msg); + throw new SqlGatewayException(msg); + } + return convertSession(record); + } + + @Override + public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException { + return doOpenSession(environment).getSessionHandle(); + } + + public FlinkSqlGatewaySession doOpenSession(SessionEnvironment environment) throws SqlGatewayException { + SessionHandle sessionId = null; + boolean exist = true; + while (exist) { + sessionId = SessionHandle.create(); + try { + sessions.get(sessionId); + } catch (SqlGatewayException ignored) { + exist = false; + } + } + + WsFlinkSqlGatewaySession record = new WsFlinkSqlGatewaySession(); + record.setSessionHandler(sessionId.toString()); + environment.getSessionName().ifPresent(sessionName -> record.setSessionName(sessionName)); + environment.getDefaultCatalog().ifPresent(defaultCatalog -> record.setDefaultCatalog(defaultCatalog)); + if (CollectionUtils.isEmpty(environment.getSessionConfig()) == false) { + record.setSessionConfig(JacksonUtil.toJsonString(environment.getSessionConfig())); + } + wsFlinkSqlGatewaySessionMapper.insert(record); + log.info("Session {} is opened.", sessionId); + + return getSession(sessionId); + } + + @Override + public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException { + sessions.invalidate(sessionHandle); + } + + public void doCloseSession(SessionHandle sessionHandle, FlinkSqlGatewaySession session, RemovalCause removalCause) throws SqlGatewayException { + switch (removalCause) { + case EXPLICIT: + case SIZE: + case REPLACED: + case COLLECTED: + return; + case EXPIRED: + session.close(); + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkSqlGatewaySession.class) + .eq(WsFlinkSqlGatewaySession::getSessionHandler, sessionHandle.toString()); + wsFlinkSqlGatewaySessionMapper.delete(queryWrapper); + log.info("Session: {} is closed.", sessionHandle); + break; + default: + } + } + + /** + * forked from SqlGatewayServiceImpl + */ + @Override + public void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException { + try { + if (executionTimeoutMs > 0) { + // TODO: support the feature in FLINK-27838 + throw new UnsupportedOperationException( + "SqlGatewayService doesn't support timeout mechanism now."); + } + + OperationManager operationManager = getSession(sessionHandle).getSessionContext().getOperationManager(); + OperationHandle operationHandle = + operationManager.submitOperation( + handle -> + getSession(sessionHandle).getSessionContext().createOperationExecutor(getSession(sessionHandle).getSessionContext().getSessionConf()) + .configureSession(handle, statement)); + operationManager.awaitOperationTermination(operationHandle); + operationManager.closeOperation(operationHandle); + } catch (Throwable t) { + log.error("Failed to configure session.", t); + throw new SqlGatewayException("Failed to configure session.", t); + } + } + + @Override + public Map getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException { + return getSession(sessionHandle).getSessionContext().getSessionConf().toMap(); + } + + @Override + public EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException { + return SqlGatewayRestAPIVersion.V2; + } + + private FlinkSqlGatewaySession convertSession(WsFlinkSqlGatewaySession record) { + FlinkSqlGatewaySession session = new FlinkSqlGatewaySession(); + SessionHandle sessionId = new SessionHandle(UUID.fromString(record.getSessionHandler())); + session.setSessionHandle(sessionId); + + Map sessionConfig = Collections.emptyMap(); + if (StringUtils.hasText(record.getSessionConfig())) { + sessionConfig = JacksonUtil.parseJsonString(record.getSessionConfig(), new TypeReference>() { + }); + } + session.setSessionConfig(sessionConfig); + + SessionEnvironment environment = SessionEnvironment.newBuilder() + .setSessionName(record.getSessionName()) + .setDefaultCatalog(record.getDefaultCatalog()) + .addSessionConfig(sessionConfig) + .build(); + SessionContext sessionContext = SessionContext.create(defaultContext, sessionId, environment, operationExecutorService); + session.setSessionContext(sessionContext); + return session; + } + +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java index 99c42e293..6b8ff9c1c 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java @@ -19,7 +19,7 @@ package cn.sliew.scaleph.engine.sql.gateway.services.impl; import cn.sliew.scaleph.common.util.SystemUtil; -import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayQueryParamsDTO; +import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam; import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo; import cn.sliew.scaleph.engine.sql.gateway.exception.ScalephSqlGatewayNotFoundException; import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephCatalogManager; @@ -162,7 +162,7 @@ public Set getCatalogInfo(String clusterId, boolean includeSystemFu * @return */ @Override - public String executeSql(String clusterId, WsFlinkSqlGatewayQueryParamsDTO params) { + public String executeSql(String clusterId, WsFlinkSqlGatewayQueryParam params) { return getCatalogManager(clusterId) .orElseThrow(ScalephSqlGatewayNotFoundException::new) .executeStatement(params.getSql(), params.getConfiguration()); diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java similarity index 92% rename from scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java rename to scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java index 93f98de96..a25145c10 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayCreateCatalogParam.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package cn.sliew.scaleph.engine.sql.gateway.services.dto; +package cn.sliew.scaleph.engine.sql.gateway.services.param; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -29,7 +29,7 @@ @Data @EqualsAndHashCode @Schema(name = "SqlGateway创建Catalog的参数", description = "SqlGateway创建Catalog的参数") -public class WsFlinkSqlGatewayCreateCatalogParamsDTO { +public class WsFlinkSqlGatewayCreateCatalogParam { @NotBlank @Schema(description = "catalog 名称,不可为空") diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryParamsDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java similarity index 92% rename from scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryParamsDTO.java rename to scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java index d21d99209..7551c27d5 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryParamsDTO.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/param/WsFlinkSqlGatewayQueryParam.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package cn.sliew.scaleph.engine.sql.gateway.services.dto; +package cn.sliew.scaleph.engine.sql.gateway.services.param; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -28,7 +28,7 @@ @Data @EqualsAndHashCode @Schema(name = "SqlGateway执行Sql的参数", description = "SqlGateway执行Sql的参数") -public class WsFlinkSqlGatewayQueryParamsDTO { +public class WsFlinkSqlGatewayQueryParam { @NotNull @Schema(description = "sql") diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index d1d8a6c0b..e8b7b472f 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -70,7 +70,7 @@ create table ws_flink_artifact_jar editor varchar(32) comment '修改人', update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), - key idx_flink_artifact (flink_artifact_id) + key idx_flink_artifact (flink_artifact_id) ) engine = innodb comment = 'flink artifact jar'; DROP TABLE IF EXISTS ws_flink_artifact_sql; @@ -86,7 +86,7 @@ CREATE TABLE ws_flink_artifact_sql editor varchar(32), update_time datetime not null default current_timestamp on update current_timestamp, PRIMARY KEY (id), - key idx_flink_artifact (flink_artifact_id) + key idx_flink_artifact (flink_artifact_id) ) ENGINE = INNODB COMMENT = 'flink artifact sql'; INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`, @@ -139,7 +139,7 @@ create table ws_di_job editor varchar(32) comment '修改人', update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), - key idx_flink_artifact (flink_artifact_id) + key idx_flink_artifact (flink_artifact_id) ) engine = innodb comment '数据集成-作业信息'; INSERT INTO ws_di_job (id, flink_artifact_id, job_engine, job_id, current, creator, editor) VALUES (1, 4, 'seatunnel', 'b8e16c94-258c-4487-a88c-8aad40a38b35', 1, 'sys', 'sys'); @@ -471,7 +471,7 @@ create table ws_flink_custom_artifact editor varchar(32) comment '修改人', update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), - key idx_custom (flink_version, type, file_name) + key idx_custom (flink_version, type, file_name) ) engine = innodb comment = 'flink custom artifact'; drop table if exists ws_flink_custom_factory; @@ -487,5 +487,21 @@ create table ws_flink_custom_factory editor varchar(32) comment '修改人', update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', primary key (id), - key idx_custom (flink_custom_artifact_id, factory_class, `name`) -) engine = innodb comment = 'flink custom factory'; \ No newline at end of file + key idx_custom (flink_custom_artifact_id, factory_class, `name`) +) engine = innodb comment = 'flink custom factory'; + +drop table if exists ws_flink_sql_gateway_session; +create table ws_flink_sql_gateway_session +( + id bigint not null auto_increment comment '自增主键', + session_handler varchar(64) not null comment 'session handler', + session_name varchar(255) not null comment 'session name', + session_config text comment 'session config', + default_catalog varchar(255) comment 'default catalog', + creator varchar(32) comment '创建人', + create_time timestamp default current_timestamp comment '创建时间', + editor varchar(32) comment '修改人', + update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', + primary key (id), + unique key uniq_session (session_handler) +) engine = innodb comment = 'flink sql gateway session'; \ No newline at end of file