diff --git a/src/backend/datasources/MQTTClient.cpp b/src/backend/datasources/MQTTClient.cpp index 8ac5de150..ce91497ee 100644 --- a/src/backend/datasources/MQTTClient.cpp +++ b/src/backend/datasources/MQTTClient.cpp @@ -1,1138 +1,1139 @@ #ifdef HAVE_MQTT #include "backend/datasources/MQTTClient.h" #include "backend/datasources/MQTTSubscriptions.h" #include "backend/datasources/MQTTTopic.h" #include "backend/datasources/filters/AsciiFilter.h" #include "backend/datasources/filters/FITSFilter.h" #include "backend/datasources/filters/BinaryFilter.h" #include "backend/core/Project.h" #include "kdefrontend/spreadsheet/PlotDataDialog.h" #include "commonfrontend/spreadsheet/SpreadsheetView.h" #include "kdefrontend/datasources/MQTTErrorWidget.h" #include #include #include #include #include #include #include #include #include #include #include /*! \class MQTTClient \brief Represents data stored in a file. Reading and writing is done with the help of appropriate I/O-filters. \ingroup datasources */ MQTTClient::MQTTClient(const QString& name) : Folder(name), m_paused(false), m_prepared(false), m_keepLastValues(false), m_filter(nullptr), m_updateTimer(new QTimer(this)), m_willTimer(new QTimer(this)), m_client(new QMqttClient(this)), m_mqttTest(false), m_mqttRetain(false), m_mqttUseWill(false), m_mqttUseID(false), m_loaded(false), m_sampleRate(1), m_keepNvalues(0), m_updateInterval(1000), m_disconnectForWill(false), m_mqttUseAuthentication(false), m_subscriptionsLoaded(0), m_subscriptionCount(0), m_mqttFirstConnectEstablished(false) { //initActions(); qDebug()<<"MQTTClient constructor"; connect(m_updateTimer, &QTimer::timeout, this, &MQTTClient::read); m_willStatistics.fill(false, 15); connect(m_client, &QMqttClient::connected, this, &MQTTClient::onMqttConnect); connect(m_willTimer, &QTimer::timeout, this, &MQTTClient::setWillForMqtt); connect(m_client, &QMqttClient::errorChanged, this, &MQTTClient::mqttErrorChanged); //connect(this, &MQTTClient::mqttAllArrived, this, &MQTTClient::onAllArrived ); } MQTTClient::~MQTTClient() { + emit clientAboutToBeDeleted(m_client->hostname()); //stop reading before deleting the objects pauseReading(); qDebug()<<"destructor"; if (m_filter) delete m_filter; qDebug()<<"delete timers"; delete m_updateTimer; delete m_willTimer; qDebug()<<"disocnnect"; m_client->disconnectFromHost(); qDebug()<<"delete client"; delete m_client; } /*! * depending on the update type, periodically or on data changes, starts the timer or activates the file watchers, respectively. */ void MQTTClient::ready() { if (m_updateType == TimeInterval) m_updateTimer->start(m_updateInterval); } /*void MQTTClient::initActions() { m_reloadAction = new QAction(QIcon::fromTheme("view-refresh"), i18n("Reload"), this); connect(m_reloadAction, &QAction::triggered, this, &MQTTClient::read); m_toggleLinkAction = new QAction(i18n("Link the file"), this); m_toggleLinkAction->setCheckable(true); connect(m_toggleLinkAction, &QAction::triggered, this, &MQTTClient::linkToggled); m_plotDataAction = new QAction(QIcon::fromTheme("office-chart-line"), i18n("Plot data"), this); connect(m_plotDataAction, &QAction::triggered, this, &MQTTClient::plotData); }*/ /*! * \brief Updates this data source at this moment */ void MQTTClient::updateNow() { qDebug()<<"Update now"; m_updateTimer->stop(); read(); if (m_updateType == TimeInterval && !m_paused) m_updateTimer->start(m_updateInterval); } /*! * \brief Continue reading from the live data source after it was paused. */ void MQTTClient::continueReading() { qDebug()<<"continue reading"; m_paused = false; if (m_updateType == TimeInterval) m_updateTimer->start(m_updateInterval); } /*! * \brief Pause the reading of the live data source. */ void MQTTClient::pauseReading() { qDebug()<<"pause reading"; m_paused = true; if (m_updateType == TimeInterval) m_updateTimer->stop(); } void MQTTClient::setFilter(AbstractFileFilter* f) { m_filter = f; } AbstractFileFilter* MQTTClient::filter() const { return m_filter; } /*! * \brief Sets the source's update interval to \c interval * \param interval */ void MQTTClient::setUpdateInterval(int interval) { qDebug()<<"Update interval " << interval; m_updateInterval = interval; if(!m_paused) m_updateTimer->start(m_updateInterval); } int MQTTClient::updateInterval() const { return m_updateInterval; } /*! * \brief Sets how many values we should store * \param keepnvalues */ void MQTTClient::setKeepNvalues(int keepnvalues) { qDebug()<<"Keep N values" << keepnvalues; m_keepNvalues = keepnvalues; } int MQTTClient::keepNvalues() const { return m_keepNvalues; } bool MQTTClient::isPaused() const { return m_paused; } /*! * \brief Sets the sample rate to samplerate * \param samplerate */ void MQTTClient::setSampleRate(int samplerate) { qDebug()<<"Sample rate: " << samplerate; m_sampleRate = samplerate; } int MQTTClient::sampleRate() const { return m_sampleRate; } /*! * \brief Sets the source's reading type to readingType * \param readingType */ void MQTTClient::setReadingType(ReadingType readingType) { qDebug()<<"Read Type : " << static_cast(readingType); m_readingType = readingType; } MQTTClient::ReadingType MQTTClient::readingType() const { return m_readingType; } /*! * \brief Sets the source's update type to updatetype and handles this change * \param updatetype */ void MQTTClient::setUpdateType(UpdateType updatetype) { qDebug()<<"Update Type : " << static_cast(updatetype); if (updatetype == NewData) { m_updateTimer->stop(); } m_updateType = updatetype; } MQTTClient::UpdateType MQTTClient::updateType() const { return m_updateType; } /*QIcon MQTTClient::icon() const { QIcon icon; if (m_fileType == MQTTClient::Ascii) icon = QIcon::fromTheme("text-plain"); else if (m_fileType == MQTTClient::Binary) icon = QIcon::fromTheme("application-octet-stream"); else if (m_fileType == MQTTClient::Image) icon = QIcon::fromTheme("image-x-generic"); // TODO: HDF5, NetCDF, FITS, etc. return icon; }*/ /*QMenu* MQTTClient::createContextMenu() { QMenu* menu = AbstractPart::createContextMenu(); QAction* firstAction = 0; // if we're populating the context menu for the project explorer, then //there're already actions available there. Skip the first title-action //and insert the action at the beginning of the menu. if (menu->actions().size()>1) firstAction = menu->actions().at(1); menu->insertAction(firstAction, m_plotDataAction); menu->insertSeparator(firstAction); //TODO: doesnt' always make sense... // if (!m_fileWatched) // menu->insertAction(firstAction, m_reloadAction); // // m_toggleWatchAction->setChecked(m_fileWatched); // menu->insertAction(firstAction, m_toggleWatchAction); // // m_toggleLinkAction->setChecked(m_fileLinked); // menu->insertAction(firstAction, m_toggleLinkAction); return menu; }*/ //############################################################################## //################################# SLOTS #################################### //############################################################################## /* * called periodically or on new data changes (file changed, new data in the socket, etc.) */ void MQTTClient::read() { if (m_filter == nullptr) return; //initialize the device (file, socket, serial port), when calling this function for the first time if (!m_prepared) { qDebug()<<"Read & Connect"; m_client->connectToHost(); qDebug()<<"connectTOHost called"; m_prepared = true; } if((m_client->state() == QMqttClient::ClientState::Connected) && m_mqttFirstConnectEstablished) { qDebug()<<"Read"; emit readFromTopics(); } } /*! Saves as XML. */ void MQTTClient::save(QXmlStreamWriter* writer) const { writer->writeStartElement("MQTTClient"); writeBasicAttributes(writer); writeCommentElement(writer); //general writer->writeStartElement("general"); writer->writeAttribute("subscriptionCount", QString::number(m_mqttSubscriptions.size())); writer->writeAttribute("updateType", QString::number(m_updateType)); writer->writeAttribute("readingType", QString::number(m_readingType)); writer->writeAttribute("keepValues", QString::number(m_keepNvalues)); if (m_updateType == TimeInterval) writer->writeAttribute("updateInterval", QString::number(m_updateInterval)); if (m_readingType != TillEnd) writer->writeAttribute("sampleRate", QString::number(m_sampleRate)); writer->writeAttribute("host", m_client->hostname()); writer->writeAttribute("port", QString::number(m_client->port())); writer->writeAttribute("username", m_client->username()); writer->writeAttribute("pasword", m_client->password()); writer->writeAttribute("clientId", m_client->clientId()); writer->writeAttribute("useRetain", QString::number(m_mqttRetain)); writer->writeAttribute("useWill", QString::number(m_mqttUseWill)); writer->writeAttribute("willTopic", m_willTopic); writer->writeAttribute("willOwnMessage", m_willOwnMessage); writer->writeAttribute("willQoS", QString::number(m_willQoS)); writer->writeAttribute("willRetain", QString::number(m_willRetain)); writer->writeAttribute("willMessageType", QString::number(static_cast(m_willMessageType))); writer->writeAttribute("willUpdateType", QString::number(static_cast(m_willUpdateType))); writer->writeAttribute("willTimeInterval", QString::number(m_willTimeInterval)); for( int i = 0; i < m_willStatistics.count(); ++i){ writer->writeAttribute("willStatistics"+QString::number(i), QString::number(m_willStatistics[i])); } writer->writeAttribute("useID", QString::number(m_mqttUseID)); writer->writeAttribute("useAuthentication", QString::number(m_mqttUseAuthentication)); writer->writeEndElement(); //filter m_filter->save(writer); //MQTTSubscriptions for(auto* sub : children(IncludeHidden)) sub->save(writer); writer->writeEndElement(); // "MQTTClient" } /*! Loads from XML. */ bool MQTTClient::load(XmlStreamReader* reader, bool preview) { qDebug()<<"Start loading MQTTClient"; if (!readBasicAttributes(reader)) return false; QString attributeWarning = i18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs; QString str; while (!reader->atEnd()) { reader->readNext(); if (reader->isEndElement() && reader->name() == "MQTTClient") break; if (!reader->isStartElement()) continue; if (reader->name() == "comment") { if (!readCommentElement(reader)) return false; } else if (reader->name() == "general") { qDebug()<<"MQTTClient general"; attribs = reader->attributes(); str = attribs.value("subscriptionCount").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'subscriptionCount'")); else m_subscriptionCount = str.toInt(); str = attribs.value("keepValues").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'keepValues'")); else m_keepNvalues = str.toInt(); str = attribs.value("updateType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'updateType'")); else m_updateType = static_cast(str.toInt()); str = attribs.value("readingType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'readingType'")); else m_readingType = static_cast(str.toInt()); if (m_updateType == TimeInterval) { str = attribs.value("updateInterval").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'updateInterval'")); else m_updateInterval = str.toInt(); } if (m_readingType != TillEnd) { str = attribs.value("sampleRate").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'sampleRate'")); else m_sampleRate = str.toInt(); } str = attribs.value("host").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'host'")); else m_client->setHostname(str); str =attribs.value("port").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'port'")); else m_client->setPort(str.toUInt()); str = attribs.value("useAuthentication").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useAuthentication'")); else m_mqttUseAuthentication = str.toInt(); if(m_mqttUseAuthentication) { str =attribs.value("username").toString(); if(!str.isEmpty()) m_client->setUsername(str); str =attribs.value("password").toString(); if(!str.isEmpty()) m_client->setPassword(str); } str = attribs.value("useID").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useID'")); else m_mqttUseID = str.toInt(); if(m_mqttUseID) { str =attribs.value("clientId").toString(); if(!str.isEmpty()) m_client->setClientId(str); } str =attribs.value("useRetain").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useRetain'")); else m_mqttRetain = str.toInt(); str =attribs.value("useWill").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useWill'")); else m_mqttUseWill = str.toInt(); if(m_mqttUseWill) { str =attribs.value("willTopic").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTopic'")); else m_willTopic = str; str =attribs.value("willOwnMessage").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willOwnMessage'")); else m_willOwnMessage = str; str =attribs.value("willQoS").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willQoS'")); else m_willQoS = str.toUInt(); str =attribs.value("willRetain").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willRetain'")); else m_willRetain = str.toInt(); str =attribs.value("willMessageType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willMessageType'")); else m_willMessageType = static_cast(str.toInt()); str =attribs.value("willUpdateType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willUpdateType'")); else m_willUpdateType = static_cast(str.toInt()); str =attribs.value("willTimeInterval").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTimeInterval'")); else m_willTimeInterval = str.toInt(); for( int i = 0; i < m_willStatistics.count(); ++i){ str =attribs.value("willStatistics"+QString::number(i)).toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTimeInterval'")); else m_willStatistics[i] = str.toInt(); } } } else if (reader->name() == "asciiFilter") { qDebug()<<"load filter"; m_filter = new AsciiFilter(); if (!m_filter->load(reader)) return false; } else if(reader->name() == "MQTTSubscriptions") { qDebug()<<"Load MQTTSubscription"; MQTTSubscriptions* subscription = new MQTTSubscriptions(""); subscription->setMQTTClient(this); connect(subscription, &MQTTSubscriptions::loaded, this, &MQTTClient::subscriptionLoaded); if (!subscription->load(reader, preview)) { delete subscription; return false; } m_mqttSubscriptions.push_back(subscription); addChildFast(subscription); } else {// unknown element reader->raiseWarning(i18n("unknown element '%1'", reader->name().toString())); if (!reader->skipToEndElement()) return false; } } return !reader->hasError(); } void MQTTClient::setMqttClientHostPort(const QString& host, const quint16& port) { m_client->setHostname(host); m_client->setPort(port); } void MQTTClient::setMqttClientAuthentication(const QString& username, const QString& password) { m_client->setUsername(username); m_client->setPassword(password); } void MQTTClient::setMqttClientId(const QString &id){ m_client->setClientId(id); } void MQTTClient::addMqttSubscriptions(const QMqttTopicFilter& filter, const quint8& qos) { m_subscribedTopicNameQoS[filter] = qos; } void MQTTClient::onMqttConnect() { qDebug() << "on mqtt connect"; if(m_client->error() == QMqttClient::NoError) { if(!m_mqttFirstConnectEstablished) { qDebug()<<"connection made in MQTTClient"; QMapIterator i(m_subscribedTopicNameQoS); while(i.hasNext()) { i.next(); qDebug()<subscribe(i.key(), i.value()); if(temp) { qDebug()<topic()<<" "<qos(); if(!m_loaded) { m_subscriptions.push_back(temp->topic().filter()); qDebug()<<"New MQTTSubscription"; MQTTSubscriptions* newSubscription = new MQTTSubscriptions(temp->topic().filter()); newSubscription->setMQTTClient(this); qDebug()<<"Add child"; addChild(newSubscription); qDebug()<<"Add to vector"; m_mqttSubscriptions.push_back(newSubscription); } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); qDebug()<<"Added topic"; } } m_mqttFirstConnectEstablished = true; emit mqttSubscribed(); } else { qDebug() << "Resubscribing after will set"; QMapIterator i(m_subscribedTopicNameQoS); while(i.hasNext()) { i.next(); QMqttSubscription *temp = m_client->subscribe(i.key(), i.value()); if(temp) { qDebug()<topic()<<" "<qos(); connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); } else qDebug()<<"Couldn't subscribe after will change"; } } } } void MQTTClient::mqttSubscribtionMessageReceived(const QMqttMessage& msg) { if(!msg.retain() || (msg.retain() && m_mqttRetain) ) { qDebug()<<"message received from "<subscriptionName(), msg.topic().name())) { m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; } /*QString subscriptionName = m_mqttSubscriptions[i]->subscriptionName(); if(subscriptionName.contains('#') || subscriptionName.contains('+')) { if(subscriptionName.contains('#')) { if(msg.topic().name().startsWith(subscriptionName.left(subscriptionName.count() - 2)) ){ m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; } } else if (subscriptionName.contains('+')) { int pos = subscriptionName.indexOf('+'); QString start = subscriptionName.left(pos); QString end = subscriptionName.right(subscriptionName.count() - pos); if(msg.topic().name().startsWith(start) && msg.topic().name().endsWith(end)) { m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; } } } else if(subscriptionName == msg.topic().name()) { m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; }*/ } if(msg.topic().name() == m_willTopic) m_willLastMessage = QString(msg.payload()); } } int MQTTClient::topicNumber() { return m_subscriptions.count(); } int MQTTClient::topicIndex(const QString& topic) { return m_subscriptions.indexOf(topic, 0); } void MQTTClient::setMqttWillUse(bool use) { m_mqttUseWill = use; if(use == false) m_willTimer->stop(); } bool MQTTClient::mqttWillUse() const{ return m_mqttUseWill; } void MQTTClient::setWillTopic(const QString& topic) { m_willTopic = topic; } QString MQTTClient::willTopic() const{ return m_willTopic; } void MQTTClient::setWillRetain(bool retain) { m_willRetain = retain; } bool MQTTClient::willRetain() const { return m_willRetain; } void MQTTClient::setWillQoS(quint8 QoS) { m_willQoS = QoS; } quint8 MQTTClient::willQoS() const { return m_willQoS; } void MQTTClient::setWillMessageType(WillMessageType messageType) { m_willMessageType = messageType; } MQTTClient::WillMessageType MQTTClient::willMessageType() const { return m_willMessageType; } void MQTTClient::setWillOwnMessage(const QString& ownMessage) { m_willOwnMessage = ownMessage; } QString MQTTClient::willOwnMessage() const { return m_willOwnMessage; } QVector MQTTClient::topicNames() const { return m_topicNames; } void MQTTClient::setWillForMqtt() { if(m_mqttUseWill && (m_client->state() == QMqttClient::ClientState::Connected) ) { if(!m_disconnectForWill) { qDebug() << "Disconnecting from host"; m_client->disconnectFromHost(); m_disconnectForWill = true; } setWillForMqtt(); } else if(m_mqttUseWill && (m_client->state() == QMqttClient::ClientState::Disconnected) && m_disconnectForWill) { m_client->setWillQoS(m_willQoS); qDebug()<<"Will QoS" << m_willQoS; m_client->setWillRetain(m_willRetain); qDebug()<<"Will retain" << m_willRetain; m_client->setWillTopic(m_willTopic); qDebug()<<"Will Topic" << m_willTopic; switch (m_willMessageType) { case WillMessageType::OwnMessage: m_client->setWillMessage(m_willOwnMessage.toUtf8()); qDebug()<<"Will own message" << m_willOwnMessage; break; case WillMessageType::Statistics: { qDebug()<<"Start will statistics"; QVector topics = children(AbstractAspect::Recursive); const AsciiFilter* asciiFilter = nullptr; const MQTTTopic* tempTopic = nullptr; qDebug()<<"Searching for topic"; for (int i = 0; i < topics.count(); ++i) { if(topics[i]->topicName() == m_willTopic) { asciiFilter = dynamic_cast(topics[i]->filter()); tempTopic = topics[i]; break; } } qDebug()<<"Check if topic found"; if(asciiFilter != nullptr && tempTopic != nullptr) { qDebug()<<"Checking column mode"; if((asciiFilter->mqttColumnMode() == AbstractColumn::ColumnMode::Integer) || (asciiFilter->mqttColumnMode() == AbstractColumn::ColumnMode::Numeric)) { m_client->setWillMessage(asciiFilter->mqttColumnStatistics(tempTopic).toUtf8()); qDebug() << "Will statistics message: "<< QString(m_client->willMessage()); } else { m_client->setWillMessage(QString("").toUtf8()); qDebug() << "Will statistics message: "<< QString(m_client->willMessage()); } } break; } case WillMessageType::LastMessage: m_client->setWillMessage(m_willLastMessage.toUtf8()); qDebug()<<"Will last message:\n" << m_willLastMessage; break; default: break; } m_disconnectForWill = false; m_client->connectToHost(); qDebug()<< "Reconnect to host"; } } MQTTClient::WillUpdateType MQTTClient::willUpdateType() const{ return m_willUpdateType; } void MQTTClient::setWillUpdateType(WillUpdateType updateType) { m_willUpdateType = updateType; } int MQTTClient::willTimeInterval() const{ return m_willTimeInterval; } void MQTTClient::setWillTimeInterval(int interval) { m_willTimeInterval = interval; } void MQTTClient::clearLastMessage() { m_willLastMessage.clear(); } void MQTTClient::addWillStatistics(WillStatistics statistic){ m_willStatistics[static_cast(statistic)] = true; } void MQTTClient::removeWillStatistics(WillStatistics statistic) { m_willStatistics[static_cast(statistic)] = false; } QVector MQTTClient::willStatistics() const{ return m_willStatistics; } void MQTTClient::startWillTimer() const{ if(m_willUpdateType == WillUpdateType::TimePeriod) m_willTimer->start(m_willTimeInterval); } void MQTTClient::stopWillTimer() const{ m_willTimer->stop(); } void MQTTClient::setMqttRetain(bool retain) { m_mqttRetain = retain; } bool MQTTClient::mqttRetain() const { return m_mqttRetain; } void MQTTClient::mqttErrorChanged(QMqttClient::ClientError clientError) { if(clientError != QMqttClient::ClientError::NoError) { MQTTErrorWidget* errorWidget = new MQTTErrorWidget(clientError, this); errorWidget->show(); } } QString MQTTClient::clientHostName() const{ return m_client->hostname(); } quint16 MQTTClient::clientPort() const { return m_client->port(); } QString MQTTClient::clientPassword() const{ return m_client->password(); } QString MQTTClient::clientUserName() const{ return m_client->username(); } QString MQTTClient::clientID () const{ return m_client->clientId(); } void MQTTClient::setMQTTUseID(bool use) { m_mqttUseID = use; } bool MQTTClient::mqttUseID() const { return m_mqttUseID; } void MQTTClient::setMQTTUseAuthentication(bool use) { m_mqttUseAuthentication = use; } bool MQTTClient::mqttUseAuthentication() const { return m_mqttUseAuthentication; } QVector MQTTClient::mqttSubscribtions() const { return m_subscriptions; } void MQTTClient::newMQTTSubscription(const QString& topic, quint8 QoS) { QMqttTopicFilter filter {topic}; QMqttSubscription* temp = m_client->subscribe(filter, QoS); if (temp) { qDebug()<topic()<<" "<qos(); m_subscriptions.push_back(temp->topic().filter()); m_subscribedTopicNameQoS[temp->topic().filter()] = temp->qos(); qDebug()<<"New MQTTSubscription"; MQTTSubscriptions* newSubscription = new MQTTSubscriptions(temp->topic().filter()); newSubscription->setMQTTClient(this); qDebug()<<"Add child"; addChild(newSubscription); qDebug()<<"Add to vector"; m_mqttSubscriptions.push_back(newSubscription); qDebug()<<"Added topic"; qDebug()<<"Check for inferior subscriptions"; bool found = false; QVector inferiorSubscriptions; for(int i = 0; i < m_mqttSubscriptions.size(); ++i) { if(checkTopicContains(topic, m_mqttSubscriptions[i]->subscriptionName()) && topic != m_mqttSubscriptions[i]->subscriptionName()) { found = true; inferiorSubscriptions.push_back(m_mqttSubscriptions[i]); } } if(found) { for(int sub = 0; sub < inferiorSubscriptions.size(); ++sub) { qDebug()<<"Inferior subscription: "<subscriptionName(); QVector topics = inferiorSubscriptions[sub]->topics(); qDebug()<< topics.size(); for(int i = 0; i < topics.size() ; ++i) { qDebug()<topicName(); topics[i]->reparent(newSubscription); } QMqttTopicFilter unsubscribeFilter {inferiorSubscriptions[sub]->subscriptionName()}; m_client->unsubscribe(unsubscribeFilter); for (int j = 0; j < m_mqttSubscriptions.size(); ++j) { if(m_mqttSubscriptions[j]->subscriptionName() == inferiorSubscriptions[sub]->subscriptionName()) { m_mqttSubscriptions.remove(j); } } m_subscriptions.removeAll(inferiorSubscriptions[sub]->subscriptionName()); removeChild(inferiorSubscriptions[sub]); } } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); } } void MQTTClient::removeMQTTSubscription(const QString &name) { qDebug()<<"Start to remove subscription in MQTTClient: "<unsubscribe(filter); qDebug()<<"QMqttClient's unsubscribe occured"; m_subscriptions.removeAll(name); for (int i = 0; i < m_mqttSubscriptions.size(); ++i) { if(m_mqttSubscriptions[i]->subscriptionName() == name) { qDebug()<<"Subscription name"<subscriptionName() << " "<name(); MQTTSubscriptions* removeSubscription = m_mqttSubscriptions[i]; m_mqttSubscriptions.remove(i); QVector topics = removeSubscription->topics(); for (int j = 0; j < topics.size(); ++j) { qDebug()<<"Removing topic name: "<topicName(); m_topicNames.removeAll(topics[j]->topicName()); } qDebug()<<"Removed from topic names and subscription names"; removeChild(removeSubscription); qDebug()<<"removed child"; break; } } QMapIterator j(m_subscribedTopicNameQoS); while(j.hasNext()) { j.next(); if(j.key().filter() == name) { m_subscribedTopicNameQoS.remove(j.key()); qDebug()<<"Removed from TopicNameQoS map "<subscribe(filter, QoS); if (temp) { qDebug()<topic()<<" "<qos(); m_subscriptions.push_back(temp->topic().filter()); m_subscribedTopicNameQoS[temp->topic().filter()] = temp->qos(); qDebug()<<"New MQTTSubscription"; MQTTSubscriptions* newSubscription = new MQTTSubscriptions(temp->topic().filter()); newSubscription->setMQTTClient(this); qDebug()<<"Add child"; addChild(newSubscription); qDebug()<<"Add to vector"; m_mqttSubscriptions.push_back(newSubscription); qDebug()<<"Added topic"; qDebug()<<"Check for superior subscription"; bool found = false; MQTTSubscriptions* superiorSubscription; for(int i = 0; i < m_mqttSubscriptions.size(); ++i) { if(checkTopicContains(m_mqttSubscriptions[i]->subscriptionName(), topic) && topic != m_mqttSubscriptions[i]->subscriptionName()) { found = true; superiorSubscription = m_mqttSubscriptions[i]; break; } } if(found) { qDebug()<<"Superior subscription: "<subscriptionName(); QVector topics = superiorSubscription->topics(); qDebug()<< topics.size(); QVector inferiorTopics; for(int i = 0; i < topics.size(); ++i) { if(checkTopicContains(topic, topics[i]->topicName())) { inferiorTopics.push_back(topics[i]); } } for(int i = 0; i < inferiorTopics.size() ; ++i) { qDebug()<topicName(); inferiorTopics[i]->reparent(newSubscription); } } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); } } void MQTTClient::reparentTopic(const QString& topic, const QString& parent) { bool found = false; MQTTSubscriptions* superiorSubscription; for(int i = 0; i < m_mqttSubscriptions.size(); ++i) { if(m_mqttSubscriptions[i]->subscriptionName() == parent) { found = true; superiorSubscription = m_mqttSubscriptions[i]; break; } } if(found) { qDebug()<<"Superior subscription: "<subscriptionName(); QVector topics = children(AbstractAspect::Recursive); qDebug()<< topics.size(); for(int i = 0; i < topics.size(); ++i) { qDebug()<topicName()<<" "<parentAspect()->name(); if(topic == topics[i]->topicName()) { qDebug()<topicName()<<" "<subscriptionName(); topics[i]->reparent(superiorSubscription); break; } } } qDebug()<<"reparent done"; } void MQTTClient::subscriptionLoaded(const QString &name) { qDebug()< 0 && matchIndex < firstList.size() -1) { for(int j = matchIndex +1; j < firstList.size(); ++j) { if(firstList.at(j) != secondtList.at(j)) { differ = true; break; } } } else differ = true; if(!differ) { for(int i = 0; i < firstList.size(); ++i) { if(i != matchIndex) commonTopic.append(firstList.at(i)); else commonTopic.append("+"); if(i != firstList.size() - 1) commonTopic.append("/"); } } } } qDebug() << "Common topic: "< #include #include #include #include #include #include #include class QString; class AbstractFileFilter; class MQTTSubscriptions; class QAction; class MQTTClient : public Folder{ Q_OBJECT public: enum UpdateType { TimeInterval = 0, NewData }; enum ReadingType { ContinuousFixed = 0, FromEnd, TillEnd }; enum WillMessageType { OwnMessage = 0, Statistics, LastMessage }; enum WillUpdateType { TimePeriod = 0, OnClick }; enum WillStatistics { Minimum = 0, Maximum, ArithmeticMean, GeometricMean, HarmonicMean, ContraharmonicMean, Median, Variance, StandardDeviation, MeanDeviation, MeanDeviationAroundMedian, MedianDeviation, Skewness, Kurtosis, Entropy }; explicit MQTTClient(const QString& name); virtual ~MQTTClient() override; void ready(); UpdateType updateType() const; void setUpdateType(UpdateType); ReadingType readingType() const; void setReadingType(ReadingType); int sampleRate() const; void setSampleRate(int); bool isPaused() const; void setUpdateInterval(int); int updateInterval() const; void setKeepNvalues(int); int keepNvalues() const; void setKeepLastValues(bool); bool keepLastValues() const; void setMqttClientHostPort(const QString&, const quint16&); void setMqttClientAuthentication(const QString&, const QString&); void setMqttClientId(const QString&); QMqttClient mqttClient() const; void addMqttSubscriptions(const QMqttTopicFilter&, const quint8&); QVector mqttSubscribtions() const; bool checkTopicContains(const QString& superior, const QString& inferior); QString checkCommonLevel(const QString& first, const QString& second); QString clientHostName() const; quint16 clientPort() const; QString clientPassword() const; QString clientUserName() const; QString clientID () const; void updateNow(); void pauseReading(); void continueReading(); void setFilter(AbstractFileFilter*); AbstractFileFilter* filter() const; /*QIcon icon() const override; QMenu* createContextMenu() override; QWidget* view() const override;*/ void save(QXmlStreamWriter*) const override; bool load(XmlStreamReader*, bool preview) override; int topicNumber(); int topicIndex(const QString&); QVector topicNames() const; bool checkAllArrived(); void setMqttWillUse(bool); bool mqttWillUse() const; void setWillTopic(const QString&); QString willTopic() const; void setWillRetain(bool); bool willRetain() const; void setWillQoS(quint8); quint8 willQoS() const; void setWillMessageType(WillMessageType); WillMessageType willMessageType() const; void setWillOwnMessage(const QString&); QString willOwnMessage() const; WillUpdateType willUpdateType() const; void setWillUpdateType(WillUpdateType); int willTimeInterval() const; void setWillTimeInterval(int); void startWillTimer() const; void stopWillTimer() const; void setWillForMqtt() ; void setMqttRetain(bool); bool mqttRetain() const; void setMQTTUseID(bool); bool mqttUseID() const; void setMQTTUseAuthentication(bool); bool mqttUseAuthentication() const; void clearLastMessage(); void addWillStatistics(WillStatistics); void removeWillStatistics(WillStatistics); QVector willStatistics() const; void newMQTTSubscription(const QString&, quint8); void removeMQTTSubscription(const QString&); void addBeforeRemoveSubscription(const QString&, quint8); void reparentTopic(const QString& topic, const QString& parent); private: //void initActions(); UpdateType m_updateType; ReadingType m_readingType; bool m_paused; bool m_prepared; bool m_keepLastValues; int m_sampleRate; int m_keepNvalues; int m_updateInterval; AbstractFileFilter* m_filter; QTimer* m_updateTimer; /* QAction* m_reloadAction; QAction* m_toggleLinkAction; QAction* m_showEditorAction; QAction* m_showSpreadsheetAction; QAction* m_plotDataAction;*/ QMqttClient* m_client; QMap m_subscribedTopicNameQoS; QVector m_subscriptions; QVector m_topicNames; bool m_mqttTest; bool m_mqttUseWill; QString m_willMessage; QString m_willTopic; bool m_willRetain; quint8 m_willQoS; WillMessageType m_willMessageType; QString m_willOwnMessage; QString m_willLastMessage; QTimer* m_willTimer; int m_willTimeInterval; WillUpdateType m_willUpdateType; QVector m_willStatistics; bool m_mqttFirstConnectEstablished; bool m_mqttRetain; bool m_mqttUseID; bool m_mqttUseAuthentication; QVector m_mqttSubscriptions; bool m_disconnectForWill; bool m_loaded; int m_subscriptionsLoaded; int m_subscriptionCount; public slots: void read(); private slots: void onMqttConnect(); void mqttSubscribtionMessageReceived(const QMqttMessage&); void mqttErrorChanged(QMqttClient::ClientError); void subscriptionLoaded(const QString&); signals: void mqttSubscribed(); void mqttNewTopicArrived(); void readFromTopics(); + void clientAboutToBeDeleted(const QString&); }; #endif #endif // MQTTCLIENT_H diff --git a/src/kdefrontend/dockwidgets/LiveDataDock.cpp b/src/kdefrontend/dockwidgets/LiveDataDock.cpp index fb9df7d83..74e47397a 100644 --- a/src/kdefrontend/dockwidgets/LiveDataDock.cpp +++ b/src/kdefrontend/dockwidgets/LiveDataDock.cpp @@ -1,1406 +1,1434 @@ /*************************************************************************** File : LiveDataDock.cpp Project : LabPlot Description : Dock widget for live data properties -------------------------------------------------------------------- Copyright : (C) 2017 by Fabian Kristof (fkristofszabolcs@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #include "LiveDataDock.h" #include #include #include #include #include #include #ifdef HAVE_MQTT #include #endif LiveDataDock::LiveDataDock(QWidget* parent) : QWidget(parent), #ifdef HAVE_MQTT //m_client(new QMqttClient()), m_searching(true), m_previousMQTTClient(nullptr), m_searchTimer(new QTimer()), //m_messageTimer(new QTimer()), m_interpretMessage(true), #endif m_paused(false) { ui.setupUi(this); ui.bUpdateNow->setIcon(QIcon::fromTheme(QLatin1String("view-refresh"))); connect(ui.bPausePlayReading, &QPushButton::clicked, this, &LiveDataDock::pauseContinueReading); connect(ui.bUpdateNow, &QPushButton::clicked, this, &LiveDataDock::updateNow); connect(ui.sbUpdateInterval, static_cast(&QSpinBox::valueChanged), this, &LiveDataDock::updateIntervalChanged); connect(ui.sbKeepNValues, static_cast(&QSpinBox::valueChanged), this, &LiveDataDock::keepNValuesChanged); connect(ui.sbSampleSize, static_cast(&QSpinBox::valueChanged), this, &LiveDataDock::sampleSizeChanged); connect(ui.cbUpdateType, static_cast(&QComboBox::currentIndexChanged), this, &LiveDataDock::updateTypeChanged); connect(ui.cbReadingType, static_cast(&QComboBox::currentIndexChanged), this, &LiveDataDock::readingTypeChanged); #ifdef HAVE_MQTT m_searchTimer->setInterval(10000); //connect(m_client, &QMqttClient::connected, this, &LiveDataDock::onMQTTConnect); //connect(m_client, &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceived); connect(this, &LiveDataDock::newTopic, this, &LiveDataDock::setCompleter); connect(m_searchTimer, &QTimer::timeout, this, &LiveDataDock::topicTimeout); connect(ui.bSubscribe, &QPushButton::clicked, this, &LiveDataDock::addSubscription); connect(ui.bUnsubscribe, &QPushButton::clicked, this, &LiveDataDock::removeSubscription); //connect(m_messageTimer, &QTimer::timeout, this, &LiveDataDock::stopStartReceive); connect(ui.chbWill, &QCheckBox::stateChanged, this, &LiveDataDock::useWillMessage); connect(ui.cbWillQoS, static_cast(&QComboBox::currentIndexChanged), this, &LiveDataDock::willQoSChanged); connect(ui.chbWillRetain, &QCheckBox::stateChanged, this, &LiveDataDock::willRetainChanged); connect(ui.cbWillTopic, &QComboBox::currentTextChanged, this, &LiveDataDock::willTopicChanged); connect(ui.cbWillMessageType, static_cast(&QComboBox::currentIndexChanged), this, &LiveDataDock::willMessageTypeChanged); connect(ui.leWillOwnMessage, &QLineEdit::textChanged, this, &LiveDataDock::willOwnMessageChanged); connect(ui.cbWillUpdate, static_cast(&QComboBox::currentIndexChanged), this, &LiveDataDock::willUpdateChanged); connect(ui.bWillUpdateNow, &QPushButton::clicked, this, &LiveDataDock::willUpdateNow); connect(ui.leWillUpdateInterval, &QLineEdit::textChanged, this, &LiveDataDock::willUpdateIntervalChanged); connect(ui.lwWillStatistics, &QListWidget::itemChanged, this, &LiveDataDock::statisticsChanged); connect(ui.leTopics, &QLineEdit::textChanged, this, &LiveDataDock::searchTreeItem); ui.bSubscribe->setIcon(ui.bSubscribe->style()->standardIcon(QStyle::SP_ArrowRight)); ui.bUnsubscribe->setIcon(ui.bUnsubscribe->style()->standardIcon(QStyle::SP_BrowserStop)); #endif } LiveDataDock::~LiveDataDock() { + delete m_searchTimer; + QMapIterator clients(m_clients); + while(clients.hasNext()) { + clients.next(); + delete clients.value(); + } } #ifdef HAVE_MQTT /*! * \brief Sets the MQTTClients of this dock widget * \param clients */ void LiveDataDock::setMQTTClients(const QList &clients) { m_liveDataSources.clear(); m_mqttClients.clear(); m_mqttClients = clients; const MQTTClient* const fds = clients.at(0); ui.sbUpdateInterval->setValue(fds->updateInterval()); ui.cbUpdateType->setCurrentIndex(static_cast(fds->updateType())); ui.cbReadingType->setCurrentIndex(static_cast(fds->readingType())); if (fds->updateType() == MQTTClient::UpdateType::NewData) { ui.lUpdateInterval->hide(); ui.sbUpdateInterval->hide(); } if (fds->isPaused()) { ui.bPausePlayReading->setText(i18n("Continue reading")); ui.bPausePlayReading->setIcon(QIcon::fromTheme(QLatin1String("media-record"))); } else { ui.bPausePlayReading->setText(i18n("Pause reading")); ui.bPausePlayReading->setIcon(QIcon::fromTheme(QLatin1String("media-playback-pause"))); } ui.sbKeepNValues->setValue(fds->keepNvalues()); if (fds->readingType() == MQTTClient::ReadingType::TillEnd) { ui.lSampleSize->hide(); ui.sbSampleSize->hide(); } else ui.sbSampleSize->setValue(fds->sampleRate()); int itemIdx = -1; for (int i = 0; i < ui.cbReadingType->count(); ++i) { if (ui.cbReadingType->itemText(i) == QLatin1String("Read whole file")) { itemIdx = i; break; } } if (itemIdx != -1) { ui.cbReadingType->removeItem(itemIdx); } ui.chbWill->hide(); ui.chbWillRetain->hide(); ui.cbWillQoS->hide(); ui.cbWillMessageType->hide(); ui.cbWillTopic->hide(); ui.cbWillUpdate->hide(); ui.leWillOwnMessage->hide(); ui.leWillUpdateInterval->setValidator(new QIntValidator(2, 1000000) ); ui.leWillUpdateInterval->hide(); ui.lWillMessageType->hide(); ui.lWillOwnMessage->hide(); ui.lWillQos->hide(); ui.lWillTopic->hide(); ui.lWillUpdate->hide(); ui.lWillUpdateInterval->hide(); ui.bWillUpdateNow->hide(); ui.lwWillStatistics->hide(); ui.lWillStatistics->hide(); ui.gbManageSubscriptions->show(); ui.bSubscribe->show(); ui.bUnsubscribe->show(); ui.twTopics->show(); ui.leTopics->show(); ui.lTopicSearch->show(); ui.twSubscriptions->show(); ui.lQoS->show(); ui.cbQoS->show(); ui.chbWill->show(); if(m_clients[fds->clientHostName()] == nullptr) { m_clients[fds->clientHostName()] = new QMqttClient(); + connect(fds, &MQTTClient::clientAboutToBeDeleted, this, &LiveDataDock::removeClient); + connect(m_clients[fds->clientHostName()], &QMqttClient::connected, this, &LiveDataDock::onMQTTConnect); connect(m_clients[fds->clientHostName()], &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceived); m_clients[fds->clientHostName()]->setHostname(fds->clientHostName()); m_clients[fds->clientHostName()]->setPort(fds->clientPort()); if(fds->mqttUseAuthentication()) { m_clients[fds->clientHostName()]->setUsername(fds->clientUserName()); m_clients[fds->clientHostName()]->setPassword(fds->clientPassword()); } if(fds->mqttUseID()) { m_clients[fds->clientHostName()]->setClientId(fds->clientID()); } m_clients[fds->clientHostName()]->connectToHost(); } if(m_previousMQTTClient == nullptr) { connect(fds, &MQTTClient::mqttSubscribed, this, &LiveDataDock::fillSubscriptions); connect(fds, &MQTTClient::mqttNewTopicArrived, this, &LiveDataDock::updateTopics); } else if(m_previousMQTTClient->clientHostName() != fds->clientHostName()) { + qDebug()<<"At load host name not equal: "<clientHostName()<<" "<clientHostName(); disconnect(m_previousMQTTClient, &MQTTClient::mqttSubscribed, this, &LiveDataDock::fillSubscriptions); disconnect(m_previousMQTTClient, &MQTTClient::mqttNewTopicArrived, this, &LiveDataDock::updateTopics); disconnect(m_clients[m_previousMQTTClient->clientHostName()], &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceived); + connect(m_clients[m_previousMQTTClient->clientHostName()], &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceivedInBackground); + + disconnect(m_clients[fds->clientHostName()], &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceivedInBackground); ui.twTopics->clear(); for(int i = 0; i < m_addedTopics[fds->clientHostName()].size(); ++i) { addTopicToTree(m_addedTopics[fds->clientHostName()].at(i)); } ui.twSubscriptions->clear(); fillSubscriptions(); connect(fds, &MQTTClient::mqttSubscribed, this, &LiveDataDock::fillSubscriptions); connect(fds, &MQTTClient::mqttNewTopicArrived, this, &LiveDataDock::updateTopics); connect(m_clients[fds->clientHostName()], &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceived); - } ui.leWillOwnMessage->setText(fds->willOwnMessage()); ui.leWillUpdateInterval->setText(QString::number(fds->willTimeInterval())); qDebug()<<"update type at setup "<(fds->willUpdateType()) <<" start index "<currentIndex(); ui.cbWillUpdate->setCurrentIndex(static_cast(fds->willUpdateType()) ); fds->startWillTimer(); ui.cbWillMessageType->setCurrentIndex(static_cast(fds->willMessageType()) ); ui.cbWillQoS->setCurrentIndex(fds->willQoS()); ui.cbWillTopic->setCurrentText(fds->willTopic()); ui.chbWillRetain->setChecked(fds->willRetain()); QVector statitics = fds->willStatistics(); for(int i = 0; i < statitics.count(); ++i) { QListWidgetItem* item = ui.lwWillStatistics->item(static_cast(i)); if(statitics[i]) { item->setCheckState(Qt::Checked); } else { item->setCheckState(Qt::Unchecked); } } qDebug()<<"chbWill is set to: "<mqttWillUse(); //when chbWill's isChecked corresponds with source's m_mqttWillUse it doesn't emit state changed signal, we have to force it bool checked = fds->mqttWillUse(); ui.chbWill->setChecked(!checked); ui.chbWill->setChecked(checked); m_previousMQTTClient = fds; } #endif /*! * \brief Sets the live data sources of this dock widget * \param sources */ void LiveDataDock::setLiveDataSources(const QList& sources) { #ifdef HAVE_MQTT m_mqttClients.clear(); #endif m_liveDataSources = sources; const LiveDataSource* const fds = sources.at(0); ui.sbUpdateInterval->setValue(fds->updateInterval()); ui.cbUpdateType->setCurrentIndex(static_cast(fds->updateType())); ui.cbReadingType->setCurrentIndex(static_cast(fds->readingType())); if (fds->updateType() == LiveDataSource::UpdateType::NewData) { ui.lUpdateInterval->hide(); ui.sbUpdateInterval->hide(); } if (fds->isPaused()) { ui.bPausePlayReading->setText(i18n("Continue Reading")); ui.bPausePlayReading->setIcon(QIcon::fromTheme(QLatin1String("media-record"))); } else { ui.bPausePlayReading->setText(i18n("Pause Reading")); ui.bPausePlayReading->setIcon(QIcon::fromTheme(QLatin1String("media-playback-pause"))); } ui.sbKeepNValues->setValue(fds->keepNValues()); // disable "whole file" when having no file (i.e. socket or port) const QStandardItemModel* model = qobject_cast(ui.cbReadingType->model()); QStandardItem* item = model->item(LiveDataSource::ReadingType::WholeFile); if (fds->sourceType() == LiveDataSource::SourceType::FileOrPipe) item->setFlags(Qt::ItemIsSelectable | Qt::ItemIsEnabled); else item->setFlags(item->flags() & ~(Qt::ItemIsSelectable | Qt::ItemIsEnabled)); if (fds->readingType() == LiveDataSource::ReadingType::TillEnd || fds->readingType() == LiveDataSource::ReadingType::WholeFile) { ui.lSampleSize->hide(); ui.sbSampleSize->hide(); } else ui.sbSampleSize->setValue(fds->sampleSize()); ui.chbWill->hide(); ui.chbWillRetain->hide(); ui.cbWillQoS->hide(); ui.cbWillMessageType->hide(); ui.cbWillTopic->hide(); ui.cbWillUpdate->hide(); ui.leWillOwnMessage->hide(); ui.leWillUpdateInterval->setValidator(new QIntValidator(2, 1000000) ); ui.leWillUpdateInterval->hide(); ui.lWillMessageType->hide(); ui.lWillOwnMessage->hide(); ui.lWillQos->hide(); ui.lWillTopic->hide(); ui.lWillUpdate->hide(); ui.lWillUpdateInterval->hide(); ui.bWillUpdateNow->hide(); ui.lwWillStatistics->hide(); ui.lWillStatistics->hide(); ui.bSubscribe->hide(); ui.bUnsubscribe->hide(); ui.twTopics->hide(); ui.leTopics->hide(); ui.lTopicSearch->hide(); ui.twSubscriptions->hide(); ui.gbManageSubscriptions->hide(); ui.lQoS->hide(); ui.cbQoS->hide(); } /*! * \brief Modifies the sample rate of the live data sources or MQTTClient objects * \param sampleRate */ void LiveDataDock::sampleSizeChanged(int sampleSize) { if(!m_liveDataSources.isEmpty()) { for (auto* source : m_liveDataSources) source->setSampleSize(sampleSize); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { for (auto* client : m_mqttClients) client->setSampleRate(sampleSize); } #endif } /*! * \brief Updates the live data sources now */ void LiveDataDock::updateNow() { if(!m_liveDataSources.isEmpty()) { for (auto* source : m_liveDataSources) source->updateNow(); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { for (auto* client : m_mqttClients) client->updateNow(); } #endif } /*! * \brief LiveDataDock::updateTypeChanged * \param idx */ void LiveDataDock::updateTypeChanged(int idx) { if(!m_liveDataSources.isEmpty()) { DEBUG("LiveDataDock::updateTypeChanged()"); LiveDataSource::UpdateType type = static_cast(idx); switch (type) { case LiveDataSource::UpdateType::TimeInterval: ui.lUpdateInterval->show(); ui.sbUpdateInterval->show(); for (auto* source: m_liveDataSources) { source->setUpdateType(type); source->setUpdateInterval(ui.sbUpdateInterval->value()); source->setFileWatched(false); } break; case LiveDataSource::UpdateType::NewData: ui.lUpdateInterval->hide(); ui.sbUpdateInterval->hide(); for (auto* source: m_liveDataSources) { source->setFileWatched(true); source->setUpdateType(type); } } } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { MQTTClient::UpdateType type = static_cast(idx); if (type == MQTTClient::UpdateType::TimeInterval) { ui.lUpdateInterval->show(); ui.sbUpdateInterval->show(); for (auto* client : m_mqttClients) { client->setUpdateType(type); client->setUpdateInterval(ui.sbUpdateInterval->value()); } } else if (type == MQTTClient::UpdateType::NewData) { ui.lUpdateInterval->hide(); ui.sbUpdateInterval->hide(); for (auto* client : m_mqttClients) { client->setUpdateType(type); } } } #endif } /*! * \brief Handles the change of the reading type in the dock widget * \param idx */ void LiveDataDock::readingTypeChanged(int idx) { if(!m_liveDataSources.isEmpty()) { LiveDataSource::ReadingType type = static_cast(idx); if (type == LiveDataSource::ReadingType::TillEnd) { ui.lSampleSize->hide(); ui.sbSampleSize->hide(); } else { ui.lSampleSize->show(); ui.sbSampleSize->show(); } for (auto* source : m_liveDataSources) source->setReadingType(type); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { MQTTClient::ReadingType type = static_cast(idx); if (type == MQTTClient::ReadingType::TillEnd) { ui.lSampleSize->hide(); ui.sbSampleSize->hide(); } else { ui.lSampleSize->show(); ui.sbSampleSize->show(); } for (auto* client : m_mqttClients) client->setReadingType(type); } #endif } /*! * \brief Modifies the update interval of the live data sources * \param updateInterval */ void LiveDataDock::updateIntervalChanged(int updateInterval) { if(!m_liveDataSources.isEmpty()) { for (auto* source : m_liveDataSources) source->setUpdateInterval(updateInterval); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { for (auto* client : m_mqttClients) client->setUpdateInterval(updateInterval); } #endif } /*! * \brief Modifies the number of samples to keep in each of the live data sources * \param keepNvalues */ void LiveDataDock::keepNValuesChanged(const int keepNValues) { if(!m_liveDataSources.isEmpty()) { for (auto* source : m_liveDataSources) source->setKeepNValues(keepNValues); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { for (auto* client : m_mqttClients) client->setKeepNvalues(keepNValues); } #endif } /*! * \brief Pauses the reading of the live data source */ void LiveDataDock::pauseReading() { if(!m_liveDataSources.isEmpty()) { for (auto* source: m_liveDataSources) source->pauseReading(); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { for (auto* client : m_mqttClients) client->pauseReading(); } #endif } /*! * \brief Continues the reading of the live data source */ void LiveDataDock::continueReading() { if(!m_liveDataSources.isEmpty()) { for (auto* source: m_liveDataSources) source->continueReading(); } #ifdef HAVE_MQTT else if (!m_mqttClients.isEmpty()) { for (auto* client : m_mqttClients) client->continueReading(); } #endif } /*! * \brief Handles the pausing/continuing of reading of the live data source */ void LiveDataDock::pauseContinueReading() { m_paused = !m_paused; if (m_paused) { pauseReading(); ui.bPausePlayReading->setText(i18n("Continue Reading")); ui.bPausePlayReading->setIcon(QIcon::fromTheme(QLatin1String("media-record"))); } else { continueReading(); ui.bPausePlayReading->setText(i18n("Pause Reading")); ui.bPausePlayReading->setIcon(QIcon::fromTheme(QLatin1String("media-playback-pause"))); } } #ifdef HAVE_MQTT void LiveDataDock::useWillMessage(int state) { qDebug()<<"will checkstate changed" <setMqttWillUse(true); ui.chbWillRetain->show(); ui.cbWillQoS->show(); ui.cbWillMessageType->show(); ui.cbWillTopic->show(); ui.cbWillUpdate->show(); ui.lWillMessageType->show(); ui.lWillQos->hide(); ui.lWillTopic->show(); ui.lWillUpdate->show(); if (ui.cbWillMessageType->currentIndex() == (int)MQTTClient::WillMessageType::OwnMessage) { ui.leWillOwnMessage->show(); ui.lWillOwnMessage->show(); } else if(ui.cbWillMessageType->currentIndex() == (int)MQTTClient::WillMessageType::Statistics){ ui.lWillStatistics->show(); ui.lwWillStatistics->show(); } if(ui.cbWillUpdate->currentIndex() == static_cast(MQTTClient::WillUpdateType::TimePeriod)) { ui.leWillUpdateInterval->show(); ui.lWillUpdateInterval->show(); } else if (ui.cbWillUpdate->currentIndex() == static_cast(MQTTClient::WillUpdateType::OnClick)) ui.bWillUpdateNow->show(); } else if (state == Qt::Unchecked) { for (auto* source: m_mqttClients) source->setMqttWillUse(false); ui.chbWillRetain->hide(); ui.cbWillQoS->hide(); ui.cbWillMessageType->hide(); ui.cbWillTopic->hide(); ui.cbWillUpdate->hide(); ui.leWillOwnMessage->hide(); ui.leWillUpdateInterval->hide(); ui.lWillMessageType->hide(); ui.lWillOwnMessage->hide(); ui.lWillQos->hide(); ui.lWillTopic->hide(); ui.lWillUpdate->hide(); ui.lWillUpdateInterval->hide(); ui.bWillUpdateNow->hide(); ui.lWillStatistics->hide(); ui.lwWillStatistics->hide(); } } void LiveDataDock::willQoSChanged(int QoS) { for (auto* source: m_mqttClients) source->setWillQoS(QoS); } void LiveDataDock::willRetainChanged(int state) { if(state == Qt::Checked) { for (auto* source: m_mqttClients) source->setWillRetain(true); } else if (state == Qt::Unchecked) { for (auto* source: m_mqttClients) source->setWillRetain(false); } } void LiveDataDock::willTopicChanged(const QString& topic) { qDebug()<<"topic changed" << topic; for (auto* source: m_mqttClients) { if(source->willTopic() != topic) source->clearLastMessage(); source->setWillTopic(topic); } } void LiveDataDock::willMessageTypeChanged(int type) { qDebug()<<"message type changed" << type; for (auto* source: m_mqttClients) source->setWillMessageType(static_cast (type)); if(static_cast (type) == MQTTClient::WillMessageType::OwnMessage) { ui.leWillOwnMessage->show(); ui.lWillOwnMessage->show(); ui.lWillStatistics->hide(); ui.lwWillStatistics->hide(); } else if(static_cast (type) == MQTTClient::WillMessageType::LastMessage) { ui.leWillOwnMessage->hide(); ui.lWillOwnMessage->hide(); ui.lWillStatistics->hide(); ui.lwWillStatistics->hide(); } else if(static_cast (type) == MQTTClient::WillMessageType::Statistics) { ui.lWillStatistics->show(); ui.lwWillStatistics->show(); ui.leWillOwnMessage->hide(); ui.lWillOwnMessage->hide(); } } void LiveDataDock::willOwnMessageChanged(const QString& message) { qDebug()<<"own message changed" << message; for (auto* source: m_mqttClients) source->setWillOwnMessage(message); } void LiveDataDock::updateTopics() { ui.cbWillTopic->clear(); const MQTTClient* const fds = m_mqttClients.at(0); QVector topics = fds->topicNames(); if(!topics.isEmpty()) { for(int i = 0; i < topics.count(); i++) { qDebug()<<"Live Data Dock: updating will topics: "<findText(topics[i]) < 0) ui.cbWillTopic->addItem(topics[i]); } if(!fds->willTopic().isEmpty()) ui.cbWillTopic->setCurrentText(fds->willTopic()); } else qDebug()<<"Topic Vector Empty"; } void LiveDataDock::willUpdateChanged(int updateType) { qDebug()<<"Update type changed" << updateType; for (auto* source: m_mqttClients) source->setWillUpdateType(static_cast(updateType)); if(static_cast(updateType) == MQTTClient::WillUpdateType::TimePeriod) { ui.bWillUpdateNow->hide(); ui.leWillUpdateInterval->show(); ui.lWillUpdateInterval->show(); for (auto* source: m_mqttClients) { source->setWillTimeInterval(ui.leWillUpdateInterval->text().toInt()); source->startWillTimer(); } } else if (static_cast(updateType) == MQTTClient::WillUpdateType::OnClick) { ui.bWillUpdateNow->show(); ui.leWillUpdateInterval->hide(); ui.lWillUpdateInterval->hide(); for (auto* source: m_mqttClients) source->stopWillTimer(); } } void LiveDataDock::willUpdateNow() { for (auto* source: m_mqttClients) source->setWillForMqtt(); } void LiveDataDock::willUpdateIntervalChanged(const QString& interval) { qDebug()<<"Update interval changed " <setWillTimeInterval(interval.toInt()); source->startWillTimer(); } } void LiveDataDock::statisticsChanged(QListWidgetItem *item) { int idx = -1; for(int i = 0; i < ui.lwWillStatistics->count(); i++) if(item->text() == ui.lwWillStatistics->item(i)->text()) { idx = i; break; } if(item->checkState() == Qt::Checked) { if(idx >= 0) { for (auto* source: m_mqttClients) source->addWillStatistics(static_cast(idx) ); } } else { if(idx >= 0){ for (auto* source: m_mqttClients) source->removeWillStatistics(static_cast(idx) ); } } } void LiveDataDock::onMQTTConnect() { QMqttTopicFilter globalFilter{"#"}; QMqttSubscription * subscription = m_clients[m_mqttClients.first()->clientHostName()]->subscribe(globalFilter, 1); if(!subscription) qDebug()<<"Couldn't make global subscription in LiveDataDock"; - //m_messageTimer->start(3000); - QTimer::singleShot(3000, this, &LiveDataDock::fillSubscriptions); + //m_messageTimer->start(3000); } void LiveDataDock::mqttMessageReceived(const QByteArray& message, const QMqttTopicName& topic) { if(!m_addedTopics[m_mqttClients.first()->clientHostName()].contains(topic.name())) { m_addedTopics[m_mqttClients.first()->clientHostName()].push_back(topic.name()); addTopicToTree(topic.name()); } } void LiveDataDock::removeSubscription() { QTreeWidgetItem* unsubscribeItem = ui.twSubscriptions->currentItem(); if(unsubscribeItem != nullptr) { if(unsubscribeItem->parent() == nullptr) { for (auto* source: m_mqttClients) { source->removeMQTTSubscription(unsubscribeItem->text(0)); } ui.twSubscriptions->takeTopLevelItem(ui.twSubscriptions->indexOfTopLevelItem(unsubscribeItem)); } else{ while(unsubscribeItem->parent() != nullptr) { for(int i = 0; i < unsubscribeItem->parent()->childCount(); ++i) { if(unsubscribeItem->text(0) != unsubscribeItem->parent()->child(i)->text(0)) { for (auto* source: m_mqttClients) { source->addBeforeRemoveSubscription(unsubscribeItem->parent()->child(i)->text(0), ui.cbQoS->currentIndex()); } ui.twSubscriptions->addTopLevelItem(unsubscribeItem->parent()->takeChild(i)); i--; } } unsubscribeItem = unsubscribeItem->parent(); } for (auto* source: m_mqttClients) { source->removeMQTTSubscription(unsubscribeItem->text(0)); } ui.twSubscriptions->takeTopLevelItem(ui.twSubscriptions->indexOfTopLevelItem(unsubscribeItem)); manageCommonLevelSubscriptions(); } } else QMessageBox::warning(this, "Warning", "You didn't select any item from the Tree Widget"); } void LiveDataDock::setCompleter(const QString& topic) { if(!m_searching) { if(!m_topicList[m_mqttClients.first()->clientHostName()].contains(topic)) { m_topicList[m_mqttClients.first()->clientHostName()].append(topic); m_completer = new QCompleter(m_topicList[m_mqttClients.first()->clientHostName()], this); m_completer->setCompletionMode(QCompleter::PopupCompletion); m_completer->setCaseSensitivity(Qt::CaseSensitive); ui.leTopics->setCompleter(m_completer); } } } void LiveDataDock::topicTimeout() { qDebug()<<"lejart ido"; m_searching = false; m_searchTimer->stop(); } void LiveDataDock::addSubscription() { QString name; QTreeWidgetItem *item = ui.twTopics->currentItem(); if(item != nullptr) { QTreeWidgetItem *tempItem = item; name.prepend(item->text(0)); if(item->childCount() != 0) name.append("/#"); while(tempItem->parent() != nullptr) { tempItem = tempItem->parent(); name.prepend(tempItem->text(0) + "/"); } QList topLevelList = ui.twSubscriptions->findItems(name, Qt::MatchExactly); if(topLevelList.isEmpty() || topLevelList.first()->parent() != nullptr) { qDebug() << name; bool foundSuperior = false; for(int i = 0; i < ui.twSubscriptions->topLevelItemCount(); ++i) { qDebug()<topLevelItemCount(); if(checkTopicContains(name, ui.twSubscriptions->topLevelItem(i)->text(0)) && name != ui.twSubscriptions->topLevelItem(i)->text(0)) { qDebug()<<"1"<topLevelItem(i)->text(0); ui.twSubscriptions->topLevelItem(i)->takeChildren(); ui.twSubscriptions->takeTopLevelItem(i); qDebug()<<"After Delete"; i--; continue; } qDebug()<<"checked inferior"; if(checkTopicContains(ui.twSubscriptions->topLevelItem(i)->text(0), name) && name != ui.twSubscriptions->topLevelItem(i)->text(0)) { foundSuperior = true; qDebug()<<"2"<topLevelItem(i)->text(0); break; } qDebug()<<"checked superior"; } if(!foundSuperior) { qDebug()<<"Adding new topic"; QStringList toplevelName; toplevelName.push_back(name); QTreeWidgetItem* newTopLevelItem = new QTreeWidgetItem(toplevelName); ui.twSubscriptions->addTopLevelItem(newTopLevelItem); if(name.endsWith("#")) { addSubscriptionChildren(item, newTopLevelItem); } for (auto* source: m_mqttClients) { source->newMQTTSubscription(name, ui.cbQoS->currentIndex()); } } if(name.endsWith("#") && !foundSuperior) { QStringList nameList = name.split('/', QString::SkipEmptyParts); QString root = nameList.first(); QVector children; for(int i = 0; i < ui.twSubscriptions->topLevelItemCount(); ++i) { if(ui.twSubscriptions->topLevelItem(i)->text(0).startsWith(root) && name != ui.twSubscriptions->topLevelItem(i)->text(0)) { children.clear(); findSubscriptionLeafChildren(children, ui.twSubscriptions->topLevelItem(i)); for(int j = 0; j < children.size(); ++j) { if(checkTopicContains(name, children[j]->text(0))) { qDebug()<text(0); QTreeWidgetItem* unsubscribeItem = children[j]; while(unsubscribeItem->parent() != nullptr) { for(int i = 0; i < unsubscribeItem->parent()->childCount(); ++i) { qDebug()<parent()->childCount(); if(unsubscribeItem->text(0) != unsubscribeItem->parent()->child(i)->text(0)) { for (auto* source: m_mqttClients) { qDebug()<parent()->child(i)->text(0)<<"Add "; source->addBeforeRemoveSubscription(unsubscribeItem->parent()->child(i)->text(0), ui.cbQoS->currentIndex()); } ui.twSubscriptions->addTopLevelItem(unsubscribeItem->parent()->takeChild(i)); i--; } else { for (auto* source: m_mqttClients) { source->reparentTopic(unsubscribeItem->text(0), name); } } } unsubscribeItem = unsubscribeItem->parent(); } qDebug()<<"Remove: "<text(0); for (auto* source: m_mqttClients) { source->removeMQTTSubscription(unsubscribeItem->text(0)); } ui.twSubscriptions->takeTopLevelItem(ui.twSubscriptions->indexOfTopLevelItem(unsubscribeItem)); } } } } } if(!foundSuperior) manageCommonLevelSubscriptions(); if(foundSuperior) { QMessageBox::warning(this, "Warning", "You already subscribed to a topic containing this one"); } } else QMessageBox::warning(this, "Warning", "You already subscribed to this topic"); } else QMessageBox::warning(this, "Warning", "You didn't select any item from the Tree Widget"); } void LiveDataDock::fillSubscriptions() { const MQTTClient* const fds = m_mqttClients.at(0); ui.twSubscriptions->clear(); QVector subscriptions = fds->mqttSubscribtions(); for (int i = 0; i < subscriptions.count(); ++i) { QStringList name; name.append(subscriptions[i]); bool found = false; for(int j = 0; j < ui.twSubscriptions->topLevelItemCount(); ++j) { if(ui.twSubscriptions->topLevelItem(j)->text(0) == subscriptions[i]) { found = true; break; } } if(!found) { qDebug()<<"add:" << subscriptions[i]; QTreeWidgetItem* newItem = new QTreeWidgetItem(name); ui.twSubscriptions->addTopLevelItem(newItem); name.clear(); name = subscriptions[i].split('/', QString::SkipEmptyParts); QTreeWidgetItem* topic = nullptr; for(int j = 0; j < ui.twTopics->topLevelItemCount(); ++j) { if(ui.twTopics->topLevelItem(j)->text(0) == name[0]) { qDebug()<<"found top level topic: "<topLevelItem(j); break; } } if(topic != nullptr && topic->childCount() > 0) { qDebug()<<"restoring Children"; restoreSubscriptionChildren(topic, newItem, name, 1); } } } m_searching = false; } /*void LiveDataDock::stopStartReceive() { if (m_interpretMessage) { m_messageTimer->stop(); disconnect(m_client, &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceived); m_interpretMessage = false; m_messageTimer->start(10000); } else { m_messageTimer->stop(); if(!m_mqttClients.isEmpty()) { connect(m_client, &QMqttClient::messageReceived, this, &LiveDataDock::mqttMessageReceived); m_interpretMessage = true; m_messageTimer->start(3000); } } }*/ bool LiveDataDock::checkTopicContains(const QString &superior, const QString &inferior) { if (superior == inferior) return true; else { if(superior.contains("/")) { QStringList superiorList = superior.split('/', QString::SkipEmptyParts); QStringList inferiorList = inferior.split('/', QString::SkipEmptyParts); if(superiorList.size() > inferiorList.size()) return false; bool ok = true; for(int i = 0; i < superiorList.size(); ++i) { if(superiorList.at(i) != inferiorList.at(i)) { if((superiorList.at(i) != "+") && !(superiorList.at(i) == "#" && i == superiorList.size() - 1)) { qDebug() <start(); qDebug()<topLevelItemCount(); ++i) if(ui.twTopics->topLevelItem(i)->text(0) == rootName) { topItemIdx = i; break; } if(topItemIdx >= 0) { qDebug() << "Scroll"; ui.twTopics->scrollToItem(ui.twTopics->topLevelItem(topItemIdx), QAbstractItemView::ScrollHint::PositionAtTop); } } QString LiveDataDock::checkCommonLevel(const QString& first, const QString& second) { qDebug()< 0 && matchIndex < firstList.size() -1) { for(int j = matchIndex +1; j < firstList.size(); ++j) { if(firstList.at(j) != secondtList.at(j)) { differ = true; break; } } } else differ = true; if(!differ) { for(int i = 0; i < firstList.size(); ++i) { if(i != matchIndex) commonTopic.append(firstList.at(i)); else commonTopic.append("+"); if(i != firstList.size() - 1) commonTopic.append("/"); } } } } qDebug() << "Common topic: "<childCount() > 0) { for(int i = 0; i < topic->childCount(); ++i) { QTreeWidgetItem* temp = topic->child(i); QString name; if(topic->child(i)->childCount() > 0) { name.append(temp->text(0) + "/#"); while(temp->parent() != nullptr) { temp = temp->parent(); name.prepend(temp->text(0) + "/"); } } else { name.append(temp->text(0)); while(temp->parent() != nullptr) { temp = temp->parent(); name.prepend(temp->text(0) + "/"); } } QStringList nameList; nameList.append(name); QTreeWidgetItem* childItem = new QTreeWidgetItem(nameList); subscription->addChild(childItem); addSubscriptionChildren(topic->child(i), childItem); } } } void LiveDataDock::restoreSubscriptionChildren(QTreeWidgetItem * topic, QTreeWidgetItem * subscription, const QStringList& list, int level) { if(list[level] != "+" && list[level] != "#" && level < list.size() - 1) { for(int i = 0; i < topic->childCount(); ++i) { if(topic->child(i)->text(0) == list[level]) { restoreSubscriptionChildren(topic->child(i), subscription, list, level + 1); break; } } } else if (list[level] == "+") { for(int i = 0; i < topic->childCount(); ++i) { QString name; name.append(topic->child(i)->text(0)); for(int j = level + 1; j < list.size(); ++j) { name.append("/" + list[j]); } QTreeWidgetItem* temp = topic->child(i); while(temp->parent() != nullptr) { temp = temp->parent(); name.prepend(temp->text(0) + "/"); } QStringList nameList; nameList.append(name); QTreeWidgetItem* newItem = new QTreeWidgetItem(nameList); subscription->addChild(newItem); restoreSubscriptionChildren(topic->child(i), newItem, list, level + 1); } } else if (list[level] == "#") { addSubscriptionChildren(topic, subscription); } } int LiveDataDock::commonLevelIndex(const QString& first, const QString& second) { qDebug()< 0) { for(int j = matchIndex +1; j < firstList.size(); ++j) { if(firstList.at(j) != secondtList.at(j)) { differ = true; break; } } } else differ = true; if(!differ) { for(int i = 0; i < firstList.size(); ++i) { if(i != matchIndex) commonTopic.append(firstList.at(i)); else commonTopic.append("+"); if(i != firstList.size() - 1) commonTopic.append("/"); } } } } qDebug() << "Common topic: "<& children, QTreeWidgetItem* root) { if(root->childCount() == 0) { children.push_back(root); } else { for(int i = 0; i < root->childCount(); ++i) { findSubscriptionLeafChildren(children, root->child(i)); } } } int LiveDataDock::checkCommonChildCount(int levelIdx, int level, QStringList& commonList, QTreeWidgetItem* currentItem) { qDebug()<<"LevelIdx: "<text(0); if(levelIdx < level - 1) { if(commonList[levelIdx] != "+") { for(int j = 0; j < currentItem->childCount(); ++j) { if(currentItem->child(j)->text(0) == commonList[levelIdx]) { qDebug()<<"level index: "<child(j)->text(0)<<" "<child(j)); } } } else { int childCount = -1; bool ok = true; for(int j = 0; j < currentItem->childCount(); ++j) { int temp = checkCommonChildCount(levelIdx + 1, level, commonList, currentItem->child(j)); if((j > 0) && (temp != childCount)) { ok = false; break; } childCount = temp; } if(ok) return childCount; else return -1; } } else if (levelIdx == level - 1) { if(commonList[levelIdx] != "+") { for(int j = 0; j < currentItem->childCount(); ++j) { if(currentItem->child(j)->text(0) == commonList[levelIdx]) { qDebug()<<"level index: "<child(j)->text(0)<<" "<child(j)->childCount(); } } } else { int childCount = -1; bool ok = true; for(int j = 0; j < currentItem->childCount(); ++j) { if((j > 0) && (currentItem->child(j)->childCount() != childCount)) { ok = false; break; } childCount = currentItem->child(j)->childCount(); } if(ok) return childCount; else return -1; } } else if (level == 1 && levelIdx == 1) return currentItem->childCount(); return -1; } void LiveDataDock::manageCommonLevelSubscriptions() { bool foundEqual = false; do{ foundEqual = false; QMap> equalTopicsMap; QVector equalTopics; qDebug()<<"Search for common topic after unsubscribe"; for(int i = 0; i < ui.twSubscriptions->topLevelItemCount() - 1; ++i) { for(int j = i + 1; j < ui.twSubscriptions->topLevelItemCount(); ++j) { qDebug()<topLevelItem(i)->text(0)<<" "<topLevelItem(j)->text(0); QString commonTopic = checkCommonLevel(ui.twSubscriptions->topLevelItem(i)->text(0), ui.twSubscriptions->topLevelItem(j)->text(0)); if(!commonTopic.isEmpty()) { if(!equalTopicsMap[commonTopic].contains(ui.twSubscriptions->topLevelItem(i)->text(0))) { qDebug()<topLevelItem(i)->text(0); equalTopicsMap[commonTopic].push_back(ui.twSubscriptions->topLevelItem(i)->text(0)); } if(!equalTopicsMap[commonTopic].contains(ui.twSubscriptions->topLevelItem(j)->text(0))) { qDebug()<topLevelItem(i)->text(0); equalTopicsMap[commonTopic].push_back(ui.twSubscriptions->topLevelItem(j)->text(0)); } } } } if(!equalTopicsMap.isEmpty()) { qDebug()<<"Equal topics not empty"; QVector commonTopics; QMapIterator> topics(equalTopicsMap); while(topics.hasNext()) { topics.next(); qDebug()<<"Checking: " << topics.key(); int level = commonLevelIndex(topics.value().last(), topics.value().first()); QStringList commonList = topics.value().first().split('/', QString::SkipEmptyParts); QTreeWidgetItem* currentItem; for(int i = 0; i < ui.twTopics->topLevelItemCount(); ++i) { if(ui.twTopics->topLevelItem(i)->text(0) == commonList.first()) { currentItem = ui.twTopics->topLevelItem(i); break; } } qDebug()<<"level "< 0) { if(topics.value().size() == childCount) { foundEqual = true; commonTopics.push_back(topics.key()); qDebug()<addTopLevelItem(newTopic); for(int i = 0; i < equalTopics.size(); ++i) { for(int j = 0; j < ui.twSubscriptions->topLevelItemCount(); ++j){ if(ui.twSubscriptions->topLevelItem(j)->text(0) == equalTopics[i]) { newTopic->addChild(ui.twSubscriptions->takeTopLevelItem(j)); break; } } } for(int i = 0; i < ui.twSubscriptions->topLevelItemCount(); ++i) { if(checkTopicContains(commonTopic, ui.twSubscriptions->topLevelItem(i)->text(0)) && commonTopic != ui.twSubscriptions->topLevelItem(i)->text(0) ) { ui.twSubscriptions->topLevelItem(i)->takeChildren(); ui.twSubscriptions->takeTopLevelItem(i); i--; } } for (auto* source: m_mqttClients) { source->newMQTTSubscription(commonTopic, ui.cbQoS->currentIndex()); } } } } while(foundEqual); } void LiveDataDock::addTopicToTree(const QString &topicName) { QStringList name; QChar sep = '/'; QString rootName; if(topicName.contains(sep)) { QStringList list = topicName.split(sep, QString::SkipEmptyParts); rootName = list.at(0); name.append(list.at(0)); QTreeWidgetItem* currentItem; int topItemIdx = -1; for(int i = 0; i < ui.twTopics->topLevelItemCount(); ++i) { if(ui.twTopics->topLevelItem(i)->text(0) == list.at(0)) { topItemIdx = i; break; } } if( topItemIdx < 0) { currentItem = new QTreeWidgetItem(name); ui.twTopics->addTopLevelItem(currentItem); for(int i = 1; i < list.size(); ++i) { name.clear(); name.append(list.at(i)); currentItem->addChild(new QTreeWidgetItem(name)); currentItem = currentItem->child(0); } } else { currentItem = ui.twTopics->topLevelItem(topItemIdx); int listIdx = 1; for(; listIdx < list.size(); ++listIdx) { QTreeWidgetItem* childItem = nullptr; bool found = false; for(int j = 0; j < currentItem->childCount(); ++j) { childItem = currentItem->child(j); if(childItem->text(0) == list.at(listIdx)) { found = true; currentItem = childItem; break; } } if(!found) break; } for(; listIdx < list.size(); ++listIdx) { name.clear(); name.append(list.at(listIdx)); currentItem->addChild(new QTreeWidgetItem(name)); currentItem = currentItem->child(currentItem->childCount() - 1); } } } else { rootName = topicName; name.append(topicName); ui.twTopics->addTopLevelItem(new QTreeWidgetItem(name)); } for(int i = 0; i < ui.twSubscriptions->topLevelItemCount(); ++i) { QStringList subscriptionName = ui.twSubscriptions->topLevelItem(i)->text(0).split('/', QString::SkipEmptyParts); if (rootName == subscriptionName[0]) { qDebug()<clientHostName()].contains(topic.name())) { + //qDebug()<<"Background message: "<clientHostName()].push_back(topic.name()); + } +} + +void LiveDataDock::removeClient(const QString& name) { + + m_clients[name]->disconnectFromHost(); + + m_addedTopics.remove(name); + m_topicList.remove(name); + + delete m_clients[name]; + m_clients.remove(name); +} #endif diff --git a/src/kdefrontend/dockwidgets/LiveDataDock.h b/src/kdefrontend/dockwidgets/LiveDataDock.h index 8c2b975d4..53d3e5ae1 100644 --- a/src/kdefrontend/dockwidgets/LiveDataDock.h +++ b/src/kdefrontend/dockwidgets/LiveDataDock.h @@ -1,130 +1,133 @@ /*************************************************************************** File : LiveDataDock.h Project : LabPlot Description : Dock widget for live data properties -------------------------------------------------------------------- Copyright : (C) 2017 by Fabian Kristof (fkristofszabolcs@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #ifndef LIVEDATADOCK_H #define LIVEDATADOCK_H #ifdef HAVE_MQTT #include #include #include #include "backend/datasources/filters/AsciiFilter.h" #include "backend/datasources/MQTTClient.h" #endif #include #include #include "ui_livedatadock.h" #include "backend/datasources/LiveDataSource.h" class QTimer; class QTreeWidgetItem; class QString; class QCompleter; class LiveDataDock : public QWidget { Q_OBJECT public: explicit LiveDataDock(QWidget *parent = 0); void setLiveDataSources(const QList& sources); ~LiveDataDock(); private: Ui::LiveDataDock ui; QList m_liveDataSources; bool m_paused; void pauseReading(); void continueReading(); private slots: void updateTypeChanged(int); void readingTypeChanged(int); void sampleSizeChanged(int); void updateIntervalChanged(int); void keepNValuesChanged(int); void updateNow(); void pauseContinueReading(); #ifdef HAVE_MQTT public: void setMQTTClients(const QList& clients); private slots: void useWillMessage(int); void willQoSChanged(int); void willRetainChanged(int); void willTopicChanged(const QString &); void willMessageTypeChanged(int); void willOwnMessageChanged(const QString&); void updateTopics(); void willUpdateChanged(int); void willUpdateNow(); void willUpdateIntervalChanged(const QString&); void statisticsChanged(QListWidgetItem *); void addSubscription(); void removeSubscription(); void onMQTTConnect(); void mqttMessageReceived(const QByteArray&, const QMqttTopicName&); + void mqttMessageReceivedInBackground(const QByteArray&, const QMqttTopicName&); void setCompleter(const QString&); void topicTimeout(); void fillSubscriptions(); //void stopStartReceive(); void searchTreeItem(const QString& rootName); + void removeClient(const QString&); + signals: void newTopic(const QString&); private: void addTopicToTree(const QString&); bool checkTopicContains(const QString& superior, const QString& inferior); QString checkCommonLevel(const QString& first, const QString& second); void findSubscriptionLeafChildren(QVector&, QTreeWidgetItem*); int checkCommonChildCount(int levelIdx, int level, QStringList& namelist, QTreeWidgetItem* currentItem); void manageCommonLevelSubscriptions(); int commonLevelIndex(const QString& first, const QString& second); void addSubscriptionChildren(QTreeWidgetItem * topic, QTreeWidgetItem * subscription); void restoreSubscriptionChildren(QTreeWidgetItem * topic, QTreeWidgetItem * subscription, const QStringList&, int level); QList m_mqttClients; //QMqttClient* m_client; QMap m_clients; QCompleter* m_completer; QMap m_topicList; bool m_searching; QTimer* m_searchTimer; QTimer* m_messageTimer; bool m_interpretMessage; const MQTTClient* m_previousMQTTClient; QString m_mqttUnsubscribeName; QMap> m_addedTopics; #endif }; #endif // LIVEDATADOCK_H