Skip to content

Commit

Permalink
add constant retry when failing to connect to cardano-node (or when l…
Browse files Browse the repository at this point in the history
…oosing the connection)
  • Loading branch information
KtorZ committed Feb 27, 2022
1 parent 5a4a3b6 commit 73a653d
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 52 deletions.
1 change: 1 addition & 0 deletions kupo.cabal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ library:
- profunctors
- relude
- safe
- safe-exceptions
- strict-containers
- sqlite-simple
- text
Expand Down
10 changes: 4 additions & 6 deletions src/Kupo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ import Kupo.Prelude
import Kupo.App
( Tracers, Tracers' (..), consumer, producer, startOrResume )
import Kupo.App.ChainSync
( IntersectionNotFoundException (..)
, TraceChainSync (..)
, mkChainSyncClient
)
( IntersectionNotFoundException (..), mkChainSyncClient )
import Kupo.App.Http
( runServer )
import Kupo.App.Mailbox
Expand All @@ -54,7 +51,7 @@ import Kupo.Control.MonadDatabase
import Kupo.Control.MonadLog
( MonadLog (..), nullTracer, withStdoutTracers )
import Kupo.Control.MonadOuroboros
( MonadOuroboros (..), NodeToClientVersion (..) )
( MonadOuroboros (..), NodeToClientVersion (..), TraceChainSync (..) )
import Kupo.Control.MonadSTM
( MonadSTM (..) )
import Kupo.Options
Expand Down Expand Up @@ -142,7 +139,8 @@ kupo tr@Tracers{tracerChainSync, tracerHttp, tracerDatabase} = hijackSigTerm *>

-- Chain-Sync client, fetching blocks from the network
( let client = mkChainSyncClient (producer tr mailbox db) checkpoints
in withChainSyncServer [ NodeToClientV_9 .. NodeToClientV_12 ]
in withChainSyncServer tracerChainSync
[ NodeToClientV_9 .. NodeToClientV_12 ]
networkMagic
slotsPerEpoch
nodeSocket
Expand Down
4 changes: 3 additions & 1 deletion src/Kupo/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Kupo.App
import Kupo.Prelude

import Kupo.App.ChainSync
( ChainSyncHandler (..), TraceChainSync (..) )
( ChainSyncHandler (..) )
import Kupo.App.Http
( TraceHttpServer )
import Kupo.App.Mailbox
Expand All @@ -31,6 +31,8 @@ import Kupo.Control.MonadDatabase
( Database (..), MonadDatabase (..), TraceDatabase (..) )
import Kupo.Control.MonadLog
( MonadLog (..), Tracer, TracerDefinition (..), TracerHKD )
import Kupo.Control.MonadOuroboros
( TraceChainSync (..) )
import Kupo.Control.MonadSTM
( MonadSTM (..) )
import Kupo.Control.MonadThrow
Expand Down
32 changes: 0 additions & 32 deletions src/Kupo/App/ChainSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ module Kupo.App.ChainSync
( ChainSyncHandler (..)
, mkChainSyncClient
, IntersectionNotFoundException (..)
, TraceChainSync (..)
) where

import Kupo.Prelude

import Kupo.Control.MonadLog
( HasSeverityAnnotation (..), Severity (..) )
import Kupo.Control.MonadThrow
( MonadThrow (..) )
import Kupo.Data.ChainSync
Expand Down Expand Up @@ -96,32 +93,3 @@ mkChainSyncClient ChainSyncHandler{onRollBackward, onRollForward} pts =
-- TODO: Make this configurable as it depends on available machine's resources.
maxInFlight :: Int
maxInFlight = 100

--
-- Tracer
--

data TraceChainSync where
ChainSyncRollBackward
:: { point :: SlotNo }
-> TraceChainSync
ChainSyncRollForward
:: { slotNo :: SlotNo, matches :: Int }
-> TraceChainSync
ChainSyncIntersectionNotFound
:: { points :: [WithOrigin SlotNo] }
-> TraceChainSync
deriving stock (Generic, Show)

instance ToJSON TraceChainSync where
toEncoding =
defaultGenericToEncoding

instance HasSeverityAnnotation TraceChainSync where
getSeverityAnnotation = \case
ChainSyncRollForward{} ->
Debug
ChainSyncRollBackward{} ->
Notice
ChainSyncIntersectionNotFound{} ->
Error
110 changes: 102 additions & 8 deletions src/Kupo/Control/MonadOuroboros.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

{-# LANGUAGE TypeApplications #-}

{-# OPTIONS_GHC -fno-warn-orphans #-}

module Kupo.Control.MonadOuroboros
( MonadOuroboros (..)
, NetworkMagic (..)
, EpochSlots (..)
, NodeToClientVersion (..)
, TraceChainSync (..)
) where

import Kupo.Prelude
Expand All @@ -17,10 +20,26 @@ import Cardano.Chain.Slotting
( EpochSlots (..) )
import Cardano.Ledger.Crypto
( StandardCrypto )
import Cardano.Slotting.Slot
( SlotNo )
import Control.Exception.Safe
( IOException, isAsyncException )
import Control.Monad.Class.MonadThrow
( MonadCatch (..), MonadThrow (..) )
import Control.Monad.Class.MonadTimer
( threadDelay )
import Control.Tracer
( Tracer, traceWith )
import Control.Tracer
( nullTracer )
import Data.List
( isInfixOf )
import Data.Map.Strict
( (!) )
import Data.Severity
( HasSeverityAnnotation (..), Severity (..) )
import Data.Time.Clock
( DiffTime )
import Ouroboros.Consensus.Byron.Ledger.Config
( CodecConfig (..) )
import Ouroboros.Consensus.Cardano
Expand Down Expand Up @@ -57,16 +76,21 @@ import Ouroboros.Network.NodeToClient
, localSnocket
, withIOManager
)
import Ouroboros.Network.Point
( WithOrigin (..) )
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
( ChainSyncClientPipelined (..), chainSyncClientPeerPipelined )
import Ouroboros.Network.Protocol.Handshake.Version
( combineVersions, simpleSingletonVersions )
import System.IO.Error
( isDoesNotExistError )

class MonadOuroboros (m :: Type -> Type) where
type Block m :: Type
withChainSyncServer
:: IsBlock (Block m)
=> [NodeToClientVersion]
=> Tracer m TraceChainSync
-> [NodeToClientVersion]
-> NetworkMagic
-> EpochSlots
-> FilePath
Expand All @@ -80,9 +104,11 @@ type IsBlock block =

instance MonadOuroboros IO where
type Block IO = CardanoBlock StandardCrypto
withChainSyncServer wantedVersions networkMagic slotsPerEpoch socket client =
withChainSyncServer tr wantedVersions networkMagic slotsPerEpoch socket client =
withIOManager $ \iocp -> do
connectTo (localSnocket iocp) tracers versions socket
& onExceptions
& foreverCalmly
where
tracers = NetworkConnectTracers
{ nctMuxTracer = nullTracer
Expand All @@ -108,16 +134,40 @@ instance MonadOuroboros IO where
let
peer = chainSyncClientPeerPipelined client
codec = cChainSyncCodec (codecs slotsPerEpoch version)
tr = nullTracer
in
runPipelinedPeer tr codec channel peer
runPipelinedPeer nullTracer codec channel peer
}
]

instance MonadOuroboros (ReaderT r IO) where
type Block (ReaderT r IO) = Block IO
withChainSyncServer wantedVersions networkMagic slotsPerEpoch socket =
lift . withChainSyncServer wantedVersions networkMagic slotsPerEpoch socket
foreverCalmly a = do
let a' = a *> threadDelay 5 *> a' in a'

onExceptions
= handle onUnknownException
. handle onIOException

onIOException :: IOException -> IO ()
onIOException e
| isRetryable = do
traceWith tr $ ChainSyncFailedToConnect socket 5
| otherwise = do
traceWith tr $ ChainSyncUnknownException $ show (toException e)
where
isRetryable :: Bool
isRetryable = isResourceVanishedError e || isDoesNotExistError e || isTryAgainError e

isTryAgainError :: IOException -> Bool
isTryAgainError = isInfixOf "resource exhausted" . show

isResourceVanishedError :: IOException -> Bool
isResourceVanishedError = isInfixOf "resource vanished" . show

onUnknownException :: SomeException -> IO ()
onUnknownException e
| isAsyncException e = do
throwIO e
| otherwise =
traceWith tr $ ChainSyncUnknownException $ show e

codecs
:: EpochSlots
Expand All @@ -136,3 +186,47 @@ codecs epochSlots nodeToClientV =
allegra = ShelleyCodecConfig
mary = ShelleyCodecConfig
alonzo = ShelleyCodecConfig

--
-- Tracer
--

data TraceChainSync where
ChainSyncRollBackward
:: { point :: SlotNo }
-> TraceChainSync
ChainSyncRollForward
:: { slotNo :: SlotNo, matches :: Int }
-> TraceChainSync
ChainSyncIntersectionNotFound
:: { points :: [WithOrigin SlotNo] }
-> TraceChainSync
ChainSyncFailedToConnect
:: { socket :: FilePath, retryingIn :: DiffTime }
-> TraceChainSync
ChainSyncUnknownException
:: { exception :: Text }
-> TraceChainSync
deriving stock (Generic, Show)

instance ToJSON TraceChainSync where
toEncoding =
defaultGenericToEncoding

instance ToJSON (WithOrigin SlotNo) where
toEncoding = \case
Origin -> toEncoding ("origin" :: Text)
At sl -> toEncoding sl

instance HasSeverityAnnotation TraceChainSync where
getSeverityAnnotation = \case
ChainSyncRollForward{} ->
Debug
ChainSyncRollBackward{} ->
Notice
ChainSyncFailedToConnect{} ->
Warning
ChainSyncIntersectionNotFound{} ->
Error
ChainSyncUnknownException{} ->
Error
5 changes: 0 additions & 5 deletions src/Kupo/Data/ChainSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,6 @@ slotNoToJson :: SlotNo -> Json.Encoding
slotNoToJson =
Json.integer . toInteger . unSlotNo

instance ToJSON (WithOrigin SlotNo) where
toEncoding = \case
Origin -> toEncoding ("origin" :: Text)
At sl -> toEncoding sl

-- Hash

hashToJson :: HashAlgorithm alg => Hash alg a -> Json.Encoding
Expand Down

0 comments on commit 73a653d

Please sign in to comment.