Skip to content

feat: get configuration parameters from the db and allow reloading config with NOTIFY #1729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 69 additions & 67 deletions main/Main.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}

module Main (main) where

Expand All @@ -8,15 +10,11 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Hasql.Connection as C
import qualified Hasql.Notifications as N
import qualified Hasql.Pool as P
import qualified Hasql.Session as S
import qualified Hasql.Transaction as HT
import qualified Hasql.Transaction.Sessions as HT

import Control.AutoUpdate (defaultUpdateSettings, mkAutoUpdate,
updateAction)
import Control.Debounce (debounceAction, debounceEdge,
debounceFreq,
defaultDebounceSettings, mkDebounce,
trailingEdge)
import Control.Retry (RetryStatus, capDelay,
exponentialBackoff, retrying,
rsPreviousDelay)
Expand All @@ -36,6 +34,7 @@ import PostgREST.Config
import PostgREST.DbStructure (getDbStructure, getPgVersion)
import PostgREST.Error (PgError (PgError), checkIsFatal,
errorPayload)
import PostgREST.Statements (dbSettingsStatement)
import PostgREST.Types (ConnectionStatus (..), DbStructure,
PgVersion (..), SCacheStatus (..),
minimumPgVersion)
Expand Down Expand Up @@ -63,11 +62,11 @@ main = do
-- read PGRST_ env variables
env <- readEnvironment

-- read path from commad line
opts <- readCLIShowHelp env
-- read command/path from commad line
CLI{cliCommand, cliPath} <- readCLIShowHelp env

-- build the 'AppConfig' from the config file path
conf <- readValidateConfig env $ cliPath opts
conf <- readValidateConfig mempty env cliPath

-- These are config values that can't be reloaded at runtime. Reloading some of them would imply restarting the web server.
let
Expand All @@ -87,19 +86,7 @@ main = do
poolSize = configDbPoolSize conf
poolTimeout = configDbPoolTimeout' conf
logLevel = configLogLevel conf

case cliCommand opts of
CmdDumpConfig ->
do
putStr $ dumpAppConfig conf
exitSuccess
CmdDumpSchema ->
do
dumpedSchema <- dumpSchema conf
putStrLn dumpedSchema
exitSuccess
CmdRun ->
pass
gucConfigEnabled = configDbLoadGucConfig conf

-- create connection pool with the provided settings, returns either a 'Connection' or a 'ConnectionError'. Does not throw.
pool <- P.acquire (poolSize, poolTimeout, dbUri)
Expand All @@ -116,6 +103,25 @@ main = do
-- Config that can change at runtime
refConf <- newIORef conf

let configRereader = reReadConfig pool gucConfigEnabled env cliPath refConf

-- re-read and override the config if db-load-guc-config is true
when gucConfigEnabled configRereader

case cliCommand of
CmdDumpConfig ->
do
dumpedConfig <- dumpAppConfig <$> readIORef refConf
putStr dumpedConfig
exitSuccess
CmdDumpSchema ->
do
dumpedSchema <- dumpSchema pool =<< readIORef refConf
putStrLn dumpedSchema
exitSuccess
CmdRun ->
pass

-- This is passed to the connectionWorker method so it can kill the main thread if the PostgreSQL's version is not supported.
mainTid <- myThreadId

Expand All @@ -140,16 +146,15 @@ main = do
Catch connWorker
) Nothing

-- Re-read the config on SIGUSR2, but only if we have a config file
when (isJust $ cliPath opts) $
void $ installHandler sigUSR2 (
Catch $ reReadConfig env (cliPath opts) refConf
) Nothing
-- Re-read the config on SIGUSR2
void $ installHandler sigUSR2 (
Catch $ configRereader >> putStrLn ("Config reloaded" :: Text)
) Nothing
#endif

-- reload schema cache on NOTIFY
-- reload schema cache + config on NOTIFY
when dbChannelEnabled $
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker configRereader

-- ask for the OS time at most once per second
getTime <- mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}
Expand Down Expand Up @@ -266,6 +271,15 @@ connectionStatus pool =
putStrLn $ "Attempting to reconnect to the database in " <> (show delay::Text) <> " seconds..."
return itShould

loadDbSettings :: P.Pool -> IO [(Text, Text)]
loadDbSettings pool = do
result <- P.use pool $ HT.transaction HT.ReadCommitted HT.Read $ HT.statement mempty dbSettingsStatement
case result of
Left e -> do
hPutStrLn stderr ("An error ocurred when trying to query database settings for the config parameters:\n" <> show e :: Text)
pure []
Right x -> pure x

-- | Load the DbStructure by using a connection from the pool.
loadSchemaCache :: P.Pool -> PgVersion -> IORef AppConfig -> IORef (Maybe DbStructure) -> IO SCacheStatus
loadSchemaCache pool actualPgVersion refConf refDbStructure = do
Expand Down Expand Up @@ -295,32 +309,27 @@ loadSchemaCache pool actualPgVersion refConf refDbStructure = do
When a NOTIFY <db-channel> - with an empty payload - is done, it refills the schema cache.
It uses the connectionWorker in case the LISTEN connection dies.
-}
listener :: ByteString -> Text -> P.Pool -> IORef AppConfig -> IORef (Maybe DbStructure) -> MVar ConnectionStatus -> IO () -> IO ()
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker = start
listener :: ByteString -> Text -> P.Pool -> IORef AppConfig -> IORef (Maybe DbStructure) -> MVar ConnectionStatus -> IO () -> IO () -> IO ()
listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWorker configRereader = start
where
start = do
connStatus <- takeMVar mvarConnectionStatus -- takeMVar makes the thread wait if the MVar is empty(until there's a connection).
case connStatus of
Connected actualPgVersion -> void $ forkFinally (do -- forkFinally allows to detect if the thread dies
dbOrError <- C.acquire dbUri
-- Debounce in case too many NOTIFYs arrive. Could happen on a migration(assuming a pg EVENT TRIGGER is set up).
-- This might not be needed according to pg docs https://www.postgresql.org/docs/12/sql-notify.html:
-- "If the same channel name is signaled multiple times from the same transaction with identical payload strings, the database server can decide to deliver a single notification only."
-- But we do it to be extra safe.
scFiller <- mkDebounce (defaultDebounceSettings {
-- It's not necessary to check the loadSchemaCache success here. If the connection drops, the thread will die and proceed to recover below.
debounceAction = void $ loadSchemaCache pool actualPgVersion refConf refDbStructure,
debounceEdge = trailingEdge, -- wait until the function hasn’t been called in _1s
debounceFreq = _1s })
case dbOrError of
Right db -> do
putStrLn $ "Listening for notifications on the " <> dbChannel <> " channel"
let channelToListen = N.toPgIdentifier dbChannel
cfLoader = configRereader >> putStrLn ("Config reloaded" :: Text)
scLoader = void $ loadSchemaCache pool actualPgVersion refConf refDbStructure -- It's not necessary to check the loadSchemaCache success here. If the connection drops, the thread will die and proceed to recover below.
N.listen db channelToListen
N.waitForNotifications (\_ msg ->
if BS.null msg
then scFiller -- reload the schema cache
else pure ()) db -- Do nothing if anything else than an empty message is sent
if | BS.null msg -> scLoader -- reload the schema cache
| msg == "reload schema" -> scLoader -- reload the schema cache
| msg == "reload config" -> cfLoader -- reload the config
| otherwise -> pure () -- Do nothing if anything else than an empty message is sent
) db
_ -> die errorMessage)
(\_ -> do -- if the thread dies, we try to recover
putStrLn retryMessage
Expand All @@ -331,40 +340,33 @@ listener dbUri dbChannel pool refConf refDbStructure mvarConnectionStatus connWo
errorMessage = "Could not listen for notifications on the " <> dbChannel <> " channel" :: Text
retryMessage = "Retrying listening for notifications on the " <> dbChannel <> " channel.." :: Text

#ifndef mingw32_HOST_OS
-- | Re-reads the config at runtime. Invoked on SIGUSR2.
-- | Re-reads the config at runtime.
-- | If it panics(config path was changed, invalid setting), it'll show an error but won't kill the main thread.
reReadConfig :: Environment -> Maybe FilePath -> IORef AppConfig -> IO ()
reReadConfig env path refConf = do
conf <- readValidateConfig env path
reReadConfig :: P.Pool -> Bool -> Environment -> Maybe FilePath -> IORef AppConfig -> IO ()
reReadConfig pool gucConfigEnabled env path refConf = do
dbSettings <- if gucConfigEnabled then loadDbSettings pool else pure []
conf <- readValidateConfig dbSettings env path
atomicWriteIORef refConf conf
putStrLn ("Config file reloaded" :: Text)
#endif

-- | Dump DbStructure schema to JSON
dumpSchema :: AppConfig -> IO LBS.ByteString
dumpSchema conf =
do
Right conn <- C.acquire . toS $ configDbUri conf
Right pgVersion <- S.run getPgVersion conn
let
getDbStructureTransaction =
dumpSchema :: P.Pool -> AppConfig -> IO LBS.ByteString
dumpSchema pool conf = do
result <-
timeToStderr "Loaded schema in %.3f seconds" $
P.use pool $ do
pgVersion <- getPgVersion
HT.transaction HT.ReadCommitted HT.Read $
getDbStructure
(toList $ configDbSchemas conf)
(configDbExtraSearchPath conf)
pgVersion
(configDbPreparedStatements conf)
result <-
timeToStderr "Loaded schema in %.3f seconds" $
S.run getDbStructureTransaction conn
C.release conn
case result of
Left e -> do
hPutStrLn stderr $ "An error ocurred when loading the schema cache:\n" <> show e
exitFailure
Right dbStructure -> return $ Aeson.encode dbStructure

P.release pool
case result of
Left e -> do
hPutStrLn stderr $ "An error ocurred when loading the schema cache:\n" <> show e
exitFailure
Right dbStructure -> return $ Aeson.encode dbStructure

-- | Print the time taken to run an IO action to stderr with the given printf string
timeToStderr :: [Char] -> IO (Either a b) -> IO (Either a b)
Expand Down
6 changes: 3 additions & 3 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ app dbStructure conf apiRequest =
encodeApi ti sd procs = encodeOpenAPI (concat $ M.elems procs) (toTableInfo ti) uri' sd $ dbPrimaryKeys dbStructure

body <- encodeApi <$>
H.statement tSchema accessibleTables <*>
H.statement tSchema schemaDescription <*>
H.statement tSchema accessibleProcs
H.statement tSchema (accessibleTables prepared) <*>
H.statement tSchema (schemaDescription prepared) <*>
H.statement tSchema (accessibleProcs prepared)
return $ responseLBS status200 (catMaybes [Just $ toHeader CTOpenAPI, profileH]) (if headersOnly then mempty else toS body)

_ -> return notFound
Expand Down
Loading