From c0b7f804c4379810d407a8904f589d700cb881dd Mon Sep 17 00:00:00 2001 From: aoiasd Date: Sat, 8 Feb 2025 17:55:07 +0800 Subject: [PATCH] Support run analyzer Signed-off-by: aoiasd --- go.mod | 2 +- go.sum | 2 + internal/distributed/proxy/service.go | 4 ++ internal/mocks/mock_proxy.go | 59 +++++++++++++++++++++++++++ internal/proxy/impl.go | 33 +++++++++++++++ 5 files changed, 99 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index be8747e564870..d439c52811dd7 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250208062437-5af22aa4b559 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 4282f7fa5d329..152fd56ee024b 100644 --- a/go.sum +++ b/go.sum @@ -662,6 +662,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250208062437-5af22aa4b559 h1:c8n10eBkYU/HYaDUNAaKog4aIA3ZHO+GL7bHN2Ug/MA= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250208062437-5af22aa4b559/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index c2f2bea06c56a..b1d5c5b97b47a 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -1157,3 +1157,7 @@ func (s *Server) InvalidateShardLeaderCache(ctx context.Context, req *proxypb.In func (s *Server) DescribeDatabase(ctx context.Context, req *milvuspb.DescribeDatabaseRequest) (*milvuspb.DescribeDatabaseResponse, error) { return s.proxy.DescribeDatabase(ctx, req) } + +func (s *Server) RunAnalyzer(ctx context.Context, req *milvuspb.RunAnalyzerRequset) (*milvuspb.RunAnalyzerResponse, error) { + return s.proxy.RunAnalyzer(ctx, req) +} diff --git a/internal/mocks/mock_proxy.go b/internal/mocks/mock_proxy.go index 6025dd3cc92bb..b58e6dcdb52e8 100644 --- a/internal/mocks/mock_proxy.go +++ b/internal/mocks/mock_proxy.go @@ -5772,6 +5772,65 @@ func (_c *MockProxy_RestoreRBAC_Call) RunAndReturn(run func(context.Context, *mi return _c } +// RunAnalyzer provides a mock function with given fields: _a0, _a1 +func (_m *MockProxy) RunAnalyzer(_a0 context.Context, _a1 *milvuspb.RunAnalyzerRequset) (*milvuspb.RunAnalyzerResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for RunAnalyzer") + } + + var r0 *milvuspb.RunAnalyzerResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.RunAnalyzerRequset) (*milvuspb.RunAnalyzerResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.RunAnalyzerRequset) *milvuspb.RunAnalyzerResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.RunAnalyzerResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.RunAnalyzerRequset) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockProxy_RunAnalyzer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunAnalyzer' +type MockProxy_RunAnalyzer_Call struct { + *mock.Call +} + +// RunAnalyzer is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.RunAnalyzerRequset +func (_e *MockProxy_Expecter) RunAnalyzer(_a0 interface{}, _a1 interface{}) *MockProxy_RunAnalyzer_Call { + return &MockProxy_RunAnalyzer_Call{Call: _e.mock.On("RunAnalyzer", _a0, _a1)} +} + +func (_c *MockProxy_RunAnalyzer_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.RunAnalyzerRequset)) *MockProxy_RunAnalyzer_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.RunAnalyzerRequset)) + }) + return _c +} + +func (_c *MockProxy_RunAnalyzer_Call) Return(_a0 *milvuspb.RunAnalyzerResponse, _a1 error) *MockProxy_RunAnalyzer_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockProxy_RunAnalyzer_Call) RunAndReturn(run func(context.Context, *milvuspb.RunAnalyzerRequset) (*milvuspb.RunAnalyzerResponse, error)) *MockProxy_RunAnalyzer_Call { + _c.Call.Return(run) + return _c +} + // Search provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) Search(_a0 context.Context, _a1 *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 9aec8ede3f0a9..1e877c85372a8 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/http" "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/ctokenizer" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -6926,3 +6927,35 @@ func (node *Proxy) OperatePrivilegeGroup(ctx context.Context, req *milvuspb.Oper } return result, nil } + +func (node *Proxy) RunAnalyzer(ctx context.Context, req *milvuspb.RunAnalyzerRequset) (*milvuspb.RunAnalyzerResponse, error) { + // TODO: use collection analyzer when collection name not none + // if req.GetCollectionName() != "" {} + tokenizer, err := ctokenizer.NewTokenizer(req.GetAnalyzerParams()) + if err != nil { + return &milvuspb.RunAnalyzerResponse{ + Status: merr.Status(err), + }, nil + } + defer tokenizer.Destroy() + + results := make([]*milvuspb.AnalyzerResult, len(req.GetPlaceholder())) + for i, text := range req.GetPlaceholder() { + stream := tokenizer.NewTokenStream(string(text)) + defer stream.Destroy() + tokens := []string{} + for stream.Advance() { + token := stream.Token() + log.Info("Test--", zap.String("token", token)) + tokens = append(tokens, token) + } + results[i] = &milvuspb.AnalyzerResult{ + Tokens: tokens, + } + } + + return &milvuspb.RunAnalyzerResponse{ + Status: merr.Status(nil), + Results: results, + }, nil +}