diff --git a/src/ddpclient.cpp b/src/ddpclient.cpp index 7c26d461..24cf762c 100644 --- a/src/ddpclient.cpp +++ b/src/ddpclient.cpp @@ -1,368 +1,414 @@ /* * * Copyright 2016 Riccardo Iaconelli * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of * the License or (at your option) version 3 or any later version * accepted by the membership of KDE e.V. (or its successor approved * by the membership of KDE e.V.), which shall act as a proxy * defined in Section 14 of version 3 of the license. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #include "ddpclient.h" #include #include #include #include #include #include "ruqola.h" void process_test(QJsonDocument doc) { qDebug() << "Callback test:" << doc; qDebug() << "End callback"; } void login_callback(QJsonDocument doc) { qDebug() << "LOGIN:" << doc; Ruqola::self()->setAuthToken(doc.object().value("token").toString()); qDebug() << "End callback"; } void DDPClient::resume_login_callback(QJsonDocument doc) { qDebug() << "LOGIN:" << doc; Ruqola::self()->setAuthToken(doc.object().value("token").toString()); qDebug() << "End callback"; } +unsigned DDPClient::messageID() +{ + return m_uid; +} + +void DDPClient::setMessageID(unsigned id) +{ + if ( m_uid != id ){ + m_uid = id; + } +} void empty_callback(QJsonDocument doc) { Q_UNUSED(doc); } DDPClient::DDPClient(const QString& url, QObject* parent) : QObject(parent), m_url(url), m_uid(1), m_loginJob(0), m_loginStatus(NotConnected), m_connected(false), m_attemptedPasswordLogin(false), m_attemptedTokenLogin(false) { m_webSocket.ignoreSslErrors(); connect(&m_webSocket, &QWebSocket::connected, this, &DDPClient::onWSConnected); connect(&m_webSocket, &QWebSocket::textMessageReceived, this, &DDPClient::onTextMessageReceived); connect(&m_webSocket, &QWebSocket::disconnected, this, &DDPClient::WSclosed); connect(Ruqola::self(), &Ruqola::serverURLChanged, this, &DDPClient::onServerURLChange); if (!url.isEmpty()) { m_webSocket.open(QUrl("wss://"+url+"/websocket")); } qDebug() << "Trying to connect to URL" << url; } DDPClient::~DDPClient() { m_webSocket.close(); } void DDPClient::onServerURLChange() { if (Ruqola::self()->serverURL() != m_url || !m_webSocket.isValid()) { if (m_webSocket.isValid()) { m_webSocket.flush(); m_webSocket.close(); } m_url = Ruqola::self()->serverURL(); m_webSocket.open(QUrl("wss://"+m_url+"/websocket")); connect(&m_webSocket, &QWebSocket::connected, this, &DDPClient::onWSConnected); qDebug() << "Reconnecting" << m_url; //<< m_webSocket.st; } } DDPClient::LoginStatus DDPClient::loginStatus() const { return m_loginStatus; } bool DDPClient::isConnected() const { return m_connected; } bool DDPClient::isLoggedIn() const { return m_loginStatus == LoggedIn; } -bool unsentMessages(){ - if ( !DDPClient::m_messageQueue.empty() ){ - QPair pair = DDPClient::m_messageQueue.head(); +//bool unsentMessages(){ +// if ( !DDPClient::m_messageQueue.empty() ){ +// QPair pair = DDPClient::m_messageQueue.head(); +// int id = pair.first; +// QJsonDocument params = pair.second; +// if (DDPClient::loginStatus() == DDPClient::LoggedIn){ +// MessageQueue::method("sendMessage", params); +// } +// //if it is sent successfully, dequeue it +// //else it'll stay at head in queue for sending again +// QHash::iterator it = DDPClient::m_messageStatus.find(id); +// if ( it!= DDPClient::m_messageStatus.end() ){ +// if ( it.value() == true ) +// DDPClient::m_messageQueue.dequeue(); +// } +// } +//} + +void MessageQueue::retry() +{ + if ( Ruqola::self()->loginStatus() == DDPClient::LoggedIn && !m_messageQueue.empty() ){ + QPair pair = MessageQueue::m_messageQueue.head(); int id = pair.first; QJsonDocument params = pair.second; - if (DDPClient::loginStatus() == DDPClient::LoggedIn){ - DDPClient::method("sendMessage", params); - } + MessageQueue::method("sendMessage", params); //if it is sent successfully, dequeue it //else it'll stay at head in queue for sending again - QHash::iterator it = DDPClient::m_messageStatus.find(id); - if ( it!= DDPClient::m_messageStatus.end() ){ - if ( it.value() == true ) - DDPClient::m_messageQueue.dequeue(); - } + QHash::iterator it = MessageQueue::m_messageStatus.find(id); + if ( it!= MessageQueue::m_messageStatus.end() && it.value() == true ){ + MessageQueue::m_messageQueue.dequeue(); + } } } -unsigned int DDPClient::method(const QString& m, const QJsonDocument& params) +unsigned int MessageQueue::method(const QString& m, const QJsonDocument& params) { return method(m, params, empty_callback); } -unsigned int DDPClient::method(const QString& method, const QJsonDocument& params, std::function callback) +unsigned int MessageQueue::method(const QString& method, const QJsonDocument& params, std::function callback) { QJsonObject json; json["msg"] = "method"; json["method"] = method; - json["id"] = QString::number(m_uid); + json["id"] = QString::number(Ruqola::self()->ddp()->messageID()); if (params.isArray()){ json["params"] = params.array(); } else if (params.isObject()) { QJsonArray arr; arr.append(params.object()); json["params"] = arr; } - qint64 bytes = m_webSocket.sendTextMessage(QJsonDocument(json).toJson(QJsonDocument::Compact)); + qint64 bytes = Ruqola::self()->ddp()->m_webSocket.sendTextMessage(QJsonDocument(json).toJson(QJsonDocument::Compact)); if (bytes < json.length()) { qDebug() << "ERROR! I couldn't send all of my message. This is a bug! (try again)"; - qDebug() << m_webSocket.isValid() << m_webSocket.error() << m_webSocket.requestUrl(); - - //try sending the message again - DDPClient::m_messageQueue.enqueue(qMakePair(m_uid-1, params)); - DDPClient::m_messageStatus.insert(m_uid-1,false); + qDebug() << Ruqola::self()->ddp()->m_webSocket.isValid() << Ruqola::self()->ddp()->m_webSocket.error() << Ruqola::self()->ddp()->m_webSocket.requestUrl(); + //enqueue unsent messages + MessageQueue::m_messageQueue.enqueue(qMakePair((Ruqola::self()->ddp()->messageID())-1, params)); + MessageQueue::m_messageStatus.insert((Ruqola::self()->ddp()->messageID())-1,false); + //and retry + MessageQueue::retry(); } else { qDebug() << "Successfully sent " << json; - DDPClient::m_messageStatus.insert(m_uid-1,true); + QHash::iterator it = MessageQueue::m_messageStatus.find((Ruqola::self()->ddp()->messageID())-1); + if (it.value() == false){ + it.value() = true; + } } //callback(QJsonDocument::fromJson(json.toUtf8())); - m_callbackHash[m_uid] = callback; + Ruqola::self()->ddp()->m_callbackHash[Ruqola::self()->ddp()->messageID()] = callback; - m_uid++; - return m_uid - 1 ; + Ruqola::self()->ddp()->setMessageID((Ruqola::self()->ddp()->messageID())+1); + return (Ruqola::self()->ddp()->messageID()) - 1 ; } -void DDPClient::subscribe(const QString& collection, const QJsonArray& params) +void MessageQueue::loginStatusChanged() +{ + if (Ruqola::self()->ddp()->loginStatus() == DDPClient::LoggedIn && !m_messageQueue.empty()){ + //retry sending messages + MessageQueue::retry(); + } else if (Ruqola::self()->ddp()->loginStatus() != DDPClient::LoggedIn && !m_messageQueue.empty()){ + //save messages in messageQueue in local cache and retry after client is loggedIn + } +} + +void MessageQueue::subscribe(const QString& collection, const QJsonArray& params) { QJsonObject json; json["msg"] = "sub"; - json["id"] = QString::number(m_uid); + json["id"] = QString::number(Ruqola::self()->ddp()->messageID()); json["name"] = collection; json["params"] = params; - qint64 bytes = m_webSocket.sendTextMessage(QJsonDocument(json).toJson(QJsonDocument::Compact)); + qint64 bytes = Ruqola::self()->ddp()->m_webSocket.sendTextMessage(QJsonDocument(json).toJson(QJsonDocument::Compact)); if (bytes < json.length()) { qDebug() << "ERROR! I couldn't send all of my message. This is a bug! (try again)"; + + //enqueue unsent messages + MessageQueue::m_messageQueue.enqueue(qMakePair((Ruqola::self()->ddp()->messageID())-1, params)); + MessageQueue::m_messageStatus.insert((Ruqola::self()->ddp()->messageID())-1,false); + //and retry + MessageQueue::retry(); } - - m_uid++; + Ruqola::self()->ddp()->setMessageID((Ruqola::self()->ddp()->messageID())+1); } void DDPClient::onTextMessageReceived(QString message) { QJsonDocument response = QJsonDocument::fromJson(message.toUtf8()); if (!response.isNull() && response.isObject()) { QJsonObject root = response.object(); QString messageType = root.value("msg").toString(); qDebug() << "--------------------"; qDebug() << "--------------------"; // qDebug() << "Root is- " << root; if (messageType == "updated") { } else if (messageType == "result") { unsigned id = root.value("id").toString().toInt(); if (m_callbackHash.contains(id)) { std::function callback = m_callbackHash.take(id); QJsonDocument res = QJsonDocument(root.value("result").toObject()); QJsonObject result = res.object(); QString type = result.value("type").toString(); QString msg = result.value("msg").toString(); QByteArray base64Image; QImage image; QString path = QStandardPaths::writableLocation(QStandardPaths::CacheLocation); QDir dir(path); if (!dir.exists()){ dir.mkdir(path); qDebug() << "Directory created at " << path; } QDir::setCurrent(path); const QDateTime currentTime = QDateTime::currentDateTime(); const QString timestamp = currentTime.toString(QLatin1String("yyyyMMdd-hhmmsszzz")); const QString filename = QString::fromLatin1("%1.jpg").arg(timestamp); if (type == "image"){ qDebug() << "I am here yay"; base64Image.append(msg); image.loadFromData(QByteArray::fromBase64(base64Image), "JPG"); if ( !image.isNull() ){ qDebug() << "Saving Image to " << path; if (image.save(filename, "JPEG") ){ qDebug() << "Image saved successfully"; } else { qDebug() << "Image NOT saved"; } } else{ qDebug() << "Image is NULL"; } } else if (type == "text"){ } callback( QJsonDocument(root.value("result").toObject()) ); } emit result(id, QJsonDocument(root.value("result").toObject())); if (id == m_loginJob) { if (root.value("error").toObject().value("error").toInt() == 403) { qDebug() << "Wrong password or token expired"; login(); // Let's keep trying to log in } else { Ruqola::self()->setAuthToken(root.value("result").toObject().value("token").toString()); setLoginStatus(DDPClient::LoggedIn); } // emit loggedInChanged(); } } else if (messageType == "connected") { qDebug() << "Connected"; m_connected = true; emit connectedChanged(); setLoginStatus(DDPClient::LoggingIn); login(); // Try to resume auth token login } else if (messageType == "error") { qDebug() << "ERROR!!" << message; } else if (messageType == "ping") { qDebug() << "Ping - Pong"; QJsonObject pong; pong["msg"] = "pong"; m_webSocket.sendBinaryMessage(QJsonDocument(pong).toJson(QJsonDocument::Compact)); } else if (messageType == "added"){ qDebug() << "ADDING" <password().isEmpty()) { // If we have a password and we couldn't log in, let's stop here if (m_attemptedPasswordLogin) { setLoginStatus(LoginFailed); return; } m_attemptedPasswordLogin = true; QJsonObject user; user["username"] = Ruqola::self()->userName(); QJsonObject json; json["password"] = Ruqola::self()->password(); json["user"] = user; - m_loginJob = method("login", QJsonDocument(json)); + m_loginJob = Ruqola::self()->messageQueue()->method("login", QJsonDocument(json)); } else if (!Ruqola::self()->authToken().isEmpty() && !m_attemptedTokenLogin) { m_attemptedPasswordLogin = true; QJsonObject json; json["resume"] = Ruqola::self()->authToken(); - m_loginJob = method("login", QJsonDocument(json)); + m_loginJob = Ruqola::self()->messageQueue()->method("login", QJsonDocument(json)); } else { setLoginStatus(LoginFailed); } } void DDPClient::logOut() { // setLoginStatus(NotConnected); m_webSocket.close(); } void DDPClient::onWSConnected() { qDebug() << "Websocket connected at URL" << m_url; QJsonArray supportedVersions; supportedVersions.append("1"); QJsonObject protocol; protocol["msg"] = "connect"; protocol["version"] = "1"; protocol["support"] = supportedVersions; // QString json("{\"msg\":\"connect\", \"version\": \"1\", \"support\": [\"1\"]}"); QByteArray serialize = QJsonDocument(protocol).toJson(QJsonDocument::Compact); qint64 bytes = m_webSocket.sendTextMessage(serialize); if (bytes < serialize.length()) { qDebug() << "ERROR! I couldn't send all of my message. This is a bug! (try again)"; } else { qDebug() << "Successfully sent " << serialize; } } void DDPClient::WSclosed() { qDebug() << "WebSocket CLOSED" << m_webSocket.closeReason() << m_webSocket.error() << m_webSocket.closeCode(); setLoginStatus(NotConnected); // m_connected = false; } diff --git a/src/ddpclient.h b/src/ddpclient.h index 63c4140d..ed0639e4 100644 --- a/src/ddpclient.h +++ b/src/ddpclient.h @@ -1,149 +1,154 @@ /* * * Copyright 2016 Riccardo Iaconelli * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of * the License or (at your option) version 3 or any later version * accepted by the membership of KDE e.V. (or its successor approved * by the membership of KDE e.V.), which shall act as a proxy * defined in Section 14 of version 3 of the license. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ #ifndef DDPCLIENT_H #define DDPCLIENT_H // #include // #include // #include #include #include #include class QJsonObject; class QJsonDocument; class QUrl; class QWebSocket; class DDPClient : public QObject { Q_OBJECT public: enum LoginStatus { NotConnected, LoggingIn, LoggedIn, LoginFailed, LoggedOut }; Q_ENUM(LoginStatus) DDPClient(const QString &url = QString(), QObject *parent = 0); ~DDPClient(); - // unsigned method(const QString &method, const QJsonObject ¶ms); - - void subscribe(const QString &collection, const QJsonArray ¶ms); - Q_INVOKABLE void login(); void logOut(); + unsigned messageID(); + void setMessageID(unsigned id); + // Q_INVOKABLE void loginWithPassword(); bool isConnected() const; bool isLoggedIn() const; void onServerURLChange(); //Again try to send unsent message; returns true if message was sent successfully bool unsentMessages(); signals: // void connected(); void connectedChanged(); void loginStatusChanged(); // void loggedInChanged(); void disconnected(); /** * @brief Emitted whenever a result is received. The parameter is the expected ID. * * @param id the ID received in the method() call */ void result(unsigned id, QJsonDocument result); void added(QJsonObject item); void changed(QJsonObject item); private slots: void onWSConnected(); void onTextMessageReceived(QString message); void WSclosed(); private: LoginStatus loginStatus() const; void setLoginStatus(LoginStatus l); void resume_login_callback(QJsonDocument doc); QString m_url; QWebSocket m_webSocket; unsigned m_uid; QHash > m_callbackHash; unsigned m_loginJob; LoginStatus m_loginStatus; bool m_connected; bool m_attemptedPasswordLogin; bool m_attemptedTokenLogin; friend class Ruqola; + friend class MessageQueue; }; -Class MessageQueue() : public QObject +class MessageQueue : public QObject { Q_OBJECT - public: +public: /** * @brief Call a method with name @param method and parameters @param params * * @param method The name of the method * @param params The parameters * @return unsigned int, the ID of the called method. Watch for it */ unsigned method(const QString &method, const QJsonDocument ¶ms); unsigned method(const QString &method, const QJsonDocument ¶ms, std::function callback); + void subscribe(const QString &collection, const QJsonArray ¶ms); + + // method which tries to resend unsuccessful messages again + void retry(); +public slots: + void loginStatusChanged(); + +private: //pair- int (m_uid), QJsonDocument (params) QQueue> m_messageQueue; //message with m_uid sent succussfully or not QHash m_messageStatus; - - - }; // #include "ddpclient.moc" #endif // DDPCLIENT_H