Skip to content

Commit

Permalink
TRD-1708: Clean up Incoming hotpath & Prioritize Incoming
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandrosKyriakakis committed Jan 29, 2024
1 parent 3bf2b87 commit ff3ecfb
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 20 deletions.
1 change: 1 addition & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ const (
RejectInvalidMessage string = "RejectInvalidMessage"
DynamicSessions string = "DynamicSessions"
DynamicQualifier string = "DynamicQualifier"
CleanIncomingHotPath string = "CleanIncomingHotPath"
)
10 changes: 10 additions & 0 deletions in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ type inSession struct{ loggedOn }
func (state inSession) String() string { return "In Session" }

func (state inSession) FixMsgIn(session *session, msg *Message) sessionState {

if session.CleanIncomingHotPath && IsExecutionReport(msg.rawMessage.Bytes()) {
if err := session.application.FromApp(msg, session.sessionID); err != nil {
return handleStateError(session, err)
}
if err := session.store.IncrNextTargetMsgSeqNum(); err != nil {
return handleStateError(session, err)
}
return state
}
msgType, err := msg.Header.GetBytes(tagMsgType)
if err != nil {
return handleStateError(session, err)
Expand Down
2 changes: 2 additions & 0 deletions internal/session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ type SessionSettings struct {
LogoutTimeout time.Duration
LogonTimeout time.Duration
SocketConnectAddress []string

CleanIncomingHotPath bool
}
13 changes: 10 additions & 3 deletions memorystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,22 @@ func (store *memoryStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, err
return msgs, nil
}

type memoryStoreFactory struct{}
type memoryStoreFactory struct {
store MessageStore
}

func (f memoryStoreFactory) Create(sessionID SessionID) (MessageStore, error) {
func (f *memoryStoreFactory) Create(sessionID SessionID) (MessageStore, error) {
m := new(memoryStore)
if err := m.Reset(); err != nil {
return m, errors.Wrap(err, "reset")
}
f.store = m
return m, nil
}

func (f *memoryStoreFactory) GetStore() MessageStore {
return f.store
}

// NewMemoryStoreFactory returns a MessageStoreFactory instance that created in-memory MessageStores.
func NewMemoryStoreFactory() MessageStoreFactory { return memoryStoreFactory{} }
func NewMemoryStoreFactory() MessageStoreFactory { return &memoryStoreFactory{} }
32 changes: 21 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,25 +799,35 @@ func (s *session) run() {

for !s.Stopped() {
select {

case msg := <-s.admin:
s.onAdmin(msg)

case <-s.messageEvent:
s.SendAppMessages(s)

// Prioritizing Incoming
case fixIn, ok := <-s.messageIn:
if !ok {
s.Disconnected(s)
} else {
s.Incoming(s, fixIn)
}

case evt := <-s.sessionEvent:
s.Timeout(s, evt)
default:
select {
case msg := <-s.admin:
s.onAdmin(msg)

case fixIn, ok := <-s.messageIn:
if !ok {
s.Disconnected(s)
} else {
s.Incoming(s, fixIn)
}

case <-s.messageEvent:
s.SendAppMessages(s)

case now := <-ticker.C:
s.CheckSessionTime(s, now)
case evt := <-s.sessionEvent:
s.Timeout(s, evt)

case now := <-ticker.C:
s.CheckSessionTime(s, now)
}
}
}
}
10 changes: 10 additions & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,16 @@ func (f sessionFactory) newSession(
s.DisableMessagePersist = !persistMessages
}

if settings.HasSetting(config.CleanIncomingHotPath) {
var cleanIncomingHotPath bool
if cleanIncomingHotPath, err = settings.BoolSetting(config.CleanIncomingHotPath); err != nil {
return
}

s.CleanIncomingHotPath = cleanIncomingHotPath

}

if f.BuildInitiators {
if err = f.buildInitiatorSettings(s, settings); err != nil {
return
Expand Down
4 changes: 2 additions & 2 deletions session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func (s SessionSettings) BoolSetting(setting string) (bool, error) {
}

switch stringVal {
case "Y", "y":
case "Y", "y", "true", "True":
return true, nil
case "N", "n":
case "N", "n", "false", "False":
return false, nil
}

Expand Down
28 changes: 24 additions & 4 deletions session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package quickfix

import (
"bytes"
"fmt"
"time"

Expand Down Expand Up @@ -75,20 +76,39 @@ func (sm *stateMachine) Disconnected(session *session) {
}
}

var msgTypeExecutionReport = []byte("\x0135=8\x01")

func IsExecutionReport(m []byte) bool {
return bytes.Contains(m, msgTypeExecutionReport)
}

func (sm *stateMachine) Incoming(session *session, m fixIn) {
sm.CheckSessionTime(session, time.Now())
if !sm.IsConnected() {
return
}
var (
rawBytesBuffer = m.bytes
rawBytes []byte
)
if rawBytesBuffer != nil {
rawBytes = rawBytesBuffer.Bytes()
}

session.log.OnIncoming(m.bytes.Bytes())
session.log.OnIncoming(rawBytes)

msg := NewMessage()
if err := ParseMessageWithDataDictionary(msg, m.bytes, session.transportDataDictionary, session.appDataDictionary); err != nil {
session.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), m.bytes)
} else {
if session.CleanIncomingHotPath && IsExecutionReport(rawBytes) {
msg.ReceiveTime = m.receiveTime
msg.rawMessage = rawBytesBuffer
sm.fixMsgIn(session, msg)
} else {
if err := ParseMessageWithDataDictionary(msg, rawBytesBuffer, session.transportDataDictionary, session.appDataDictionary); err != nil {
session.log.OnEventf("Msg Parse Error: %v, %q", err.Error(), m.bytes)
} else {
msg.ReceiveTime = m.receiveTime
sm.fixMsgIn(session, msg)
}
}

session.peerTimer.Reset(time.Duration(float64(1.2) * float64(session.HeartBtInt)))
Expand Down
5 changes: 5 additions & 0 deletions store/file/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type fileStoreFactory struct {
settings *quickfix.Settings
}

func (f fileStoreFactory) GetStore() quickfix.MessageStore {
//TODO implement me
panic("implement me")
}

type fileStore struct {
sessionID quickfix.SessionID
cache quickfix.MessageStore
Expand Down

0 comments on commit ff3ecfb

Please sign in to comment.