{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.MQTT.Client (
MQTTConfig(..), MQTTClient, QoS(..), Topic, mqttConfig, mkLWT, LastWill(..),
ProtocolLevel(..), Property(..), SubOptions(..), subOptions, MessageCallback(..),
waitForClient,
connectURI, isConnected,
disconnect, normalDisconnect, stopClient,
subscribe, unsubscribe, publish, publishq, pubAliased,
svrProps, connACK, MQTTException(..),
runMQTTConduit, MQTTConduit, isConnectedSTM, connACKSTM,
registerCorrelated, unregisterCorrelated
) where
import Control.Concurrent (myThreadId, threadDelay)
import Control.Concurrent.Async (Async, async, asyncThreadId, cancel, cancelWith, link, race_, wait,
waitAnyCancel)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (STM, TChan, TVar, atomically, check, modifyTVar', newTChan, newTChanIO,
newTVarIO, orElse, readTChan, readTVar, readTVarIO, registerDelay, retry,
writeTChan, writeTVar)
import Control.DeepSeq (force)
import qualified Control.Exception as E
import Control.Monad (forever, guard, join, unless, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as BCS
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import Data.Conduit (ConduitT, Void, await, runConduit, yield, (.|))
import Data.Conduit.Attoparsec (conduitParser)
import qualified Data.Conduit.Combinators as C
import Data.Conduit.Network (AppData, appSink, appSource, clientSettings, runTCPClient)
import Data.Conduit.Network.TLS (runTLSClient, tlsClientConfig, tlsClientTLSSettings)
import Data.Default.Class (def)
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Map.Strict.Decaying as Decaying
import Data.Maybe (fromJust, fromMaybe)
import Data.Text (Text)
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Lazy.Encoding as TEL
import Data.Word (Word16)
import GHC.Conc (labelThread)
import Network.Connection (ConnectionParams (..), TLSSettings (..), connectTo, connectionClose,
connectionGetChunk, connectionPut, initConnectionContext)
import Network.URI (URI (..), nullURIAuth, unEscapeString, uriPort, uriRegName, uriUserInfo)
import qualified Network.WebSockets as WS
import Network.WebSockets.Stream (makeStream)
import System.IO.Error (catchIOError, isEOFError)
import System.Timeout (timeout)
import qualified Data.Set as Set
import Network.MQTT.Topic (Filter, Topic, mkTopic, unFilter, unTopic)
import Network.MQTT.Types as T
data ConnState = Starting
| Connected
| Stopped
| Disconnected
| DiscoErr DisconnectRequest
| ConnErr ConnACKFlags deriving (ConnState -> ConnState -> Bool
(ConnState -> ConnState -> Bool)
-> (ConnState -> ConnState -> Bool) -> Eq ConnState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConnState -> ConnState -> Bool
== :: ConnState -> ConnState -> Bool
$c/= :: ConnState -> ConnState -> Bool
/= :: ConnState -> ConnState -> Bool
Eq, Int -> ConnState -> ShowS
[ConnState] -> ShowS
ConnState -> String
(Int -> ConnState -> ShowS)
-> (ConnState -> String)
-> ([ConnState] -> ShowS)
-> Show ConnState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConnState -> ShowS
showsPrec :: Int -> ConnState -> ShowS
$cshow :: ConnState -> String
show :: ConnState -> String
$cshowList :: [ConnState] -> ShowS
showList :: [ConnState] -> ShowS
Show)
data DispatchType = DSubACK | DUnsubACK | DPubACK | DPubREC | DPubREL | DPubCOMP
deriving (DispatchType -> DispatchType -> Bool
(DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool) -> Eq DispatchType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DispatchType -> DispatchType -> Bool
== :: DispatchType -> DispatchType -> Bool
$c/= :: DispatchType -> DispatchType -> Bool
/= :: DispatchType -> DispatchType -> Bool
Eq, Int -> DispatchType -> ShowS
[DispatchType] -> ShowS
DispatchType -> String
(Int -> DispatchType -> ShowS)
-> (DispatchType -> String)
-> ([DispatchType] -> ShowS)
-> Show DispatchType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DispatchType -> ShowS
showsPrec :: Int -> DispatchType -> ShowS
$cshow :: DispatchType -> String
show :: DispatchType -> String
$cshowList :: [DispatchType] -> ShowS
showList :: [DispatchType] -> ShowS
Show, Eq DispatchType
Eq DispatchType =>
(DispatchType -> DispatchType -> Ordering)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> DispatchType)
-> (DispatchType -> DispatchType -> DispatchType)
-> Ord DispatchType
DispatchType -> DispatchType -> Bool
DispatchType -> DispatchType -> Ordering
DispatchType -> DispatchType -> DispatchType
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DispatchType -> DispatchType -> Ordering
compare :: DispatchType -> DispatchType -> Ordering
$c< :: DispatchType -> DispatchType -> Bool
< :: DispatchType -> DispatchType -> Bool
$c<= :: DispatchType -> DispatchType -> Bool
<= :: DispatchType -> DispatchType -> Bool
$c> :: DispatchType -> DispatchType -> Bool
> :: DispatchType -> DispatchType -> Bool
$c>= :: DispatchType -> DispatchType -> Bool
>= :: DispatchType -> DispatchType -> Bool
$cmax :: DispatchType -> DispatchType -> DispatchType
max :: DispatchType -> DispatchType -> DispatchType
$cmin :: DispatchType -> DispatchType -> DispatchType
min :: DispatchType -> DispatchType -> DispatchType
Ord, Int -> DispatchType
DispatchType -> Int
DispatchType -> [DispatchType]
DispatchType -> DispatchType
DispatchType -> DispatchType -> [DispatchType]
DispatchType -> DispatchType -> DispatchType -> [DispatchType]
(DispatchType -> DispatchType)
-> (DispatchType -> DispatchType)
-> (Int -> DispatchType)
-> (DispatchType -> Int)
-> (DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> DispatchType -> [DispatchType])
-> Enum DispatchType
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: DispatchType -> DispatchType
succ :: DispatchType -> DispatchType
$cpred :: DispatchType -> DispatchType
pred :: DispatchType -> DispatchType
$ctoEnum :: Int -> DispatchType
toEnum :: Int -> DispatchType
$cfromEnum :: DispatchType -> Int
fromEnum :: DispatchType -> Int
$cenumFrom :: DispatchType -> [DispatchType]
enumFrom :: DispatchType -> [DispatchType]
$cenumFromThen :: DispatchType -> DispatchType -> [DispatchType]
enumFromThen :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromTo :: DispatchType -> DispatchType -> [DispatchType]
enumFromTo :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
enumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
Enum, DispatchType
DispatchType -> DispatchType -> Bounded DispatchType
forall a. a -> a -> Bounded a
$cminBound :: DispatchType
minBound :: DispatchType
$cmaxBound :: DispatchType
maxBound :: DispatchType
Bounded)
data MessageCallback = NoCallback
| SimpleCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
| OrderedCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
| LowLevelCallback (MQTTClient -> PublishRequest -> IO ())
| OrderedLowLevelCallback (MQTTClient -> PublishRequest -> IO ())
data MQTTClient = MQTTClient {
MQTTClient -> TChan MQTTPkt
_ch :: TChan MQTTPkt
, MQTTClient -> TVar Word16
_pktID :: TVar Word16
, MQTTClient -> MessageCallback
_cb :: MessageCallback
, MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks :: TVar (Map (DispatchType,Word16) (TChan MQTTPkt))
, MQTTClient -> Map Word16 PublishRequest
_inflight :: Decaying.Map Word16 PublishRequest
, MQTTClient -> TVar ConnState
_st :: TVar ConnState
, MQTTClient -> TVar (Maybe (Async ()))
_ct :: TVar (Maybe (Async ()))
, MQTTClient -> TVar (Map Topic Word16)
_outA :: TVar (Map Topic Word16)
, MQTTClient -> TVar (Map Word16 Topic)
_inA :: TVar (Map Word16 Topic)
, MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
, MQTTClient -> Map ByteString MessageCallback
_corr :: Decaying.Map BL.ByteString MessageCallback
, MQTTClient -> MVar (IO ())
_cbM :: MVar (IO ())
, MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
}
data MQTTConfig = MQTTConfig{
MQTTConfig -> Bool
_cleanSession :: Bool
, MQTTConfig -> Maybe LastWill
_lwt :: Maybe LastWill
, MQTTConfig -> MessageCallback
_msgCB :: MessageCallback
, MQTTConfig -> ProtocolLevel
_protocol :: ProtocolLevel
, MQTTConfig -> [Property]
_connProps :: [Property]
, MQTTConfig -> String
_hostname :: String
, MQTTConfig -> Int
_port :: Int
, MQTTConfig -> String
_connID :: String
, MQTTConfig -> Maybe String
_username :: Maybe String
, MQTTConfig -> Maybe String
_password :: Maybe String
, MQTTConfig -> Int
_connectTimeout :: Int
, MQTTConfig -> TLSSettings
_tlsSettings :: TLSSettings
, MQTTConfig -> Int
_pingPeriod :: Int
, MQTTConfig -> Int
_pingPatience :: Int
}
mqttConfig :: MQTTConfig
mqttConfig :: MQTTConfig
mqttConfig = MQTTConfig{_hostname :: String
_hostname=String
"", _port :: Int
_port=Int
1883, _connID :: String
_connID=String
"",
_username :: Maybe String
_username=Maybe String
forall a. Maybe a
Nothing, _password :: Maybe String
_password=Maybe String
forall a. Maybe a
Nothing,
_cleanSession :: Bool
_cleanSession=Bool
True, _lwt :: Maybe LastWill
_lwt=Maybe LastWill
forall a. Maybe a
Nothing,
_msgCB :: MessageCallback
_msgCB=MessageCallback
NoCallback,
_protocol :: ProtocolLevel
_protocol=ProtocolLevel
Protocol311, _connProps :: [Property]
_connProps=[Property]
forall a. Monoid a => a
mempty,
_connectTimeout :: Int
_connectTimeout=Int
180000000,
_tlsSettings :: TLSSettings
_tlsSettings=TLSSettings
forall a. Default a => a
def,
_pingPeriod :: Int
_pingPeriod=Int
30000000,
_pingPatience :: Int
_pingPatience=Int
90000000}
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_connID :: MQTTConfig -> String
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} URI
uri = do
let cf :: MQTTConfig -> IO MQTTClient
cf = case URI -> String
uriScheme URI
uri of
String
"mqtt:" -> MQTTConfig -> IO MQTTClient
runClient
String
"mqtts:" -> MQTTConfig -> IO MQTTClient
runClientTLS
String
"ws:" -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
False
String
"wss:" -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
True
String
us -> String -> MQTTConfig -> IO MQTTClient
forall a. String -> a
mqttFail (String -> MQTTConfig -> IO MQTTClient)
-> String -> MQTTConfig -> IO MQTTClient
forall a b. (a -> b) -> a -> b
$ String
"invalid URI scheme: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
us
a :: URIAuth
a = URIAuth -> Maybe URIAuth -> URIAuth
forall a. a -> Maybe a -> a
fromMaybe URIAuth
nullURIAuth (Maybe URIAuth -> URIAuth) -> Maybe URIAuth -> URIAuth
forall a b. (a -> b) -> a -> b
$ URI -> Maybe URIAuth
uriAuthority URI
uri
(Maybe String
u,Maybe String
p) = String -> (Maybe String, Maybe String)
up (URIAuth -> String
uriUserInfo URIAuth
a)
Maybe MQTTClient
v <- String -> Int -> IO MQTTClient -> IO (Maybe MQTTClient)
forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
"MQTT connect" Int
_connectTimeout (IO MQTTClient -> IO (Maybe MQTTClient))
-> IO MQTTClient -> IO (Maybe MQTTClient)
forall a b. (a -> b) -> a -> b
$
MQTTConfig -> IO MQTTClient
cf MQTTConfig
cfg{Network.MQTT.Client._connID=cid _protocol (uriFragment uri),
_hostname=uriRegName a, _port=port (uriPort a) (uriScheme uri),
Network.MQTT.Client._username=u, Network.MQTT.Client._password=p}
IO MQTTClient
-> (MQTTClient -> IO MQTTClient)
-> Maybe MQTTClient
-> IO MQTTClient
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> IO MQTTClient
forall a. String -> a
mqttFail (String -> IO MQTTClient) -> String -> IO MQTTClient
forall a b. (a -> b) -> a -> b
$ String
"connection to " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> URI -> String
forall a. Show a => a -> String
show URI
uri String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" timed out") MQTTClient -> IO MQTTClient
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe MQTTClient
v
where
port :: String -> a -> a
port String
"" a
"mqtt:" = a
1883
port String
"" a
"mqtts:" = a
8883
port String
"" a
"ws:" = a
80
port String
"" a
"wss:" = a
443
port String
x a
_ = (String -> a
forall a. Read a => String -> a
read (String -> a) -> ShowS -> String -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShowS
forall a. HasCallStack => [a] -> [a]
tail) String
x
cid :: p -> ShowS
cid p
_ [Char
'#'] = String
""
cid p
_ (Char
'#':String
xs) = String
xs
cid p
_ String
_ = String
""
up :: String -> (Maybe String, Maybe String)
up String
"" = (Maybe String
_username, Maybe String
_password)
up String
x = let (String
u,String
r) = (Char -> Bool) -> String -> (String, String)
forall a. (a -> Bool) -> [a] -> ([a], [a])
break (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
':') (ShowS
forall a. HasCallStack => [a] -> [a]
init String
x) in
(String -> Maybe String
forall a. a -> Maybe a
Just (ShowS
unEscapeString String
u), if String
r String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"" then Maybe String
forall a. Maybe a
Nothing else String -> Maybe String
forall a. a -> Maybe a
Just (ShowS
unEscapeString ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ ShowS
forall a. HasCallStack => [a] -> [a]
tail String
r))
runClient :: MQTTConfig -> IO MQTTClient
runClient :: MQTTConfig -> IO MQTTClient
runClient cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_connID :: MQTTConfig -> String
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (ClientSettings -> (AppData -> IO ()) -> IO ()
forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient (Int -> ByteString -> ClientSettings
clientSettings Int
_port (String -> ByteString
BCS.pack String
_hostname))) MQTTConfig
cfg
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_connID :: MQTTConfig -> String
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (TLSClientConfig -> (AppData -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
tlsConf) MQTTConfig
cfg
where tlsConf :: TLSClientConfig
tlsConf = (Int -> ByteString -> TLSClientConfig
tlsClientConfig Int
_port (String -> ByteString
BCS.pack String
_hostname)) {tlsClientTLSSettings=_tlsSettings}
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (AppData -> IO ()) -> IO ()
mkconn = ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (((AppData -> IO ()) -> IO ()) -> (MQTTConduit -> IO ()) -> IO ()
forall {a} {m :: * -> *} {m :: * -> *} {c} {t} {i} {o}.
(HasReadWrite a, MonadIO m, MonadIO m) =>
((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (AppData -> IO ()) -> IO ()
mkconn)
where adapt :: ((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (a -> c) -> t
mk (ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f = (a -> c) -> t
mk ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> (a -> (ConduitT i ByteString m (), ConduitT ByteString o m ()))
-> a
-> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
forall {ad} {m :: * -> *} {m :: * -> *} {i} {o}.
(HasReadWrite ad, MonadIO m, MonadIO m) =>
ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor)
adaptor :: ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor ad
ad = (ad -> ConduitT i ByteString m ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource ad
ad, ad -> ConduitT ByteString o m ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink ad
ad)
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI{String
uriPath :: String
uriPath :: URI -> String
uriPath, String
uriQuery :: String
uriQuery :: URI -> String
uriQuery} Bool
secure cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_connID :: MQTTConfig -> String
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} =
((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (((Connection -> IO ()) -> IO ()) -> (MQTTConduit -> IO ()) -> IO ()
forall {c} {t}. ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (((Connection -> IO ()) -> IO ())
-> (MQTTConduit -> IO ()) -> IO ())
-> ((Connection -> IO ()) -> IO ())
-> (MQTTConduit -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
cf Bool
secure String
_hostname Int
_port String
endpoint ConnectionOptions
WS.defaultConnectionOptions Headers
hdrs) MQTTConfig
cfg
where
hdrs :: Headers
hdrs = [(CI ByteString
"Sec-WebSocket-Protocol", ByteString
"mqtt")]
adapt :: ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (Connection -> c) -> t
mk MQTTConduit -> c
f = (Connection -> c) -> t
mk (MQTTConduit -> c
f (MQTTConduit -> c)
-> (Connection -> MQTTConduit) -> Connection -> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> MQTTConduit
adaptor)
adaptor :: Connection -> MQTTConduit
adaptor Connection
s = (Connection -> ConduitT () ByteString IO ()
wsSource Connection
s, Connection -> ConduitT ByteString Void IO ()
wsSink Connection
s)
endpoint :: String
endpoint = String
uriPath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
uriQuery
cf :: Bool -> String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
cf :: Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
cf Bool
False = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
forall a.
String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWith
cf Bool
True = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
runWSS
wsSource :: WS.Connection -> ConduitT () BCS.ByteString IO ()
wsSource :: Connection -> ConduitT () ByteString IO ()
wsSource Connection
ws = ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConduitT () ByteString IO () -> ConduitT () ByteString IO ())
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString
bs <- IO ByteString -> ConduitT () ByteString IO ByteString
forall a. IO a -> ConduitT () ByteString IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> ConduitT () ByteString IO ByteString)
-> IO ByteString -> ConduitT () ByteString IO ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
ws
Bool
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BCS.null ByteString
bs) (ConduitT () ByteString IO () -> ConduitT () ByteString IO ())
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ConduitT () ByteString IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs
wsSink :: WS.Connection -> ConduitT BCS.ByteString Void IO ()
wsSink :: Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws = ConduitT ByteString Void IO ()
-> (ByteString -> ConduitT ByteString Void IO ())
-> Maybe ByteString
-> ConduitT ByteString Void IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT ByteString Void IO ()
forall a. a -> ConduitT ByteString Void IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\ByteString
bs -> IO () -> ConduitT ByteString Void IO ()
forall a. IO a -> ConduitT ByteString Void IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
ws ByteString
bs) ConduitT ByteString Void IO ()
-> ConduitT ByteString Void IO () -> ConduitT ByteString Void IO ()
forall a b.
ConduitT ByteString Void IO a
-> ConduitT ByteString Void IO b -> ConduitT ByteString Void IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws) (Maybe ByteString -> ConduitT ByteString Void IO ())
-> ConduitT ByteString Void IO (Maybe ByteString)
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConduitT ByteString Void IO (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
runWSS :: String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
runWSS :: String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
runWSS String
host Int
port String
path ConnectionOptions
options Headers
hdrs' Connection -> IO ()
app = do
let connectionParams :: ConnectionParams
connectionParams = ConnectionParams
{ connectionHostname :: String
connectionHostname = String
host
, connectionPort :: PortNumber
connectionPort = Int -> PortNumber
forall a. Enum a => Int -> a
toEnum Int
port
, connectionUseSecure :: Maybe TLSSettings
connectionUseSecure = TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just TLSSettings
_tlsSettings
, connectionUseSocks :: Maybe ProxySettings
connectionUseSocks = Maybe ProxySettings
forall a. Maybe a
Nothing
}
ConnectionContext
context <- IO ConnectionContext
initConnectionContext
IO Connection
-> (Connection -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
E.bracket (ConnectionContext -> ConnectionParams -> IO Connection
connectTo ConnectionContext
context ConnectionParams
connectionParams) Connection -> IO ()
connectionClose
(\Connection
conn -> do
Stream
stream <- IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
makeStream (Connection -> IO (Maybe ByteString)
reader Connection
conn) (Connection -> Maybe ByteString -> IO ()
writer Connection
conn)
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWithStream Stream
stream String
host String
path ConnectionOptions
options Headers
hdrs' Connection -> IO ()
app)
where
reader :: Connection -> IO (Maybe ByteString)
reader Connection
conn =
IO (Maybe ByteString)
-> (IOError -> IO (Maybe ByteString)) -> IO (Maybe ByteString)
forall a. IO a -> (IOError -> IO a) -> IO a
catchIOError (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
connectionGetChunk Connection
conn)
(\IOError
e -> if IOError -> Bool
isEOFError IOError
e then Maybe ByteString -> IO (Maybe ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing else IOError -> IO (Maybe ByteString)
forall e a. Exception e => e -> IO a
E.throwIO IOError
e)
writer :: Connection -> Maybe ByteString -> IO ()
writer Connection
conn = IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
connectionPut Connection
conn (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BC.toStrict)
mqttFail :: String -> a
mqttFail :: forall a. String -> a
mqttFail = MQTTException -> a
forall a e. Exception e => e -> a
E.throw (MQTTException -> a) -> (String -> MQTTException) -> String -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException
namedAsync :: String -> IO a -> IO (Async a)
namedAsync :: forall a. String -> IO a -> IO (Async a)
namedAsync String
s IO a
a = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
a IO (Async a) -> (Async a -> IO (Async a)) -> IO (Async a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Async a
p -> ThreadId -> String -> IO ()
labelThread (Async a -> ThreadId
forall a. Async a -> ThreadId
asyncThreadId Async a
p) String
s IO () -> IO (Async a) -> IO (Async a)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async a -> IO (Async a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Async a
p
namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout :: forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
n Int
to IO a
a = Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
to (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO a) -> IO a
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> ThreadId -> String -> IO ()
labelThread ThreadId
tid String
n IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
a)
type MQTTConduit = (ConduitT () BCS.ByteString IO (), ConduitT BCS.ByteString Void IO ())
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ())
-> MQTTConfig
-> IO MQTTClient
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (MQTTConduit -> IO ()) -> IO ()
mkconn MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_connID :: MQTTConfig -> String
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} = do
TChan MQTTPkt
_ch <- IO (TChan MQTTPkt)
forall a. IO (TChan a)
newTChanIO
TVar Word16
_pktID <- Word16 -> IO (TVar Word16)
forall a. a -> IO (TVar a)
newTVarIO Word16
1
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks <- Map (DispatchType, Word16) (TChan MQTTPkt)
-> IO (TVar (Map (DispatchType, Word16) (TChan MQTTPkt)))
forall a. a -> IO (TVar a)
newTVarIO Map (DispatchType, Word16) (TChan MQTTPkt)
forall a. Monoid a => a
mempty
Map Word16 PublishRequest
_inflight <- NominalDiffTime -> IO (Map Word16 PublishRequest)
forall k a. Ord k => NominalDiffTime -> IO (Map k a)
Decaying.new NominalDiffTime
60
TVar ConnState
_st <- ConnState -> IO (TVar ConnState)
forall a. a -> IO (TVar a)
newTVarIO ConnState
Starting
TVar (Maybe (Async ()))
_ct <- Maybe (Async ()) -> IO (TVar (Maybe (Async ())))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Async ())
forall a. Maybe a
Nothing
TVar (Map Topic Word16)
_outA <- Map Topic Word16 -> IO (TVar (Map Topic Word16))
forall a. a -> IO (TVar a)
newTVarIO Map Topic Word16
forall a. Monoid a => a
mempty
TVar (Map Word16 Topic)
_inA <- Map Word16 Topic -> IO (TVar (Map Word16 Topic))
forall a. a -> IO (TVar a)
newTVarIO Map Word16 Topic
forall a. Monoid a => a
mempty
TVar ConnACKFlags
_connACKFlags <- ConnACKFlags -> IO (TVar ConnACKFlags)
forall a. a -> IO (TVar a)
newTVarIO (SessionReuse -> ConnACKRC -> [Property] -> ConnACKFlags
ConnACKFlags SessionReuse
NewSession ConnACKRC
ConnUnspecifiedError [Property]
forall a. Monoid a => a
mempty)
Map ByteString MessageCallback
_corr <- NominalDiffTime -> IO (Map ByteString MessageCallback)
forall k a. Ord k => NominalDiffTime -> IO (Map k a)
Decaying.new NominalDiffTime
600
MVar (IO ())
_cbM <- IO (MVar (IO ()))
forall a. IO (MVar a)
newEmptyMVar
TVar (Maybe (Async ()))
_cbHandle <- Maybe (Async ()) -> IO (TVar (Maybe (Async ())))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Async ())
forall a. Maybe a
Nothing
let _cb :: MessageCallback
_cb = MessageCallback
_msgCB
cli :: MQTTClient
cli = MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
_cb :: MessageCallback
..}
Async ()
t <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT clientThread" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ MQTTClient -> IO ()
clientThread MQTTClient
cli
ConnState
s <- STM ConnState -> IO ConnState
forall a. STM a -> IO a
atomically (MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient
cli Async ()
t)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
s ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Disconnected) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
t
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
cli
MQTTClient -> IO MQTTClient
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MQTTClient
cli
where
clientThread :: MQTTClient -> IO ()
clientThread MQTTClient
cli = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally IO ()
connectAndRun IO ()
markDisco
where
connectAndRun :: IO ()
connectAndRun = (MQTTConduit -> IO ()) -> IO ()
mkconn ((MQTTConduit -> IO ()) -> IO ())
-> (MQTTConduit -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MQTTConduit
ad -> MQTTClient -> MQTTConduit -> IO MQTTClient
forall {m :: * -> *} {b} {a} {a}.
Monad m =>
b -> (a, ConduitT ByteString Void m a) -> m b
start MQTTClient
cli MQTTConduit
ad IO MQTTClient -> (MQTTClient -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MQTTConduit -> MQTTClient -> IO ()
run MQTTConduit
ad
markDisco :: IO ()
markDisco = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ConnState
st <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli)
Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ ConnState
st ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Starting Bool -> Bool -> Bool
|| ConnState
st ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected
TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli) ConnState
Disconnected
start :: b -> (a, ConduitT ByteString Void m a) -> m b
start b
c (a
_,ConduitT ByteString Void m a
sink) = do
m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m a -> m ())
-> (ConduitT () Void m a -> m a) -> ConduitT () Void m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () Void m a -> m a
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m a -> m ()) -> ConduitT () Void m a -> m ()
forall a b. (a -> b) -> a -> b
$ do
let req :: ConnectRequest
req = ConnectRequest
connectRequest{T._connID=BC.pack _connID,
T._lastWill=_lwt,
T._username=TEL.encodeUtf8 . TL.pack <$> _username,
T._password=BC.pack <$> _password,
T._cleanSession=_cleanSession,
T._connProperties=_connProps}
ByteString -> ConduitT () ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ProtocolLevel -> ConnectRequest -> ByteString
forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol ConnectRequest
req) ConduitT () ByteString m ()
-> ConduitT ByteString Void m a -> ConduitT () Void m a
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
sink
b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
c
run :: MQTTConduit -> MQTTClient -> IO ()
run (ConduitT () ByteString IO ()
src,ConduitT ByteString Void IO ()
sink) c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = do
TChan Bool
pch <- IO (TChan Bool)
forall a. IO (TChan a)
newTChanIO
Async ()
o <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT out" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
processOut
Async ()
p <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT ping" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO ()
onceConnected IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
forall {b}. IO b
doPing
Async ()
w <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT watchdog" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TChan Bool -> IO ()
forall {a} {b}. TChan a -> IO b
watchdog TChan Bool
pch
Async ()
s <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"MQTT in" (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TChan Bool -> IO ()
doSrc TChan Bool
pch
IO (Async (), ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async (), ()) -> IO ()) -> IO (Async (), ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ [Async ()] -> IO (Async (), ())
forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async ()
o, Async ()
p, Async ()
w, Async ()
s]
where
doSrc :: TChan Bool -> IO ()
doSrc TChan Bool
pch = ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString IO ()
src
ConduitT () ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Parser ByteString MQTTPkt
-> ConduitT ByteString (PositionRange, MQTTPkt) IO ()
forall a (m :: * -> *) b.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a (PositionRange, b) m ()
conduitParser (ProtocolLevel -> Parser ByteString MQTTPkt
parsePacket ProtocolLevel
_protocol)
ConduitT ByteString (PositionRange, MQTTPkt) IO ()
-> ConduitT (PositionRange, MQTTPkt) Void IO ()
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((PositionRange, MQTTPkt) -> IO ())
-> ConduitT (PositionRange, MQTTPkt) Void IO ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_ (\(PositionRange
_,MQTTPkt
x) -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch MQTTClient
c TChan Bool
pch (MQTTPkt -> IO ()) -> MQTTPkt -> IO ()
forall a b. (a -> b) -> a -> b
$! MQTTPkt
x))
onceConnected :: IO ()
onceConnected = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> STM ()
check (Bool -> STM ()) -> (ConnState -> Bool) -> ConnState -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected) (ConnState -> STM ()) -> STM ConnState -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
processOut :: IO ()
processOut = ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO MQTTPkt -> ConduitT () MQTTPkt IO ()
forall (m :: * -> *) a i. Monad m => m a -> ConduitT i a m ()
C.repeatM (IO MQTTPkt -> IO MQTTPkt
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
_ch))
ConduitT () MQTTPkt IO ()
-> ConduitT MQTTPkt Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (MQTTPkt -> ByteString) -> ConduitT MQTTPkt ByteString IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (MQTTPkt -> ByteString) -> MQTTPkt -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolLevel -> MQTTPkt -> ByteString
forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol)
ConduitT MQTTPkt ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT MQTTPkt Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void IO ()
sink
doPing :: IO b
doPing = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
_pingPeriod IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c MQTTPkt
PingPkt
watchdog :: TChan a -> IO b
watchdog TChan a
ch = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
toch <- Int -> IO (TVar Bool)
registerDelay Int
_pingPatience
Bool
timedOut <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ((Bool -> STM ()
check (Bool -> STM ()) -> STM Bool -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
toch) STM () -> STM Bool -> STM Bool
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) STM Bool -> STM Bool -> STM Bool
forall a. STM a -> STM a -> STM a
`orElse` (TChan a -> STM a
forall a. TChan a -> STM a
readTChan TChan a
ch STM a -> STM Bool -> STM Bool
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
timedOut (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> MQTTException -> IO ()
forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout
waitForLaunch :: MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} Async ()
t = do
TVar (Maybe (Async ())) -> Maybe (Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_ct (Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
t)
ConnState
c <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
if ConnState
c ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Starting then STM ConnState
forall a. STM a
retry else ConnState -> STM ConnState
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConnState
c
waitForClient :: MQTTClient -> IO ()
waitForClient :: MQTTClient -> IO ()
waitForClient c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally (MQTTClient -> IO ()
stopCallbackThread MQTTClient
c) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO a
wait (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
IO () -> (SomeException -> IO ()) -> Maybe SomeException -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) SomeException -> IO ()
forall e a. Exception e => e -> IO a
E.throwIO (Maybe SomeException -> IO ()) -> IO (Maybe SomeException) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a. STM a -> IO a
atomically (MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
c ConnState
Stopped)
stopClient :: MQTTClient -> IO ()
stopClient :: MQTTClient -> IO ()
stopClient MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO ()
cancel (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO ()
cancel (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped)
stateX :: MQTTClient -> ConnState -> STM (Maybe E.SomeException)
stateX :: MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} ConnState
want = ConnState -> Maybe SomeException
f (ConnState -> Maybe SomeException)
-> STM ConnState -> STM (Maybe SomeException)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
where
je :: String -> Maybe SomeException
je = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (String -> SomeException) -> String -> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTException -> SomeException
forall e. Exception e => e -> SomeException
E.toException (MQTTException -> SomeException)
-> (String -> MQTTException) -> String -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException
f :: ConnState -> Maybe E.SomeException
f :: ConnState -> Maybe SomeException
f ConnState
Connected = if ConnState
want ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected then Maybe SomeException
forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly connected"
f ConnState
Stopped = if ConnState
want ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Stopped then Maybe SomeException
forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly stopped"
f ConnState
Disconnected = String -> Maybe SomeException
je String
"disconnected"
f ConnState
Starting = String -> Maybe SomeException
je String
"died while starting"
f (DiscoErr DisconnectRequest
x) = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (DisconnectRequest -> SomeException)
-> DisconnectRequest
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTException -> SomeException
forall e. Exception e => e -> SomeException
E.toException (MQTTException -> SomeException)
-> (DisconnectRequest -> MQTTException)
-> DisconnectRequest
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisconnectRequest -> MQTTException
Discod (DisconnectRequest -> Maybe SomeException)
-> DisconnectRequest -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ DisconnectRequest
x
f (ConnErr ConnACKFlags
e) = String -> Maybe SomeException
je (ConnACKFlags -> String
forall a. Show a => a -> String
show ConnACKFlags
e)
data MQTTException = Timeout | BadData | Discod DisconnectRequest | MQTTException String deriving(MQTTException -> MQTTException -> Bool
(MQTTException -> MQTTException -> Bool)
-> (MQTTException -> MQTTException -> Bool) -> Eq MQTTException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MQTTException -> MQTTException -> Bool
== :: MQTTException -> MQTTException -> Bool
$c/= :: MQTTException -> MQTTException -> Bool
/= :: MQTTException -> MQTTException -> Bool
Eq, Int -> MQTTException -> ShowS
[MQTTException] -> ShowS
MQTTException -> String
(Int -> MQTTException -> ShowS)
-> (MQTTException -> String)
-> ([MQTTException] -> ShowS)
-> Show MQTTException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MQTTException -> ShowS
showsPrec :: Int -> MQTTException -> ShowS
$cshow :: MQTTException -> String
show :: MQTTException -> String
$cshowList :: [MQTTException] -> ShowS
showList :: [MQTTException] -> ShowS
Show)
instance E.Exception MQTTException
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} TChan Bool
pch MQTTPkt
pkt =
case MQTTPkt
pkt of
(ConnACKPkt ConnACKFlags
p) -> ConnACKFlags -> IO ()
connACKd ConnACKFlags
p
(PublishPkt PublishRequest
p) -> PublishRequest -> IO ()
pub PublishRequest
p
(SubACKPkt (SubscribeResponse Word16
i [Either SubErr QoS]
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DSubACK Word16
i
(UnsubACKPkt (UnsubscribeResponse Word16
i [Property]
_ [UnsubStatus]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DUnsubACK Word16
i
(PubACKPkt (PubACK Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubACK Word16
i
(PubRELPkt (PubREL Word16
i Word8
_ [Property]
_)) -> Word16 -> IO ()
pubd Word16
i
(PubRECPkt (PubREC Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubREC Word16
i
(PubCOMPPkt (PubCOMP Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubCOMP Word16
i
(DisconnectPkt DisconnectRequest
req) -> DisconnectRequest -> IO ()
disco DisconnectRequest
req
MQTTPkt
PongPkt -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Bool -> STM ()) -> Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan Bool -> Bool -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Bool
pch (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool
True
(AuthPkt AuthRequest
p) -> String -> IO ()
forall a. String -> a
mqttFail (String
"unexpected incoming auth: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> AuthRequest -> String
forall a. Show a => a -> String
show AuthRequest
p)
MQTTPkt
PingPkt -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming ping packet"
(ConnPkt ConnectRequest
_ ProtocolLevel
_) -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming connect"
(SubscribePkt SubscribeRequest
_) -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming subscribe"
(UnsubscribePkt UnsubscribeRequest
_) -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming unsubscribe"
where connACKd :: ConnACKFlags -> IO ()
connACKd connr :: ConnACKFlags
connr@(ConnACKFlags SessionReuse
_ ConnACKRC
val [Property]
_) = case ConnACKRC
val of
ConnACKRC
ConnAccepted -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ConnACKFlags -> ConnACKFlags -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnACKFlags
_connACKFlags ConnACKFlags
connr
TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Connected
ConnACKRC
_ -> do
Maybe (Async ())
t <- TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (ConnACKFlags -> ConnState
ConnErr ConnACKFlags
connr)
MQTTException -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (String -> MQTTException
MQTTException (String -> MQTTException) -> String -> MQTTException
forall a b. (a -> b) -> a -> b
$ ConnACKFlags -> String
forall a. Show a => a -> String
show ConnACKFlags
connr) Maybe (Async ())
t
pub :: PublishRequest -> IO ()
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS0} = STM PublishRequest -> IO PublishRequest
forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p) IO PublishRequest -> (PublishRequest -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe MQTTPkt -> PublishRequest -> IO ()
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify Maybe MQTTPkt
forall a. Maybe a
Nothing
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS1, Word16
_pubPktID :: Word16
_pubPktID :: PublishRequest -> Word16
_pubPktID} =
Maybe MQTTPkt -> PublishRequest -> IO ()
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (MQTTPkt -> Maybe MQTTPkt
forall a. a -> Maybe a
Just (PubACK -> MQTTPkt
PubACKPkt (Word16 -> Word8 -> [Property] -> PubACK
PubACK Word16
_pubPktID Word8
0 [Property]
forall a. Monoid a => a
mempty))) (PublishRequest -> IO ()) -> IO PublishRequest -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM PublishRequest -> IO PublishRequest
forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p)
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS2} = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
p' :: PublishRequest
p'@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubQoS :: PublishRequest -> QoS
_pubPktID :: PublishRequest -> Word16
_pubDup :: Bool
_pubQoS :: QoS
_pubRetain :: Bool
_pubTopic :: ByteString
_pubPktID :: Word16
_pubBody :: ByteString
_pubProps :: [Property]
_pubDup :: PublishRequest -> Bool
_pubRetain :: PublishRequest -> Bool
_pubTopic :: PublishRequest -> ByteString
_pubBody :: PublishRequest -> ByteString
_pubProps :: PublishRequest -> [Property]
..} <- PublishRequest -> STM PublishRequest
resolve PublishRequest
p
Word16 -> PublishRequest -> Map Word16 PublishRequest -> STM ()
forall k v. Ord k => k -> v -> Map k v -> STM ()
Decaying.insert Word16
_pubPktID PublishRequest
p' Map Word16 PublishRequest
_inflight
MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREC -> MQTTPkt
PubRECPkt (Word16 -> Word8 -> [Property] -> PubREC
PubREC Word16
_pubPktID Word8
0 [Property]
forall a. Monoid a => a
mempty))
pubd :: Word16 -> IO ()
pubd Word16
i = STM (Maybe PublishRequest) -> IO (Maybe PublishRequest)
forall a. STM a -> IO a
atomically ((Word16 -> PublishRequest -> Maybe PublishRequest)
-> Word16
-> Map Word16 PublishRequest
-> STM (Maybe PublishRequest)
forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> STM (Maybe a)
Decaying.updateLookupWithKey (\Word16
_ PublishRequest
_ -> Maybe PublishRequest
forall a. Maybe a
Nothing) Word16
i Map Word16 PublishRequest
_inflight) IO (Maybe PublishRequest)
-> (Maybe PublishRequest -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe PublishRequest
Nothing -> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0x92 [Property]
forall a. Monoid a => a
mempty))
Just PublishRequest
p -> Maybe MQTTPkt -> PublishRequest -> IO ()
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (MQTTPkt -> Maybe MQTTPkt
forall a. a -> Maybe a
Just (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0 [Property]
forall a. Monoid a => a
mempty))) PublishRequest
p
notify :: t MQTTPkt -> PublishRequest -> IO ()
notify t MQTTPkt
rpkt p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubQoS :: PublishRequest -> QoS
_pubPktID :: PublishRequest -> Word16
_pubDup :: PublishRequest -> Bool
_pubRetain :: PublishRequest -> Bool
_pubTopic :: PublishRequest -> ByteString
_pubBody :: PublishRequest -> ByteString
_pubProps :: PublishRequest -> [Property]
_pubDup :: Bool
_pubQoS :: QoS
_pubRetain :: Bool
_pubTopic :: ByteString
_pubPktID :: Word16
_pubBody :: ByteString
_pubProps :: [Property]
..} = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Word16 -> Map Word16 PublishRequest -> STM ()
forall k v. Ord k => k -> Map k v -> STM ()
Decaying.delete Word16
_pubPktID Map Word16 PublishRequest
_inflight
MessageCallback
cb <- IO MessageCallback
-> (ByteString -> IO MessageCallback)
-> Maybe ByteString
-> IO MessageCallback
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (MessageCallback -> IO MessageCallback
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageCallback
_cb) (\ByteString
cd -> STM MessageCallback -> IO MessageCallback
forall a. STM a -> IO a
atomically (MessageCallback
-> ByteString
-> Map ByteString MessageCallback
-> STM MessageCallback
forall k v. Ord k => v -> k -> Map k v -> STM v
Decaying.findWithDefault MessageCallback
_cb ByteString
cd Map ByteString MessageCallback
_corr)) Maybe ByteString
cdata
() -> IO ()
forall a. a -> IO a
E.evaluate (() -> IO ()) -> (() -> ()) -> () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. () -> ()
forall a. NFData a => a -> a
force (() -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< case MessageCallback
cb of
MessageCallback
NoCallback -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
call (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
callOrd (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
LowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
call (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
callOrd (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
where
call :: IO a -> IO ()
call IO a
a = Async () -> IO ()
forall a. Async a -> IO ()
link (Async () -> IO ()) -> IO (Async ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"notifier" (IO a
a IO a -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond)
callOrd :: IO a -> IO ()
callOrd IO a
a = MVar (IO ()) -> IO () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (IO ())
_cbM (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a
a IO a -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond
respond :: IO ()
respond = (MQTTPkt -> IO ()) -> t MQTTPkt -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c) t MQTTPkt
rpkt
cdata :: Maybe ByteString
cdata = (Property -> Maybe ByteString -> Maybe ByteString)
-> Maybe ByteString -> [Property] -> Maybe ByteString
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe ByteString -> Maybe ByteString
f Maybe ByteString
forall a. Maybe a
Nothing [Property]
_pubProps
where f :: Property -> Maybe ByteString -> Maybe ByteString
f (PropCorrelationData ByteString
x) Maybe ByteString
_ = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
x
f Property
_ Maybe ByteString
o = Maybe ByteString
o
resolve :: PublishRequest -> STM PublishRequest
resolve p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubQoS :: PublishRequest -> QoS
_pubPktID :: PublishRequest -> Word16
_pubDup :: PublishRequest -> Bool
_pubRetain :: PublishRequest -> Bool
_pubTopic :: PublishRequest -> ByteString
_pubBody :: PublishRequest -> ByteString
_pubProps :: PublishRequest -> [Property]
_pubDup :: Bool
_pubQoS :: QoS
_pubRetain :: Bool
_pubTopic :: ByteString
_pubPktID :: Word16
_pubBody :: ByteString
_pubProps :: [Property]
..} = do
Topic
topic <- Maybe Word16 -> STM Topic
resolveTopic ((Property -> Maybe Word16 -> Maybe Word16)
-> Maybe Word16 -> [Property] -> Maybe Word16
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe Word16 -> Maybe Word16
aliasID Maybe Word16
forall a. Maybe a
Nothing [Property]
_pubProps)
PublishRequest -> STM PublishRequest
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PublishRequest
p{_pubTopic=topicToBL topic}
where
aliasID :: Property -> Maybe Word16 -> Maybe Word16
aliasID (PropTopicAlias Word16
x) Maybe Word16
_ = Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
x
aliasID Property
_ Maybe Word16
o = Maybe Word16
o
resolveTopic :: Maybe Word16 -> STM Topic
resolveTopic Maybe Word16
Nothing = Topic -> STM Topic
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Topic
blToTopic ByteString
_pubTopic)
resolveTopic (Just Word16
x) = do
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
_pubTopic ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"") (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Word16 Topic)
-> (Map Word16 Topic -> Map Word16 Topic) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 Topic)
_inA (Word16 -> Topic -> Map Word16 Topic -> Map Word16 Topic
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
x (ByteString -> Topic
blToTopic ByteString
_pubTopic))
STM Topic -> (Topic -> STM Topic) -> Maybe Topic -> STM Topic
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> STM Topic
forall a. String -> a
mqttFail (String
"failed to lookup topic alias " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word16 -> String
forall a. Show a => a -> String
show Word16
x)) Topic -> STM Topic
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Topic -> STM Topic)
-> (Map Word16 Topic -> Maybe Topic)
-> Map Word16 Topic
-> STM Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> Map Word16 Topic -> Maybe Topic
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
x (Map Word16 Topic -> STM Topic)
-> STM (Map Word16 Topic) -> STM Topic
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Map Word16 Topic) -> STM (Map Word16 Topic)
forall a. TVar a -> STM a
readTVar TVar (Map Word16 Topic)
_inA
delegate :: DispatchType -> Word16 -> IO ()
delegate DispatchType
dt Word16
pid = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
STM ()
-> (TChan MQTTPkt -> STM ()) -> Maybe (TChan MQTTPkt) -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (DispatchType -> STM ()
nak DispatchType
dt) (TChan MQTTPkt -> MQTTPkt -> STM ()
forall a. TChan a -> a -> STM ()
`writeTChan` MQTTPkt
pkt) (Maybe (TChan MQTTPkt) -> STM ())
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Maybe (TChan MQTTPkt))
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DispatchType, Word16)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Maybe (TChan MQTTPkt)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (DispatchType
dt, Word16
pid) (Map (DispatchType, Word16) (TChan MQTTPkt) -> STM ())
-> STM (Map (DispatchType, Word16) (TChan MQTTPkt)) -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM (Map (DispatchType, Word16) (TChan MQTTPkt))
forall a. TVar a -> STM a
readTVar TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks
where
nak :: DispatchType -> STM ()
nak DispatchType
DPubREC = MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt (Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0x92 [Property]
forall a. Monoid a => a
mempty))
nak DispatchType
_ = () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
disco :: DisconnectRequest -> IO ()
disco DisconnectRequest
req = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (DisconnectRequest -> ConnState
DiscoErr DisconnectRequest
req)
MQTTException -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (DisconnectRequest -> MQTTException
Discod DisconnectRequest
req) (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread MQTTClient{MessageCallback
_cb :: MQTTClient -> MessageCallback
_cb :: MessageCallback
_cb, MVar (IO ())
_cbM :: MQTTClient -> MVar (IO ())
_cbM :: MVar (IO ())
_cbM, TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle}
| MessageCallback -> Bool
isOrdered MessageCallback
_cb = do
TVar Bool
latch <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
Async ()
handle <- String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"ordered callbacks" (TVar Bool -> IO ()
waitFor TVar Bool
latch IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
forall {b}. IO b
runOrderedCallbacks)
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (Async ())
cbThread <- TVar (Maybe (Async ())) -> STM (Maybe (Async ()))
forall a. TVar a -> STM a
readTVar TVar (Maybe (Async ()))
_cbHandle
case Maybe (Async ())
cbThread of
Maybe (Async ())
Nothing -> do
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
latch Bool
True
TVar (Maybe (Async ())) -> Maybe (Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_cbHandle (Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
handle)
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just Async ()
_ -> IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
handle)
| Bool
otherwise = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where isOrdered :: MessageCallback -> Bool
isOrdered (OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
_) = Bool
True
isOrdered (OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
_) = Bool
True
isOrdered MessageCallback
_ = Bool
False
waitFor :: TVar Bool -> IO ()
waitFor TVar Bool
latch = STM () -> IO ()
forall a. STM a -> IO a
atomically (Bool -> STM ()
check (Bool -> STM ()) -> STM Bool -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
latch)
runOrderedCallbacks :: IO b
runOrderedCallbacks = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> (MVar (IO ()) -> IO ()) -> MVar (IO ()) -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (MVar (IO ()) -> IO (IO ())) -> MVar (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (IO ()) -> IO (IO ())
forall a. MVar a -> IO a
takeMVar (MVar (IO ()) -> IO b) -> MVar (IO ()) -> IO b
forall a b. (a -> b) -> a -> b
$ MVar (IO ())
_cbM
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread MQTTClient{TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle} = IO () -> (Async () -> IO ()) -> Maybe (Async ()) -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) Async () -> IO ()
forall a. Async a -> IO ()
cancel (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
maybeCancelWith :: E.Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith :: forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e = IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (Async () -> e -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
`cancelWith` e
e)
killConn :: E.Exception e => MQTTClient -> e -> IO ()
killConn :: forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} e
e = TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct IO (Maybe (Async ())) -> (Maybe (Async ()) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= e -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e
checkConnected :: MQTTClient -> STM ()
checkConnected :: MQTTClient -> STM ()
checkConnected MQTTClient
mc = STM ()
-> (SomeException -> STM ()) -> Maybe SomeException -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) SomeException -> STM ()
forall a e. Exception e => e -> a
E.throw (Maybe SomeException -> STM ())
-> STM (Maybe SomeException) -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
mc ConnState
Connected
isConnected :: MQTTClient -> IO Bool
isConnected :: MQTTClient -> IO Bool
isConnected = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool)
-> (MQTTClient -> STM Bool) -> MQTTClient -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM Bool
isConnectedSTM
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = (ConnState
Connected ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
==) (ConnState -> Bool) -> STM ConnState -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} MQTTPkt
p = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> MQTTPkt -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan MQTTPkt
_ch MQTTPkt
p
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (MQTTPkt -> STM ()) -> MQTTPkt -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c
textToBL :: Text -> BL.ByteString
textToBL :: Text -> ByteString
textToBL = ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (Text -> ByteString) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8
blToText :: BL.ByteString -> Text
blToText :: ByteString -> Text
blToText = ByteString -> Text
TE.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict
topicToBL :: Topic -> BL.ByteString
topicToBL :: Topic -> ByteString
topicToBL = Text -> ByteString
textToBL (Text -> ByteString) -> (Topic -> Text) -> Topic -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Topic -> Text
unTopic
filterToBL :: Filter -> BL.ByteString
filterToBL :: Filter -> ByteString
filterToBL = Text -> ByteString
textToBL (Text -> ByteString) -> (Filter -> Text) -> Filter -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter -> Text
unFilter
blToTopic :: BL.ByteString -> Topic
blToTopic :: ByteString -> Topic
blToTopic = Maybe Topic -> Topic
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Topic -> Topic)
-> (ByteString -> Maybe Topic) -> ByteString -> Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Topic
mkTopic (Text -> Maybe Topic)
-> (ByteString -> Text) -> ByteString -> Maybe Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
blToText
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} [DispatchType]
dts = do
MQTTClient -> STM ()
checkConnected MQTTClient
c
TChan MQTTPkt
ch <- STM (TChan MQTTPkt)
forall a. STM (TChan a)
newTChan
Word16
pid <- TVar Word16 -> STM Word16
forall a. TVar a -> STM a
readTVar TVar Word16
_pktID
TVar Word16 -> (Word16 -> Word16) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Word16
_pktID ((Word16 -> Word16) -> STM ()) -> (Word16 -> Word16) -> STM ()
forall a b. (a -> b) -> a -> b
$ if Word16
pid Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
forall a. Bounded a => a
maxBound then Word16 -> Word16 -> Word16
forall a b. a -> b -> a
const Word16
1 else Word16 -> Word16
forall a. Enum a => a -> a
succ
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => Map k a -> Map k a -> Map k a
Map.union ([((DispatchType, Word16), TChan MQTTPkt)]
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [((DispatchType
t, Word16
pid), TChan MQTTPkt
ch) | DispatchType
t <- [DispatchType]
dts]))
(TChan MQTTPkt, Word16) -> STM (TChan MQTTPkt, Word16)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)
releasePktID :: MQTTClient -> (DispatchType,Word16) -> STM ()
releasePktID :: MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} (DispatchType, Word16)
k = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks ((DispatchType, Word16)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (DispatchType, Word16)
k)
releasePktIDs :: MQTTClient -> [(DispatchType,Word16)] -> STM ()
releasePktIDs :: MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} [(DispatchType, Word16)]
ks = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall {a}.
Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany
where deleteMany :: Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany Map (DispatchType, Word16) a
m = Map (DispatchType, Word16) a
-> Set (DispatchType, Word16) -> Map (DispatchType, Word16) a
forall k a. Ord k => Map k a -> Set k -> Map k a
Map.withoutKeys Map (DispatchType, Word16) a
m ([(DispatchType, Word16)] -> Set (DispatchType, Word16)
forall a. Ord a => [a] -> Set a
Set.fromList [(DispatchType, Word16)]
ks)
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} DispatchType
dt Word16 -> MQTTPkt
f = do
(TChan MQTTPkt
ch,Word16
pid) <- STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a. STM a -> IO a
atomically (STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16))
-> STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a b. (a -> b) -> a -> b
$ do
(TChan MQTTPkt
ch,Word16
pid) <- MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType
dt]
MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (Word16 -> MQTTPkt
f Word16
pid)
(TChan MQTTPkt, Word16) -> STM (TChan MQTTPkt, Word16)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TChan MQTTPkt
ch,Word16
pid)
STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ do
ConnState
st <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnState
st ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
/= ConnState
Connected) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ String -> STM ()
forall a. String -> a
mqttFail String
"disconnected waiting for response"
MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID MQTTClient
c (DispatchType
dt,Word16
pid)
TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
subscribe :: MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property])
subscribe :: MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
c [(Filter, SubOptions)]
ls [Property]
props = do
MQTTClient -> IO ()
runCallbackThread MQTTClient
c
MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DSubACK (\Word16
pid -> SubscribeRequest -> MQTTPkt
SubscribePkt (SubscribeRequest -> MQTTPkt) -> SubscribeRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16
-> [(ByteString, SubOptions)] -> [Property] -> SubscribeRequest
SubscribeRequest Word16
pid [(ByteString, SubOptions)]
ls' [Property]
props) IO MQTTPkt
-> (MQTTPkt -> IO ([Either SubErr QoS], [Property]))
-> IO ([Either SubErr QoS], [Property])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
SubACKPkt (SubscribeResponse Word16
_ [Either SubErr QoS]
rs [Property]
aprops) -> ([Either SubErr QoS], [Property])
-> IO ([Either SubErr QoS], [Property])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Either SubErr QoS]
rs, [Property]
aprops)
MQTTPkt
pkt -> String -> IO ([Either SubErr QoS], [Property])
forall a. String -> a
mqttFail (String -> IO ([Either SubErr QoS], [Property]))
-> String -> IO ([Either SubErr QoS], [Property])
forall a b. (a -> b) -> a -> b
$ String
"unexpected response to subscribe: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MQTTPkt -> String
forall a. Show a => a -> String
show MQTTPkt
pkt
where ls' :: [(ByteString, SubOptions)]
ls' = ((Filter, SubOptions) -> (ByteString, SubOptions))
-> [(Filter, SubOptions)] -> [(ByteString, SubOptions)]
forall a b. (a -> b) -> [a] -> [b]
map ((Filter -> ByteString)
-> (Filter, SubOptions) -> (ByteString, SubOptions)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first Filter -> ByteString
filterToBL) [(Filter, SubOptions)]
ls
unsubscribe :: MQTTClient -> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe :: MQTTClient
-> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe MQTTClient
c [Filter]
ls [Property]
props = do
(UnsubACKPkt (UnsubscribeResponse Word16
_ [Property]
rsn [UnsubStatus]
rprop)) <- MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DUnsubACK (\Word16
pid -> UnsubscribeRequest -> MQTTPkt
UnsubscribePkt (UnsubscribeRequest -> MQTTPkt) -> UnsubscribeRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16 -> [ByteString] -> [Property] -> UnsubscribeRequest
UnsubscribeRequest Word16
pid ((Filter -> ByteString) -> [Filter] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map Filter -> ByteString
filterToBL [Filter]
ls) [Property]
props)
([UnsubStatus], [Property]) -> IO ([UnsubStatus], [Property])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([UnsubStatus]
rprop, [Property]
rsn)
publish :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> IO ()
publish :: MQTTClient -> Topic -> ByteString -> Bool -> IO ()
publish MQTTClient
c Topic
t ByteString
m Bool
r = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
QoS0 [Property]
forall a. Monoid a => a
mempty
publishq :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> QoS
-> [Property]
-> IO ()
publishq :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
(TChan MQTTPkt
ch,Word16
pid) <- STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a. STM a -> IO a
atomically (STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16))
-> STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a b. (a -> b) -> a -> b
$ MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType]
types
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally (TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid) (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs MQTTClient
c [(DispatchType
t',Word16
pid) | DispatchType
t' <- [DispatchType]
types])
where
types :: [DispatchType]
types = [DispatchType
DPubACK, DispatchType
DPubREC, DispatchType
DPubCOMP]
publishAndWait :: TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid = do
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (Word16 -> MQTTPkt
pkt Word16
pid)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (QoS
q QoS -> QoS -> Bool
forall a. Ord a => a -> a -> Bool
> QoS
QoS0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
pkt :: Word16 -> MQTTPkt
pkt Word16
pid = PublishRequest -> MQTTPkt
PublishPkt (PublishRequest -> MQTTPkt) -> PublishRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ PublishRequest {_pubDup :: Bool
_pubDup = Bool
False,
_pubQoS :: QoS
_pubQoS = QoS
q,
_pubPktID :: Word16
_pubPktID = Word16
pid,
_pubRetain :: Bool
_pubRetain = Bool
r,
_pubTopic :: ByteString
_pubTopic = Topic -> ByteString
topicToBL Topic
t,
_pubBody :: ByteString
_pubBody = ByteString
m,
_pubProps :: [Property]
_pubProps = [Property]
props}
satisfyQoS :: TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
| QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS0 = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS1 = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(PubACKPkt (PubACK Word16
_ Word8
st [Property]
pprops)) <- STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Word8 -> Bool
forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 1 publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
pprops)
| QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS2 = IO ()
waitRec
| Bool
otherwise = String -> IO ()
forall a. HasCallStack => String -> a
error String
"invalid QoS"
where
isOK :: a -> Bool
isOK a
0 = Bool
True
isOK a
16 = Bool
True
isOK a
_ = Bool
False
waitRec :: IO ()
waitRec = STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch) IO MQTTPkt -> (MQTTPkt -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PubRECPkt (PubREC Word16
_ Word8
st [Property]
recprops) -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Word8 -> Bool
forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 2 REC publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
recprops)
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt (PubREL -> MQTTPkt) -> PubREL -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0 [Property]
forall a. Monoid a => a
mempty)
PubCOMPPkt (PubCOMP Word16
_ Word8
st' [Property]
compprops) ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
st' Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 2 COMP publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st' String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
compprops)
MQTTPkt
wtf -> String -> IO ()
forall a. String -> a
mqttFail (String
"unexpected packet received in QoS2 publish: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MQTTPkt -> String
forall a. Show a => a -> String
show MQTTPkt
wtf)
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} DiscoReason
reason [Property]
props = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ IO ()
getDisconnected IO ()
orDieTrying
where
getDisconnected :: IO ()
getDisconnected = do
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (DisconnectRequest -> MQTTPkt
DisconnectPkt (DisconnectRequest -> MQTTPkt) -> DisconnectRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ DiscoReason -> [Property] -> DisconnectRequest
DisconnectRequest DiscoReason
reason [Property]
props)
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO a
wait (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped
orDieTrying :: IO ()
orDieTrying = Int -> IO ()
threadDelay Int
10000000 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTException -> IO ()
forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect MQTTClient
c = MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect MQTTClient
c DiscoReason
DiscoNormalDisconnection [Property]
forall a. Monoid a => a
mempty
mkLWT :: Topic -> BL.ByteString -> Bool -> T.LastWill
mkLWT :: Topic -> ByteString -> Bool -> LastWill
mkLWT Topic
t ByteString
m Bool
r = T.LastWill{
_willRetain :: Bool
T._willRetain=Bool
r,
_willQoS :: QoS
T._willQoS=QoS
QoS0,
_willTopic :: ByteString
T._willTopic = Topic -> ByteString
topicToBL Topic
t,
_willMsg :: ByteString
T._willMsg=ByteString
m,
_willProps :: [Property]
T._willProps=[Property]
forall a. Monoid a => a
mempty
}
svrProps :: MQTTClient -> IO [Property]
svrProps :: MQTTClient -> IO [Property]
svrProps = (ConnACKFlags -> [Property]) -> IO ConnACKFlags -> IO [Property]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ConnACKFlags -> [Property]
p (IO ConnACKFlags -> IO [Property])
-> (MQTTClient -> IO ConnACKFlags) -> MQTTClient -> IO [Property]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM ConnACKFlags -> IO ConnACKFlags
forall a. STM a -> IO a
atomically (STM ConnACKFlags -> IO ConnACKFlags)
-> (MQTTClient -> STM ConnACKFlags)
-> MQTTClient
-> IO ConnACKFlags
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
where p :: ConnACKFlags -> [Property]
p (ConnACKFlags SessionReuse
_ ConnACKRC
_ [Property]
props) = [Property]
props
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient{TVar ConnACKFlags
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
_connACKFlags} = TVar ConnACKFlags -> STM ConnACKFlags
forall a. TVar a -> STM a
readTVar TVar ConnACKFlags
_connACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK = STM ConnACKFlags -> IO ConnACKFlags
forall a. STM a -> IO a
atomically (STM ConnACKFlags -> IO ConnACKFlags)
-> (MQTTClient -> STM ConnACKFlags)
-> MQTTClient
-> IO ConnACKFlags
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
maxAliases :: MQTTClient -> IO Word16
maxAliases :: MQTTClient -> IO Word16
maxAliases = ([Property] -> Word16) -> IO [Property] -> IO Word16
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Property -> Word16 -> Word16) -> Word16 -> [Property] -> Word16
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Word16 -> Word16
f Word16
0) (IO [Property] -> IO Word16)
-> (MQTTClient -> IO [Property]) -> MQTTClient -> IO Word16
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> IO [Property]
svrProps
where
f :: Property -> Word16 -> Word16
f (PropTopicAliasMaximum Word16
n) Word16
_ = Word16
n
f Property
_ Word16
o = Word16
o
pubAliased :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> QoS
-> [Property]
-> IO ()
pubAliased :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
pubAliased c :: MQTTClient
c@MQTTClient{MVar (IO ())
TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
Word16
x <- MQTTClient -> IO Word16
maxAliases MQTTClient
c
(Topic
t', Word16
n) <- Word16 -> IO (Topic, Word16)
alias Word16
x
let np :: [Property]
np = [Property]
props [Property] -> [Property] -> [Property]
forall a. Semigroup a => a -> a -> a
<> case Word16
n of
Word16
0 -> [Property]
forall a. Monoid a => a
mempty
Word16
_ -> [Word16 -> Property
PropTopicAlias Word16
n]
MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t' ByteString
m Bool
r QoS
q [Property]
np
where
alias :: Word16 -> IO (Topic, Word16)
alias Word16
mv = STM (Topic, Word16) -> IO (Topic, Word16)
forall a. STM a -> IO a
atomically (STM (Topic, Word16) -> IO (Topic, Word16))
-> STM (Topic, Word16) -> IO (Topic, Word16)
forall a b. (a -> b) -> a -> b
$ do
Map Topic Word16
as <- TVar (Map Topic Word16) -> STM (Map Topic Word16)
forall a. TVar a -> STM a
readTVar TVar (Map Topic Word16)
_outA
let n :: Word16
n = Int -> Word16
forall a. Enum a => Int -> a
toEnum (Map Topic Word16 -> Int
forall a. Map Topic a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Map Topic Word16
as Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
cur :: Maybe Word16
cur = Topic -> Map Topic Word16 -> Maybe Word16
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Topic
t Map Topic Word16
as
v :: Word16
v = Word16 -> Maybe Word16 -> Word16
forall a. a -> Maybe a -> a
fromMaybe (if Word16
n Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
mv then Word16
0 else Word16
n) Maybe Word16
cur
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
v Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
0) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Topic Word16) -> Map Topic Word16 -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map Topic Word16)
_outA (Topic -> Word16 -> Map Topic Word16 -> Map Topic Word16
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Topic
t Word16
v Map Topic Word16
as)
(Topic, Word16) -> STM (Topic, Word16)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Topic -> (Word16 -> Topic) -> Maybe Word16 -> Topic
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Topic
t (Topic -> Word16 -> Topic
forall a b. a -> b -> a
const Topic
"") Maybe Word16
cur, Word16
v)
registerCorrelated :: MQTTClient -> BL.ByteString -> MessageCallback -> STM ()
registerCorrelated :: MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient{Map ByteString MessageCallback
_corr :: MQTTClient -> Map ByteString MessageCallback
_corr :: Map ByteString MessageCallback
_corr} ByteString
bs MessageCallback
cb = ByteString
-> MessageCallback -> Map ByteString MessageCallback -> STM ()
forall k v. Ord k => k -> v -> Map k v -> STM ()
Decaying.insert ByteString
bs MessageCallback
cb Map ByteString MessageCallback
_corr
unregisterCorrelated :: MQTTClient -> BL.ByteString -> STM ()
unregisterCorrelated :: MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient{Map ByteString MessageCallback
_corr :: MQTTClient -> Map ByteString MessageCallback
_corr :: Map ByteString MessageCallback
_corr} ByteString
bs = ByteString -> Map ByteString MessageCallback -> STM ()
forall k v. Ord k => k -> Map k v -> STM ()
Decaying.delete ByteString
bs Map ByteString MessageCallback
_corr