Skip to content

Commit

Permalink
feat: 北极星旁路功能插件设计——支持 Local Plugin 以及 Remote Plugin 模式 polarismesh#757
Browse files Browse the repository at this point in the history
  • Loading branch information
edocevol committed May 27, 2023
1 parent 019e335 commit bd4dbea
Show file tree
Hide file tree
Showing 7 changed files with 526 additions and 17 deletions.
7 changes: 7 additions & 0 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/metrics"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/pluggable"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/common/version"
config_center "github.com/polarismesh/polaris/config"
Expand Down Expand Up @@ -98,6 +99,12 @@ func Start(configFilePath string) {
metrics.InitMetrics()
eventhub.InitEventHub()

// 加载可插拔插件
if err = pluggable.Discovery(ctx); err != nil {
fmt.Printf("[ERROR] discover pluggable plugin fail: %+v", err)
return
}

// 设置插件配置
plugin.SetPluginConfig(&cfg.Plugin)

Expand Down
116 changes: 116 additions & 0 deletions common/pluggable/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package pluggable

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/polaris-contrib/polaris-server-remote-plugin-common/api"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// SetupTimeout is the timeout for setting up a connection.
const SetupTimeout = 5 * time.Second

// GRPCConnectionDialer defines the function to dial a grpc connection.
type GRPCConnectionDialer func(ctx context.Context, name string) (*grpc.ClientConn, error)

// SocketDialContext dials a gRPC connection using a socket.
func SocketDialContext(ctx context.Context, socket string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
unixSock := "unix://" + socket

// disable TLS as default when using socket
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))

dialCtx, cancel := context.WithTimeout(ctx, SetupTimeout)
defer cancel()

grpcConn, err := grpc.DialContext(dialCtx, unixSock, opts...)
if err != nil {
return nil, err
}

return grpcConn, nil
}

// GRPCPluginClient defines the interface for a gRPC plugin client,
// polaris server will call the plugin client's Ping method to check if the plugin is alive.
type GRPCPluginClient interface {
// Ping checks if the plugin is alive.
Ping(ctx context.Context, in *api.PingRequest, opts ...grpc.CallOption) (*api.PongResponse, error)
}

// GRPCConnector defines the connector for a gRPC plugin.
type GRPCConnector struct {
// pluginClient is the client that is used to communicate with the plugin, exposed for plugin logic layer.
pluginClient GRPCPluginClient
// dialer use to dial a grpc connection.
dialer GRPCConnectionDialer
// conn is the grpc client connection.
conn *grpc.ClientConn
// clientFactory is the factory to create a grpc client.
clientFactory func(grpc.ClientConnInterface) GRPCPluginClient
}

// NewGRPCConnectorWithDialer creates a new grpc connector for the given client factory and dialer.
func NewGRPCConnectorWithDialer(
dialer GRPCConnectionDialer, factory func(grpc.ClientConnInterface) GRPCPluginClient) *GRPCConnector {
return &GRPCConnector{
dialer: dialer,
clientFactory: factory,
}
}

// Dial init a grpc connection to the plugin server and create a grpc client.
func (g *GRPCConnector) Dial(ctx context.Context, name string) error {
conn, err := g.dialer(ctx, name)
if err != nil {
return errors.Wrapf(err, "unable to open GRPC connection using the dialer")
}

g.conn = conn
g.pluginClient = g.clientFactory(conn)
return nil
}

// PluginClient returns the grpc client.
func (g *GRPCConnector) PluginClient() GRPCPluginClient {
return g.pluginClient
}

// socketDialer returns a GRPCConnectionDialer that dials a grpc connection using a socket.
func socketDialer(socket string, opts ...grpc.DialOption) GRPCConnectionDialer {
return func(ctx context.Context, name string) (*grpc.ClientConn, error) {
return SocketDialContext(ctx, socket, opts...)
}
}

// Ping checks if the plugin is alive.
func (g *GRPCConnector) Ping(ctx context.Context) error {
_, err := g.pluginClient.Ping(ctx, &api.PingRequest{}, grpc.WaitForReady(true))
return err
}

// Close closes the underlying gRPC connection.
func (g *GRPCConnector) Close() error {
return g.conn.Close()
}
180 changes: 180 additions & 0 deletions common/pluggable/pluggable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package pluggable

import (
"context"
"os"
"path/filepath"
"strings"

"github.com/jhump/protoreflect/grpcreflect"
"github.com/pkg/errors"
"google.golang.org/grpc"
_ "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
reflectV1Alpha "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

"github.com/polarismesh/polaris/common/log"
)

const (
// envVarPolarisPluggableFolder
envVarPolarisPluggableFolder string = "POLARIS_PLUGGABLE_SOCKETS_FOLDER"
// defaultPolarisPluggablePath
defaultPolarisPluggablePath = "/tmp/polaris-pluggable-sockets"
)

// onFinishedCallback is a callback to be called when a plugin is finished.
type onFinishedCallback func(name string, dialer GRPCConnectionDialer)

// onFinished
var onFinished = make(map[string]onFinishedCallback)

// AddOnFinished adds a callback to be called when a plugin is finished.
func AddOnFinished(serviceDesc string, cb onFinishedCallback) {
_, ok := onFinished[serviceDesc]
if ok {
log.Fatalf("onFinished callback for %s already exists", serviceDesc)
}
onFinished[serviceDesc] = cb
}

// Discovery discovers all the plugins.
func Discovery(ctx context.Context) error {
services, err := discovery(ctx)
if err != nil {
return err
}
finished(services)
return nil
}

// finished calls the onFinished callback for the given services.
func finished(services []*pluginService) {
for _, svc := range services {
cb, ok := onFinished[svc.protoRef]
if !ok {
continue
}

cb(svc.name, svc.dialer)
log.Infof("discovered pluggable component service: %s", svc.protoRef)
}
}

// pluginService is a plugin service.
type pluginService struct {
name string
protoRef string
dialer GRPCConnectionDialer
}

// discovery discovers all the plugins.
func discovery(ctx context.Context) ([]*pluginService, error) {
sockFolder := socketFolder()
files, err := pluginFiles(sockFolder)
if err != nil {
return nil, err
}

var services []*pluginService
for _, dirEntry := range files {
if dirEntry.IsDir() {
continue
}

var discoveredServices []*pluginService
discoveredServices, err = trySingleSocket(ctx, dirEntry, sockFolder)

// skip non-socket files.
if err == errNotSocket {
continue
}

// return error if any other error occurs.
if err != nil {
return nil, err
}

services = append(services, discoveredServices...)
}
return services, nil
}

// trySingleSocket tries to discover plugins in a single socket.
func trySingleSocket(ctx context.Context, entry os.DirEntry, socketsFolder string) ([]*pluginService, error) {
socket, err := socketName(entry)
if err != nil {
return nil, err
}

socketFullPath := filepath.Join(socketsFolder, socket)
reflectClient, cleanup, err := dialServerReflection(ctx, socketFullPath)
if err != nil {
return nil, err
}
defer cleanup()

services, err := reflectClient.ListServices()
if err != nil {
return nil, errors.Wrapf(err, "unable to list plugin: %s's services", socket)
}

socketNameWithoutExt := strings.Trim(socket, filepath.Ext(socket))
dialer := socketDialer(socketFullPath, grpc.WithBlock(), grpc.FailOnNonTempDialError(true))

var pluginServices []*pluginService
for _, svc := range services {
pluginServices = append(pluginServices, &pluginService{
protoRef: svc,
dialer: dialer,
name: socketNameWithoutExt,
})
}

return pluginServices, nil
}

// dialServerReflection dials the server reflection service, returning the client and a cleanup function.
func dialServerReflection(ctx context.Context, socket string) (*grpcreflect.Client, func(), error) {
conn, err := SocketDialContext(ctx, socket, grpc.WithBlock())
if err != nil {
return nil, nil, err
}

reflectClient := grpcreflect.NewClientV1Alpha(ctx, reflectV1Alpha.NewServerReflectionClient(conn))
return reflectClient, reflectionConnectionCleanup(conn, reflectClient), nil
}

// reflectionConnectionCleanup closes the reflection connection.
func reflectionConnectionCleanup(conn *grpc.ClientConn, client *grpcreflect.Client) func() {
return func() {
client.Reset()
if err := conn.Close(); err != nil {
log.Errorf("error closing grpc reflection connection: %v", err)
}
}
}

// socketFolder returns the socket folder path specified by the environment variable.
func socketFolder() string {
if value, ok := os.LookupEnv(envVarPolarisPluggableFolder); ok {
return value
}
return defaultPolarisPluggablePath
}
72 changes: 72 additions & 0 deletions common/pluggable/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package pluggable

import (
"os"

"github.com/pkg/errors"

"github.com/polarismesh/polaris/common/log"
)

var (
// errNotSocket is returned when the file is not a socket.
errNotSocket = errors.New("not a socket")
)

// pluginFiles returns the plugin files in the socket folder.
func pluginFiles(sockFolder string) ([]os.DirEntry, error) {
_, err := os.Stat(sockFolder)
if os.IsNotExist(err) {
log.Infof("socket folder %s does not exist, skip plugin discovery", sockFolder)
return nil, nil
}

if err != nil {
log.Errorf("failed to stat socket folder %s: %v", sockFolder, err)
return nil, err
}

var files []os.DirEntry
files, err = os.ReadDir(sockFolder)
if err != nil {
return nil, errors.Wrapf(err, "failed to read socket folder %s", sockFolder)
}

return files, nil
}

// socketName returns true if the file is a socket.
func socketName(entry os.DirEntry) (string, error) {
if entry.IsDir() {
return "", errNotSocket
}

f, err := entry.Info()
if err != nil {
return "", err
}

// skip non-socket files.
if f.Mode()&os.ModeSocket == 0 {
return "", errNotSocket
}

return entry.Name(), nil
}
Loading

0 comments on commit bd4dbea

Please sign in to comment.