Dans mon GHC Haskell
utilisant stm, network-conduit et conduit, j'ai un brin pour chaque socket qui est bifurqué automatiquement à l'aide de runTCPServer
. Les brins peuvent communiquer avec d'autres brins en utilisant un TChan de diffusion.
Ceci montre comment je voudrais mettre en place la "chaîne" de conduits :
Ainsi, nous avons ici deux sources (chacune liée à des conduits d'aide qui) qui produisent un Packet
objet qui encoder
acceptera et se transformera en ByteString
puis envoyez la prise. J'ai eu beaucoup de difficultés à fusionner les deux entrées de manière efficace (la performance est un souci).
J'apprécierais si quelqu'un pouvait m'indiquer la bonne direction.
Comme il serait impoli de ma part de poster cette question sans faire de tentative, je vais mettre ici ce que j'ai déjà essayé ;
J'ai écrit/pris une fonction qui (en bloquant) produit une Source à partir d'un TMChan (canal fermable) ;
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
De même, une fonction permettant de transformer un Chan en évier ;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
Ensuite, mergeSources est simple : fork 2 threads (ce que je ne veux vraiment pas faire, mais que diable) qui peuvent mettre leurs nouveaux éléments dans la liste unique dont je produirai ensuite une source ;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
Bien que j'aie réussi à faire en sorte que ces fonctions vérifient le type, je n'ai pas réussi à faire en sorte que l'utilisation de ces fonctions vérifie le type ;
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
Je considère que cette méthode est de toute façon imparfaite - il y a beaucoup de listes et de conversions intermédiaires. Cela ne peut pas être bon pour les performances. Je cherche des conseils.
PS. D'après ce que j'ai compris, il ne s'agit pas d'un doublon ; Fusion de conduits à entrées multiples En effet, dans ma situation, les deux sources produisent le même type de produit et je ne me soucie pas de savoir de quelle source provient le produit. Packet
est produit, tant que je n'attends pas sur l'un d'eux alors qu'un autre a des objets prêts à être consommés.
PPS. Je m'excuse pour l'utilisation (et donc la nécessité de connaissances) de Lens dans le code d'exemple.