{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.MQTT.Client (
MQTTConfig(..), MQTTClient, QoS(..), Topic, mqttConfig, mkLWT, LastWill(..),
ProtocolLevel(..), Property(..), SubOptions(..), subOptions, MessageCallback(..),
waitForClient,
connectURI, isConnected,
disconnect, normalDisconnect, stopClient,
subscribe, unsubscribe, publish, publishq, pubAliased,
svrProps, connACK, MQTTException(..),
runMQTTConduit, MQTTConduit, isConnectedSTM, connACKSTM,
registerCorrelated, unregisterCorrelated
) where
import Control.Concurrent (myThreadId, threadDelay)
import Control.Concurrent.Async (Async, async, asyncThreadId, cancel, cancelWith, link, race_, wait,
waitAnyCancel)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (STM, TChan, TVar, atomically, check, modifyTVar', newTChan, newTChanIO,
newTVarIO, orElse, readTChan, readTVar, readTVarIO, registerDelay, retry,
writeTChan, writeTVar)
import Control.DeepSeq (force)
import qualified Control.Exception as E
import Control.Monad (forever, guard, join, unless, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as BCS
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BC
import Data.Conduit (ConduitT, Void, await, runConduit, yield, (.|))
import Data.Conduit.Attoparsec (conduitParser)
import qualified Data.Conduit.Combinators as C
import Data.Conduit.Network (AppData, appSink, appSource, clientSettings, runTCPClient)
import Data.Conduit.Network.TLS (runTLSClient, tlsClientConfig, tlsClientTLSSettings)
import Data.Default.Class (def)
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Map.Strict.Decaying as Decaying
import Data.Maybe (fromJust, fromMaybe)
import Data.Text (Text)
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Lazy.Encoding as TEL
import Data.Word (Word16)
import GHC.Conc (labelThread)
import Network.Connection (ConnectionParams (..), TLSSettings (..), connectTo, connectionClose,
connectionGetChunk, connectionPut, initConnectionContext)
import Network.URI (URI (..), nullURIAuth, unEscapeString, uriPort, uriRegName, uriUserInfo)
import qualified Network.WebSockets as WS
import Network.WebSockets.Stream (makeStream)
import System.IO.Error (catchIOError, isEOFError)
import System.Timeout (timeout)
import qualified Data.Set as Set
import Network.MQTT.Topic (Filter, Topic, mkTopic, unFilter, unTopic)
import Network.MQTT.Types as T
data ConnState = Starting
| Connected
| Stopped
| Disconnected
| DiscoErr DisconnectRequest
| ConnErr ConnACKFlags deriving (ConnState -> ConnState -> Bool
(ConnState -> ConnState -> Bool)
-> (ConnState -> ConnState -> Bool) -> Eq ConnState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConnState -> ConnState -> Bool
== :: ConnState -> ConnState -> Bool
$c/= :: ConnState -> ConnState -> Bool
/= :: ConnState -> ConnState -> Bool
Eq, Int -> ConnState -> ShowS
[ConnState] -> ShowS
ConnState -> String
(Int -> ConnState -> ShowS)
-> (ConnState -> String)
-> ([ConnState] -> ShowS)
-> Show ConnState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConnState -> ShowS
showsPrec :: Int -> ConnState -> ShowS
$cshow :: ConnState -> String
show :: ConnState -> String
$cshowList :: [ConnState] -> ShowS
showList :: [ConnState] -> ShowS
Show)
data DispatchType = DSubACK | DUnsubACK | DPubACK | DPubREC | DPubREL | DPubCOMP
deriving (DispatchType -> DispatchType -> Bool
(DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool) -> Eq DispatchType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DispatchType -> DispatchType -> Bool
== :: DispatchType -> DispatchType -> Bool
$c/= :: DispatchType -> DispatchType -> Bool
/= :: DispatchType -> DispatchType -> Bool
Eq, Int -> DispatchType -> ShowS
[DispatchType] -> ShowS
DispatchType -> String
(Int -> DispatchType -> ShowS)
-> (DispatchType -> String)
-> ([DispatchType] -> ShowS)
-> Show DispatchType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DispatchType -> ShowS
showsPrec :: Int -> DispatchType -> ShowS
$cshow :: DispatchType -> String
show :: DispatchType -> String
$cshowList :: [DispatchType] -> ShowS
showList :: [DispatchType] -> ShowS
Show, Eq DispatchType
Eq DispatchType =>
(DispatchType -> DispatchType -> Ordering)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> Bool)
-> (DispatchType -> DispatchType -> DispatchType)
-> (DispatchType -> DispatchType -> DispatchType)
-> Ord DispatchType
DispatchType -> DispatchType -> Bool
DispatchType -> DispatchType -> Ordering
DispatchType -> DispatchType -> DispatchType
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DispatchType -> DispatchType -> Ordering
compare :: DispatchType -> DispatchType -> Ordering
$c< :: DispatchType -> DispatchType -> Bool
< :: DispatchType -> DispatchType -> Bool
$c<= :: DispatchType -> DispatchType -> Bool
<= :: DispatchType -> DispatchType -> Bool
$c> :: DispatchType -> DispatchType -> Bool
> :: DispatchType -> DispatchType -> Bool
$c>= :: DispatchType -> DispatchType -> Bool
>= :: DispatchType -> DispatchType -> Bool
$cmax :: DispatchType -> DispatchType -> DispatchType
max :: DispatchType -> DispatchType -> DispatchType
$cmin :: DispatchType -> DispatchType -> DispatchType
min :: DispatchType -> DispatchType -> DispatchType
Ord, Int -> DispatchType
DispatchType -> Int
DispatchType -> [DispatchType]
DispatchType -> DispatchType
DispatchType -> DispatchType -> [DispatchType]
DispatchType -> DispatchType -> DispatchType -> [DispatchType]
(DispatchType -> DispatchType)
-> (DispatchType -> DispatchType)
-> (Int -> DispatchType)
-> (DispatchType -> Int)
-> (DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> [DispatchType])
-> (DispatchType -> DispatchType -> DispatchType -> [DispatchType])
-> Enum DispatchType
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: DispatchType -> DispatchType
succ :: DispatchType -> DispatchType
$cpred :: DispatchType -> DispatchType
pred :: DispatchType -> DispatchType
$ctoEnum :: Int -> DispatchType
toEnum :: Int -> DispatchType
$cfromEnum :: DispatchType -> Int
fromEnum :: DispatchType -> Int
$cenumFrom :: DispatchType -> [DispatchType]
enumFrom :: DispatchType -> [DispatchType]
$cenumFromThen :: DispatchType -> DispatchType -> [DispatchType]
enumFromThen :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromTo :: DispatchType -> DispatchType -> [DispatchType]
enumFromTo :: DispatchType -> DispatchType -> [DispatchType]
$cenumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
enumFromThenTo :: DispatchType -> DispatchType -> DispatchType -> [DispatchType]
Enum, DispatchType
DispatchType -> DispatchType -> Bounded DispatchType
forall a. a -> a -> Bounded a
$cminBound :: DispatchType
minBound :: DispatchType
$cmaxBound :: DispatchType
maxBound :: DispatchType
Bounded)
data MessageCallback = NoCallback
| SimpleCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
| OrderedCallback (MQTTClient -> Topic -> BL.ByteString -> [Property] -> IO ())
| LowLevelCallback (MQTTClient -> PublishRequest -> IO ())
| OrderedLowLevelCallback (MQTTClient -> PublishRequest -> IO ())
data MQTTClient = MQTTClient {
MQTTClient -> TChan MQTTPkt
_ch :: TChan MQTTPkt
, MQTTClient -> TVar Word16
_pktID :: TVar Word16
, MQTTClient -> MessageCallback
_cb :: MessageCallback
, MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks :: TVar (Map (DispatchType,Word16) (TChan MQTTPkt))
, MQTTClient -> Map Word16 PublishRequest
_inflight :: Decaying.Map Word16 PublishRequest
, MQTTClient -> TVar ConnState
_st :: TVar ConnState
, MQTTClient -> TVar (Maybe (Async ()))
_ct :: TVar (Maybe (Async ()))
, MQTTClient -> TVar (Map Topic Word16)
_outA :: TVar (Map Topic Word16)
, MQTTClient -> TVar (Map Word16 Topic)
_inA :: TVar (Map Word16 Topic)
, MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
, MQTTClient -> Map ByteString MessageCallback
_corr :: Decaying.Map BL.ByteString MessageCallback
, MQTTClient -> MVar (IO ())
_cbM :: MVar (IO ())
, MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
}
data MQTTConfig = MQTTConfig{
MQTTConfig -> Bool
_cleanSession :: Bool
, MQTTConfig -> Maybe LastWill
_lwt :: Maybe LastWill
, MQTTConfig -> MessageCallback
_msgCB :: MessageCallback
, MQTTConfig -> ProtocolLevel
_protocol :: ProtocolLevel
, MQTTConfig -> [Property]
_connProps :: [Property]
, MQTTConfig -> String
_hostname :: String
, MQTTConfig -> Int
_port :: Int
, MQTTConfig -> String
_connID :: String
, MQTTConfig -> Maybe String
_username :: Maybe String
, MQTTConfig -> Maybe String
_password :: Maybe String
, MQTTConfig -> Int
_connectTimeout :: Int
, MQTTConfig -> TLSSettings
_tlsSettings :: TLSSettings
, MQTTConfig -> Int
_pingPeriod :: Int
, MQTTConfig -> Int
_pingPatience :: Int
}
mqttConfig :: MQTTConfig
mqttConfig :: MQTTConfig
mqttConfig = MQTTConfig{_hostname :: String
_hostname=String
"", _port :: Int
_port=Int
1883, _connID :: String
_connID=String
"",
_username :: Maybe String
_username=Maybe String
forall a. Maybe a
Nothing, _password :: Maybe String
_password=Maybe String
forall a. Maybe a
Nothing,
_cleanSession :: Bool
_cleanSession=Bool
True, _lwt :: Maybe LastWill
_lwt=Maybe LastWill
forall a. Maybe a
Nothing,
_msgCB :: MessageCallback
_msgCB=MessageCallback
NoCallback,
_protocol :: ProtocolLevel
_protocol=ProtocolLevel
Protocol311, _connProps :: [Property]
_connProps=[Property]
forall a. Monoid a => a
mempty,
_connectTimeout :: Int
_connectTimeout=Int
180000000,
_tlsSettings :: TLSSettings
_tlsSettings=TLSSettings
forall a. Default a => a
def,
_pingPeriod :: Int
_pingPeriod=Int
30000000,
_pingPatience :: Int
_pingPatience=Int
90000000}
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI :: MQTTConfig -> URI -> IO MQTTClient
connectURI cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_connID :: MQTTConfig -> String
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} URI
uri = do
let cf :: MQTTConfig -> IO MQTTClient
cf = case URI -> String
uriScheme URI
uri of
String
"mqtt:" -> MQTTConfig -> IO MQTTClient
runClient
String
"mqtts:" -> MQTTConfig -> IO MQTTClient
runClientTLS
String
"ws:" -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
False
String
"wss:" -> URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI
uri Bool
True
String
us -> String -> MQTTConfig -> IO MQTTClient
forall a. String -> a
mqttFail (String -> MQTTConfig -> IO MQTTClient)
-> String -> MQTTConfig -> IO MQTTClient
forall a b. (a -> b) -> a -> b
$ String
"invalid URI scheme: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
us
a :: URIAuth
a = URIAuth -> Maybe URIAuth -> URIAuth
forall a. a -> Maybe a -> a
fromMaybe URIAuth
nullURIAuth (Maybe URIAuth -> URIAuth) -> Maybe URIAuth -> URIAuth
forall a b. (a -> b) -> a -> b
$ URI -> Maybe URIAuth
uriAuthority URI
uri
(Maybe String
u,Maybe String
p) = String -> (Maybe String, Maybe String)
up (URIAuth -> String
uriUserInfo URIAuth
a)
v <- String -> Int -> IO MQTTClient -> IO (Maybe MQTTClient)
forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
"MQTT connect" Int
_connectTimeout (IO MQTTClient -> IO (Maybe MQTTClient))
-> IO MQTTClient -> IO (Maybe MQTTClient)
forall a b. (a -> b) -> a -> b
$
MQTTConfig -> IO MQTTClient
cf MQTTConfig
cfg{Network.MQTT.Client._connID=cid _protocol (uriFragment uri),
_hostname=uriRegName a, _port=port (uriPort a) (uriScheme uri),
Network.MQTT.Client._username=u, Network.MQTT.Client._password=p}
maybe (mqttFail $ "connection to " <> show uri <> " timed out") pure v
where
port :: String -> a -> a
port String
"" a
"mqtt:" = a
1883
port String
"" a
"mqtts:" = a
8883
port String
"" a
"ws:" = a
80
port String
"" a
"wss:" = a
443
port String
x a
_ = (String -> a
forall a. Read a => String -> a
read (String -> a) -> ShowS -> String -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShowS
forall a. HasCallStack => [a] -> [a]
tail) String
x
cid :: p -> ShowS
cid p
_ [Char
'#'] = String
""
cid p
_ (Char
'#':String
xs) = String
xs
cid p
_ String
_ = String
""
up :: String -> (Maybe String, Maybe String)
up String
"" = (Maybe String
_username, Maybe String
_password)
up String
x = let (String
u,String
r) = (Char -> Bool) -> String -> (String, String)
forall a. (a -> Bool) -> [a] -> ([a], [a])
break (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
':') (ShowS
forall a. HasCallStack => [a] -> [a]
init String
x) in
(String -> Maybe String
forall a. a -> Maybe a
Just (ShowS
unEscapeString String
u), if String
r String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"" then Maybe String
forall a. Maybe a
Nothing else String -> Maybe String
forall a. a -> Maybe a
Just (ShowS
unEscapeString ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ ShowS
forall a. HasCallStack => [a] -> [a]
tail String
r))
runClient :: MQTTConfig -> IO MQTTClient
runClient :: MQTTConfig -> IO MQTTClient
runClient cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_connID :: MQTTConfig -> String
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (ClientSettings -> (AppData -> IO ()) -> IO ()
forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient (Int -> ByteString -> ClientSettings
clientSettings Int
_port (String -> ByteString
BCS.pack String
_hostname))) MQTTConfig
cfg
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS :: MQTTConfig -> IO MQTTClient
runClientTLS cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_connID :: MQTTConfig -> String
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} = ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (TLSClientConfig -> (AppData -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
tlsConf) MQTTConfig
cfg
where tlsConf :: TLSClientConfig
tlsConf = (Int -> ByteString -> TLSClientConfig
tlsClientConfig Int
_port (String -> ByteString
BCS.pack String
_hostname)) {tlsClientTLSSettings=_tlsSettings}
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat :: ((AppData -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
tcpCompat (AppData -> IO ()) -> IO ()
mkconn = ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (((AppData -> IO ()) -> IO ()) -> (MQTTConduit -> IO ()) -> IO ()
forall {a} {m :: * -> *} {m :: * -> *} {c} {t} {i} {o}.
(HasReadWrite a, MonadIO m, MonadIO m) =>
((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (AppData -> IO ()) -> IO ()
mkconn)
where adapt :: ((a -> c) -> t)
-> ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> t
adapt (a -> c) -> t
mk (ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f = (a -> c) -> t
mk ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c
f ((ConduitT i ByteString m (), ConduitT ByteString o m ()) -> c)
-> (a -> (ConduitT i ByteString m (), ConduitT ByteString o m ()))
-> a
-> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
forall {ad} {m :: * -> *} {m :: * -> *} {i} {o}.
(HasReadWrite ad, MonadIO m, MonadIO m) =>
ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor)
adaptor :: ad -> (ConduitT i ByteString m (), ConduitT ByteString o m ())
adaptor ad
ad = (ad -> ConduitT i ByteString m ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource ad
ad, ad -> ConduitT ByteString o m ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink ad
ad)
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS :: URI -> Bool -> MQTTConfig -> IO MQTTClient
runWS URI{String
uriPath :: String
uriPath :: URI -> String
uriPath, String
uriQuery :: String
uriQuery :: URI -> String
uriQuery} Bool
secure cfg :: MQTTConfig
cfg@MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_connID :: MQTTConfig -> String
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} =
((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (((Connection -> IO ()) -> IO ()) -> (MQTTConduit -> IO ()) -> IO ()
forall {c} {t}. ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (((Connection -> IO ()) -> IO ())
-> (MQTTConduit -> IO ()) -> IO ())
-> ((Connection -> IO ()) -> IO ())
-> (MQTTConduit -> IO ())
-> IO ()
forall a b. (a -> b) -> a -> b
$ Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
cf Bool
secure String
_hostname Int
_port String
endpoint ConnectionOptions
WS.defaultConnectionOptions Headers
hdrs) MQTTConfig
cfg
where
hdrs :: Headers
hdrs = [(CI ByteString
"Sec-WebSocket-Protocol", ByteString
"mqtt")]
adapt :: ((Connection -> c) -> t) -> (MQTTConduit -> c) -> t
adapt (Connection -> c) -> t
mk MQTTConduit -> c
f = (Connection -> c) -> t
mk (MQTTConduit -> c
f (MQTTConduit -> c)
-> (Connection -> MQTTConduit) -> Connection -> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> MQTTConduit
adaptor)
adaptor :: Connection -> MQTTConduit
adaptor Connection
s = (Connection -> ConduitT () ByteString IO ()
wsSource Connection
s, Connection -> ConduitT ByteString Void IO ()
wsSink Connection
s)
endpoint :: String
endpoint = String
uriPath String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
uriQuery
cf :: Bool -> String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
cf :: Bool
-> String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
cf Bool
False = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
forall a.
String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
WS.runClientWith
cf Bool
True = String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
runWSS
wsSource :: WS.Connection -> ConduitT () BCS.ByteString IO ()
wsSource :: Connection -> ConduitT () ByteString IO ()
wsSource Connection
ws = ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConduitT () ByteString IO () -> ConduitT () ByteString IO ())
-> ConduitT () ByteString IO () -> ConduitT () ByteString IO ()
forall a b. (a -> b) -> a -> b
$ do
bs <- IO ByteString -> ConduitT () ByteString IO ByteString
forall a. IO a -> ConduitT () ByteString IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> ConduitT () ByteString IO ByteString)
-> IO ByteString -> ConduitT () ByteString IO ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
ws
unless (BCS.null bs) $ yield bs
wsSink :: WS.Connection -> ConduitT BCS.ByteString Void IO ()
wsSink :: Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws = ConduitT ByteString Void IO ()
-> (ByteString -> ConduitT ByteString Void IO ())
-> Maybe ByteString
-> ConduitT ByteString Void IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT ByteString Void IO ()
forall a. a -> ConduitT ByteString Void IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\ByteString
bs -> IO () -> ConduitT ByteString Void IO ()
forall a. IO a -> ConduitT ByteString Void IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
ws ByteString
bs) ConduitT ByteString Void IO ()
-> ConduitT ByteString Void IO () -> ConduitT ByteString Void IO ()
forall a b.
ConduitT ByteString Void IO a
-> ConduitT ByteString Void IO b -> ConduitT ByteString Void IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> ConduitT ByteString Void IO ()
wsSink Connection
ws) (Maybe ByteString -> ConduitT ByteString Void IO ())
-> ConduitT ByteString Void IO (Maybe ByteString)
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConduitT ByteString Void IO (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
runWSS :: String -> Int -> String -> WS.ConnectionOptions -> WS.Headers -> WS.ClientApp () -> IO ()
runWSS :: String
-> Int
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO ())
-> IO ()
runWSS String
host Int
port String
path ConnectionOptions
options Headers
hdrs' Connection -> IO ()
app = do
let connectionParams :: ConnectionParams
connectionParams = ConnectionParams
{ connectionHostname :: String
connectionHostname = String
host
, connectionPort :: PortNumber
connectionPort = Int -> PortNumber
forall a. Enum a => Int -> a
toEnum Int
port
, connectionUseSecure :: Maybe TLSSettings
connectionUseSecure = TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just TLSSettings
_tlsSettings
, connectionUseSocks :: Maybe ProxySettings
connectionUseSocks = Maybe ProxySettings
forall a. Maybe a
Nothing
}
context <- IO ConnectionContext
initConnectionContext
E.bracket (connectTo context connectionParams) connectionClose
(\Connection
conn -> do
stream <- IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
makeStream (Connection -> IO (Maybe ByteString)
reader Connection
conn) (Connection -> Maybe ByteString -> IO ()
writer Connection
conn)
WS.runClientWithStream stream host path options hdrs' app)
where
reader :: Connection -> IO (Maybe ByteString)
reader Connection
conn =
IO (Maybe ByteString)
-> (IOError -> IO (Maybe ByteString)) -> IO (Maybe ByteString)
forall a. IO a -> (IOError -> IO a) -> IO a
catchIOError (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
connectionGetChunk Connection
conn)
(\IOError
e -> if IOError -> Bool
isEOFError IOError
e then Maybe ByteString -> IO (Maybe ByteString)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing else IOError -> IO (Maybe ByteString)
forall e a. (HasCallStack, Exception e) => e -> IO a
E.throwIO IOError
e)
writer :: Connection -> Maybe ByteString -> IO ()
writer Connection
conn = IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
connectionPut Connection
conn (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BC.toStrict)
mqttFail :: String -> a
mqttFail :: forall a. String -> a
mqttFail = MQTTException -> a
forall a e. (HasCallStack, Exception e) => e -> a
E.throw (MQTTException -> a) -> (String -> MQTTException) -> String -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException
namedAsync :: String -> IO a -> IO (Async a)
namedAsync :: forall a. String -> IO a -> IO (Async a)
namedAsync String
s IO a
a = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
a IO (Async a) -> (Async a -> IO (Async a)) -> IO (Async a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Async a
p -> ThreadId -> String -> IO ()
labelThread (Async a -> ThreadId
forall a. Async a -> ThreadId
asyncThreadId Async a
p) String
s IO () -> IO (Async a) -> IO (Async a)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async a -> IO (Async a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Async a
p
namedTimeout :: String -> Int -> IO a -> IO (Maybe a)
namedTimeout :: forall a. String -> Int -> IO a -> IO (Maybe a)
namedTimeout String
n Int
to IO a
a = Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
to (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO a) -> IO a
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> ThreadId -> String -> IO ()
labelThread ThreadId
tid String
n IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
a)
type MQTTConduit = (ConduitT () BCS.ByteString IO (), ConduitT BCS.ByteString Void IO ())
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ())
-> MQTTConfig
-> IO MQTTClient
runMQTTConduit :: ((MQTTConduit -> IO ()) -> IO ()) -> MQTTConfig -> IO MQTTClient
runMQTTConduit (MQTTConduit -> IO ()) -> IO ()
mkconn MQTTConfig{Bool
Int
String
[Property]
Maybe String
Maybe LastWill
ProtocolLevel
TLSSettings
MessageCallback
_connID :: MQTTConfig -> String
_cleanSession :: MQTTConfig -> Bool
_lwt :: MQTTConfig -> Maybe LastWill
_msgCB :: MQTTConfig -> MessageCallback
_protocol :: MQTTConfig -> ProtocolLevel
_connProps :: MQTTConfig -> [Property]
_hostname :: MQTTConfig -> String
_port :: MQTTConfig -> Int
_username :: MQTTConfig -> Maybe String
_password :: MQTTConfig -> Maybe String
_connectTimeout :: MQTTConfig -> Int
_tlsSettings :: MQTTConfig -> TLSSettings
_pingPeriod :: MQTTConfig -> Int
_pingPatience :: MQTTConfig -> Int
_cleanSession :: Bool
_lwt :: Maybe LastWill
_msgCB :: MessageCallback
_protocol :: ProtocolLevel
_connProps :: [Property]
_hostname :: String
_port :: Int
_connID :: String
_username :: Maybe String
_password :: Maybe String
_connectTimeout :: Int
_tlsSettings :: TLSSettings
_pingPeriod :: Int
_pingPatience :: Int
..} = do
_ch <- IO (TChan MQTTPkt)
forall a. IO (TChan a)
newTChanIO
_pktID <- newTVarIO 1
_acks <- newTVarIO mempty
_inflight <- Decaying.new 60
_st <- newTVarIO Starting
_ct <- newTVarIO Nothing
_outA <- newTVarIO mempty
_inA <- newTVarIO mempty
_connACKFlags <- newTVarIO (ConnACKFlags NewSession ConnUnspecifiedError mempty)
_corr <- Decaying.new 600
_cbM <- newEmptyMVar
_cbHandle <- newTVarIO Nothing
let _cb = MessageCallback
_msgCB
cli = MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
_cb :: MessageCallback
..}
t <- namedAsync "MQTT clientThread" $ clientThread cli
s <- atomically (waitForLaunch cli t)
when (s == Disconnected) $ wait t
atomically $ checkConnected cli
pure cli
where
clientThread :: MQTTClient -> IO ()
clientThread MQTTClient
cli = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally IO ()
connectAndRun IO ()
markDisco
where
connectAndRun :: IO ()
connectAndRun = (MQTTConduit -> IO ()) -> IO ()
mkconn ((MQTTConduit -> IO ()) -> IO ())
-> (MQTTConduit -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MQTTConduit
ad -> MQTTClient -> MQTTConduit -> IO MQTTClient
forall {m :: * -> *} {b} {a} {a}.
Monad m =>
b -> (a, ConduitT ByteString Void m a) -> m b
start MQTTClient
cli MQTTConduit
ad IO MQTTClient -> (MQTTClient -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MQTTConduit -> MQTTClient -> IO ()
run MQTTConduit
ad
markDisco :: IO ()
markDisco = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
st <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar (MQTTClient -> TVar ConnState
_st MQTTClient
cli)
guard $ st == Starting || st == Connected
writeTVar (_st cli) Disconnected
start :: b -> (a, ConduitT ByteString Void m a) -> m b
start b
c (a
_,ConduitT ByteString Void m a
sink) = do
m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m a -> m ())
-> (ConduitT () Void m a -> m a) -> ConduitT () Void m a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () Void m a -> m a
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m a -> m ()) -> ConduitT () Void m a -> m ()
forall a b. (a -> b) -> a -> b
$ do
let req :: ConnectRequest
req = ConnectRequest
connectRequest{T._connID=BC.pack _connID,
T._lastWill=_lwt,
T._username=TEL.encodeUtf8 . TL.pack <$> _username,
T._password=BC.pack <$> _password,
T._cleanSession=_cleanSession,
T._connProperties=_connProps}
ByteString -> ConduitT () ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ProtocolLevel -> ConnectRequest -> ByteString
forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol ConnectRequest
req) ConduitT () ByteString m ()
-> ConduitT ByteString Void m a -> ConduitT () Void m a
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m a
sink
b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
c
run :: MQTTConduit -> MQTTClient -> IO ()
run (ConduitT () ByteString IO ()
src,ConduitT ByteString Void IO ()
sink) c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = do
pch <- IO (TChan Bool)
forall a. IO (TChan a)
newTChanIO
o <- namedAsync "MQTT out" $ onceConnected >> processOut
p <- namedAsync "MQTT ping" $ onceConnected >> doPing
w <- namedAsync "MQTT watchdog" $ watchdog pch
s <- namedAsync "MQTT in" $ doSrc pch
void $ waitAnyCancel [o, p, w, s]
where
doSrc :: TChan Bool -> IO ()
doSrc TChan Bool
pch = ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () ByteString IO ()
src
ConduitT () ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Parser ByteString MQTTPkt
-> ConduitT ByteString (PositionRange, MQTTPkt) IO ()
forall a (m :: * -> *) b.
(AttoparsecInput a, MonadThrow m) =>
Parser a b -> ConduitT a (PositionRange, b) m ()
conduitParser (ProtocolLevel -> Parser ByteString MQTTPkt
parsePacket ProtocolLevel
_protocol)
ConduitT ByteString (PositionRange, MQTTPkt) IO ()
-> ConduitT (PositionRange, MQTTPkt) Void IO ()
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((PositionRange, MQTTPkt) -> IO ())
-> ConduitT (PositionRange, MQTTPkt) Void IO ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
C.mapM_ (\(PositionRange
_,MQTTPkt
x) -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch MQTTClient
c TChan Bool
pch (MQTTPkt -> IO ()) -> MQTTPkt -> IO ()
forall a b. (a -> b) -> a -> b
$! MQTTPkt
x))
onceConnected :: IO ()
onceConnected = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> STM ()
check (Bool -> STM ()) -> (ConnState -> Bool) -> ConnState -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected) (ConnState -> STM ()) -> STM ConnState -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
processOut :: IO ()
processOut = ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO MQTTPkt -> ConduitT () MQTTPkt IO ()
forall (m :: * -> *) a i. Monad m => m a -> ConduitT i a m ()
C.repeatM (IO MQTTPkt -> IO MQTTPkt
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
_ch))
ConduitT () MQTTPkt IO ()
-> ConduitT MQTTPkt Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (MQTTPkt -> ByteString) -> ConduitT MQTTPkt ByteString IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map (ByteString -> ByteString
BL.toStrict (ByteString -> ByteString)
-> (MQTTPkt -> ByteString) -> MQTTPkt -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProtocolLevel -> MQTTPkt -> ByteString
forall a. ByteMe a => ProtocolLevel -> a -> ByteString
toByteString ProtocolLevel
_protocol)
ConduitT MQTTPkt ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT MQTTPkt Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void IO ()
sink
doPing :: IO b
doPing = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
_pingPeriod IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c MQTTPkt
PingPkt
watchdog :: TChan a -> IO b
watchdog TChan a
ch = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
toch <- Int -> IO (TVar Bool)
registerDelay Int
_pingPatience
timedOut <- atomically $ ((check =<< readTVar toch) >> pure True) `orElse` (readTChan ch >> pure False)
when timedOut $ killConn c Timeout
waitForLaunch :: MQTTClient -> Async () -> STM ConnState
waitForLaunch MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} Async ()
t = do
TVar (Maybe (Async ())) -> Maybe (Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_ct (Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
t)
c <- TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
if c == Starting then retry else pure c
waitForClient :: MQTTClient -> IO ()
waitForClient :: MQTTClient -> IO ()
waitForClient c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
E.finally (MQTTClient -> IO ()
stopCallbackThread MQTTClient
c) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO a
wait (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
IO () -> (SomeException -> IO ()) -> Maybe SomeException -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) SomeException -> IO ()
forall e a. (HasCallStack, Exception e) => e -> IO a
E.throwIO (Maybe SomeException -> IO ()) -> IO (Maybe SomeException) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a. STM a -> IO a
atomically (MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
c ConnState
Stopped)
stopClient :: MQTTClient -> IO ()
stopClient :: MQTTClient -> IO ()
stopClient MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO ()
cancel (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO ()
cancel (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped)
stateX :: MQTTClient -> ConnState -> STM (Maybe E.SomeException)
stateX :: MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} ConnState
want = ConnState -> Maybe SomeException
f (ConnState -> Maybe SomeException)
-> STM ConnState -> STM (Maybe SomeException)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
where
je :: String -> Maybe SomeException
je = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (String -> SomeException) -> String -> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTException -> SomeException
forall e. Exception e => e -> SomeException
E.toException (MQTTException -> SomeException)
-> (String -> MQTTException) -> String -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MQTTException
MQTTException
f :: ConnState -> Maybe E.SomeException
f :: ConnState -> Maybe SomeException
f ConnState
Connected = if ConnState
want ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Connected then Maybe SomeException
forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly connected"
f ConnState
Stopped = if ConnState
want ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
== ConnState
Stopped then Maybe SomeException
forall a. Maybe a
Nothing else String -> Maybe SomeException
je String
"unexpectedly stopped"
f ConnState
Disconnected = String -> Maybe SomeException
je String
"disconnected"
f ConnState
Starting = String -> Maybe SomeException
je String
"died while starting"
f (DiscoErr DisconnectRequest
x) = SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (SomeException -> Maybe SomeException)
-> (DisconnectRequest -> SomeException)
-> DisconnectRequest
-> Maybe SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTException -> SomeException
forall e. Exception e => e -> SomeException
E.toException (MQTTException -> SomeException)
-> (DisconnectRequest -> MQTTException)
-> DisconnectRequest
-> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DisconnectRequest -> MQTTException
Discod (DisconnectRequest -> Maybe SomeException)
-> DisconnectRequest -> Maybe SomeException
forall a b. (a -> b) -> a -> b
$ DisconnectRequest
x
f (ConnErr ConnACKFlags
e) = String -> Maybe SomeException
je (ConnACKFlags -> String
forall a. Show a => a -> String
show ConnACKFlags
e)
data MQTTException = Timeout | BadData | Discod DisconnectRequest | MQTTException String deriving(MQTTException -> MQTTException -> Bool
(MQTTException -> MQTTException -> Bool)
-> (MQTTException -> MQTTException -> Bool) -> Eq MQTTException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MQTTException -> MQTTException -> Bool
== :: MQTTException -> MQTTException -> Bool
$c/= :: MQTTException -> MQTTException -> Bool
/= :: MQTTException -> MQTTException -> Bool
Eq, Int -> MQTTException -> ShowS
[MQTTException] -> ShowS
MQTTException -> String
(Int -> MQTTException -> ShowS)
-> (MQTTException -> String)
-> ([MQTTException] -> ShowS)
-> Show MQTTException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MQTTException -> ShowS
showsPrec :: Int -> MQTTException -> ShowS
$cshow :: MQTTException -> String
show :: MQTTException -> String
$cshowList :: [MQTTException] -> ShowS
showList :: [MQTTException] -> ShowS
Show)
instance E.Exception MQTTException
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch :: MQTTClient -> TChan Bool -> MQTTPkt -> IO ()
dispatch c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} TChan Bool
pch MQTTPkt
pkt =
case MQTTPkt
pkt of
(ConnACKPkt ConnACKFlags
p) -> ConnACKFlags -> IO ()
connACKd ConnACKFlags
p
(PublishPkt PublishRequest
p) -> PublishRequest -> IO ()
pub PublishRequest
p
(SubACKPkt (SubscribeResponse Word16
i [Either SubErr QoS]
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DSubACK Word16
i
(UnsubACKPkt (UnsubscribeResponse Word16
i [Property]
_ [UnsubStatus]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DUnsubACK Word16
i
(PubACKPkt (PubACK Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubACK Word16
i
(PubRELPkt (PubREL Word16
i Word8
_ [Property]
_)) -> Word16 -> IO ()
pubd Word16
i
(PubRECPkt (PubREC Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubREC Word16
i
(PubCOMPPkt (PubCOMP Word16
i Word8
_ [Property]
_)) -> DispatchType -> Word16 -> IO ()
delegate DispatchType
DPubCOMP Word16
i
(DisconnectPkt DisconnectRequest
req) -> DisconnectRequest -> IO ()
disco DisconnectRequest
req
MQTTPkt
PongPkt -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Bool -> STM ()) -> Bool -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan Bool -> Bool -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Bool
pch (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool
True
(AuthPkt AuthRequest
p) -> String -> IO ()
forall a. String -> a
mqttFail (String
"unexpected incoming auth: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> AuthRequest -> String
forall a. Show a => a -> String
show AuthRequest
p)
MQTTPkt
PingPkt -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming ping packet"
(ConnPkt ConnectRequest
_ ProtocolLevel
_) -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming connect"
(SubscribePkt SubscribeRequest
_) -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming subscribe"
(UnsubscribePkt UnsubscribeRequest
_) -> String -> IO ()
forall a. String -> a
mqttFail String
"unexpected incoming unsubscribe"
where connACKd :: ConnACKFlags -> IO ()
connACKd connr :: ConnACKFlags
connr@(ConnACKFlags SessionReuse
_ ConnACKRC
val [Property]
_) = case ConnACKRC
val of
ConnACKRC
ConnAccepted -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ConnACKFlags -> ConnACKFlags -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnACKFlags
_connACKFlags ConnACKFlags
connr
TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Connected
ConnACKRC
_ -> do
t <- TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
atomically $ writeTVar _st (ConnErr connr)
maybeCancelWith (MQTTException $ show connr) t
pub :: PublishRequest -> IO ()
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS0} = STM PublishRequest -> IO PublishRequest
forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p) IO PublishRequest -> (PublishRequest -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe MQTTPkt -> PublishRequest -> IO ()
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify Maybe MQTTPkt
forall a. Maybe a
Nothing
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS1, Word16
_pubPktID :: Word16
_pubPktID :: PublishRequest -> Word16
_pubPktID} =
Maybe MQTTPkt -> PublishRequest -> IO ()
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (MQTTPkt -> Maybe MQTTPkt
forall a. a -> Maybe a
Just (PubACK -> MQTTPkt
PubACKPkt (Word16 -> Word8 -> [Property] -> PubACK
PubACK Word16
_pubPktID Word8
0 [Property]
forall a. Monoid a => a
mempty))) (PublishRequest -> IO ()) -> IO PublishRequest -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM PublishRequest -> IO PublishRequest
forall a. STM a -> IO a
atomically (PublishRequest -> STM PublishRequest
resolve PublishRequest
p)
pub p :: PublishRequest
p@PublishRequest{_pubQoS :: PublishRequest -> QoS
_pubQoS=QoS
QoS2} = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
p'@PublishRequest{..} <- PublishRequest -> STM PublishRequest
resolve PublishRequest
p
Decaying.insert _pubPktID p' _inflight
sendPacket c (PubRECPkt (PubREC _pubPktID 0 mempty))
pubd :: Word16 -> IO ()
pubd Word16
i = STM (Maybe PublishRequest) -> IO (Maybe PublishRequest)
forall a. STM a -> IO a
atomically ((Word16 -> PublishRequest -> Maybe PublishRequest)
-> Word16
-> Map Word16 PublishRequest
-> STM (Maybe PublishRequest)
forall k a.
Ord k =>
(k -> a -> Maybe a) -> k -> Map k a -> STM (Maybe a)
Decaying.updateLookupWithKey (\Word16
_ PublishRequest
_ -> Maybe PublishRequest
forall a. Maybe a
Nothing) Word16
i Map Word16 PublishRequest
_inflight) IO (Maybe PublishRequest)
-> (Maybe PublishRequest -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe PublishRequest
Nothing -> MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0x92 [Property]
forall a. Monoid a => a
mempty))
Just PublishRequest
p -> Maybe MQTTPkt -> PublishRequest -> IO ()
forall {t :: * -> *}.
Foldable t =>
t MQTTPkt -> PublishRequest -> IO ()
notify (MQTTPkt -> Maybe MQTTPkt
forall a. a -> Maybe a
Just (PubCOMP -> MQTTPkt
PubCOMPPkt (Word16 -> Word8 -> [Property] -> PubCOMP
PubCOMP Word16
i Word8
0 [Property]
forall a. Monoid a => a
mempty))) PublishRequest
p
notify :: t MQTTPkt -> PublishRequest -> IO ()
notify t MQTTPkt
rpkt p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubQoS :: PublishRequest -> QoS
_pubPktID :: PublishRequest -> Word16
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubDup :: Bool
_pubQoS :: QoS
_pubRetain :: Bool
_pubTopic :: ByteString
_pubPktID :: Word16
_pubBody :: ByteString
_pubProps :: [Property]
..} = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Word16 -> Map Word16 PublishRequest -> STM ()
forall k v. Ord k => k -> Map k v -> STM ()
Decaying.delete Word16
_pubPktID Map Word16 PublishRequest
_inflight
cb <- IO MessageCallback
-> (ByteString -> IO MessageCallback)
-> Maybe ByteString
-> IO MessageCallback
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (MessageCallback -> IO MessageCallback
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MessageCallback
_cb) (\ByteString
cd -> STM MessageCallback -> IO MessageCallback
forall a. STM a -> IO a
atomically (MessageCallback
-> ByteString
-> Map ByteString MessageCallback
-> STM MessageCallback
forall k v. Ord k => v -> k -> Map k v -> STM v
Decaying.findWithDefault MessageCallback
_cb ByteString
cd Map ByteString MessageCallback
_corr)) Maybe ByteString
cdata
E.evaluate . force =<< case cb of
MessageCallback
NoCallback -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
call (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
callOrd (MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
f MQTTClient
c (ByteString -> Topic
blToTopic ByteString
_pubTopic) ByteString
_pubBody [Property]
_pubProps)
LowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
call (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
f -> IO () -> IO ()
forall {a}. IO a -> IO ()
callOrd (MQTTClient -> PublishRequest -> IO ()
f MQTTClient
c PublishRequest
p)
where
call :: IO a -> IO ()
call IO a
a = Async () -> IO ()
forall a. Async a -> IO ()
link (Async () -> IO ()) -> IO (Async ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< String -> IO () -> IO (Async ())
forall a. String -> IO a -> IO (Async a)
namedAsync String
"notifier" (IO a
a IO a -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond)
callOrd :: IO a -> IO ()
callOrd IO a
a = MVar (IO ()) -> IO () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (IO ())
_cbM (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a
a IO a -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
respond
respond :: IO ()
respond = (MQTTPkt -> IO ()) -> t MQTTPkt -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c) t MQTTPkt
rpkt
cdata :: Maybe ByteString
cdata = (Property -> Maybe ByteString -> Maybe ByteString)
-> Maybe ByteString -> [Property] -> Maybe ByteString
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe ByteString -> Maybe ByteString
f Maybe ByteString
forall a. Maybe a
Nothing [Property]
_pubProps
where f :: Property -> Maybe ByteString -> Maybe ByteString
f (PropCorrelationData ByteString
x) Maybe ByteString
_ = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
x
f Property
_ Maybe ByteString
o = Maybe ByteString
o
resolve :: PublishRequest -> STM PublishRequest
resolve p :: PublishRequest
p@PublishRequest{Bool
[Property]
Word16
ByteString
QoS
_pubQoS :: PublishRequest -> QoS
_pubPktID :: PublishRequest -> Word16
_pubProps :: PublishRequest -> [Property]
_pubBody :: PublishRequest -> ByteString
_pubTopic :: PublishRequest -> ByteString
_pubRetain :: PublishRequest -> Bool
_pubDup :: PublishRequest -> Bool
_pubDup :: Bool
_pubQoS :: QoS
_pubRetain :: Bool
_pubTopic :: ByteString
_pubPktID :: Word16
_pubBody :: ByteString
_pubProps :: [Property]
..} = do
topic <- Maybe Word16 -> STM Topic
resolveTopic ((Property -> Maybe Word16 -> Maybe Word16)
-> Maybe Word16 -> [Property] -> Maybe Word16
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Maybe Word16 -> Maybe Word16
aliasID Maybe Word16
forall a. Maybe a
Nothing [Property]
_pubProps)
pure p{_pubTopic=topicToBL topic}
where
aliasID :: Property -> Maybe Word16 -> Maybe Word16
aliasID (PropTopicAlias Word16
x) Maybe Word16
_ = Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
x
aliasID Property
_ Maybe Word16
o = Maybe Word16
o
resolveTopic :: Maybe Word16 -> STM Topic
resolveTopic Maybe Word16
Nothing = Topic -> STM Topic
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Topic
blToTopic ByteString
_pubTopic)
resolveTopic (Just Word16
x) = do
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString
_pubTopic ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"") (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Word16 Topic)
-> (Map Word16 Topic -> Map Word16 Topic) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map Word16 Topic)
_inA (Word16 -> Topic -> Map Word16 Topic -> Map Word16 Topic
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Word16
x (ByteString -> Topic
blToTopic ByteString
_pubTopic))
STM Topic -> (Topic -> STM Topic) -> Maybe Topic -> STM Topic
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> STM Topic
forall a. String -> a
mqttFail (String
"failed to lookup topic alias " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word16 -> String
forall a. Show a => a -> String
show Word16
x)) Topic -> STM Topic
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Topic -> STM Topic)
-> (Map Word16 Topic -> Maybe Topic)
-> Map Word16 Topic
-> STM Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> Map Word16 Topic -> Maybe Topic
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Word16
x (Map Word16 Topic -> STM Topic)
-> STM (Map Word16 Topic) -> STM Topic
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Map Word16 Topic) -> STM (Map Word16 Topic)
forall a. TVar a -> STM a
readTVar TVar (Map Word16 Topic)
_inA
delegate :: DispatchType -> Word16 -> IO ()
delegate DispatchType
dt Word16
pid = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
STM ()
-> (TChan MQTTPkt -> STM ()) -> Maybe (TChan MQTTPkt) -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (DispatchType -> STM ()
nak DispatchType
dt) (TChan MQTTPkt -> MQTTPkt -> STM ()
forall a. TChan a -> a -> STM ()
`writeTChan` MQTTPkt
pkt) (Maybe (TChan MQTTPkt) -> STM ())
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Maybe (TChan MQTTPkt))
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DispatchType, Word16)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Maybe (TChan MQTTPkt)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (DispatchType
dt, Word16
pid) (Map (DispatchType, Word16) (TChan MQTTPkt) -> STM ())
-> STM (Map (DispatchType, Word16) (TChan MQTTPkt)) -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM (Map (DispatchType, Word16) (TChan MQTTPkt))
forall a. TVar a -> STM a
readTVar TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks
where
nak :: DispatchType -> STM ()
nak DispatchType
DPubREC = MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt (Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0x92 [Property]
forall a. Monoid a => a
mempty))
nak DispatchType
_ = () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
disco :: DisconnectRequest -> IO ()
disco DisconnectRequest
req = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st (DisconnectRequest -> ConnState
DiscoErr DisconnectRequest
req)
MQTTException -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith (DisconnectRequest -> MQTTException
Discod DisconnectRequest
req) (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread :: MQTTClient -> IO ()
runCallbackThread MQTTClient{MessageCallback
_cb :: MQTTClient -> MessageCallback
_cb :: MessageCallback
_cb, MVar (IO ())
_cbM :: MQTTClient -> MVar (IO ())
_cbM :: MVar (IO ())
_cbM, TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle}
| MessageCallback -> Bool
isOrdered MessageCallback
_cb = do
latch <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
handle <- namedAsync "ordered callbacks" (waitFor latch *> runOrderedCallbacks)
join . atomically $ do
cbThread <- readTVar _cbHandle
case cbThread of
Maybe (Async ())
Nothing -> do
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
latch Bool
True
TVar (Maybe (Async ())) -> Maybe (Async ()) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Async ()))
_cbHandle (Async () -> Maybe (Async ())
forall a. a -> Maybe a
Just Async ()
handle)
IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Just Async ()
_ -> IO () -> STM (IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
handle)
| Bool
otherwise = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where isOrdered :: MessageCallback -> Bool
isOrdered (OrderedCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
_) = Bool
True
isOrdered (OrderedLowLevelCallback MQTTClient -> PublishRequest -> IO ()
_) = Bool
True
isOrdered MessageCallback
_ = Bool
False
waitFor :: TVar Bool -> IO ()
waitFor TVar Bool
latch = STM () -> IO ()
forall a. STM a -> IO a
atomically (Bool -> STM ()
check (Bool -> STM ()) -> STM Bool -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
latch)
runOrderedCallbacks :: IO b
runOrderedCallbacks = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> (MVar (IO ()) -> IO ()) -> MVar (IO ()) -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (MVar (IO ()) -> IO (IO ())) -> MVar (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (IO ()) -> IO (IO ())
forall a. MVar a -> IO a
takeMVar (MVar (IO ()) -> IO b) -> MVar (IO ()) -> IO b
forall a b. (a -> b) -> a -> b
$ MVar (IO ())
_cbM
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread :: MQTTClient -> IO ()
stopCallbackThread MQTTClient{TVar (Maybe (Async ()))
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_cbHandle :: TVar (Maybe (Async ()))
_cbHandle} = IO () -> (Async () -> IO ()) -> Maybe (Async ()) -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) Async () -> IO ()
forall a. Async a -> IO ()
cancel (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_cbHandle
maybeCancelWith :: E.Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith :: forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e = IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (Async () -> e -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
`cancelWith` e
e)
killConn :: E.Exception e => MQTTClient -> e -> IO ()
killConn :: forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} e
e = TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct IO (Maybe (Async ())) -> (Maybe (Async ()) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= e -> Maybe (Async ()) -> IO ()
forall e. Exception e => e -> Maybe (Async ()) -> IO ()
maybeCancelWith e
e
checkConnected :: MQTTClient -> STM ()
checkConnected :: MQTTClient -> STM ()
checkConnected MQTTClient
mc = STM ()
-> (SomeException -> STM ()) -> Maybe SomeException -> STM ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) SomeException -> STM ()
forall a e. (HasCallStack, Exception e) => e -> a
E.throw (Maybe SomeException -> STM ())
-> STM (Maybe SomeException) -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MQTTClient -> ConnState -> STM (Maybe SomeException)
stateX MQTTClient
mc ConnState
Connected
isConnected :: MQTTClient -> IO Bool
isConnected :: MQTTClient -> IO Bool
isConnected = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool)
-> (MQTTClient -> STM Bool) -> MQTTClient -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM Bool
isConnectedSTM
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM :: MQTTClient -> STM Bool
isConnectedSTM MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} = (ConnState
Connected ConnState -> ConnState -> Bool
forall a. Eq a => a -> a -> Bool
==) (ConnState -> Bool) -> STM ConnState -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar ConnState -> STM ConnState
forall a. TVar a -> STM a
readTVar TVar ConnState
_st
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket :: MQTTClient -> MQTTPkt -> STM ()
sendPacket c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} MQTTPkt
p = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> MQTTPkt -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan MQTTPkt
_ch MQTTPkt
p
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO :: MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (MQTTPkt -> STM ()) -> MQTTPkt -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> MQTTPkt -> STM ()
sendPacket MQTTClient
c
textToBL :: Text -> BL.ByteString
textToBL :: Text -> ByteString
textToBL = ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (Text -> ByteString) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8
blToText :: BL.ByteString -> Text
blToText :: ByteString -> Text
blToText = ByteString -> Text
TE.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict
topicToBL :: Topic -> BL.ByteString
topicToBL :: Topic -> ByteString
topicToBL = Text -> ByteString
textToBL (Text -> ByteString) -> (Topic -> Text) -> Topic -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Topic -> Text
unTopic
filterToBL :: Filter -> BL.ByteString
filterToBL :: Filter -> ByteString
filterToBL = Text -> ByteString
textToBL (Text -> ByteString) -> (Filter -> Text) -> Filter -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter -> Text
unFilter
blToTopic :: BL.ByteString -> Topic
blToTopic :: ByteString -> Topic
blToTopic = Maybe Topic -> Topic
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Topic -> Topic)
-> (ByteString -> Maybe Topic) -> ByteString -> Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Maybe Topic
mkTopic (Text -> Maybe Topic)
-> (ByteString -> Text) -> ByteString -> Maybe Topic
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
blToText
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID :: MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} [DispatchType]
dts = do
MQTTClient -> STM ()
checkConnected MQTTClient
c
ch <- STM (TChan MQTTPkt)
forall a. STM (TChan a)
newTChan
pid <- readTVar _pktID
modifyTVar' _pktID $ if pid == maxBound then const 1 else succ
modifyTVar' _acks (Map.union (Map.fromList [((t, pid), ch) | t <- dts]))
pure (ch,pid)
releasePktID :: MQTTClient -> (DispatchType,Word16) -> STM ()
releasePktID :: MQTTClient -> (DispatchType, Word16) -> STM ()
releasePktID c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} (DispatchType, Word16)
k = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks ((DispatchType, Word16)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (DispatchType, Word16)
k)
releasePktIDs :: MQTTClient -> [(DispatchType,Word16)] -> STM ()
releasePktIDs :: MQTTClient -> [(DispatchType, Word16)] -> STM ()
releasePktIDs c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} [(DispatchType, Word16)]
ks = MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
-> (Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_acks Map (DispatchType, Word16) (TChan MQTTPkt)
-> Map (DispatchType, Word16) (TChan MQTTPkt)
forall {a}.
Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany
where deleteMany :: Map (DispatchType, Word16) a -> Map (DispatchType, Word16) a
deleteMany Map (DispatchType, Word16) a
m = Map (DispatchType, Word16) a
-> Set (DispatchType, Word16) -> Map (DispatchType, Word16) a
forall k a. Ord k => Map k a -> Set k -> Map k a
Map.withoutKeys Map (DispatchType, Word16) a
m ([(DispatchType, Word16)] -> Set (DispatchType, Word16)
forall a. Ord a => [a] -> Set a
Set.fromList [(DispatchType, Word16)]
ks)
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait :: MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} DispatchType
dt Word16 -> MQTTPkt
f = do
(ch,pid) <- STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a. STM a -> IO a
atomically (STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16))
-> STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a b. (a -> b) -> a -> b
$ do
(ch,pid) <- MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType
dt]
sendPacket c (f pid)
pure (ch,pid)
atomically $ do
st <- readTVar _st
when (st /= Connected) $ mqttFail "disconnected waiting for response"
releasePktID c (dt,pid)
readTChan ch
subscribe :: MQTTClient -> [(Filter, SubOptions)] -> [Property] -> IO ([Either SubErr QoS], [Property])
subscribe :: MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
c [(Filter, SubOptions)]
ls [Property]
props = do
MQTTClient -> IO ()
runCallbackThread MQTTClient
c
MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DSubACK (\Word16
pid -> SubscribeRequest -> MQTTPkt
SubscribePkt (SubscribeRequest -> MQTTPkt) -> SubscribeRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16
-> [(ByteString, SubOptions)] -> [Property] -> SubscribeRequest
SubscribeRequest Word16
pid [(ByteString, SubOptions)]
ls' [Property]
props) IO MQTTPkt
-> (MQTTPkt -> IO ([Either SubErr QoS], [Property]))
-> IO ([Either SubErr QoS], [Property])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
SubACKPkt (SubscribeResponse Word16
_ [Either SubErr QoS]
rs [Property]
aprops) -> ([Either SubErr QoS], [Property])
-> IO ([Either SubErr QoS], [Property])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Either SubErr QoS]
rs, [Property]
aprops)
MQTTPkt
pkt -> String -> IO ([Either SubErr QoS], [Property])
forall a. String -> a
mqttFail (String -> IO ([Either SubErr QoS], [Property]))
-> String -> IO ([Either SubErr QoS], [Property])
forall a b. (a -> b) -> a -> b
$ String
"unexpected response to subscribe: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MQTTPkt -> String
forall a. Show a => a -> String
show MQTTPkt
pkt
where ls' :: [(ByteString, SubOptions)]
ls' = ((Filter, SubOptions) -> (ByteString, SubOptions))
-> [(Filter, SubOptions)] -> [(ByteString, SubOptions)]
forall a b. (a -> b) -> [a] -> [b]
map ((Filter -> ByteString)
-> (Filter, SubOptions) -> (ByteString, SubOptions)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first Filter -> ByteString
filterToBL) [(Filter, SubOptions)]
ls
unsubscribe :: MQTTClient -> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe :: MQTTClient
-> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe MQTTClient
c [Filter]
ls [Property]
props = do
(UnsubACKPkt (UnsubscribeResponse _ rsn rprop)) <- MQTTClient -> DispatchType -> (Word16 -> MQTTPkt) -> IO MQTTPkt
sendAndWait MQTTClient
c DispatchType
DUnsubACK (\Word16
pid -> UnsubscribeRequest -> MQTTPkt
UnsubscribePkt (UnsubscribeRequest -> MQTTPkt) -> UnsubscribeRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16 -> [ByteString] -> [Property] -> UnsubscribeRequest
UnsubscribeRequest Word16
pid ((Filter -> ByteString) -> [Filter] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map Filter -> ByteString
filterToBL [Filter]
ls) [Property]
props)
pure (rprop, rsn)
publish :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> IO ()
publish :: MQTTClient -> Topic -> ByteString -> Bool -> IO ()
publish MQTTClient
c Topic
t ByteString
m Bool
r = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
QoS0 [Property]
forall a. Monoid a => a
mempty
publishq :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> QoS
-> [Property]
-> IO ()
publishq :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
c Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
(ch,pid) <- STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a. STM a -> IO a
atomically (STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16))
-> STM (TChan MQTTPkt, Word16) -> IO (TChan MQTTPkt, Word16)
forall a b. (a -> b) -> a -> b
$ MQTTClient -> [DispatchType] -> STM (TChan MQTTPkt, Word16)
reservePktID MQTTClient
c [DispatchType]
types
E.finally (publishAndWait ch pid) (atomically $ releasePktIDs c [(t',pid) | t' <- types])
where
types :: [DispatchType]
types = [DispatchType
DPubACK, DispatchType
DPubREC, DispatchType
DPubCOMP]
publishAndWait :: TChan MQTTPkt -> Word16 -> IO ()
publishAndWait TChan MQTTPkt
ch Word16
pid = do
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (Word16 -> MQTTPkt
pkt Word16
pid)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (QoS
q QoS -> QoS -> Bool
forall a. Ord a => a -> a -> Bool
> QoS
QoS0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
pkt :: Word16 -> MQTTPkt
pkt Word16
pid = PublishRequest -> MQTTPkt
PublishPkt (PublishRequest -> MQTTPkt) -> PublishRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ PublishRequest {_pubDup :: Bool
_pubDup = Bool
False,
_pubQoS :: QoS
_pubQoS = QoS
q,
_pubPktID :: Word16
_pubPktID = Word16
pid,
_pubRetain :: Bool
_pubRetain = Bool
r,
_pubTopic :: ByteString
_pubTopic = Topic -> ByteString
topicToBL Topic
t,
_pubBody :: ByteString
_pubBody = ByteString
m,
_pubProps :: [Property]
_pubProps = [Property]
props}
satisfyQoS :: TChan MQTTPkt -> Word16 -> IO ()
satisfyQoS TChan MQTTPkt
ch Word16
pid
| QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS0 = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS1 = IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(PubACKPkt (PubACK _ st pprops)) <- STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (STM MQTTPkt -> IO MQTTPkt) -> STM MQTTPkt -> IO MQTTPkt
forall a b. (a -> b) -> a -> b
$ MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch
unless (isOK st) $ mqttFail ("qos 1 publish error: " <> show st <> " " <> show pprops)
| QoS
q QoS -> QoS -> Bool
forall a. Eq a => a -> a -> Bool
== QoS
QoS2 = IO ()
waitRec
| Bool
otherwise = String -> IO ()
forall a. HasCallStack => String -> a
error String
"invalid QoS"
where
isOK :: a -> Bool
isOK a
0 = Bool
True
isOK a
16 = Bool
True
isOK a
_ = Bool
False
waitRec :: IO ()
waitRec = STM MQTTPkt -> IO MQTTPkt
forall a. STM a -> IO a
atomically (MQTTClient -> STM ()
checkConnected MQTTClient
c STM () -> STM MQTTPkt -> STM MQTTPkt
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan MQTTPkt -> STM MQTTPkt
forall a. TChan a -> STM a
readTChan TChan MQTTPkt
ch) IO MQTTPkt -> (MQTTPkt -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PubRECPkt (PubREC Word16
_ Word8
st [Property]
recprops) -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Word8 -> Bool
forall {a}. (Eq a, Num a) => a -> Bool
isOK Word8
st) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 2 REC publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
recprops)
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (PubREL -> MQTTPkt
PubRELPkt (PubREL -> MQTTPkt) -> PubREL -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ Word16 -> Word8 -> [Property] -> PubREL
PubREL Word16
pid Word8
0 [Property]
forall a. Monoid a => a
mempty)
PubCOMPPkt (PubCOMP Word16
_ Word8
st' [Property]
compprops) ->
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
st' Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> a
mqttFail (String
"qos 2 COMP publish error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word8 -> String
forall a. Show a => a -> String
show Word8
st' String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Property] -> String
forall a. Show a => a -> String
show [Property]
compprops)
MQTTPkt
wtf -> String -> IO ()
forall a. String -> a
mqttFail (String
"unexpected packet received in QoS2 publish: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> MQTTPkt -> String
forall a. Show a => a -> String
show MQTTPkt
wtf)
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect :: MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} DiscoReason
reason [Property]
props = IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
race_ IO ()
getDisconnected IO ()
orDieTrying
where
getDisconnected :: IO ()
getDisconnected = do
MQTTClient -> MQTTPkt -> IO ()
sendPacketIO MQTTClient
c (DisconnectRequest -> MQTTPkt
DisconnectPkt (DisconnectRequest -> MQTTPkt) -> DisconnectRequest -> MQTTPkt
forall a b. (a -> b) -> a -> b
$ DiscoReason -> [Property] -> DisconnectRequest
DisconnectRequest DiscoReason
reason [Property]
props)
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ())
-> (Maybe (Async ()) -> IO (Maybe ())) -> Maybe (Async ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Async () -> IO ()) -> Maybe (Async ()) -> IO (Maybe ())
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse Async () -> IO ()
forall a. Async a -> IO a
wait (Maybe (Async ()) -> IO ()) -> IO (Maybe (Async ())) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Async ()))
_ct
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ConnState -> ConnState -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ConnState
_st ConnState
Stopped
orDieTrying :: IO ()
orDieTrying = Int -> IO ()
threadDelay Int
10000000 IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MQTTClient -> MQTTException -> IO ()
forall e. Exception e => MQTTClient -> e -> IO ()
killConn MQTTClient
c MQTTException
Timeout
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect :: MQTTClient -> IO ()
normalDisconnect MQTTClient
c = MQTTClient -> DiscoReason -> [Property] -> IO ()
disconnect MQTTClient
c DiscoReason
DiscoNormalDisconnection [Property]
forall a. Monoid a => a
mempty
mkLWT :: Topic -> BL.ByteString -> Bool -> T.LastWill
mkLWT :: Topic -> ByteString -> Bool -> LastWill
mkLWT Topic
t ByteString
m Bool
r = T.LastWill{
_willRetain :: Bool
T._willRetain=Bool
r,
_willQoS :: QoS
T._willQoS=QoS
QoS0,
_willTopic :: ByteString
T._willTopic = Topic -> ByteString
topicToBL Topic
t,
_willMsg :: ByteString
T._willMsg=ByteString
m,
_willProps :: [Property]
T._willProps=[Property]
forall a. Monoid a => a
mempty
}
svrProps :: MQTTClient -> IO [Property]
svrProps :: MQTTClient -> IO [Property]
svrProps = (ConnACKFlags -> [Property]) -> IO ConnACKFlags -> IO [Property]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ConnACKFlags -> [Property]
p (IO ConnACKFlags -> IO [Property])
-> (MQTTClient -> IO ConnACKFlags) -> MQTTClient -> IO [Property]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM ConnACKFlags -> IO ConnACKFlags
forall a. STM a -> IO a
atomically (STM ConnACKFlags -> IO ConnACKFlags)
-> (MQTTClient -> STM ConnACKFlags)
-> MQTTClient
-> IO ConnACKFlags
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
where p :: ConnACKFlags -> [Property]
p (ConnACKFlags SessionReuse
_ ConnACKRC
_ [Property]
props) = [Property]
props
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM :: MQTTClient -> STM ConnACKFlags
connACKSTM MQTTClient{TVar ConnACKFlags
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_connACKFlags :: TVar ConnACKFlags
_connACKFlags} = TVar ConnACKFlags -> STM ConnACKFlags
forall a. TVar a -> STM a
readTVar TVar ConnACKFlags
_connACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK :: MQTTClient -> IO ConnACKFlags
connACK = STM ConnACKFlags -> IO ConnACKFlags
forall a. STM a -> IO a
atomically (STM ConnACKFlags -> IO ConnACKFlags)
-> (MQTTClient -> STM ConnACKFlags)
-> MQTTClient
-> IO ConnACKFlags
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> STM ConnACKFlags
connACKSTM
maxAliases :: MQTTClient -> IO Word16
maxAliases :: MQTTClient -> IO Word16
maxAliases = ([Property] -> Word16) -> IO [Property] -> IO Word16
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Property -> Word16 -> Word16) -> Word16 -> [Property] -> Word16
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Property -> Word16 -> Word16
f Word16
0) (IO [Property] -> IO Word16)
-> (MQTTClient -> IO [Property]) -> MQTTClient -> IO Word16
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MQTTClient -> IO [Property]
svrProps
where
f :: Property -> Word16 -> Word16
f (PropTopicAliasMaximum Word16
n) Word16
_ = Word16
n
f Property
_ Word16
o = Word16
o
pubAliased :: MQTTClient
-> Topic
-> BL.ByteString
-> Bool
-> QoS
-> [Property]
-> IO ()
pubAliased :: MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
pubAliased c :: MQTTClient
c@MQTTClient{TVar (Maybe (Async ()))
TVar Word16
TVar (Map Word16 Topic)
TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
TVar (Map Topic Word16)
TVar ConnACKFlags
TVar ConnState
MVar (IO ())
TChan MQTTPkt
Map Word16 PublishRequest
Map ByteString MessageCallback
MessageCallback
_ch :: MQTTClient -> TChan MQTTPkt
_pktID :: MQTTClient -> TVar Word16
_cb :: MQTTClient -> MessageCallback
_acks :: MQTTClient -> TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: MQTTClient -> Map Word16 PublishRequest
_st :: MQTTClient -> TVar ConnState
_ct :: MQTTClient -> TVar (Maybe (Async ()))
_outA :: MQTTClient -> TVar (Map Topic Word16)
_inA :: MQTTClient -> TVar (Map Word16 Topic)
_connACKFlags :: MQTTClient -> TVar ConnACKFlags
_corr :: MQTTClient -> Map ByteString MessageCallback
_cbM :: MQTTClient -> MVar (IO ())
_cbHandle :: MQTTClient -> TVar (Maybe (Async ()))
_ch :: TChan MQTTPkt
_pktID :: TVar Word16
_cb :: MessageCallback
_acks :: TVar (Map (DispatchType, Word16) (TChan MQTTPkt))
_inflight :: Map Word16 PublishRequest
_st :: TVar ConnState
_ct :: TVar (Maybe (Async ()))
_outA :: TVar (Map Topic Word16)
_inA :: TVar (Map Word16 Topic)
_connACKFlags :: TVar ConnACKFlags
_corr :: Map ByteString MessageCallback
_cbM :: MVar (IO ())
_cbHandle :: TVar (Maybe (Async ()))
..} Topic
t ByteString
m Bool
r QoS
q [Property]
props = do
x <- MQTTClient -> IO Word16
maxAliases MQTTClient
c
(t', n) <- alias x
let np = [Property]
props [Property] -> [Property] -> [Property]
forall a. Semigroup a => a -> a -> a
<> case Word16
n of
Word16
0 -> [Property]
forall a. Monoid a => a
mempty
Word16
_ -> [Word16 -> Property
PropTopicAlias Word16
n]
publishq c t' m r q np
where
alias :: Word16 -> IO (Topic, Word16)
alias Word16
mv = STM (Topic, Word16) -> IO (Topic, Word16)
forall a. STM a -> IO a
atomically (STM (Topic, Word16) -> IO (Topic, Word16))
-> STM (Topic, Word16) -> IO (Topic, Word16)
forall a b. (a -> b) -> a -> b
$ do
as <- TVar (Map Topic Word16) -> STM (Map Topic Word16)
forall a. TVar a -> STM a
readTVar TVar (Map Topic Word16)
_outA
let n = Int -> Word16
forall a. Enum a => Int -> a
toEnum (Map Topic Word16 -> Int
forall a. Map Topic a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Map Topic Word16
as Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
cur = Topic -> Map Topic Word16 -> Maybe Word16
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Topic
t Map Topic Word16
as
v = Word16 -> Maybe Word16 -> Word16
forall a. a -> Maybe a -> a
fromMaybe (if Word16
n Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
mv then Word16
0 else Word16
n) Maybe Word16
cur
when (v > 0) $ writeTVar _outA (Map.insert t v as)
pure (maybe t (const "") cur, v)
registerCorrelated :: MQTTClient -> BL.ByteString -> MessageCallback -> STM ()
registerCorrelated :: MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient{Map ByteString MessageCallback
_corr :: MQTTClient -> Map ByteString MessageCallback
_corr :: Map ByteString MessageCallback
_corr} ByteString
bs MessageCallback
cb = ByteString
-> MessageCallback -> Map ByteString MessageCallback -> STM ()
forall k v. Ord k => k -> v -> Map k v -> STM ()
Decaying.insert ByteString
bs MessageCallback
cb Map ByteString MessageCallback
_corr
unregisterCorrelated :: MQTTClient -> BL.ByteString -> STM ()
unregisterCorrelated :: MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient{Map ByteString MessageCallback
_corr :: MQTTClient -> Map ByteString MessageCallback
_corr :: Map ByteString MessageCallback
_corr} ByteString
bs = ByteString -> Map ByteString MessageCallback -> STM ()
forall k v. Ord k => k -> Map k v -> STM ()
Decaying.delete ByteString
bs Map ByteString MessageCallback
_corr