Skip to content

Commit dbc6ae2

Browse files
committed
WIP: command rate monitoring
1 parent 4455b8b commit dbc6ae2

File tree

10 files changed

+207
-21
lines changed

10 files changed

+207
-21
lines changed

package.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ dependencies:
4545
- direct-sqlcipher == 2.3.*
4646
- directory == 1.3.*
4747
- filepath == 1.4.*
48+
- hashable == 1.4.*
4849
- hourglass == 0.2.*
4950
- http-types == 0.12.*
5051
- http2 >= 4.2.2 && < 4.3
@@ -59,6 +60,7 @@ dependencies:
5960
- network-udp >= 0.0 && < 0.1
6061
- optparse-applicative >= 0.15 && < 0.17
6162
- process == 1.6.*
63+
- psqueues == 0.2.8.*
6264
- random >= 1.1 && < 1.3
6365
- simple-logger == 0.1.*
6466
- socks == 0.6.*

simplexmq.cabal

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ library
204204
, direct-sqlcipher ==2.3.*
205205
, directory ==1.3.*
206206
, filepath ==1.4.*
207+
, hashable ==1.4.*
207208
, hourglass ==0.2.*
208209
, http-types ==0.12.*
209210
, http2 >=4.2.2 && <4.3
@@ -218,6 +219,7 @@ library
218219
, network-udp ==0.0.*
219220
, optparse-applicative >=0.15 && <0.17
220221
, process ==1.6.*
222+
, psqueues ==0.2.8.*
221223
, random >=1.1 && <1.3
222224
, simple-logger ==0.1.*
223225
, socks ==0.6.*
@@ -278,6 +280,7 @@ executable ntf-server
278280
, direct-sqlcipher ==2.3.*
279281
, directory ==1.3.*
280282
, filepath ==1.4.*
283+
, hashable ==1.4.*
281284
, hourglass ==0.2.*
282285
, http-types ==0.12.*
283286
, http2 >=4.2.2 && <4.3
@@ -292,6 +295,7 @@ executable ntf-server
292295
, network-udp ==0.0.*
293296
, optparse-applicative >=0.15 && <0.17
294297
, process ==1.6.*
298+
, psqueues ==0.2.8.*
295299
, random >=1.1 && <1.3
296300
, simple-logger ==0.1.*
297301
, simplexmq
@@ -353,6 +357,7 @@ executable smp-agent
353357
, direct-sqlcipher ==2.3.*
354358
, directory ==1.3.*
355359
, filepath ==1.4.*
360+
, hashable ==1.4.*
356361
, hourglass ==0.2.*
357362
, http-types ==0.12.*
358363
, http2 >=4.2.2 && <4.3
@@ -367,6 +372,7 @@ executable smp-agent
367372
, network-udp ==0.0.*
368373
, optparse-applicative >=0.15 && <0.17
369374
, process ==1.6.*
375+
, psqueues ==0.2.8.*
370376
, random >=1.1 && <1.3
371377
, simple-logger ==0.1.*
372378
, simplexmq
@@ -428,6 +434,7 @@ executable smp-server
428434
, direct-sqlcipher ==2.3.*
429435
, directory ==1.3.*
430436
, filepath ==1.4.*
437+
, hashable ==1.4.*
431438
, hourglass ==0.2.*
432439
, http-types ==0.12.*
433440
, http2 >=4.2.2 && <4.3
@@ -442,6 +449,7 @@ executable smp-server
442449
, network-udp ==0.0.*
443450
, optparse-applicative >=0.15 && <0.17
444451
, process ==1.6.*
452+
, psqueues ==0.2.8.*
445453
, random >=1.1 && <1.3
446454
, simple-logger ==0.1.*
447455
, simplexmq
@@ -503,6 +511,7 @@ executable xftp
503511
, direct-sqlcipher ==2.3.*
504512
, directory ==1.3.*
505513
, filepath ==1.4.*
514+
, hashable ==1.4.*
506515
, hourglass ==0.2.*
507516
, http-types ==0.12.*
508517
, http2 >=4.2.2 && <4.3
@@ -517,6 +526,7 @@ executable xftp
517526
, network-udp ==0.0.*
518527
, optparse-applicative >=0.15 && <0.17
519528
, process ==1.6.*
529+
, psqueues ==0.2.8.*
520530
, random >=1.1 && <1.3
521531
, simple-logger ==0.1.*
522532
, simplexmq
@@ -578,6 +588,7 @@ executable xftp-server
578588
, direct-sqlcipher ==2.3.*
579589
, directory ==1.3.*
580590
, filepath ==1.4.*
591+
, hashable ==1.4.*
581592
, hourglass ==0.2.*
582593
, http-types ==0.12.*
583594
, http2 >=4.2.2 && <4.3
@@ -592,6 +603,7 @@ executable xftp-server
592603
, network-udp ==0.0.*
593604
, optparse-applicative >=0.15 && <0.17
594605
, process ==1.6.*
606+
, psqueues ==0.2.8.*
595607
, random >=1.1 && <1.3
596608
, simple-logger ==0.1.*
597609
, simplexmq
@@ -689,6 +701,7 @@ test-suite simplexmq-test
689701
, directory ==1.3.*
690702
, filepath ==1.4.*
691703
, generic-random ==1.5.*
704+
, hashable ==1.4.*
692705
, hourglass ==0.2.*
693706
, hspec ==2.11.*
694707
, hspec-core ==2.11.*
@@ -706,6 +719,7 @@ test-suite simplexmq-test
706719
, network-udp ==0.0.*
707720
, optparse-applicative >=0.15 && <0.17
708721
, process ==1.6.*
722+
, psqueues ==0.2.8.*
709723
, random >=1.1 && <1.3
710724
, silently ==1.2.*
711725
, simple-logger ==0.1.*

src/Simplex/Messaging/Server.hs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
128128
raceAny_
129129
( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub
130130
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ())
131-
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
131+
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> rateStatsThread_ cfg <> controlPortThread_ cfg
132132
)
133133
`finally` withLock' (savingLock s) "final" (saveServer False)
134134
where
@@ -205,6 +205,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
205205
[logServerStats logStatsStartTime interval serverStatsLogFile]
206206
serverStatsThread_ _ = []
207207

208+
rateStatsThread_ :: ServerConfig -> [M ()]
209+
rateStatsThread_ ServerConfig {rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} =
210+
[ monitorServerRates bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection
211+
logServerRates logStatsStartTime logInterval rateStatsLogFile -- log distributions once in a while
212+
]
213+
rateStatsThread_ _ = []
214+
208215
logServerStats :: Int64 -> Int64 -> FilePath -> M ()
209216
logServerStats startAt logInterval statsFilePath = do
210217
labelMyThread "logServerStats"
@@ -257,6 +264,25 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
257264
]
258265
liftIO $ threadDelay' interval
259266

267+
monitorServerRates :: Int64 -> M ()
268+
monitorServerRates bucketWidth = do
269+
labelMyThread "monitorServerRates"
270+
forever $ do
271+
-- TODO: calculate delay for the next bucket closing time
272+
liftIO $ threadDelay' bucketWidth
273+
-- TODO: collect and reset buckets
274+
275+
logServerRates :: Int64 -> Int64 -> FilePath -> M ()
276+
logServerRates startAt logInterval statsFilePath = do
277+
labelMyThread "logServerStats"
278+
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
279+
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
280+
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
281+
let interval = 1000000 * logInterval
282+
forever $ do
283+
-- write the thing
284+
liftIO $ threadDelay' interval
285+
260286
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
261287
runClient signKey tp h = do
262288
kh <- asks serverIdentity
@@ -411,13 +437,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
411437
hPutStrLn h "AUTH"
412438

413439
runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
414-
runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do
440+
runClientTransport h@THandle {connection, params = THandleParams {thVersion, sessionId}} = do
415441
q <- asks $ tbqSize . config
416442
ts <- liftIO getSystemTime
417443
active <- asks clients
418444
nextClientId <- asks clientSeq
419445
c <- atomically $ do
420-
new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts
446+
new@Client {clientId} <- newClient (getPeerId connection) nextClientId q thVersion sessionId ts
421447
modifyTVar' active $ IM.insert clientId new
422448
pure new
423449
s <- asks server
@@ -643,6 +669,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
643669
where
644670
createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> M (Transmission BrokerMsg)
645671
createQueue st recipientKey dhKey subMode = time "NEW" $ do
672+
-- TODO: read client Q rate
673+
-- TODO: read server Q rate for peerId
674+
-- TODO: read global server Q rate
675+
-- TODO: add throttling delay/blackhole request if needed
646676
(rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random
647677
let rcvDhSecret = C.dh' dhKey privDhKey
648678
qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey}
@@ -673,6 +703,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
673703
stats <- asks serverStats
674704
atomically $ modifyTVar' (qCreated stats) (+ 1)
675705
atomically $ modifyTVar' (qCount stats) (+ 1)
706+
-- TODO: increment client Q counter
707+
-- TODO: increment current Q counter in IP timeline
708+
-- TODO: increment current Q counter in server timeline
676709
case subMode of
677710
SMOnlyCreate -> pure ()
678711
SMSubscribe -> void $ subscribeQueue qr rId
@@ -835,6 +868,10 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
835868
case C.maxLenBS msgBody of
836869
Left _ -> pure $ err LARGE_MSG
837870
Right body -> do
871+
-- TODO: read client S rate
872+
-- TODO: read server S rate for peerId
873+
-- TODO: read global server S rate
874+
-- TODO: add throttling delay/blackhole request if needed
838875
msg_ <- time "SEND" $ do
839876
q <- getStoreMsgQueue "SEND" $ recipientId qr
840877
expireMessages q
@@ -850,6 +887,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
850887
atomically $ modifyTVar' (msgSent stats) (+ 1)
851888
atomically $ modifyTVar' (msgCount stats) (+ 1)
852889
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
890+
-- TODO: increment client S counter
891+
-- TODO: increment current S counter in IP timeline
892+
-- TODO: increment current S counter in server timeline
853893
pure ok
854894
where
855895
mkMessage :: C.MaxLenBS MaxMessageLen -> M Message

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Data.List.NonEmpty (NonEmpty)
1616
import Data.Map.Strict (Map)
1717
import qualified Data.Map.Strict as M
1818
import Data.Time.Clock (getCurrentTime)
19+
import Data.Time.Clock.POSIX (getPOSIXTime)
1920
import Data.Time.Clock.System (SystemTime)
2021
import Data.X509.Validation (Fingerprint (..))
2122
import Network.Socket (ServiceName)
@@ -33,7 +34,7 @@ import Simplex.Messaging.Server.Stats
3334
import Simplex.Messaging.Server.StoreLog
3435
import Simplex.Messaging.TMap (TMap)
3536
import qualified Simplex.Messaging.TMap as TM
36-
import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP)
37+
import Simplex.Messaging.Transport (ATransport, PeerId, VersionRangeSMP, VersionSMP)
3738
import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, alpn, loadFingerprint, loadTLSServerParams, newSocketState)
3839
import System.IO (IOMode (..))
3940
import System.Mem.Weak (Weak)
@@ -70,6 +71,10 @@ data ServerConfig = ServerConfig
7071
serverStatsLogFile :: FilePath,
7172
-- | file to save and restore stats
7273
serverStatsBackupFile :: Maybe FilePath,
74+
-- | rate limit monitoring interval / bucket width, seconds
75+
rateStatsInterval :: Maybe Int64,
76+
rateStatsLogFile :: FilePath,
77+
rateStatsBackupFile :: Maybe FilePath,
7378
-- | CA certificate private key is not needed for initialization
7479
caCertificateFile :: FilePath,
7580
privateKeyFile :: FilePath,
@@ -109,6 +114,8 @@ data Env = Env
109114
storeLog :: Maybe (StoreLog 'WriteMode),
110115
tlsServerParams :: T.ServerParams,
111116
serverStats :: ServerStats,
117+
qCreatedByIp :: Timeline,
118+
msgSentByIp :: Timeline,
112119
sockets :: SocketState,
113120
clientSeq :: TVar Int,
114121
clients :: TVar (IntMap Client)
@@ -124,6 +131,8 @@ data Server = Server
124131

125132
data Client = Client
126133
{ clientId :: Int,
134+
peerId :: PeerId, -- send updates for this Id to time series
135+
clientStats :: ClientStats, -- capture final values on disconnect
127136
subscriptions :: TMap RecipientId (TVar Sub),
128137
ntfSubscriptions :: TMap NotifierId (),
129138
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
@@ -155,8 +164,8 @@ newServer = do
155164
savingLock <- createLock
156165
return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock}
157166

158-
newClient :: TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client
159-
newClient nextClientId qSize thVersion sessionId createdAt = do
167+
newClient :: PeerId -> TVar Int -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client
168+
newClient peerId nextClientId qSize thVersion sessionId createdAt = do
160169
clientId <- stateTVar nextClientId $ \next -> (next, next + 1)
161170
subscriptions <- TM.empty
162171
ntfSubscriptions <- TM.empty
@@ -168,7 +177,8 @@ newClient nextClientId qSize thVersion sessionId createdAt = do
168177
connected <- newTVar True
169178
rcvActiveAt <- newTVar createdAt
170179
sndActiveAt <- newTVar createdAt
171-
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt}
180+
clientStats <- ClientStats <$> newTVar 0 <*> newTVar 0
181+
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, peerId, clientStats}
172182

173183
newSubscription :: SubscriptionThread -> STM Sub
174184
newSubscription subThread = do
@@ -189,7 +199,10 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
189199
sockets <- atomically newSocketState
190200
clientSeq <- newTVarIO 0
191201
clients <- newTVarIO mempty
192-
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients}
202+
now <- getPOSIXTime
203+
qCreatedByIp <- atomically $ newTimeline perMinute now
204+
msgSentByIp <- atomically $ newTimeline perMinute now
205+
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, qCreatedByIp, msgSentByIp, sockets, clientSeq, clients}
193206
where
194207
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
195208
restoreQueues QueueStore {queues, senders, notifiers} f = do

src/Simplex/Messaging/Server/Main.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ smpServerCLI cfgPath logPath =
208208
logStatsStartTime = 0, -- seconds from 00:00 UTC
209209
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
210210
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
211+
rateStatsInterval = Just 60, -- TODO: add to options
212+
rateStatsLogFile = combine logPath "smp-server-rates.daily.log",
213+
rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log",
211214
smpServerVRange = supportedServerSMPRelayVRange,
212215
transportConfig =
213216
defaultTransportServerConfig

0 commit comments

Comments
 (0)