62 votes

Un conduit de traitement, 2 sources d'IO du même type

Dans mon GHC Haskell utilisant stm, network-conduit et conduit, j'ai un brin pour chaque socket qui est bifurqué automatiquement à l'aide de runTCPServer . Les brins peuvent communiquer avec d'autres brins en utilisant un TChan de diffusion.

Ceci montre comment je voudrais mettre en place la "chaîne" de conduits :

enter image description here

Ainsi, nous avons ici deux sources (chacune liée à des conduits d'aide qui) qui produisent un Packet objet qui encoder acceptera et se transformera en ByteString puis envoyez la prise. J'ai eu beaucoup de difficultés à fusionner les deux entrées de manière efficace (la performance est un souci).

J'apprécierais si quelqu'un pouvait m'indiquer la bonne direction.


Comme il serait impoli de ma part de poster cette question sans faire de tentative, je vais mettre ici ce que j'ai déjà essayé ;

J'ai écrit/pris une fonction qui (en bloquant) produit une Source à partir d'un TMChan (canal fermable) ;

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

De même, une fonction permettant de transformer un Chan en évier ;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

Ensuite, mergeSources est simple : fork 2 threads (ce que je ne veux vraiment pas faire, mais que diable) qui peuvent mettre leurs nouveaux éléments dans la liste unique dont je produirai ensuite une source ;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

Bien que j'aie réussi à faire en sorte que ces fonctions vérifient le type, je n'ai pas réussi à faire en sorte que l'utilisation de ces fonctions vérifie le type ;

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

Je considère que cette méthode est de toute façon imparfaite - il y a beaucoup de listes et de conversions intermédiaires. Cela ne peut pas être bon pour les performances. Je cherche des conseils.


PS. D'après ce que j'ai compris, il ne s'agit pas d'un doublon ; Fusion de conduits à entrées multiples En effet, dans ma situation, les deux sources produisent le même type de produit et je ne me soucie pas de savoir de quelle source provient le produit. Packet est produit, tant que je n'attends pas sur l'un d'eux alors qu'un autre a des objets prêts à être consommés.

PPS. Je m'excuse pour l'utilisation (et donc la nécessité de connaissances) de Lens dans le code d'exemple.

1voto

Petr Pudlák Points 25113

Je ne sais pas si cela peut vous aider, mais j'ai essayé d'appliquer la suggestion d'Iain et j'ai créé une variante de mergeSources' qui s'arrête dès que l'un des canaux le fait :

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(Cette simple addition est disponible aquí ).

Quelques commentaires sur votre version de mergeSources (prenez-les avec un grain de sel, il se peut que je n'aie pas bien compris quelque chose) :

  • Utilisation de ...TMChan au lieu de ...TBMChan semble dangereux. Si les auteurs sont plus rapides que les lecteurs, votre tas va exploser. En regardant votre diagramme, il semble que cela peut facilement arriver, si votre pair TCP ne lit pas les données assez rapidement. Donc j'utiliserais définitivement ...TBMChan avec une limite peut-être importante mais limitée.

  • Vous n'avez pas besoin de la MonadSTM m contrainte. Tous les éléments de la STM sont intégrés dans IO avec

    liftSTM = liftIO . atomically

    Peut-être que cela vous aidera un peu lorsque vous utiliserez mergeSources' en serverApp .

  • Juste un problème cosmétique, j'ai trouvé

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn

    très difficile à lire en raison de l'utilisation de liftA2 sur le (->) r monade. Je dirais

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c

    serait plus long, mais beaucoup plus facile à lire.

Pourriez-vous peut-être créer un projet autonome où il serait possible de jouer avec serverApp ?

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X