From 80a93d45b43988eee9e8930acee050ae609decf6 Mon Sep 17 00:00:00 2001 From: yanghao Date: Wed, 14 Aug 2024 23:04:43 +0800 Subject: [PATCH 1/5] add feature: acceptorTemplate --- accepter_test.go | 85 ------- acceptor.go | 113 ++++++--- acceptor_session_provider.go | 115 +++++++++ acceptor_session_provider_test.go | 247 ++++++++++++++++++++ acceptor_test.go | 373 ++++++++++++++++++++++++++++++ config/configuration.go | 3 + 6 files changed, 823 insertions(+), 113 deletions(-) delete mode 100644 accepter_test.go create mode 100644 acceptor_session_provider.go create mode 100644 acceptor_session_provider_test.go create mode 100644 acceptor_test.go diff --git a/accepter_test.go b/accepter_test.go deleted file mode 100644 index 54bfff845..000000000 --- a/accepter_test.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package quickfix - -import ( - "net" - "testing" - - "github.com/quickfixgo/quickfix/config" - - proxyproto "github.com/pires/go-proxyproto" - "github.com/stretchr/testify/assert" -) - -func TestAcceptor_Start(t *testing.T) { - sessionSettings := NewSessionSettings() - sessionSettings.Set(config.BeginString, BeginStringFIX42) - sessionSettings.Set(config.SenderCompID, "sender") - sessionSettings.Set(config.TargetCompID, "target") - - settingsWithTCPProxy := NewSettings() - settingsWithTCPProxy.GlobalSettings().Set("UseTCPProxy", "Y") - - settingsWithNoTCPProxy := NewSettings() - settingsWithNoTCPProxy.GlobalSettings().Set("UseTCPProxy", "N") - - genericSettings := NewSettings() - - const ( - GenericListener = iota - ProxyListener - ) - - acceptorStartTests := []struct { - name string - settings *Settings - listenerType int - }{ - {"with TCP proxy set", settingsWithTCPProxy, ProxyListener}, - {"with no TCP proxy set", settingsWithNoTCPProxy, GenericListener}, - {"no TCP proxy configuration set", genericSettings, GenericListener}, - } - - for _, tt := range acceptorStartTests { - t.Run(tt.name, func(t *testing.T) { - tt.settings.GlobalSettings().Set("SocketAcceptPort", "5001") - if _, err := tt.settings.AddSession(sessionSettings); err != nil { - assert.Nil(t, err) - } - - acceptor := &Acceptor{settings: tt.settings} - if err := acceptor.Start(); err != nil { - assert.NotNil(t, err) - } - assert.Len(t, acceptor.listeners, 1) - - for _, listener := range acceptor.listeners { - if tt.listenerType == ProxyListener { - _, ok := listener.(*proxyproto.Listener) - assert.True(t, ok) - } - - if tt.listenerType == GenericListener { - _, ok := listener.(*net.TCPListener) - assert.True(t, ok) - } - } - - acceptor.Stop() - }) - } -} diff --git a/acceptor.go b/acceptor.go index f5b9b281c..c48ff5f46 100644 --- a/acceptor.go +++ b/acceptor.go @@ -38,6 +38,7 @@ type Acceptor struct { storeFactory MessageStoreFactory globalLog Log sessions map[SessionID]*session + sessionsLock sync.RWMutex sessionGroup sync.WaitGroup listenerShutdown sync.WaitGroup dynamicSessions bool @@ -48,6 +49,7 @@ type Acceptor struct { sessionHostPort map[SessionID]int listeners map[string]net.Listener connectionValidator ConnectionValidator + sessionProvider AcceptorSessionProvider sessionFactory } @@ -104,14 +106,8 @@ func (a *Acceptor) Start() (err error) { a.listeners[address] = &proxyproto.Listener{Listener: a.listeners[address]} } } + a.startSessions() - for _, s := range a.sessions { - a.sessionGroup.Add(1) - go func(s *session) { - s.run() - a.sessionGroup.Done() - }(s) - } if a.dynamicSessions { a.dynamicSessionChan = make(chan *session) a.sessionGroup.Add(1) @@ -140,17 +136,7 @@ func (a *Acceptor) Stop() { if a.dynamicSessions { close(a.dynamicSessionChan) } - for _, session := range a.sessions { - session.stop() - } - a.sessionGroup.Wait() - - for sessionID := range a.sessions { - err := UnregisterSession(sessionID) - if err != nil { - return - } - } + a.stopSessions() } // RemoteAddr gets remote IP address for a given session. @@ -191,6 +177,15 @@ func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Se } for sessionID, sessionSettings := range settings.SessionSettings() { + if sessionSettings.HasSetting(config.AcceptorTemplate) { + var acceptorTemplate bool + if acceptorTemplate, err = sessionSettings.BoolSetting(config.AcceptorTemplate); err != nil { + return + } + if acceptorTemplate { + continue + } + } sessID := sessionID sessID.Qualifier = "" @@ -331,18 +326,35 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { } session, ok := a.sessions[sessID] if !ok { - if !a.dynamicSessions { - a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes) - return + var dynamicSessionCreated bool + if a.sessionProvider != nil { + session, err = a.sessionProvider.GetSession(sessID) + if err != nil { + if err == errUnknownSession && a.dynamicSessions { + goto CREATE_SHORT_LIVED_DYNAMIC_SESSION + } + a.globalLog.OnEventf("Failed to get session %v from provider: %v", sessID, err) + return + } + a.addMngdDynamicSession(sessID, session) + dynamicSessionCreated = true } - dynamicSession, err := a.sessionFactory.createSession(sessID, a.storeFactory, a.settings.globalSettings.clone(), a.logFactory, a.app) - if err != nil { - a.globalLog.OnEventf("Dynamic session %v failed to create: %v", sessID, err) - return + CREATE_SHORT_LIVED_DYNAMIC_SESSION: + if !dynamicSessionCreated { + if !a.dynamicSessions { + a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes) + return + } + dynamicSession, err := a.sessionFactory.createSession(sessID, a.storeFactory, a.settings.globalSettings.clone(), a.logFactory, a.app) + if err != nil { + a.globalLog.OnEventf("Dynamic session %v failed to create: %v", sessID, err) + return + } + a.dynamicSessionChan <- dynamicSession + session = dynamicSession + defer session.stop() } - a.dynamicSessionChan <- dynamicSession - session = dynamicSession - defer session.stop() + } a.sessionAddr.Store(sessID, netConn.RemoteAddr()) @@ -412,6 +424,46 @@ LOOP: } } +func (a *Acceptor) startSessions() { + a.sessionsLock.RLock() + defer a.sessionsLock.RUnlock() + for _, s := range a.sessions { + a.sessionGroup.Add(1) + go func(s *session) { + s.run() + a.sessionGroup.Done() + }(s) + } +} + +func (a *Acceptor) stopSessions() { + a.sessionsLock.RLock() + defer a.sessionsLock.RUnlock() + for _, session := range a.sessions { + session.stop() + } + a.sessionGroup.Wait() + + for sessionID := range a.sessions { + err := UnregisterSession(sessionID) + if err != nil { + return + } + } +} + +func (a *Acceptor) addMngdDynamicSession(sessID SessionID, session *session) { + a.sessionsLock.Lock() + defer a.sessionsLock.Unlock() + + a.sessions[sessID] = session + a.sessionGroup.Add(1) + go func() { + session.run() + a.sessionGroup.Done() + }() +} + // SetConnectionValidator sets an optional connection validator. // Use it when you need a custom authentication logic that includes lower level interactions, // like mTLS auth or IP whitelistening. @@ -421,3 +473,8 @@ LOOP: func (a *Acceptor) SetConnectionValidator(validator ConnectionValidator) { a.connectionValidator = validator } + +// SetSessionProvider sets an optional session provider. +func (a *Acceptor) SetSessionProvider(sessionProvider AcceptorSessionProvider) { + a.sessionProvider = sessionProvider +} diff --git a/acceptor_session_provider.go b/acceptor_session_provider.go new file mode 100644 index 000000000..2547a0535 --- /dev/null +++ b/acceptor_session_provider.go @@ -0,0 +1,115 @@ +package quickfix + +import "github.com/quickfixgo/quickfix/config" + +const ( + WildcardPattern string = "*" +) + +type AcceptorSessionProvider interface { + GetSession(SessionID) (*session, error) +} + +type StaticAcceptorSessionProvider struct { + sessions map[SessionID]*session +} + +func (p *StaticAcceptorSessionProvider) GetSession(sessionID SessionID) (*session, error) { + s, ok := p.sessions[sessionID] + if !ok { + return nil, errUnknownSession + } + return s, nil +} + +// DynamicAcceptorSessionProvider dynamically defines sessions for an acceptor. This can be useful for +// applications like simulators that want to accept any connection and +// dynamically create an associated session. +// +// For more complex situations, you can use this class as a starting +// point for implementing your own AcceptorSessionProvider. +type DynamicAcceptorSessionProvider struct { + settings *Settings + messageStoreFactory MessageStoreFactory + logFactory LogFactory + sessionFactory *sessionFactory + application Application + templateMappings []*TemplateMapping +} + +func NewDynamicAcceptorSessionProvider(settings *Settings, messageStoreFactory MessageStoreFactory, logFactory LogFactory, + application Application, templateMappings []*TemplateMapping, +) *DynamicAcceptorSessionProvider { + return &DynamicAcceptorSessionProvider{ + settings: settings, + messageStoreFactory: messageStoreFactory, + logFactory: logFactory, + sessionFactory: &sessionFactory{}, + application: application, + templateMappings: templateMappings, + } +} + +func (p *DynamicAcceptorSessionProvider) FindTemplateID(sessionID SessionID) *SessionID { + return p.lookupTemplateID(sessionID) +} + +func (p *DynamicAcceptorSessionProvider) GetSession(sessionID SessionID) (*session, error) { + s, ok := lookupSession(sessionID) + if !ok { + templateID := p.lookupTemplateID(sessionID) + if templateID == nil { + return nil, errUnknownSession + } + dynamicSessionSettings := p.settings.globalSettings.clone() + templateSettings := p.settings.sessionSettings[*templateID] + dynamicSessionSettings.overlay(templateSettings) + dynamicSessionSettings.Set(config.BeginString, sessionID.BeginString) + dynamicSessionSettings.Set(config.SenderCompID, sessionID.SenderCompID) + dynamicSessionSettings.Set(config.SenderSubID, sessionID.SenderSubID) + dynamicSessionSettings.Set(config.SenderLocationID, sessionID.SenderLocationID) + dynamicSessionSettings.Set(config.TargetCompID, sessionID.TargetCompID) + dynamicSessionSettings.Set(config.TargetSubID, sessionID.TargetSubID) + dynamicSessionSettings.Set(config.TargetLocationID, sessionID.TargetLocationID) + var err error + s, err = p.sessionFactory.createSession(sessionID, + p.messageStoreFactory, + dynamicSessionSettings, + p.logFactory, + p.application, + ) + if err != nil { + return nil, err + } + } + return s, nil +} + +func (provider *DynamicAcceptorSessionProvider) lookupTemplateID(sessionID SessionID) *SessionID { + for _, mapping := range provider.templateMappings { + if isTemplateMatching(mapping.Pattern, sessionID) { + return &mapping.TemplateID + } + } + return nil +} + +func isTemplateMatching(pattern SessionID, sessionID SessionID) bool { + return matches(pattern.BeginString, sessionID.BeginString) && + matches(pattern.SenderCompID, sessionID.SenderCompID) && + matches(pattern.SenderSubID, sessionID.SenderSubID) && + matches(pattern.SenderLocationID, sessionID.SenderLocationID) && + matches(pattern.TargetCompID, sessionID.TargetCompID) && + matches(pattern.TargetSubID, sessionID.TargetSubID) && + matches(pattern.TargetLocationID, sessionID.TargetLocationID) +} + +func matches(pattern string, value string) bool { + return WildcardPattern == pattern || pattern == value +} + +// TemplateMapping mapping from a sessionID pattern to a session template ID. +type TemplateMapping struct { + Pattern SessionID + TemplateID SessionID +} diff --git a/acceptor_session_provider_test.go b/acceptor_session_provider_test.go new file mode 100644 index 000000000..6cc221000 --- /dev/null +++ b/acceptor_session_provider_test.go @@ -0,0 +1,247 @@ +package quickfix + +import ( + "reflect" + "testing" + + "github.com/quickfixgo/quickfix/config" + "github.com/stretchr/testify/suite" +) + +type DynamicAcceptorSessionProviderTestSuite struct { + suite.Suite + + provider *DynamicAcceptorSessionProvider + + settings *Settings + messageStoreFactory MessageStoreFactory + logFactory LogFactory + app Application + sessionFactory *sessionFactory + TemplateMapping []*TemplateMapping +} + +func (suite *DynamicAcceptorSessionProviderTestSuite) SetupTest() { + suite.settings = NewSettings() + suite.messageStoreFactory = NewMemoryStoreFactory() + suite.logFactory = nullLogFactory{} + suite.app = &noopApp{} + suite.sessionFactory = &sessionFactory{} + suite.TemplateMapping = make([]*TemplateMapping, 0) + + templateId1 := SessionID{BeginString: "FIX.4.2", SenderCompID: "ANY", TargetCompID: "ANY"} + suite.TemplateMapping = append( + suite.TemplateMapping, + &TemplateMapping{Pattern: SessionID{BeginString: WildcardPattern, SenderCompID: "S1", TargetCompID: WildcardPattern}, TemplateID: templateId1}, + ) + suite.setUpSettings(templateId1, "ResetOnLogout", "Y") + + templateId2 := SessionID{BeginString: "FIX.4.4", SenderCompID: "S1", TargetCompID: "ANY"} + suite.TemplateMapping = append( + suite.TemplateMapping, + &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.4", SenderCompID: WildcardPattern, TargetCompID: WildcardPattern}, TemplateID: templateId2}, + ) + suite.setUpSettings(templateId2, "RefreshOnLogon", "Y") + + templateId3 := SessionID{BeginString: "FIX.4.4", SenderCompID: "ANY", TargetCompID: "ANY"} + suite.TemplateMapping = append( + suite.TemplateMapping, + &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.2", SenderCompID: WildcardPattern, SenderSubID: WildcardPattern, SenderLocationID: WildcardPattern, + TargetCompID: WildcardPattern, TargetSubID: WildcardPattern, TargetLocationID: WildcardPattern, Qualifier: WildcardPattern, + }, TemplateID: templateId3}, + ) + suite.setUpSettings(templateId3, "ResetOnDisconnect", "Y") + + suite.provider = NewDynamicAcceptorSessionProvider(suite.settings, suite.messageStoreFactory, + suite.logFactory, suite.app, suite.TemplateMapping) +} + +func (suite *DynamicAcceptorSessionProviderTestSuite) setUpSettings(TemplateID SessionID, key, value string) { + sessionSettings := NewSessionSettings() + sessionSettings.Set(config.BeginString, TemplateID.BeginString) + sessionSettings.Set(config.SenderCompID, TemplateID.SenderCompID) + sessionSettings.Set(config.SenderSubID, TemplateID.SenderSubID) + sessionSettings.Set(config.SenderLocationID, TemplateID.SenderLocationID) + sessionSettings.Set(config.TargetCompID, TemplateID.TargetCompID) + sessionSettings.Set(config.TargetSubID, TemplateID.TargetSubID) + sessionSettings.Set(config.TargetLocationID, TemplateID.TargetLocationID) + sessionSettings.Set(config.SessionQualifier, TemplateID.Qualifier) + + sessionSettings.Set("StartTime", "00:00:00") + sessionSettings.Set("EndTime", "00:00:00") + sessionSettings.Set(key, value) + suite.settings.AddSession(sessionSettings) +} + +func (suite *DynamicAcceptorSessionProviderTestSuite) TestSessionCreation() { + type expected struct { + sessionID SessionID + resetOnLogout bool + refreshOnLogon bool + resetOnDisconnect bool + } + var tests = []struct { + name string + input SessionID + expected expected + }{ + { + name: "session created - matched", + input: SessionID{ + BeginString: "FIX.4.2", SenderCompID: "SENDER", SenderSubID: "SENDERSUB", SenderLocationID: "SENDERLOC", + TargetCompID: "TARGET", TargetSubID: "TARGETSUB", TargetLocationID: "TARGETLOC", Qualifier: "", + }, + expected: expected{ + sessionID: SessionID{ + BeginString: "FIX.4.2", SenderCompID: "SENDER", SenderSubID: "SENDERSUB", SenderLocationID: "SENDERLOC", + TargetCompID: "TARGET", TargetSubID: "TARGETSUB", TargetLocationID: "TARGETLOC", Qualifier: "", + }, + resetOnLogout: false, + refreshOnLogon: false, + resetOnDisconnect: true, + }, + }, + { + name: "create session - matching the first", + input: SessionID{ + BeginString: "FIX.4.4", SenderCompID: "S1", TargetCompID: "T", + }, + expected: expected{ + sessionID: SessionID{ + BeginString: "FIX.4.4", SenderCompID: "S1", TargetCompID: "T", + }, + resetOnLogout: true, + refreshOnLogon: false, + resetOnDisconnect: false, + }, + }, + { + name: "create session - matching the second", + input: SessionID{ + BeginString: "FIX.4.4", SenderCompID: "X", TargetCompID: "Y", + }, + expected: expected{ + sessionID: SessionID{ + BeginString: "FIX.4.4", SenderCompID: "X", TargetCompID: "Y", + }, + resetOnLogout: false, + refreshOnLogon: true, + resetOnDisconnect: false, + }, + }, + } + + for _, test := range tests { + session, err := suite.provider.GetSession(test.input) + suite.NoError(err) + suite.NotNil(session) + sessionID := session.sessionID + suite.Equal(test.expected.sessionID, sessionID, test.name+": created sessionID not expected") + suite.Equal(test.expected.resetOnLogout, session.ResetOnLogout, test.name+":ResetOnLogout not expected") + suite.Equal(test.expected.refreshOnLogon, session.RefreshOnLogon, test.name+":RefreshOnLogon not expected") + suite.Equal(test.expected.resetOnDisconnect, session.ResetOnDisconnect, test.name+":ResetOnDisconnect not expected") + } +} + +func (suite *DynamicAcceptorSessionProviderTestSuite) TestTemplateNotFound() { + var tests = []struct { + name string + input SessionID + }{ + { + name: "template not found", + input: SessionID{ + BeginString: "FIX.4.3", SenderCompID: "S", TargetCompID: "T", + }, + }, + } + + for _, test := range tests { + _, err := suite.provider.GetSession(test.input) + suite.Error(err, test.name+": expected error for template not found") + } +} + +func TestDynamicAcceptorSessionProviderTestSuite(t *testing.T) { + suite.Run(t, new(DynamicAcceptorSessionProviderTestSuite)) +} + +func TestStaticSessionProvider_GetSession(t *testing.T) { + sessions := make(map[SessionID]*session) + sessionID1 := SessionID{BeginString: "FIX.4.2", SenderCompID: "SENDER", TargetCompID: "TARGET"} + session1 := &session{sessionID: sessionID1} + sessions[sessionID1] = session1 + + type args struct { + sessionID SessionID + } + tests := []struct { + name string + args args + want *session + wantErr bool + }{ + { + name: "session found", + args: args{ + sessionID: sessionID1, + }, + want: session1, + wantErr: false, + }, + { + name: "session not found", + args: args{ + sessionID: SessionID{ + BeginString: "FIX.4.2", SenderCompID: "X", TargetCompID: "Y", + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &StaticAcceptorSessionProvider{ + sessions: sessions, + } + got, err := p.GetSession(tt.args.sessionID) + if (err != nil) != tt.wantErr { + t.Errorf("StaticSessionProvider.GetSession() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("StaticSessionProvider.GetSession() = %v, want %v", got, tt.want) + } + }) + } +} + +var _ Application = &noopApp{} + +type noopApp struct { +} + +func (n *noopApp) FromAdmin(message *Message, sessionID SessionID) MessageRejectError { + return nil +} + +func (n *noopApp) FromApp(message *Message, sessionID SessionID) MessageRejectError { + return nil +} + +func (n *noopApp) OnCreate(sessionID SessionID) { +} + +func (n *noopApp) OnLogon(sessionID SessionID) { +} + +func (n *noopApp) OnLogout(sessionID SessionID) { +} + +func (n *noopApp) ToAdmin(message *Message, sessionID SessionID) { +} + +func (n *noopApp) ToApp(message *Message, sessionID SessionID) error { + return nil +} diff --git a/acceptor_test.go b/acceptor_test.go new file mode 100644 index 000000000..ffb015c8c --- /dev/null +++ b/acceptor_test.go @@ -0,0 +1,373 @@ +// Copyright (c) quickfixengine.org All rights reserved. +// +// This file may be distributed under the terms of the quickfixengine.org +// license as defined by quickfixengine.org and appearing in the file +// LICENSE included in the packaging of this file. +// +// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING +// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE. +// +// See http://www.quickfixengine.org/LICENSE for licensing information. +// +// Contact ask@quickfixengine.org if any conditions of this licensing +// are not clear to you. + +package quickfix + +import ( + "bytes" + "io" + "net" + "testing" + "time" + + "github.com/quickfixgo/quickfix/config" + + proxyproto "github.com/pires/go-proxyproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestAcceptor_Start(t *testing.T) { + sessionSettings := NewSessionSettings() + sessionSettings.Set(config.BeginString, BeginStringFIX42) + sessionSettings.Set(config.SenderCompID, "sender") + sessionSettings.Set(config.TargetCompID, "target") + + settingsWithTCPProxy := NewSettings() + settingsWithTCPProxy.GlobalSettings().Set("UseTCPProxy", "Y") + + settingsWithNoTCPProxy := NewSettings() + settingsWithNoTCPProxy.GlobalSettings().Set("UseTCPProxy", "N") + + genericSettings := NewSettings() + + const ( + GenericListener = iota + ProxyListener + ) + + acceptorStartTests := []struct { + name string + settings *Settings + listenerType int + }{ + {"with TCP proxy set", settingsWithTCPProxy, ProxyListener}, + {"with no TCP proxy set", settingsWithNoTCPProxy, GenericListener}, + {"no TCP proxy configuration set", genericSettings, GenericListener}, + } + + for _, tt := range acceptorStartTests { + t.Run(tt.name, func(t *testing.T) { + tt.settings.GlobalSettings().Set("SocketAcceptPort", "5001") + if _, err := tt.settings.AddSession(sessionSettings); err != nil { + assert.Nil(t, err) + } + + acceptor := &Acceptor{settings: tt.settings} + if err := acceptor.Start(); err != nil { + assert.NotNil(t, err) + } + assert.Len(t, acceptor.listeners, 1) + + for _, listener := range acceptor.listeners { + if tt.listenerType == ProxyListener { + _, ok := listener.(*proxyproto.Listener) + assert.True(t, ok) + } + + if tt.listenerType == GenericListener { + _, ok := listener.(*net.TCPListener) + assert.True(t, ok) + } + } + + acceptor.Stop() + }) + } +} + +var _ net.Conn = &mockConn{} + +type mockConn struct { + closeChan chan struct{} + localAddr net.Addr + remoteAddr net.Addr + + onWriteback func([]byte) + inboundMessages []*Message +} + +func (c *mockConn) Read(b []byte) (n int, err error) { + if len(c.inboundMessages) > 0 { + messageBytes := c.inboundMessages[0].build() + copy(b, messageBytes) + c.inboundMessages = c.inboundMessages[1:] + return len(messageBytes), err + } + <-c.closeChan + return 0, io.EOF +} + +func (c *mockConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *mockConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +func (c *mockConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *mockConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *mockConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func (c *mockConn) Write(b []byte) (n int, err error) { + if c.onWriteback != nil { + c.onWriteback(b) + } + return len(b), nil +} + +func (c *mockConn) Close() error { + return nil +} + +func mockLogonMessage(sessionID SessionID, msgSeqNum int) *Message { + msg := NewMessage() + msg.Header.SetField(tagMsgType, FIXString("A")) + msg.Header.SetInt(tagMsgSeqNum, msgSeqNum) + msg.Header.SetString(tagBeginString, sessionID.BeginString) + msg.Header.SetString(tagSenderCompID, sessionID.SenderCompID) + msg.Header.SetString(tagSenderSubID, sessionID.SenderSubID) + msg.Header.SetString(tagSenderLocationID, sessionID.SenderLocationID) + msg.Header.SetString(tagTargetCompID, sessionID.TargetCompID) + msg.Header.SetString(tagTargetSubID, sessionID.TargetSubID) + msg.Header.SetString(tagTargetLocationID, sessionID.TargetLocationID) + msg.Header.SetField(tagSendingTime, FIXUTCTimestamp{Time: time.Now()}) + msg.Body.SetInt(tagHeartBtInt, 30) + return msg +} + +type AcceptorTemplateTestSuite struct { + suite.Suite + acceptor *Acceptor + + sessionId1 SessionID + sessionId2 SessionID + sessionId3 SessionID + + testDynamicSessionID SessionID + logonSessionID SessionID + seqNum int + + dynamicSessionProvider AcceptorSessionProvider +} + +func (suite *AcceptorTemplateTestSuite) BeforeTest(suiteName, testName string) { + settings := NewSettings() + settings.globalSettings.Set(config.SocketAcceptPort, "5001") + sessionId1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} + sessionSettings1 := NewSessionSettings() + sessionSettings1.Set(config.BeginString, sessionId1.BeginString) + sessionSettings1.Set(config.SenderCompID, sessionId1.SenderCompID) + sessionSettings1.Set(config.TargetCompID, sessionId1.TargetCompID) + suite.sessionId1 = sessionId1 + settings.AddSession(sessionSettings1) + + sessionId2 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2"} + sessionSettings2 := NewSessionSettings() + sessionSettings2.Set(config.BeginString, sessionId2.BeginString) + sessionSettings2.Set(config.SenderCompID, sessionId2.SenderCompID) + sessionSettings2.Set(config.TargetCompID, sessionId2.TargetCompID) + suite.sessionId2 = sessionId2 + settings.AddSession(sessionSettings2) + + // acceptor template + sessionId3 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "*", SenderSubID: "*", SenderLocationID: "*", + TargetCompID: "target3", TargetSubID: "*", TargetLocationID: "*"} + sessionSettings3 := NewSessionSettings() + sessionSettings3.Set(config.BeginString, sessionId3.BeginString) + sessionSettings3.Set(config.SenderCompID, sessionId3.SenderCompID) + sessionSettings3.Set(config.SenderSubID, sessionId3.SenderSubID) + sessionSettings3.Set(config.SenderLocationID, sessionId3.SenderLocationID) + sessionSettings3.Set(config.TargetCompID, sessionId3.TargetCompID) + sessionSettings3.Set(config.TargetSubID, sessionId3.TargetSubID) + sessionSettings3.Set(config.TargetLocationID, sessionId3.TargetLocationID) + sessionSettings3.Set(config.ResetOnLogout, "Y") + sessionSettings3.Set(config.AcceptorTemplate, "Y") + suite.sessionId3 = sessionId3 + settings.AddSession(sessionSettings3) + + app := &noopApp{} + a, err := NewAcceptor(app, memoryStoreFactory{}, settings, NewScreenLogFactory()) + if err != nil { + suite.Fail("Failed to create acceptor: %v", err) + } + suite.acceptor = a + + templateMappings := make([]*TemplateMapping, 0) + templateMappings = append(templateMappings, &TemplateMapping{ + Pattern: suite.sessionId3, + TemplateID: suite.sessionId3, + }) + suite.dynamicSessionProvider = NewDynamicAcceptorSessionProvider(suite.acceptor.settings, suite.acceptor.storeFactory, suite.acceptor.logFactory, suite.acceptor.app, templateMappings) + suite.acceptor.SetSessionProvider(suite.dynamicSessionProvider) + + suite.testDynamicSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "target3", TargetCompID: "dynamicSender"} + suite.logonSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "dynamicSender", TargetCompID: "target3"} + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed: %v", err) + } + + suite.verifySessionCount(2) + suite.seqNum = 1 +} + +func (suite *AcceptorTemplateTestSuite) logonAndDisconnectAfterCheck(sessionID SessionID, + checkFuncAfterLogon func(), + mustHaveResponse bool) { + inboundMessages := []*Message{mockLogonMessage(sessionID, suite.seqNum)} + suite.seqNum++ + var respondedLogonMessageReceived bool + mockConn1 := &mockConn{ + closeChan: make(chan struct{}), + inboundMessages: inboundMessages, + localAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5001}, + remoteAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5002}, + } + mockConn1.onWriteback = func(b []byte) { + reponseMsg := NewMessage() + err := ParseMessage(reponseMsg, bytes.NewBuffer(b)) + suite.Require().NoError(err, "parse responding message failed") + msgType, err := reponseMsg.Header.GetString(tagMsgType) + suite.Require().NoError(err, "unexpected mssage") + suite.Require().Equalf("A", msgType, "expected logon message in reponse %s", reponseMsg.String()) + respondedLogonMessageReceived = true + if checkFuncAfterLogon != nil { + checkFuncAfterLogon() + } + close(mockConn1.closeChan) + } + suite.acceptor.handleConnection(mockConn1) + if mustHaveResponse { + suite.Require().Equal(true, respondedLogonMessageReceived, "expected responding logon message") + } +} + +func (suite *AcceptorTemplateTestSuite) verifySessionCount(expectedSessionCount int) { + suite.Require().Equalf(expectedSessionCount, len(suite.acceptor.sessions), "expected %v sessions but found %v", expectedSessionCount, len(suite.acceptor.sessions)) + suite.Require().Equalf(expectedSessionCount, len(sessions), "expected %v sessions but found %v in registry", expectedSessionCount, len(suite.acceptor.sessions)) +} + +func (suite *AcceptorTemplateTestSuite) TestCreateDynamicSessionBySessionProvider() { + logonSessionID := suite.logonSessionID + suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.verifySessionCount(3) + + createdSession, ok := suite.acceptor.sessions[logonSessionID] + suite.Require().Equal(true, ok, "expected dynamic session to be created") + suite.Require().Equal(logonSessionID, createdSession.sessionID, "expected session ID to match inbound session ID") + suite.Require().Equal(createdSession.ResetOnLogout, true, "expected ResetOnLogout=Y for createdSession") + + remoteAddr, ok := suite.acceptor.RemoteAddr(logonSessionID) + if !ok { + suite.Fail("Failed to get remote address for dynamic session") + } + suite.Require().Equal("127.0.0.1:5002", remoteAddr.String(), "expect remoteAddr for dynamic session to be 127.0.0.1:5002 but got %v", remoteAddr.String()) + }, true) + suite.acceptor.Stop() +} + +func (suite *AcceptorTemplateTestSuite) TestSessionCreatedBySessionProviderShouldBeKept() { + logonSessionID := suite.logonSessionID + suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.verifySessionCount(3) + }, true) + err := SendToTarget(createFIX43NewOrderSingle(), logonSessionID) + suite.NoError(err, "expected message can still be sent after session disconnected") + suite.acceptor.Stop() +} + +func (suite *AcceptorTemplateTestSuite) TestNoNewSessionCreatedWhenSameSessionIDLogons() { + suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.verifySessionCount(3) + }, true) + suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.verifySessionCount(3) + }, true) + suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.verifySessionCount(3) + }, true) + suite.acceptor.Stop() +} + +func (suite *AcceptorTemplateTestSuite) TestSessionNotFoundBySessionProvider() { + sessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "unknownSender", TargetCompID: "unknownTarget"} + suite.logonAndDisconnectAfterCheck(sessionID, func() {}, false) + suite.verifySessionCount(2) + suite.acceptor.Stop() +} + +func TestAcceptorTemplateTestSuite(t *testing.T) { + suite.Run(t, new(AcceptorTemplateTestSuite)) +} + +type DynamicSessionTestSuite struct { + suite.Suite +} + +func (suite *DynamicSessionTestSuite) TestDynamicSession() { + settings := NewSettings() + settings.globalSettings.Set(config.SocketAcceptPort, "5001") + settings.globalSettings.Set(config.DynamicSessions, "Y") + sessionId1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} + sessionSettings1 := NewSessionSettings() + sessionSettings1.Set(config.BeginString, sessionId1.BeginString) + sessionSettings1.Set(config.SenderCompID, sessionId1.SenderCompID) + sessionSettings1.Set(config.TargetCompID, sessionId1.TargetCompID) + settings.AddSession(sessionSettings1) + + a, err := NewAcceptor(&noopApp{}, memoryStoreFactory{}, settings, NewNullLogFactory()) + suite.Require().NoError(err, "create acceptor with DynamicSession=Y failed") + + if err := a.Start(); err != nil { + suite.FailNow("acceptor start failed: %v", err) + } + + inboundSessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "X", TargetCompID: "Y"} + inboundMessages := []*Message{mockLogonMessage(inboundSessionID, 1)} + reversedInboundSessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "Y", TargetCompID: "X"} + + mockConn1 := &mockConn{ + closeChan: make(chan struct{}), + inboundMessages: inboundMessages, + localAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5001}, + remoteAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5002}, + } + + var respondedLogonMessageReceived bool + mockConn1.onWriteback = func(_ []byte) { + respondedLogonMessageReceived = true + // close conn + close(mockConn1.closeChan) + } + + a.handleConnection(mockConn1) + suite.Require().Equal(true, respondedLogonMessageReceived, "expected responding logon message") + err = SendToTarget(createFIX43NewOrderSingle(), reversedInboundSessionID) + suite.Error(err, "session created by DynamicSession is unregistered after session connected") + a.Stop() +} + +func TestDynamicSessionTestSuite(t *testing.T) { + suite.Run(t, new(DynamicSessionTestSuite)) +} diff --git a/config/configuration.go b/config/configuration.go index f524f5005..506641cf9 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -697,6 +697,9 @@ const ( // - Y // - N DynamicQualifier string = "DynamicQualifier" + + // AcceptorTemplate designates a template Acceptor session. + AcceptorTemplate string = "AcceptorTemplate" ) const ( From d2edc4e99bcb897432f39c657171bedabde55c53 Mon Sep 17 00:00:00 2001 From: yanghao Date: Thu, 15 Aug 2024 00:04:01 +0800 Subject: [PATCH 2/5] Fix ci failed --- acceptor_session_provider_test.go | 10 ++-- acceptor_test.go | 93 +++++++++++++++++-------------- 2 files changed, 58 insertions(+), 45 deletions(-) diff --git a/acceptor_session_provider_test.go b/acceptor_session_provider_test.go index 6cc221000..2ae081e43 100644 --- a/acceptor_session_provider_test.go +++ b/acceptor_session_provider_test.go @@ -136,10 +136,11 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) TestSessionCreation() { suite.NoError(err) suite.NotNil(session) sessionID := session.sessionID - suite.Equal(test.expected.sessionID, sessionID, test.name+": created sessionID not expected") - suite.Equal(test.expected.resetOnLogout, session.ResetOnLogout, test.name+":ResetOnLogout not expected") - suite.Equal(test.expected.refreshOnLogon, session.RefreshOnLogon, test.name+":RefreshOnLogon not expected") - suite.Equal(test.expected.resetOnDisconnect, session.ResetOnDisconnect, test.name+":ResetOnDisconnect not expected") + suite.Require().Equal(test.expected.sessionID, sessionID, test.name+": created sessionID not expected") + suite.Require().Equal(test.expected.resetOnLogout, session.ResetOnLogout, test.name+":ResetOnLogout not expected") + suite.Require().Equal(test.expected.refreshOnLogon, session.RefreshOnLogon, test.name+":RefreshOnLogon not expected") + suite.Require().Equal(test.expected.resetOnDisconnect, session.ResetOnDisconnect, test.name+":ResetOnDisconnect not expected") + UnregisterSession(sessionID) } } @@ -213,6 +214,7 @@ func TestStaticSessionProvider_GetSession(t *testing.T) { if !reflect.DeepEqual(got, tt.want) { t.Errorf("StaticSessionProvider.GetSession() = %v, want %v", got, tt.want) } + UnregisterSession(tt.args.sessionID) }) } } diff --git a/acceptor_test.go b/acceptor_test.go index ffb015c8c..65c44a940 100644 --- a/acceptor_test.go +++ b/acceptor_test.go @@ -118,15 +118,15 @@ func (c *mockConn) RemoteAddr() net.Addr { return c.remoteAddr } -func (c *mockConn) SetDeadline(t time.Time) error { +func (c *mockConn) SetDeadline(_ time.Time) error { return nil } -func (c *mockConn) SetReadDeadline(t time.Time) error { +func (c *mockConn) SetReadDeadline(_ time.Time) error { return nil } -func (c *mockConn) SetWriteDeadline(t time.Time) error { +func (c *mockConn) SetWriteDeadline(_ time.Time) error { return nil } @@ -161,9 +161,9 @@ type AcceptorTemplateTestSuite struct { suite.Suite acceptor *Acceptor - sessionId1 SessionID - sessionId2 SessionID - sessionId3 SessionID + sessionID1 SessionID + sessionID2 SessionID + sessionID3 SessionID testDynamicSessionID SessionID logonSessionID SessionID @@ -172,52 +172,52 @@ type AcceptorTemplateTestSuite struct { dynamicSessionProvider AcceptorSessionProvider } -func (suite *AcceptorTemplateTestSuite) BeforeTest(suiteName, testName string) { +func (suite *AcceptorTemplateTestSuite) setupAcceptor() { settings := NewSettings() settings.globalSettings.Set(config.SocketAcceptPort, "5001") - sessionId1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} + sessionID1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} sessionSettings1 := NewSessionSettings() - sessionSettings1.Set(config.BeginString, sessionId1.BeginString) - sessionSettings1.Set(config.SenderCompID, sessionId1.SenderCompID) - sessionSettings1.Set(config.TargetCompID, sessionId1.TargetCompID) - suite.sessionId1 = sessionId1 + sessionSettings1.Set(config.BeginString, sessionID1.BeginString) + sessionSettings1.Set(config.SenderCompID, sessionID1.SenderCompID) + sessionSettings1.Set(config.TargetCompID, sessionID1.TargetCompID) + suite.sessionID1 = sessionID1 settings.AddSession(sessionSettings1) - sessionId2 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2"} + sessionID2 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2"} sessionSettings2 := NewSessionSettings() - sessionSettings2.Set(config.BeginString, sessionId2.BeginString) - sessionSettings2.Set(config.SenderCompID, sessionId2.SenderCompID) - sessionSettings2.Set(config.TargetCompID, sessionId2.TargetCompID) - suite.sessionId2 = sessionId2 + sessionSettings2.Set(config.BeginString, sessionID2.BeginString) + sessionSettings2.Set(config.SenderCompID, sessionID2.SenderCompID) + sessionSettings2.Set(config.TargetCompID, sessionID2.TargetCompID) + suite.sessionID2 = sessionID2 settings.AddSession(sessionSettings2) // acceptor template - sessionId3 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "*", SenderSubID: "*", SenderLocationID: "*", + sessionID3 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "*", SenderSubID: "*", SenderLocationID: "*", TargetCompID: "target3", TargetSubID: "*", TargetLocationID: "*"} sessionSettings3 := NewSessionSettings() - sessionSettings3.Set(config.BeginString, sessionId3.BeginString) - sessionSettings3.Set(config.SenderCompID, sessionId3.SenderCompID) - sessionSettings3.Set(config.SenderSubID, sessionId3.SenderSubID) - sessionSettings3.Set(config.SenderLocationID, sessionId3.SenderLocationID) - sessionSettings3.Set(config.TargetCompID, sessionId3.TargetCompID) - sessionSettings3.Set(config.TargetSubID, sessionId3.TargetSubID) - sessionSettings3.Set(config.TargetLocationID, sessionId3.TargetLocationID) + sessionSettings3.Set(config.BeginString, sessionID3.BeginString) + sessionSettings3.Set(config.SenderCompID, sessionID3.SenderCompID) + sessionSettings3.Set(config.SenderSubID, sessionID3.SenderSubID) + sessionSettings3.Set(config.SenderLocationID, sessionID3.SenderLocationID) + sessionSettings3.Set(config.TargetCompID, sessionID3.TargetCompID) + sessionSettings3.Set(config.TargetSubID, sessionID3.TargetSubID) + sessionSettings3.Set(config.TargetLocationID, sessionID3.TargetLocationID) sessionSettings3.Set(config.ResetOnLogout, "Y") sessionSettings3.Set(config.AcceptorTemplate, "Y") - suite.sessionId3 = sessionId3 + suite.sessionID3 = sessionID3 settings.AddSession(sessionSettings3) app := &noopApp{} a, err := NewAcceptor(app, memoryStoreFactory{}, settings, NewScreenLogFactory()) if err != nil { - suite.Fail("Failed to create acceptor: %v", err) + suite.Fail("Failed to create acceptor", err) } suite.acceptor = a templateMappings := make([]*TemplateMapping, 0) templateMappings = append(templateMappings, &TemplateMapping{ - Pattern: suite.sessionId3, - TemplateID: suite.sessionId3, + Pattern: suite.sessionID3, + TemplateID: suite.sessionID3, }) suite.dynamicSessionProvider = NewDynamicAcceptorSessionProvider(suite.acceptor.settings, suite.acceptor.storeFactory, suite.acceptor.logFactory, suite.acceptor.app, templateMappings) suite.acceptor.SetSessionProvider(suite.dynamicSessionProvider) @@ -225,7 +225,7 @@ func (suite *AcceptorTemplateTestSuite) BeforeTest(suiteName, testName string) { suite.testDynamicSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "target3", TargetCompID: "dynamicSender"} suite.logonSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "dynamicSender", TargetCompID: "target3"} if err := suite.acceptor.Start(); err != nil { - suite.FailNow("acceptor start failed: %v", err) + suite.FailNow("acceptor start failed", err) } suite.verifySessionCount(2) @@ -245,12 +245,12 @@ func (suite *AcceptorTemplateTestSuite) logonAndDisconnectAfterCheck(sessionID S remoteAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5002}, } mockConn1.onWriteback = func(b []byte) { - reponseMsg := NewMessage() - err := ParseMessage(reponseMsg, bytes.NewBuffer(b)) + responseMsg := NewMessage() + err := ParseMessage(responseMsg, bytes.NewBuffer(b)) suite.Require().NoError(err, "parse responding message failed") - msgType, err := reponseMsg.Header.GetString(tagMsgType) + msgType, err := responseMsg.Header.GetString(tagMsgType) suite.Require().NoError(err, "unexpected mssage") - suite.Require().Equalf("A", msgType, "expected logon message in reponse %s", reponseMsg.String()) + suite.Require().Equalf("A", msgType, "expected logon message in response %s", responseMsg.String()) respondedLogonMessageReceived = true if checkFuncAfterLogon != nil { checkFuncAfterLogon() @@ -265,10 +265,11 @@ func (suite *AcceptorTemplateTestSuite) logonAndDisconnectAfterCheck(sessionID S func (suite *AcceptorTemplateTestSuite) verifySessionCount(expectedSessionCount int) { suite.Require().Equalf(expectedSessionCount, len(suite.acceptor.sessions), "expected %v sessions but found %v", expectedSessionCount, len(suite.acceptor.sessions)) - suite.Require().Equalf(expectedSessionCount, len(sessions), "expected %v sessions but found %v in registry", expectedSessionCount, len(suite.acceptor.sessions)) + suite.Require().Equalf(expectedSessionCount, len(sessions), "expected %v sessions but found %v in registry", expectedSessionCount, sessions) } -func (suite *AcceptorTemplateTestSuite) TestCreateDynamicSessionBySessionProvider() { +func (suite *AcceptorTemplateTestSuite) testCreateDynamicSessionBySessionProvider() { + suite.setupAcceptor() logonSessionID := suite.logonSessionID suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { suite.verifySessionCount(3) @@ -287,7 +288,8 @@ func (suite *AcceptorTemplateTestSuite) TestCreateDynamicSessionBySessionProvide suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) TestSessionCreatedBySessionProviderShouldBeKept() { +func (suite *AcceptorTemplateTestSuite) testSessionCreatedBySessionProviderShouldBeKept() { + suite.setupAcceptor() logonSessionID := suite.logonSessionID suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { suite.verifySessionCount(3) @@ -297,7 +299,8 @@ func (suite *AcceptorTemplateTestSuite) TestSessionCreatedBySessionProviderShoul suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) TestNoNewSessionCreatedWhenSameSessionIDLogons() { +func (suite *AcceptorTemplateTestSuite) testNoNewSessionCreatedWhenSameSessionIDLogons() { + suite.setupAcceptor() suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { suite.verifySessionCount(3) }, true) @@ -310,13 +313,21 @@ func (suite *AcceptorTemplateTestSuite) TestNoNewSessionCreatedWhenSameSessionID suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) TestSessionNotFoundBySessionProvider() { +func (suite *AcceptorTemplateTestSuite) testSessionNotFoundBySessionProvider() { + suite.setupAcceptor() sessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "unknownSender", TargetCompID: "unknownTarget"} suite.logonAndDisconnectAfterCheck(sessionID, func() {}, false) suite.verifySessionCount(2) suite.acceptor.Stop() } +func (suite *AcceptorTemplateTestSuite) TestSequentially() { + suite.Run("testCreateDynamicSessionBySessionProvider", suite.testCreateDynamicSessionBySessionProvider) + suite.Run("testSessionCreatedBySessionProviderShouldBeKept", suite.testSessionCreatedBySessionProviderShouldBeKept) + suite.Run("testNoNewSessionCreatedWhenSameSessionIDLogons", suite.testNoNewSessionCreatedWhenSameSessionIDLogons) + suite.Run("testSessionNotFoundBySessionProvider", suite.testSessionNotFoundBySessionProvider) +} + func TestAcceptorTemplateTestSuite(t *testing.T) { suite.Run(t, new(AcceptorTemplateTestSuite)) } @@ -327,9 +338,9 @@ type DynamicSessionTestSuite struct { func (suite *DynamicSessionTestSuite) TestDynamicSession() { settings := NewSettings() - settings.globalSettings.Set(config.SocketAcceptPort, "5001") + settings.globalSettings.Set(config.SocketAcceptPort, "5003") settings.globalSettings.Set(config.DynamicSessions, "Y") - sessionId1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} + sessionId1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "dynamicSender1", TargetCompID: "dynamicTarget1"} sessionSettings1 := NewSessionSettings() sessionSettings1.Set(config.BeginString, sessionId1.BeginString) sessionSettings1.Set(config.SenderCompID, sessionId1.SenderCompID) From db67f32dbea86b8fc9549340b1257d695655d8c2 Mon Sep 17 00:00:00 2001 From: yanghao Date: Thu, 15 Aug 2024 16:22:03 +0800 Subject: [PATCH 3/5] change the acceptorTemplate logic --- acceptor.go | 82 ++++++---- acceptor_session_provider.go | 101 +++++++----- acceptor_session_provider_test.go | 199 ++++++++++++------------ acceptor_test.go | 250 +++++++++++++++--------------- store/file/filestore.go | 22 ++- store/mongo/mongostore.go | 22 ++- store/sql/sqlstore.go | 22 ++- 7 files changed, 399 insertions(+), 299 deletions(-) diff --git a/acceptor.go b/acceptor.go index c48ff5f46..d3ce23bc4 100644 --- a/acceptor.go +++ b/acceptor.go @@ -32,24 +32,25 @@ import ( // Acceptor accepts connections from FIX clients and manages the associated sessions. type Acceptor struct { - app Application - settings *Settings - logFactory LogFactory - storeFactory MessageStoreFactory - globalLog Log - sessions map[SessionID]*session - sessionsLock sync.RWMutex - sessionGroup sync.WaitGroup - listenerShutdown sync.WaitGroup - dynamicSessions bool - dynamicQualifier bool - dynamicQualifierCount int - dynamicSessionChan chan *session - sessionAddr sync.Map - sessionHostPort map[SessionID]int - listeners map[string]net.Listener - connectionValidator ConnectionValidator - sessionProvider AcceptorSessionProvider + app Application + settings *Settings + logFactory LogFactory + storeFactory MessageStoreFactory + globalLog Log + sessions map[SessionID]*session + sessionsLock sync.RWMutex + sessionGroup sync.WaitGroup + listenerShutdown sync.WaitGroup + dynamicSessions bool + dynamicQualifier bool + dynamicQualifierCount int + dynamicSessionChan chan *session + sessionAddr sync.Map + sessionHostPort map[SessionID]int + listeners map[string]net.Listener + connectionValidator ConnectionValidator + templateIDProvider TemplateIDProvider + dynamicAcceptorSessionProvider *dynamicAcceptorSessionProvider sessionFactory } @@ -62,6 +63,11 @@ type ConnectionValidator interface { // Start accepting connections. func (a *Acceptor) Start() (err error) { + + if err = a.configureDyanmicSessionProvider(); err != nil { + return + } + socketAcceptHost := "" if a.settings.GlobalSettings().HasSetting(config.SocketAcceptHost) { if socketAcceptHost, err = a.settings.GlobalSettings().Setting(config.SocketAcceptHost); err != nil { @@ -123,6 +129,25 @@ func (a *Acceptor) Start() (err error) { return } +func (a *Acceptor) configureDyanmicSessionProvider() error { + if a.templateIDProvider == nil { + defaultTemplateIDProvider, err := NewDefaultTemplateIDProvider(a.settings) + if err != nil { + return err + } + if len(defaultTemplateIDProvider.templateMappings) == 0 { + // no templateMappings + return nil + } + a.templateIDProvider = defaultTemplateIDProvider + } + if setter, ok := a.storeFactory.(TemplateIDProviderSetter); ok { + setter.SetTemplateIDProvider(a.templateIDProvider) + } + a.dynamicAcceptorSessionProvider = NewDynamicAcceptorSessionProvider(a.settings, a.storeFactory, a.logFactory, a.app, a.templateIDProvider) + return nil +} + // Stop logs out existing sessions, close their connections, and stop accepting new connections. func (a *Acceptor) Stop() { defer func() { @@ -326,21 +351,14 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { } session, ok := a.sessions[sessID] if !ok { - var dynamicSessionCreated bool - if a.sessionProvider != nil { - session, err = a.sessionProvider.GetSession(sessID) + if a.dynamicAcceptorSessionProvider != nil { + session, err = a.dynamicAcceptorSessionProvider.GetSession(sessID) if err != nil { - if err == errUnknownSession && a.dynamicSessions { - goto CREATE_SHORT_LIVED_DYNAMIC_SESSION - } a.globalLog.OnEventf("Failed to get session %v from provider: %v", sessID, err) return } a.addMngdDynamicSession(sessID, session) - dynamicSessionCreated = true - } - CREATE_SHORT_LIVED_DYNAMIC_SESSION: - if !dynamicSessionCreated { + } else { if !a.dynamicSessions { a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes) return @@ -354,7 +372,6 @@ func (a *Acceptor) handleConnection(netConn net.Conn) { session = dynamicSession defer session.stop() } - } a.sessionAddr.Store(sessID, netConn.RemoteAddr()) @@ -474,7 +491,8 @@ func (a *Acceptor) SetConnectionValidator(validator ConnectionValidator) { a.connectionValidator = validator } -// SetSessionProvider sets an optional session provider. -func (a *Acceptor) SetSessionProvider(sessionProvider AcceptorSessionProvider) { - a.sessionProvider = sessionProvider +// SetTemplateIDProvider sets an optional templateID provider. +// If not set and AcceptorTemplate=Y is configured for a session, the `DefaultTemplateIDProvider` will be used. +func (a *Acceptor) SetTemplateIDProvider(templateIDProvider TemplateIDProvider) { + a.templateIDProvider = templateIDProvider } diff --git a/acceptor_session_provider.go b/acceptor_session_provider.go index 2547a0535..f9349e096 100644 --- a/acceptor_session_provider.go +++ b/acceptor_session_provider.go @@ -6,63 +6,94 @@ const ( WildcardPattern string = "*" ) -type AcceptorSessionProvider interface { - GetSession(SessionID) (*session, error) +// TemplateIDProvider is an interface for obtaining templateIDs for inbound sessions. +// +// The SessionSettings for the template SessionID must be configured in the Acceptor. +// The sessionSettings of an inbound session inherits from the template SessionID. +// If no matching template is found, return nil, and no session will be created for the inbound logon request. +type TemplateIDProvider interface { + GetTemplateID(inbound SessionID) (templateID *SessionID) } -type StaticAcceptorSessionProvider struct { - sessions map[SessionID]*session +type TemplateIDProviderSetter interface { + SetTemplateIDProvider(TemplateIDProvider) } -func (p *StaticAcceptorSessionProvider) GetSession(sessionID SessionID) (*session, error) { - s, ok := p.sessions[sessionID] - if !ok { - return nil, errUnknownSession +type DefaultTemplateIDProvider struct { + templateMappings []*TemplateMapping +} + +func NewDefaultTemplateIDProvider(settings *Settings) (*DefaultTemplateIDProvider, error) { + templateMappings := make([]*TemplateMapping, 0) + for sid, ss := range settings.SessionSettings() { + var ( + acceptorTemplate bool + err error + ) + if ss.HasSetting(config.AcceptorTemplate) { + acceptorTemplate, err = ss.BoolSetting(config.AcceptorTemplate) + if err != nil { + return nil, err + } + } + if acceptorTemplate { + templateMappings = append(templateMappings, &TemplateMapping{ + Pattern: sid, + TemplateID: sid, + }) + } } - return s, nil + return &DefaultTemplateIDProvider{templateMappings: templateMappings}, nil } -// DynamicAcceptorSessionProvider dynamically defines sessions for an acceptor. This can be useful for -// applications like simulators that want to accept any connection and -// dynamically create an associated session. -// -// For more complex situations, you can use this class as a starting -// point for implementing your own AcceptorSessionProvider. -type DynamicAcceptorSessionProvider struct { +func (p *DefaultTemplateIDProvider) GetTemplateID(inbound SessionID) (templateID *SessionID) { + return p.lookupTemplateID(inbound) +} + +func (p *DefaultTemplateIDProvider) lookupTemplateID(sessionID SessionID) *SessionID { + for _, mapping := range p.templateMappings { + if isTemplateMatching(mapping.Pattern, sessionID) { + return &mapping.TemplateID + } + } + return nil +} + +type dynamicAcceptorSessionProvider struct { settings *Settings messageStoreFactory MessageStoreFactory logFactory LogFactory sessionFactory *sessionFactory application Application - templateMappings []*TemplateMapping + + templateIDProvider TemplateIDProvider } func NewDynamicAcceptorSessionProvider(settings *Settings, messageStoreFactory MessageStoreFactory, logFactory LogFactory, - application Application, templateMappings []*TemplateMapping, -) *DynamicAcceptorSessionProvider { - return &DynamicAcceptorSessionProvider{ + application Application, templateIDProvider TemplateIDProvider, +) *dynamicAcceptorSessionProvider { + return &dynamicAcceptorSessionProvider{ settings: settings, messageStoreFactory: messageStoreFactory, logFactory: logFactory, sessionFactory: &sessionFactory{}, application: application, - templateMappings: templateMappings, + templateIDProvider: templateIDProvider, } } -func (p *DynamicAcceptorSessionProvider) FindTemplateID(sessionID SessionID) *SessionID { - return p.lookupTemplateID(sessionID) -} - -func (p *DynamicAcceptorSessionProvider) GetSession(sessionID SessionID) (*session, error) { +func (p *dynamicAcceptorSessionProvider) GetSession(sessionID SessionID) (*session, error) { s, ok := lookupSession(sessionID) - if !ok { - templateID := p.lookupTemplateID(sessionID) + if !ok && p.templateIDProvider != nil { + templateID := p.templateIDProvider.GetTemplateID(sessionID) if templateID == nil { return nil, errUnknownSession } dynamicSessionSettings := p.settings.globalSettings.clone() - templateSettings := p.settings.sessionSettings[*templateID] + templateSettings, ok := p.settings.sessionSettings[*templateID] + if !ok { + return nil, errUnknownSession + } dynamicSessionSettings.overlay(templateSettings) dynamicSessionSettings.Set(config.BeginString, sessionID.BeginString) dynamicSessionSettings.Set(config.SenderCompID, sessionID.SenderCompID) @@ -82,16 +113,10 @@ func (p *DynamicAcceptorSessionProvider) GetSession(sessionID SessionID) (*sessi return nil, err } } - return s, nil -} - -func (provider *DynamicAcceptorSessionProvider) lookupTemplateID(sessionID SessionID) *SessionID { - for _, mapping := range provider.templateMappings { - if isTemplateMatching(mapping.Pattern, sessionID) { - return &mapping.TemplateID - } + if s == nil { + return nil, errUnknownSession } - return nil + return s, nil } func isTemplateMatching(pattern SessionID, sessionID SessionID) bool { diff --git a/acceptor_session_provider_test.go b/acceptor_session_provider_test.go index 2ae081e43..6388109c5 100644 --- a/acceptor_session_provider_test.go +++ b/acceptor_session_provider_test.go @@ -1,24 +1,104 @@ package quickfix import ( - "reflect" + "strings" "testing" "github.com/quickfixgo/quickfix/config" "github.com/stretchr/testify/suite" ) +var _ Application = &noopApp{} + +type noopApp struct { +} + +func (n *noopApp) FromAdmin(_ *Message, _ SessionID) MessageRejectError { + return nil +} + +func (n *noopApp) FromApp(_ *Message, _ SessionID) MessageRejectError { + return nil +} + +func (n *noopApp) OnCreate(_ SessionID) { +} + +func (n *noopApp) OnLogon(_ SessionID) { +} + +func (n *noopApp) OnLogout(_ SessionID) { +} + +func (n *noopApp) ToAdmin(_ *Message, _ SessionID) { +} + +func (n *noopApp) ToApp(_ *Message, _ SessionID) error { + return nil +} + type DynamicAcceptorSessionProviderTestSuite struct { suite.Suite - provider *DynamicAcceptorSessionProvider + dynamicAcceptorSessionProvider *dynamicAcceptorSessionProvider settings *Settings messageStoreFactory MessageStoreFactory logFactory LogFactory app Application sessionFactory *sessionFactory - TemplateMapping []*TemplateMapping +} + +func (suite *DynamicAcceptorSessionProviderTestSuite) TestNewDefaultTemplateIDProvider() { + cfg := ` +[default] +ConnectionType=acceptor +SocketAcceptPort=9878 +BeginString=FIX.4.2 +TimeZone=America/New_York +StartTime=00:00:01 +EndTime=23:59:59 +HeartBtInt=30 + +[session] +AcceptorTemplate=Y +SenderCompID=test1 +TargetCompID=* +ResetOnLogon=Y + +[session] +AcceptorTemplate=Y +SenderCompID=test2 +TargetCompID=* +ResetOnLogon=Y + ` + stringReader := strings.NewReader(cfg) + settings, err := ParseSettings(stringReader) + if err != nil { + suite.FailNow("parse setting failed", err) + } + + provider, err := NewDefaultTemplateIDProvider(settings) + if err != nil { + suite.FailNow("create TemplateIDProvider failed", err) + } + + s1 := SessionID{BeginString: "FIX.4.2", SenderCompID: "test1", TargetCompID: "cli"} + templateID1 := provider.GetTemplateID(s1) + suite.Require().NotNil(templateID1, "expected template matched") + suite.Require().Equal( + *templateID1, + SessionID{BeginString: "FIX.4.2", SenderCompID: "test1", TargetCompID: "*"}, + "unexpected templateID", + ) + + s2 := SessionID{BeginString: "FIX.4.3", SenderCompID: "test1", TargetCompID: "cli"} + templateID2 := provider.GetTemplateID(s2) + suite.Require().Nilf(templateID2, "expected template not matched for %v", s2.String()) + + s3 := SessionID{BeginString: "FIX.4.2", SenderCompID: "X", TargetCompID: "cli"} + templateID3 := provider.GetTemplateID(s3) + suite.Require().Nilf(templateID3, "expected template not matched for %v", s3.String()) } func (suite *DynamicAcceptorSessionProviderTestSuite) SetupTest() { @@ -27,33 +107,37 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) SetupTest() { suite.logFactory = nullLogFactory{} suite.app = &noopApp{} suite.sessionFactory = &sessionFactory{} - suite.TemplateMapping = make([]*TemplateMapping, 0) + templateMappings := make([]*TemplateMapping, 0) - templateId1 := SessionID{BeginString: "FIX.4.2", SenderCompID: "ANY", TargetCompID: "ANY"} - suite.TemplateMapping = append( - suite.TemplateMapping, - &TemplateMapping{Pattern: SessionID{BeginString: WildcardPattern, SenderCompID: "S1", TargetCompID: WildcardPattern}, TemplateID: templateId1}, + templateID1 := SessionID{BeginString: "FIX.4.2", SenderCompID: "ANY", TargetCompID: "ANY"} + templateMappings = append( + templateMappings, + &TemplateMapping{Pattern: SessionID{BeginString: WildcardPattern, SenderCompID: "S1", TargetCompID: WildcardPattern}, TemplateID: templateID1}, ) - suite.setUpSettings(templateId1, "ResetOnLogout", "Y") + suite.setUpSettings(templateID1, "ResetOnLogout", "Y") templateId2 := SessionID{BeginString: "FIX.4.4", SenderCompID: "S1", TargetCompID: "ANY"} - suite.TemplateMapping = append( - suite.TemplateMapping, + templateMappings = append( + templateMappings, &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.4", SenderCompID: WildcardPattern, TargetCompID: WildcardPattern}, TemplateID: templateId2}, ) suite.setUpSettings(templateId2, "RefreshOnLogon", "Y") templateId3 := SessionID{BeginString: "FIX.4.4", SenderCompID: "ANY", TargetCompID: "ANY"} - suite.TemplateMapping = append( - suite.TemplateMapping, + templateMappings = append( + templateMappings, &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.2", SenderCompID: WildcardPattern, SenderSubID: WildcardPattern, SenderLocationID: WildcardPattern, TargetCompID: WildcardPattern, TargetSubID: WildcardPattern, TargetLocationID: WildcardPattern, Qualifier: WildcardPattern, }, TemplateID: templateId3}, ) suite.setUpSettings(templateId3, "ResetOnDisconnect", "Y") - suite.provider = NewDynamicAcceptorSessionProvider(suite.settings, suite.messageStoreFactory, - suite.logFactory, suite.app, suite.TemplateMapping) + templateIDProvider := &DefaultTemplateIDProvider{ + templateMappings: templateMappings, + } + + suite.dynamicAcceptorSessionProvider = NewDynamicAcceptorSessionProvider(suite.settings, suite.messageStoreFactory, + suite.logFactory, suite.app, templateIDProvider) } func (suite *DynamicAcceptorSessionProviderTestSuite) setUpSettings(TemplateID SessionID, key, value string) { @@ -132,7 +216,7 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) TestSessionCreation() { } for _, test := range tests { - session, err := suite.provider.GetSession(test.input) + session, err := suite.dynamicAcceptorSessionProvider.GetSession(test.input) suite.NoError(err) suite.NotNil(session) sessionID := session.sessionID @@ -158,7 +242,7 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) TestTemplateNotFound() { } for _, test := range tests { - _, err := suite.provider.GetSession(test.input) + _, err := suite.dynamicAcceptorSessionProvider.GetSession(test.input) suite.Error(err, test.name+": expected error for template not found") } } @@ -166,84 +250,3 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) TestTemplateNotFound() { func TestDynamicAcceptorSessionProviderTestSuite(t *testing.T) { suite.Run(t, new(DynamicAcceptorSessionProviderTestSuite)) } - -func TestStaticSessionProvider_GetSession(t *testing.T) { - sessions := make(map[SessionID]*session) - sessionID1 := SessionID{BeginString: "FIX.4.2", SenderCompID: "SENDER", TargetCompID: "TARGET"} - session1 := &session{sessionID: sessionID1} - sessions[sessionID1] = session1 - - type args struct { - sessionID SessionID - } - tests := []struct { - name string - args args - want *session - wantErr bool - }{ - { - name: "session found", - args: args{ - sessionID: sessionID1, - }, - want: session1, - wantErr: false, - }, - { - name: "session not found", - args: args{ - sessionID: SessionID{ - BeginString: "FIX.4.2", SenderCompID: "X", TargetCompID: "Y", - }, - }, - want: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &StaticAcceptorSessionProvider{ - sessions: sessions, - } - got, err := p.GetSession(tt.args.sessionID) - if (err != nil) != tt.wantErr { - t.Errorf("StaticSessionProvider.GetSession() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("StaticSessionProvider.GetSession() = %v, want %v", got, tt.want) - } - UnregisterSession(tt.args.sessionID) - }) - } -} - -var _ Application = &noopApp{} - -type noopApp struct { -} - -func (n *noopApp) FromAdmin(message *Message, sessionID SessionID) MessageRejectError { - return nil -} - -func (n *noopApp) FromApp(message *Message, sessionID SessionID) MessageRejectError { - return nil -} - -func (n *noopApp) OnCreate(sessionID SessionID) { -} - -func (n *noopApp) OnLogon(sessionID SessionID) { -} - -func (n *noopApp) OnLogout(sessionID SessionID) { -} - -func (n *noopApp) ToAdmin(message *Message, sessionID SessionID) { -} - -func (n *noopApp) ToApp(message *Message, sessionID SessionID) error { - return nil -} diff --git a/acceptor_test.go b/acceptor_test.go index 65c44a940..cec5fb389 100644 --- a/acceptor_test.go +++ b/acceptor_test.go @@ -19,12 +19,12 @@ import ( "bytes" "io" "net" + "strings" "testing" "time" - "github.com/quickfixgo/quickfix/config" - proxyproto "github.com/pires/go-proxyproto" + "github.com/quickfixgo/quickfix/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -165,76 +165,74 @@ type AcceptorTemplateTestSuite struct { sessionID2 SessionID sessionID3 SessionID - testDynamicSessionID SessionID - logonSessionID SessionID - seqNum int - - dynamicSessionProvider AcceptorSessionProvider + cliSessionID SessionID + logonSessionID SessionID + seqNum int } -func (suite *AcceptorTemplateTestSuite) setupAcceptor() { - settings := NewSettings() - settings.globalSettings.Set(config.SocketAcceptPort, "5001") - sessionID1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} - sessionSettings1 := NewSessionSettings() - sessionSettings1.Set(config.BeginString, sessionID1.BeginString) - sessionSettings1.Set(config.SenderCompID, sessionID1.SenderCompID) - sessionSettings1.Set(config.TargetCompID, sessionID1.TargetCompID) - suite.sessionID1 = sessionID1 - settings.AddSession(sessionSettings1) - - sessionID2 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2"} - sessionSettings2 := NewSessionSettings() - sessionSettings2.Set(config.BeginString, sessionID2.BeginString) - sessionSettings2.Set(config.SenderCompID, sessionID2.SenderCompID) - sessionSettings2.Set(config.TargetCompID, sessionID2.TargetCompID) - suite.sessionID2 = sessionID2 - settings.AddSession(sessionSettings2) - - // acceptor template - sessionID3 := SessionID{BeginString: BeginStringFIX43, SenderCompID: "*", SenderSubID: "*", SenderLocationID: "*", +func (suite *AcceptorTemplateTestSuite) BeforeTest(_, _ string) { + cfg := ` +[default] +ConnectionType=acceptor +SocketAcceptPort=5001 +TimeZone=America/New_York +StartTime=00:00:01 +EndTime=23:59:59 +HeartBtInt=30 + +[session] +BeginString=FIX.4.2 +SenderCompID=sender1 +TargetCompID=target1 + +[session] +BeginString=FIX.4.3 +SenderCompID=sender2 +TargetCompID=target2 +ResetOnLogon=Y + +[session] +AcceptorTemplate=Y +BeginString=FIX.4.3 +SenderCompID=* +SenderSubID=* +SenderLocationID=* +TargetCompID=target3 +TargetSubID=* +TargetLocationID=* +ResetOnLogout=Y +` + stringReader := strings.NewReader(cfg) + settings, err := ParseSettings(stringReader) + if err != nil { + suite.FailNow("parse setting failed", err) + } + suite.sessionID1 = SessionID{BeginString: BeginStringFIX42, SenderCompID: "sender1", TargetCompID: "target1"} + suite.sessionID2 = SessionID{BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2"} + suite.sessionID3 = SessionID{BeginString: BeginStringFIX43, SenderCompID: "*", SenderSubID: "*", SenderLocationID: "*", TargetCompID: "target3", TargetSubID: "*", TargetLocationID: "*"} - sessionSettings3 := NewSessionSettings() - sessionSettings3.Set(config.BeginString, sessionID3.BeginString) - sessionSettings3.Set(config.SenderCompID, sessionID3.SenderCompID) - sessionSettings3.Set(config.SenderSubID, sessionID3.SenderSubID) - sessionSettings3.Set(config.SenderLocationID, sessionID3.SenderLocationID) - sessionSettings3.Set(config.TargetCompID, sessionID3.TargetCompID) - sessionSettings3.Set(config.TargetSubID, sessionID3.TargetSubID) - sessionSettings3.Set(config.TargetLocationID, sessionID3.TargetLocationID) - sessionSettings3.Set(config.ResetOnLogout, "Y") - sessionSettings3.Set(config.AcceptorTemplate, "Y") - suite.sessionID3 = sessionID3 - settings.AddSession(sessionSettings3) app := &noopApp{} - a, err := NewAcceptor(app, memoryStoreFactory{}, settings, NewScreenLogFactory()) + a, err := NewAcceptor(app, memoryStoreFactory{}, settings, NewNullLogFactory()) if err != nil { suite.Fail("Failed to create acceptor", err) } suite.acceptor = a - templateMappings := make([]*TemplateMapping, 0) - templateMappings = append(templateMappings, &TemplateMapping{ - Pattern: suite.sessionID3, - TemplateID: suite.sessionID3, - }) - suite.dynamicSessionProvider = NewDynamicAcceptorSessionProvider(suite.acceptor.settings, suite.acceptor.storeFactory, suite.acceptor.logFactory, suite.acceptor.app, templateMappings) - suite.acceptor.SetSessionProvider(suite.dynamicSessionProvider) - - suite.testDynamicSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "target3", TargetCompID: "dynamicSender"} + suite.cliSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "target3", TargetCompID: "dynamicSender"} suite.logonSessionID = SessionID{BeginString: BeginStringFIX43, SenderCompID: "dynamicSender", TargetCompID: "target3"} - if err := suite.acceptor.Start(); err != nil { - suite.FailNow("acceptor start failed", err) - } + suite.seqNum = 1 +} - suite.verifySessionCount(2) +func (suite *AcceptorTemplateTestSuite) TearDownTest() { + suite.acceptor.Stop() + suite.acceptor = nil suite.seqNum = 1 } func (suite *AcceptorTemplateTestSuite) logonAndDisconnectAfterCheck(sessionID SessionID, checkFuncAfterLogon func(), - mustHaveResponse bool) { + wantLogonSuccess bool) { inboundMessages := []*Message{mockLogonMessage(sessionID, suite.seqNum)} suite.seqNum++ var respondedLogonMessageReceived bool @@ -250,7 +248,9 @@ func (suite *AcceptorTemplateTestSuite) logonAndDisconnectAfterCheck(sessionID S suite.Require().NoError(err, "parse responding message failed") msgType, err := responseMsg.Header.GetString(tagMsgType) suite.Require().NoError(err, "unexpected mssage") - suite.Require().Equalf("A", msgType, "expected logon message in response %s", responseMsg.String()) + if wantLogonSuccess && msgType != "A" { + return + } respondedLogonMessageReceived = true if checkFuncAfterLogon != nil { checkFuncAfterLogon() @@ -258,7 +258,7 @@ func (suite *AcceptorTemplateTestSuite) logonAndDisconnectAfterCheck(sessionID S close(mockConn1.closeChan) } suite.acceptor.handleConnection(mockConn1) - if mustHaveResponse { + if wantLogonSuccess { suite.Require().Equal(true, respondedLogonMessageReceived, "expected responding logon message") } } @@ -268,10 +268,14 @@ func (suite *AcceptorTemplateTestSuite) verifySessionCount(expectedSessionCount suite.Require().Equalf(expectedSessionCount, len(sessions), "expected %v sessions but found %v in registry", expectedSessionCount, sessions) } -func (suite *AcceptorTemplateTestSuite) testCreateDynamicSessionBySessionProvider() { - suite.setupAcceptor() +func (suite *AcceptorTemplateTestSuite) TestCreateDynamicSessionBySessionProvider() { + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed", err) + } + suite.verifySessionCount(2) + logonSessionID := suite.logonSessionID - suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.logonAndDisconnectAfterCheck(suite.cliSessionID, func() { suite.verifySessionCount(3) createdSession, ok := suite.acceptor.sessions[logonSessionID] @@ -285,100 +289,102 @@ func (suite *AcceptorTemplateTestSuite) testCreateDynamicSessionBySessionProvide } suite.Require().Equal("127.0.0.1:5002", remoteAddr.String(), "expect remoteAddr for dynamic session to be 127.0.0.1:5002 but got %v", remoteAddr.String()) }, true) - suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) testSessionCreatedBySessionProviderShouldBeKept() { - suite.setupAcceptor() +func (suite *AcceptorTemplateTestSuite) TestSessionCreatedBySessionProviderShouldBeKept() { + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed", err) + } + suite.verifySessionCount(2) + logonSessionID := suite.logonSessionID - suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.logonAndDisconnectAfterCheck(suite.cliSessionID, func() { suite.verifySessionCount(3) }, true) err := SendToTarget(createFIX43NewOrderSingle(), logonSessionID) suite.NoError(err, "expected message can still be sent after session disconnected") - suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) testNoNewSessionCreatedWhenSameSessionIDLogons() { - suite.setupAcceptor() - suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { +func (suite *AcceptorTemplateTestSuite) TestNoNewSessionCreatedWhenSameSessionIDLogons() { + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed", err) + } + suite.verifySessionCount(2) + + suite.logonAndDisconnectAfterCheck(suite.cliSessionID, func() { suite.verifySessionCount(3) }, true) - suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.logonAndDisconnectAfterCheck(suite.cliSessionID, func() { suite.verifySessionCount(3) }, true) - suite.logonAndDisconnectAfterCheck(suite.testDynamicSessionID, func() { + suite.logonAndDisconnectAfterCheck(suite.cliSessionID, func() { suite.verifySessionCount(3) }, true) - suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) testSessionNotFoundBySessionProvider() { - suite.setupAcceptor() +func (suite *AcceptorTemplateTestSuite) TestSessionNotFoundBySessionProvider() { + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed", err) + } + suite.verifySessionCount(2) + sessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "unknownSender", TargetCompID: "unknownTarget"} suite.logonAndDisconnectAfterCheck(sessionID, func() {}, false) suite.verifySessionCount(2) - suite.acceptor.Stop() } -func (suite *AcceptorTemplateTestSuite) TestSequentially() { - suite.Run("testCreateDynamicSessionBySessionProvider", suite.testCreateDynamicSessionBySessionProvider) - suite.Run("testSessionCreatedBySessionProviderShouldBeKept", suite.testSessionCreatedBySessionProviderShouldBeKept) - suite.Run("testNoNewSessionCreatedWhenSameSessionIDLogons", suite.testNoNewSessionCreatedWhenSameSessionIDLogons) - suite.Run("testSessionNotFoundBySessionProvider", suite.testSessionNotFoundBySessionProvider) +type mockCustomTemplateIDProvider struct { + staticTemplateID SessionID } -func TestAcceptorTemplateTestSuite(t *testing.T) { - suite.Run(t, new(AcceptorTemplateTestSuite)) +// mockCustomTemplateIDProvider always returns the same templateID +func (p *mockCustomTemplateIDProvider) GetTemplateID(inboundSessionID SessionID) *SessionID { + return &p.staticTemplateID } -type DynamicSessionTestSuite struct { - suite.Suite -} - -func (suite *DynamicSessionTestSuite) TestDynamicSession() { - settings := NewSettings() - settings.globalSettings.Set(config.SocketAcceptPort, "5003") - settings.globalSettings.Set(config.DynamicSessions, "Y") - sessionId1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "dynamicSender1", TargetCompID: "dynamicTarget1"} - sessionSettings1 := NewSessionSettings() - sessionSettings1.Set(config.BeginString, sessionId1.BeginString) - sessionSettings1.Set(config.SenderCompID, sessionId1.SenderCompID) - sessionSettings1.Set(config.TargetCompID, sessionId1.TargetCompID) - settings.AddSession(sessionSettings1) - - a, err := NewAcceptor(&noopApp{}, memoryStoreFactory{}, settings, NewNullLogFactory()) - suite.Require().NoError(err, "create acceptor with DynamicSession=Y failed") - - if err := a.Start(); err != nil { - suite.FailNow("acceptor start failed: %v", err) +func (suite *AcceptorTemplateTestSuite) TestCustomTemplateIDProvider_NoSessionCreated() { + // this templateIDProvider selects session FIX.4.3:sender2->sender2 as the template + templateIDProvider := &mockCustomTemplateIDProvider{staticTemplateID: SessionID{ + BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2", + }} + suite.acceptor.SetTemplateIDProvider(templateIDProvider) + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed", err) } + suite.verifySessionCount(2) - inboundSessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "X", TargetCompID: "Y"} - inboundMessages := []*Message{mockLogonMessage(inboundSessionID, 1)} - reversedInboundSessionID := SessionID{BeginString: BeginStringFIX43, SenderCompID: "Y", TargetCompID: "X"} + // no session created + logon1 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "target1", TargetCompID: "sender1"} + suite.logonAndDisconnectAfterCheck(logon1, func() { + suite.verifySessionCount(2) + }, true) +} - mockConn1 := &mockConn{ - closeChan: make(chan struct{}), - inboundMessages: inboundMessages, - localAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5001}, - remoteAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5002}, +func (suite *AcceptorTemplateTestSuite) TestCustomTemplateIDProvider_SessionCreated() { + // this templateIDProvider selects session FIX.4.3:sender2->sender2 as the template + templateIDProvider := &mockCustomTemplateIDProvider{staticTemplateID: SessionID{ + BeginString: BeginStringFIX43, SenderCompID: "sender2", TargetCompID: "target2", + }} + suite.acceptor.SetTemplateIDProvider(templateIDProvider) + if err := suite.acceptor.Start(); err != nil { + suite.FailNow("acceptor start failed", err) } + suite.verifySessionCount(2) - var respondedLogonMessageReceived bool - mockConn1.onWriteback = func(_ []byte) { - respondedLogonMessageReceived = true - // close conn - close(mockConn1.closeChan) - } + // session created + logonSessionID2 := SessionID{BeginString: BeginStringFIX42, SenderCompID: "any", TargetCompID: "any"} + suite.logonAndDisconnectAfterCheck(logonSessionID2, func() { + suite.verifySessionCount(3) + }, true) + // logon again + suite.logonAndDisconnectAfterCheck(logonSessionID2, func() { + suite.verifySessionCount(3) + }, true) - a.handleConnection(mockConn1) - suite.Require().Equal(true, respondedLogonMessageReceived, "expected responding logon message") - err = SendToTarget(createFIX43NewOrderSingle(), reversedInboundSessionID) - suite.Error(err, "session created by DynamicSession is unregistered after session connected") - a.Stop() + session2 := suite.acceptor.sessions[logonSessionID2] + suite.Require().Equal(true, session2.ResetOnLogon, "expected session2 ResetOnLogon=Y") } -func TestDynamicSessionTestSuite(t *testing.T) { - suite.Run(t, new(DynamicSessionTestSuite)) +func TestAcceptorTemplateTestSuite(t *testing.T) { + suite.Run(t, new(AcceptorTemplateTestSuite)) } diff --git a/store/file/filestore.go b/store/file/filestore.go index ae44540b1..f5027e300 100644 --- a/store/file/filestore.go +++ b/store/file/filestore.go @@ -38,6 +38,12 @@ type msgDef struct { type fileStoreFactory struct { settings *quickfix.Settings + + templateIDProvider quickfix.TemplateIDProvider +} + +func (f *fileStoreFactory) SetTemplateIDProvider(templateIDProvider quickfix.TemplateIDProvider) { + f.templateIDProvider = templateIDProvider } type fileStore struct { @@ -69,10 +75,20 @@ func (f fileStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfi sessionSettings, ok := f.settings.SessionSettings()[sessionID] if !ok { - if dynamicSessions { - sessionSettings = globalSettings + if f.templateIDProvider != nil { + templateID := f.templateIDProvider.GetTemplateID(sessionID) + if templateID != nil { + sessionSettings, ok = f.settings.SessionSettings()[*templateID] + if !ok { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + } } else { - return nil, fmt.Errorf("unknown session: %v", sessionID) + if dynamicSessions { + sessionSettings = globalSettings + } else { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } } } diff --git a/store/mongo/mongostore.go b/store/mongo/mongostore.go index 42696388e..faaa19a05 100644 --- a/store/mongo/mongostore.go +++ b/store/mongo/mongostore.go @@ -33,6 +33,12 @@ type mongoStoreFactory struct { settings *quickfix.Settings messagesCollection string sessionsCollection string + + templateIDProvider quickfix.TemplateIDProvider +} + +func (f *mongoStoreFactory) SetTemplateIDProvider(templateIDProvider quickfix.TemplateIDProvider) { + f.templateIDProvider = templateIDProvider } type mongoStore struct { @@ -67,10 +73,20 @@ func (f mongoStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickf sessionSettings, ok := f.settings.SessionSettings()[sessionID] if !ok { - if dynamicSessions { - sessionSettings = globalSettings + if f.templateIDProvider != nil { + templateID := f.templateIDProvider.GetTemplateID(sessionID) + if templateID != nil { + sessionSettings, ok = f.settings.SessionSettings()[*templateID] + if !ok { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + } } else { - return nil, fmt.Errorf("unknown session: %v", sessionID) + if dynamicSessions { + sessionSettings = globalSettings + } else { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } } } mongoConnectionURL, err := sessionSettings.Setting(config.MongoStoreConnection) diff --git a/store/sql/sqlstore.go b/store/sql/sqlstore.go index aeaeb6eb3..bea643d97 100644 --- a/store/sql/sqlstore.go +++ b/store/sql/sqlstore.go @@ -29,6 +29,12 @@ import ( type sqlStoreFactory struct { settings *quickfix.Settings + + templateIDProvider quickfix.TemplateIDProvider +} + +func (f *sqlStoreFactory) SetTemplateIDProvider(templateIDProvider quickfix.TemplateIDProvider) { + f.templateIDProvider = templateIDProvider } type sqlStore struct { @@ -73,10 +79,20 @@ func (f sqlStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix sessionSettings, ok := f.settings.SessionSettings()[sessionID] if !ok { - if dynamicSessions { - sessionSettings = globalSettings + if f.templateIDProvider != nil { + templateID := f.templateIDProvider.GetTemplateID(sessionID) + if templateID != nil { + sessionSettings, ok = f.settings.SessionSettings()[*templateID] + if !ok { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + } } else { - return nil, fmt.Errorf("unknown session: %v", sessionID) + if dynamicSessions { + sessionSettings = globalSettings + } else { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } } } From 6d6b9ed645327db1b49f5d845059c4aab52594bd Mon Sep 17 00:00:00 2001 From: yanghao Date: Thu, 15 Aug 2024 16:31:34 +0800 Subject: [PATCH 4/5] fix linter error --- acceptor.go | 2 +- acceptor_session_provider.go | 2 +- acceptor_session_provider_test.go | 8 ++++---- acceptor_test.go | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/acceptor.go b/acceptor.go index d3ce23bc4..d7f5eda51 100644 --- a/acceptor.go +++ b/acceptor.go @@ -144,7 +144,7 @@ func (a *Acceptor) configureDyanmicSessionProvider() error { if setter, ok := a.storeFactory.(TemplateIDProviderSetter); ok { setter.SetTemplateIDProvider(a.templateIDProvider) } - a.dynamicAcceptorSessionProvider = NewDynamicAcceptorSessionProvider(a.settings, a.storeFactory, a.logFactory, a.app, a.templateIDProvider) + a.dynamicAcceptorSessionProvider = newDynamicAcceptorSessionProvider(a.settings, a.storeFactory, a.logFactory, a.app, a.templateIDProvider) return nil } diff --git a/acceptor_session_provider.go b/acceptor_session_provider.go index f9349e096..7945ebf2c 100644 --- a/acceptor_session_provider.go +++ b/acceptor_session_provider.go @@ -69,7 +69,7 @@ type dynamicAcceptorSessionProvider struct { templateIDProvider TemplateIDProvider } -func NewDynamicAcceptorSessionProvider(settings *Settings, messageStoreFactory MessageStoreFactory, logFactory LogFactory, +func newDynamicAcceptorSessionProvider(settings *Settings, messageStoreFactory MessageStoreFactory, logFactory LogFactory, application Application, templateIDProvider TemplateIDProvider, ) *dynamicAcceptorSessionProvider { return &dynamicAcceptorSessionProvider{ diff --git a/acceptor_session_provider_test.go b/acceptor_session_provider_test.go index 6388109c5..6f763f9b9 100644 --- a/acceptor_session_provider_test.go +++ b/acceptor_session_provider_test.go @@ -116,12 +116,12 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) SetupTest() { ) suite.setUpSettings(templateID1, "ResetOnLogout", "Y") - templateId2 := SessionID{BeginString: "FIX.4.4", SenderCompID: "S1", TargetCompID: "ANY"} + templateID2 := SessionID{BeginString: "FIX.4.4", SenderCompID: "S1", TargetCompID: "ANY"} templateMappings = append( templateMappings, - &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.4", SenderCompID: WildcardPattern, TargetCompID: WildcardPattern}, TemplateID: templateId2}, + &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.4", SenderCompID: WildcardPattern, TargetCompID: WildcardPattern}, TemplateID: templateID2}, ) - suite.setUpSettings(templateId2, "RefreshOnLogon", "Y") + suite.setUpSettings(templateID2, "RefreshOnLogon", "Y") templateId3 := SessionID{BeginString: "FIX.4.4", SenderCompID: "ANY", TargetCompID: "ANY"} templateMappings = append( @@ -136,7 +136,7 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) SetupTest() { templateMappings: templateMappings, } - suite.dynamicAcceptorSessionProvider = NewDynamicAcceptorSessionProvider(suite.settings, suite.messageStoreFactory, + suite.dynamicAcceptorSessionProvider = newDynamicAcceptorSessionProvider(suite.settings, suite.messageStoreFactory, suite.logFactory, suite.app, templateIDProvider) } diff --git a/acceptor_test.go b/acceptor_test.go index cec5fb389..fd862d784 100644 --- a/acceptor_test.go +++ b/acceptor_test.go @@ -337,8 +337,8 @@ type mockCustomTemplateIDProvider struct { staticTemplateID SessionID } -// mockCustomTemplateIDProvider always returns the same templateID -func (p *mockCustomTemplateIDProvider) GetTemplateID(inboundSessionID SessionID) *SessionID { +// mockCustomTemplateIDProvider always returns the same templateID. +func (p *mockCustomTemplateIDProvider) GetTemplateID(_ SessionID) *SessionID { return &p.staticTemplateID } From f49b71c444d6874aadbd0f00005ebba4ecd3fd1a Mon Sep 17 00:00:00 2001 From: yanghao Date: Thu, 15 Aug 2024 16:33:00 +0800 Subject: [PATCH 5/5] fix linter error --- acceptor_session_provider_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/acceptor_session_provider_test.go b/acceptor_session_provider_test.go index 6f763f9b9..8a84e70ff 100644 --- a/acceptor_session_provider_test.go +++ b/acceptor_session_provider_test.go @@ -123,14 +123,14 @@ func (suite *DynamicAcceptorSessionProviderTestSuite) SetupTest() { ) suite.setUpSettings(templateID2, "RefreshOnLogon", "Y") - templateId3 := SessionID{BeginString: "FIX.4.4", SenderCompID: "ANY", TargetCompID: "ANY"} + templateID3 := SessionID{BeginString: "FIX.4.4", SenderCompID: "ANY", TargetCompID: "ANY"} templateMappings = append( templateMappings, &TemplateMapping{Pattern: SessionID{BeginString: "FIX.4.2", SenderCompID: WildcardPattern, SenderSubID: WildcardPattern, SenderLocationID: WildcardPattern, TargetCompID: WildcardPattern, TargetSubID: WildcardPattern, TargetLocationID: WildcardPattern, Qualifier: WildcardPattern, - }, TemplateID: templateId3}, + }, TemplateID: templateID3}, ) - suite.setUpSettings(templateId3, "ResetOnDisconnect", "Y") + suite.setUpSettings(templateID3, "ResetOnDisconnect", "Y") templateIDProvider := &DefaultTemplateIDProvider{ templateMappings: templateMappings,