1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
// Copyright (C) 2017 The Qt Company Ltd.
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
#ifndef QMQTTCONNECTION_P_H
#define QMQTTCONNECTION_P_H
//
// W A R N I N G
// -------------
//
// This file is not part of the Qt API. It exists purely as an
// implementation detail. This header file may change from version to
// version without notice, or even be removed.
//
// We mean it.
//
#include "qmqttclient.h"
#include "qmqttcontrolpacket_p.h"
#include "qmqttmessage.h"
#include "qmqttsubscription.h"
#include <QtCore/QBasicTimer>
#include <QtCore/QBuffer>
#include <QtCore/QHash>
#include <QtCore/QObject>
#include <QtCore/QSharedPointer>
#include <QtCore/QtEndian>
#include <memory>
QT_BEGIN_NAMESPACE
class QMqttClientPrivate;
class Q_AUTOTEST_EXPORT QMqttConnection : public QObject
{
Q_OBJECT
public:
enum InternalConnectionState {
BrokerDisconnected = 0,
BrokerConnecting,
BrokerWaitForConnectAck,
BrokerConnected,
ClientDestruction
};
explicit QMqttConnection(QObject *parent = nullptr);
~QMqttConnection() override;
void disconnectAndResetTransport();
void setTransport(QIODevice *device, QMqttClient::TransportType transport);
void connectTransport(QMqttClient::TransportType transport, const std::shared_ptr<QIODevice> &device);
QIODevice *transport() const;
bool ensureTransport(bool createSecureIfNeeded = false);
bool ensureTransportOpen(const QString &sslPeerName = QString());
bool sendControlConnect();
bool sendControlAuthenticate(const QMqttAuthenticationProperties &properties);
qint32 sendControlPublish(const QMqttTopicName &topic, const QByteArray &message, quint8 qos = 0, bool retain = false,
const QMqttPublishProperties &properties = QMqttPublishProperties());
bool sendControlPublishAcknowledge(quint16 id);
bool sendControlPublishRelease(quint16 id);
bool sendControlPublishReceive(quint16 id);
bool sendControlPublishComp(quint16 id);
QMqttSubscription *sendControlSubscribe(const QMqttTopicFilter &topic, quint8 qos, const QMqttSubscriptionProperties &properties);
bool sendControlUnsubscribe(const QMqttTopicFilter &topic, const QMqttUnsubscriptionProperties &properties);
bool sendControlPingRequest(bool isAuto = true);
bool sendControlDisconnect();
void setClientPrivate(QMqttClientPrivate *clientPrivate);
inline quint16 unusedPacketIdentifier() const;
inline InternalConnectionState internalState() const { return m_internalState; }
inline void setClientDestruction() { m_internalState = ClientDestruction; }
void cleanSubscriptions();
private:
void transportConnectionEstablished();
void transportConnectionClosed();
void transportReadyRead();
void transportError(QAbstractSocket::SocketError e);
protected:
void timerEvent(QTimerEvent *event) override;
public:
std::shared_ptr<QIODevice> m_transport;
QMqttClient::TransportType m_transportType{QMqttClient::IODevice};
bool m_transportIsSet{false}; // connectTo* does not reset m_transport;
bool m_connectedTransport{false}; // We have connected the signals for m_transport if true
QMqttClientPrivate *m_clientPrivate{nullptr};
#ifndef QT_NO_SSL
QSslConfiguration m_sslConfiguration;
#endif
private:
Q_DISABLE_COPY(QMqttConnection)
void finalize_auth();
void finalize_connack();
void finalize_suback();
void finalize_unsuback();
void finalize_publish();
void finalize_pubAckRecRelComp();
void finalize_pingresp();
void processData();
bool processDataHelper();
bool readBuffer(char *data, quint64 size);
qint32 readVariableByteInteger(qint64 *dataSize = nullptr);
void readAuthProperties(QMqttAuthenticationProperties &properties);
void readConnackProperties(QMqttServerConnectionProperties &properties);
void readMessageStatusProperties(QMqttMessageStatusProperties &properties);
void readPublishProperties(QMqttPublishProperties &properties);
void readSubscriptionProperties(QMqttSubscription *sub);
QByteArray writeConnectProperties();
QByteArray writeLastWillProperties() const;
QByteArray writePublishProperties(const QMqttPublishProperties &properties);
QByteArray writeSubscriptionProperties(const QMqttSubscriptionProperties &properties);
QByteArray writeUnsubscriptionProperties(const QMqttUnsubscriptionProperties &properties);
QByteArray writeAuthenticationProperties(const QMqttAuthenticationProperties &properties);
void closeConnection(QMqttClient::ClientError error);
QByteArray readBuffer(quint64 size);
template<typename T> T readBufferTyped(qint64 *dataSize = nullptr);
QByteArray m_readBuffer;
int m_readPosition{0};
qint64 m_missingData{0};
struct PublishData {
quint8 qos;
bool dup;
bool retain;
};
PublishData m_currentPublish{0, false, false};
QMqttControlPacket::PacketType m_currentPacket{QMqttControlPacket::UNKNOWN};
bool writePacketToTransport(const QMqttControlPacket &p);
QHash<quint16, QMqttSubscription *> m_pendingSubscriptionAck;
QHash<quint16, QMqttSubscription *> m_pendingUnsubscriptions;
QHash<QMqttTopicFilter, QMqttSubscription *> m_activeSubscriptions;
QHash<quint16, QSharedPointer<QMqttControlPacket>> m_pendingMessages;
QHash<quint16, QSharedPointer<QMqttControlPacket>> m_pendingReleaseMessages;
InternalConnectionState m_internalState{BrokerDisconnected};
QBasicTimer m_pingTimer;
int m_pingTimeout{0};
QList<QMqttTopicName> m_receiveAliases;
QList<QMqttTopicName> m_publishAliases;
};
QT_END_NAMESPACE
#endif // QMQTTCONNECTION_P_H
|