Skip to content

Add configuration for max number of retries and exit server with an error when connection dies or fails to open #65

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## 0.9.0.0

- Add PGWS_RETRIES to limit the amount of times the server tries to open a database connection upon startup (defaults to 5). This breaks backward compatibility if you rely on the behaviour of the server to try infitite times.

## 0.8.0.1

- Fix compilation error due to missing version upper bound for protolude.
Expand Down
2 changes: 1 addition & 1 deletion postgres-websockets.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: postgres-websockets
version: 0.8.0.1
version: 0.9.0.0
synopsis: Middleware to map LISTEN/NOTIFY messages to Websockets
description: Please see README.md
homepage: https://github.com/diogob/postgres-websockets#readme
Expand Down
2 changes: 2 additions & 0 deletions src/PostgresWebsockets/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data AppConfig = AppConfig {
, configJwtSecret :: ByteString
, configJwtSecretIsBase64 :: Bool
, configPool :: Int
, configRetries :: Int
}

-- | User friendly version number
Expand Down Expand Up @@ -70,6 +71,7 @@ readOptions =
<*> var str "PGWS_JWT_SECRET" (help "Secret used to sign JWT tokens used to open communications channels")
<*> var auto "PGWS_JWT_SECRET_BASE64" (def False <> helpDef show <> help "Indicate whether the JWT secret should be decoded from a base64 encoded string")
<*> var auto "PGWS_POOL_SIZE" (def 10 <> helpDef show <> help "How many connection to the database should be used by the connection pool")
<*> var auto "PGWS_RETRIES" (def 5 <> helpDef show <> help "How many times it should try to connect to the database on startup before exiting with an error")

loadSecretFile :: AppConfig -> IO AppConfig
loadSecretFile conf = extractAndTransform secret
Expand Down
14 changes: 7 additions & 7 deletions src/PostgresWebsockets/HasqlBroadcast.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import Data.Aeson (decode, Value(..))
import Data.HashMap.Lazy (lookupDefault)
import Data.Either.Combinators (mapBoth)
import Data.Function (id)
import Control.Retry (RetryStatus, retrying, capDelay, exponentialBackoff)
import Control.Retry (RetryStatus(..), retrying, capDelay, exponentialBackoff)

import PostgresWebsockets.Broadcast

{- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error.
This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners
-}
newHasqlBroadcaster :: IO () -> Text -> ByteString -> IO Multiplexer
newHasqlBroadcaster onConnectionFailure ch = newHasqlBroadcasterForConnection . tryUntilConnected
newHasqlBroadcaster :: IO () -> Text -> Int -> ByteString -> IO Multiplexer
newHasqlBroadcaster onConnectionFailure ch maxRetries = newHasqlBroadcasterForConnection . tryUntilConnected maxRetries
where
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch

Expand All @@ -44,20 +44,20 @@ newHasqlBroadcasterOrError onConnectionFailure ch =
where
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch

tryUntilConnected :: ByteString -> IO Connection
tryUntilConnected =
tryUntilConnected :: Int -> ByteString -> IO Connection
tryUntilConnected maxRetries =
fmap (either (panic "Failure on connection retry") id) . retryConnection
where
retryConnection conStr = retrying retryPolicy shouldRetry (const $ acquire conStr)
maxDelayInMicroseconds = 32000000
firstDelayInMicroseconds = 1000000
retryPolicy = capDelay maxDelayInMicroseconds $ exponentialBackoff firstDelayInMicroseconds
shouldRetry :: RetryStatus -> Either ConnectionError Connection -> IO Bool
shouldRetry _ con =
shouldRetry RetryStatus{..} con =
case con of
Left err -> do
putErrLn $ "Error connecting notification listener to database: " <> show err
return True
pure $ rsIterNumber < maxRetries - 1
_ -> return False

{- | Returns a multiplexer from a channel and an IO Connection, listen for different database notifications on the provided channel using the connection produced.
Expand Down
19 changes: 10 additions & 9 deletions src/PostgresWebsockets/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ import Network.Wai.Middleware.RequestLogger (logStdout)

-- | Start a stand-alone warp server using the parameters from AppConfig and a opening a database connection pool.
serve :: AppConfig -> IO ()
serve conf = do
serve conf@AppConfig{..} = do
shutdownSignal <- newEmptyMVar
let listenChannel = toS $ configListenChannel conf
pgSettings = toS (configDatabase conf)
waitForShutdown cl = void $ forkIO (takeMVar shutdownSignal >> cl >> die "Shutting server down...")
let listenChannel = toS configListenChannel
pgSettings = toS configDatabase
waitForShutdown cl = void $ forkIO (takeMVar shutdownSignal >> cl)
appSettings = warpSettings waitForShutdown conf

putStrLn $ ("Listening on port " :: Text) <> show (configPort conf)
putStrLn $ ("Listening on port " :: Text) <> show configPort

let shutdown = putErrLn ("Broadcaster connection is dead" :: Text) >> putMVar shutdownSignal ()
pool <- P.acquire (configPool conf, 10, pgSettings)
multi <- newHasqlBroadcaster shutdown listenChannel pgSettings
pool <- P.acquire (configPool, 10, pgSettings)
multi <- newHasqlBroadcaster shutdown listenChannel configRetries pgSettings
getTime <- mkGetTime

runSettings appSettings $
postgresWsMiddleware getTime listenChannel (configJwtSecret conf) pool multi $
logStdout $ maybe dummyApp staticApp' (configPath conf)
postgresWsMiddleware getTime listenChannel configJwtSecret pool multi $
logStdout $ maybe dummyApp staticApp' configPath
die "Shutting down server..."

where
mkGetTime :: IO (IO UTCTime)
Expand Down