WebSocket MQTT 订阅
结合 MQTT 客户端和 WebSocket 连接。
WebSocket MQTT 订阅 展示了如何设计自定义 QIODevice,以便将 WebSocket 连接与 QMqttClient 结合。
创建自定义 QIODevice
新的自定义设备 WebSocketIODevice
必须是 QIODevice 的子类
class WebSocketIODevice : public QIODevice { Q_OBJECT public: WebSocketIODevice(QObject *parent = nullptr); bool isSequential() const override; qint64 bytesAvailable() const override; bool open(OpenMode mode) override; void close() override; qint64 readData(char *data, qint64 maxlen) override; qint64 writeData(const char *data, qint64 len) override; void setUrl(const QUrl &url); void setProtocol(const QByteArray &data); Q_SIGNALS: void socketConnected(); public slots: void handleBinaryMessage(const QByteArray &msg); void onSocketConnected(); private: QByteArray m_protocol; QByteArray m_buffer; QWebSocket m_socket; QUrl m_url; };
设计用于管理连接和订阅的类
WebSocketIODevice
将成为 ClientSubscription
类的私有成员,与 QMqttClient 和 QMqttSubscription 一起
private: QMqttClient m_client; QMqttSubscription *m_subscription; QUrl m_url; QString m_topic; WebSocketIODevice m_device; int m_version;
订阅和接收消息
主要逻辑在 ClientSubscription
类的 connectAndSubscribe()
方法中实现。在通过 WebSocket 初始化 MQTT 连接之前,您需要验证 WebSocket 连接是否成功。MQTT 连接建立后,QMqttClient 可以订阅主题。如果订阅成功,可以使用 QMqttSubscription 通过 ClientSubscription
类的 handleMessage()
方法接收订阅主题的消息。
void ClientSubscription::connectAndSubscribe() { qCDebug(lcWebSocketMqtt) << "Connecting to broker at " << m_url; m_device.setUrl(m_url); m_device.setProtocol(m_version == 3 ? "mqttv3.1" : "mqtt"); connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() { qCDebug(lcWebSocketMqtt) << "WebSocket connected, initializing MQTT connection."; m_client.setProtocolVersion(m_version == 3 ? QMqttClient::MQTT_3_1 : QMqttClient::MQTT_3_1_1); m_client.setTransport(&m_device, QMqttClient::IODevice); connect(&m_client, &QMqttClient::connected, this, [this]() { qCDebug(lcWebSocketMqtt) << "MQTT connection established"; m_subscription = m_client.subscribe(m_topic); if (!m_subscription) { qDebug() << "Failed to subscribe to " << m_topic; emit errorOccured(); } connect(m_subscription, &QMqttSubscription::stateChanged, [](QMqttSubscription::SubscriptionState s) { qCDebug(lcWebSocketMqtt) << "Subscription state changed:" << s; }); connect(m_subscription, &QMqttSubscription::messageReceived, [this](QMqttMessage msg) { handleMessage(msg.payload()); }); }); m_client.connectToHost(); }); if (!m_device.open(QIODevice::ReadWrite)) qDebug() << "Could not open socket device"; }
文件
© 2024 The Qt Company Ltd. 本文档中包含的贡献文档的版权属于各自的所有者。本提供的文档是根据自由软件基金会发布的 GNU 自由文档许可证版本 1.3 的条款许可的。Qt 和相应的徽标是芬兰和/或其他国家的 The Qt Company Ltd. 的商标。所有其他商标均为各自所有者的财产。