WebSocket MQTT 订阅

结合 MQTT 客户端和 WebSocket 连接。

WebSocket MQTT 订阅 展示了如何设计自定义 QIODevice,以便将 WebSocket 连接与 QMqttClient 结合。

创建自定义 QIODevice

新的自定义设备 WebSocketIODevice 必须是 QIODevice 的子类

class WebSocketIODevice : public QIODevice
{
    Q_OBJECT
public:
    WebSocketIODevice(QObject *parent = nullptr);

    bool isSequential() const override;
    qint64 bytesAvailable() const override;

    bool open(OpenMode mode) override;
    void close() override;

    qint64 readData(char *data, qint64 maxlen) override;
    qint64 writeData(const char *data, qint64 len) override;

    void setUrl(const QUrl &url);
    void setProtocol(const QByteArray &data);
Q_SIGNALS:
    void socketConnected();

public slots:
    void handleBinaryMessage(const QByteArray &msg);
    void onSocketConnected();

private:
    QByteArray m_protocol;
    QByteArray m_buffer;
    QWebSocket m_socket;
    QUrl m_url;
};

设计用于管理连接和订阅的类

WebSocketIODevice 将成为 ClientSubscription 类的私有成员,与 QMqttClientQMqttSubscription 一起

private:
    QMqttClient m_client;
    QMqttSubscription *m_subscription;
    QUrl m_url;
    QString m_topic;
    WebSocketIODevice m_device;
    int m_version;

订阅和接收消息

主要逻辑在 ClientSubscription 类的 connectAndSubscribe() 方法中实现。在通过 WebSocket 初始化 MQTT 连接之前,您需要验证 WebSocket 连接是否成功。MQTT 连接建立后,QMqttClient 可以订阅主题。如果订阅成功,可以使用 QMqttSubscription 通过 ClientSubscription 类的 handleMessage() 方法接收订阅主题的消息。

void ClientSubscription::connectAndSubscribe()
{
    qCDebug(lcWebSocketMqtt) << "Connecting to broker at " << m_url;

    m_device.setUrl(m_url);
    m_device.setProtocol(m_version == 3 ? "mqttv3.1" : "mqtt");

    connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() {
        qCDebug(lcWebSocketMqtt) << "WebSocket connected, initializing MQTT connection.";

        m_client.setProtocolVersion(m_version == 3 ? QMqttClient::MQTT_3_1 : QMqttClient::MQTT_3_1_1);
        m_client.setTransport(&m_device, QMqttClient::IODevice);

        connect(&m_client, &QMqttClient::connected, this, [this]() {
            qCDebug(lcWebSocketMqtt) << "MQTT connection established";

            m_subscription = m_client.subscribe(m_topic);
            if (!m_subscription) {
                qDebug() << "Failed to subscribe to " << m_topic;
                emit errorOccured();
            }

            connect(m_subscription, &QMqttSubscription::stateChanged,
                    [](QMqttSubscription::SubscriptionState s) {
                qCDebug(lcWebSocketMqtt) << "Subscription state changed:" << s;
            });

            connect(m_subscription, &QMqttSubscription::messageReceived,
                    [this](QMqttMessage msg) {
                handleMessage(msg.payload());
            });
        });

        m_client.connectToHost();
    });
    if (!m_device.open(QIODevice::ReadWrite))
        qDebug() << "Could not open socket device";
}

文件

© 2024 The Qt Company Ltd. 本文档中包含的贡献文档的版权属于各自的所有者。本提供的文档是根据自由软件基金会发布的 GNU 自由文档许可证版本 1.3 的条款许可的。Qt 和相应的徽标是芬兰和/或其他国家的 The Qt Company Ltd. 的商标。所有其他商标均为各自所有者的财产。