diff --git a/src/backend/datasources/MQTTClient.cpp b/src/backend/datasources/MQTTClient.cpp index 68aeff6ef..2d7cc05a5 100644 --- a/src/backend/datasources/MQTTClient.cpp +++ b/src/backend/datasources/MQTTClient.cpp @@ -1,1358 +1,1358 @@ /*************************************************************************** File : MQTTClient.cpp Project : LabPlot Description : Represents a MQTT Client -------------------------------------------------------------------- Copyright : (C) 2018 Kovacs Ferencz (kferike98@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #include "backend/datasources/MQTTClient.h" #ifdef HAVE_MQTT #include "backend/datasources/MQTTSubscription.h" #include "backend/datasources/MQTTTopic.h" #include "backend/datasources/filters/AsciiFilter.h" #include "backend/datasources/filters/FITSFilter.h" #include "backend/datasources/filters/BinaryFilter.h" #include "backend/core/Project.h" #include "kdefrontend/spreadsheet/PlotDataDialog.h" #include "commonfrontend/spreadsheet/SpreadsheetView.h" #include "kdefrontend/datasources/MQTTErrorWidget.h" #include #include #include #include #include #include #include #include #include #include #include /*! \class MQTTClient \brief The MQTT Client connects to the broker set in ImportFileWidget. It manages the MQTTSubscriptions, and the MQTTTopics. \ingroup datasources */ MQTTClient::MQTTClient(const QString& name) : Folder(name), m_updateTimer(new QTimer(this)), m_client(new QMqttClient(this)), m_willTimer(new QTimer(this)) { connect(m_updateTimer, &QTimer::timeout, this, &MQTTClient::read); connect(m_client, &QMqttClient::connected, this, &MQTTClient::onMQTTConnect); connect(m_willTimer, &QTimer::timeout, this, &MQTTClient::updateWillMessage); connect(m_client, &QMqttClient::errorChanged, this, &MQTTClient::MQTTErrorChanged); } MQTTClient::~MQTTClient() { emit clientAboutToBeDeleted(m_client->hostname(), m_client->port()); //stop reading before deleting the objects pauseReading(); qDebug() << "Delete MQTTClient: " << m_client->hostname() << m_client->port(); delete m_filter; delete m_updateTimer; delete m_willTimer; m_client->disconnectFromHost(); delete m_client; } /*! * depending on the update type, periodically or on data changes, starts the timer. */ void MQTTClient::ready() { if (m_updateType == TimeInterval) m_updateTimer->start(m_updateInterval); } /*! * \brief Updates the MQTTTopics of the client */ void MQTTClient::updateNow() { m_updateTimer->stop(); read(); if ((m_updateType == TimeInterval) && !m_paused) m_updateTimer->start(m_updateInterval); } /*! * \brief Continue reading from messages after it was paused. */ void MQTTClient::continueReading() { m_paused = false; if (m_updateType == TimeInterval) m_updateTimer->start(m_updateInterval); } /*! * \brief Pause the reading from messages. */ void MQTTClient::pauseReading() { m_paused = true; if (m_updateType == TimeInterval) m_updateTimer->stop(); } /*! * \brief Sets the filter of the MQTTClient. * The ownership of the filter is passed to MQTTClient. * * \param f a pointer to the new filter */ void MQTTClient::setFilter(AsciiFilter* f) { delete m_filter; m_filter = f; } /*! * \brief Returns the filter of the MQTTClient. */ AsciiFilter* MQTTClient::filter() const { return m_filter; } /*! * \brief Sets the MQTTclient's update interval to \c interval * \param interval */ void MQTTClient::setUpdateInterval(int interval) { m_updateInterval = interval; if (!m_paused) m_updateTimer->start(m_updateInterval); } /*! * \brief Returns the MQTTClient's update interval to \c interval * \param interval */ int MQTTClient::updateInterval() const { return m_updateInterval; } /*! * \brief Sets how many values we should store * \param keepNValues */ void MQTTClient::setKeepNValues(int keepNValues) { m_keepNValues = keepNValues; } /*! * \brief Returns how many values we should store */ int MQTTClient::keepNValues() const { return m_keepNValues; } /*! * \brief Provides information about whether the reading is paused or not * * \return true if the reading is paused * \return false otherwise */ bool MQTTClient::isPaused() const { return m_paused; } /*! * \brief Sets the size rate to sampleSize * \param sampleSize */ void MQTTClient::setSampleSize(int sampleSize) { m_sampleSize = sampleSize; } /*! * \brief Returns the size rate */ int MQTTClient::sampleSize() const { return m_sampleSize; } /*! * \brief Sets the MQTTClient's reading type to readingType * \param readingType */ void MQTTClient::setReadingType(ReadingType readingType) { m_readingType = readingType; } /*! * \brief Returns the MQTTClient's reading type */ MQTTClient::ReadingType MQTTClient::readingType() const { return m_readingType; } /*! * \brief Sets the MQTTClient's update type to updatetype and handles this change * \param updatetype */ void MQTTClient::setUpdateType(UpdateType updateType) { if (updateType == NewData) m_updateTimer->stop(); m_updateType = updateType; } /*! * \brief Returns the MQTTClient's update type */ MQTTClient::UpdateType MQTTClient::updateType() const { return m_updateType; } /*! * \brief Returns the MQTTClient's icon */ QIcon MQTTClient::icon() const { return QIcon::fromTheme("network-server-database"); } /*! * \brief Sets the host and port for the client. * * \param host the hostname of the broker we want to connect to * \param port the port used by the broker */ void MQTTClient::setMQTTClientHostPort(const QString& host, const quint16& port) { m_client->setHostname(host); m_client->setPort(port); } /*! * \brief Returns hostname of the broker the client is connected to. */ QString MQTTClient::clientHostName() const { return m_client->hostname(); } /*! * \brief Returns the port used by the broker. */ quint16 MQTTClient::clientPort() const { return m_client->port(); } /*! * \brief Sets the flag on the given value. * If set true it means that the broker requires authentication, otherwise it doesn't. * * \param use */ void MQTTClient::setMQTTUseAuthentication(bool use) { m_MQTTUseAuthentication = use; } /*! * \brief Returns whether the broker requires authentication or not. */ bool MQTTClient::MQTTUseAuthentication() const { return m_MQTTUseAuthentication; } /*! * \brief Sets the username and password for the client. * * \param username the username used for authentication * \param password the password used for authentication */ void MQTTClient::setMQTTClientAuthentication(const QString& username, const QString& password) { m_client->setUsername(username); m_client->setPassword(password); } /*! * \brief Returns the username used for authentication. */ QString MQTTClient::clientUserName() const { return m_client->username(); } /*! * \brief Returns the password used for authentication. */ QString MQTTClient::clientPassword() const { return m_client->password(); } /*! * \brief Sets the flag on the given value. * If set true it means that user wants to set the client ID, otherwise it's not the case. * * \param use */ void MQTTClient::setMQTTUseID(bool use) { m_MQTTUseID = use; } /*! * \brief Returns whether the user wants to set the client ID or not. */ bool MQTTClient::MQTTUseID() const { return m_MQTTUseID; } /*! * \brief Sets the ID of the client * * \param id */ void MQTTClient::setMQTTClientId(const QString& clientId) { m_client->setClientId(clientId); } /*! * \brief Returns the ID of the client */ QString MQTTClient::clientID () const { return m_client->clientId(); } /*! * \brief Sets the flag on the given value. * If retain is true we interpret retain messages, otherwise we do not * * \param retain */ void MQTTClient::setMQTTRetain(bool retain) { m_MQTTRetain = retain; } /*! * \brief Returns the flag, which set to true means that interpret retain messages, otherwise we do not */ bool MQTTClient::MQTTRetain() const { return m_MQTTRetain; } /*! * \brief Returns the name of every MQTTTopics which already received a message, and is child of the MQTTClient */ QVector MQTTClient::topicNames() const { return m_topicNames; } /*! * \brief Adds the initial subscriptions that were set in ImportFileWidget * * \param filter the name of the subscribed topic * \param qos the qos level of the subscription */ void MQTTClient::addInitialMQTTSubscriptions(const QMqttTopicFilter& filter, const quint8& qos) { m_subscribedTopicNameQoS[filter] = qos; } /*! * \brief Returns the name of every MQTTSubscription of the MQTTClient */ QVector MQTTClient::MQTTSubscriptions() const { return m_subscriptions; } /*! * \brief Adds a new MQTTSubscription to the MQTTClient * * \param topic, the name of the topic * \param QoS */ void MQTTClient::addMQTTSubscription(const QString& topicName, quint8 QoS) { //Check whether the subscription already exists, if it doesn't, we can add it if (!m_subscriptions.contains(topicName)) { const QMqttTopicFilter filter {topicName}; QMqttSubscription* temp = m_client->subscribe(filter, QoS); if (temp) { qDebug()<<"Subscribe to: "<< temp->topic() << " " << temp->qos(); m_subscriptions.push_back(temp->topic().filter()); m_subscribedTopicNameQoS[temp->topic().filter()] = temp->qos(); MQTTSubscription* newSubscription = new MQTTSubscription(temp->topic().filter()); newSubscription->setMQTTClient(this); addChildFast(newSubscription); m_MQTTSubscriptions.push_back(newSubscription); //Search for inferior subscriptions, that the new subscription contains bool found = false; QVector inferiorSubscriptions; for (auto* subscription : m_MQTTSubscriptions) { if (checkTopicContains(topicName, subscription->subscriptionName()) - && topicName != subscription->subscriptionName()) { + && topicName != subscription->subscriptionName()) { found = true; inferiorSubscriptions.push_back(subscription); } } //If there are some inferior subscriptions, we have to deal with them if (found) { for (auto* inferiorSubscription : inferiorSubscriptions) { qDebug()<<"Reparent topics of inferior subscription: "<< inferiorSubscription->subscriptionName(); //We have to reparent every topic of the inferior subscription, so no data is lost QVector topics = inferiorSubscription->topics(); for (auto* topic : topics) { topic->reparent(newSubscription); } //Then remove the subscription and every connected information QMqttTopicFilter unsubscribeFilter {inferiorSubscription->subscriptionName()}; m_client->unsubscribe(unsubscribeFilter); for (int j = 0; j < m_MQTTSubscriptions.size(); ++j) { if (m_MQTTSubscriptions[j]->subscriptionName() == - inferiorSubscription->subscriptionName()) { + inferiorSubscription->subscriptionName()) { m_MQTTSubscriptions.remove(j); } } m_subscriptions.removeAll(inferiorSubscription->subscriptionName()); m_subscribedTopicNameQoS.remove(inferiorSubscription->subscriptionName()); removeChild(inferiorSubscription); } } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::MQTTSubscriptionMessageReceived); - emit MQTTTopicsChanged(); + emit MQTTTopicsChanged(); } } } /*! * \brief Removes a MQTTSubscription from the MQTTClient * * \param name, the name of the subscription to remove */ void MQTTClient::removeMQTTSubscription(const QString& subscriptionName) { //We can only remove the subscription if it exists if (m_subscriptions.contains(subscriptionName)) { //unsubscribe from the topic const QMqttTopicFilter filter{subscriptionName}; m_client->unsubscribe(filter); qDebug()<<"Unsubscribe from: " << subscriptionName; //Remove every connected information m_subscriptions.removeAll(subscriptionName); for (int i = 0; i < m_MQTTSubscriptions.size(); ++i) { if (m_MQTTSubscriptions[i]->subscriptionName() == subscriptionName) { MQTTSubscription* removeSubscription = m_MQTTSubscriptions[i]; m_MQTTSubscriptions.remove(i); //Remove every topic of the subscription as well QVector topics = removeSubscription->topics(); for (int j = 0; j < topics.size(); ++j) { m_topicNames.removeAll(topics[j]->topicName()); } //Remove the MQTTSubscription removeChild(removeSubscription); break; } } QMapIterator j(m_subscribedTopicNameQoS); while (j.hasNext()) { j.next(); if (j.key().filter() == subscriptionName) { m_subscribedTopicNameQoS.remove(j.key()); break; } } - emit MQTTTopicsChanged(); + emit MQTTTopicsChanged(); } } /*! * \brief Adds a MQTTSubscription to the MQTTClient *Used when the user unsubscribes from a topic of a MQTTSubscription * * \param topic, the name of the topic * \param QoS */ void MQTTClient::addBeforeRemoveSubscription(const QString& topicName, quint8 QoS) { //We can't add the subscription if it already exists if (!m_subscriptions.contains(topicName)) { //Subscribe to the topic QMqttTopicFilter filter {topicName}; QMqttSubscription* temp = m_client->subscribe(filter, QoS); if (temp) { //Add the MQTTSubscription and other connected data qDebug()<<"Add subscription before remove: " << temp->topic() << " " << temp->qos(); m_subscriptions.push_back(temp->topic().filter()); m_subscribedTopicNameQoS[temp->topic().filter()] = temp->qos(); MQTTSubscription* newSubscription = new MQTTSubscription(temp->topic().filter()); newSubscription->setMQTTClient(this); addChildFast(newSubscription); m_MQTTSubscriptions.push_back(newSubscription); //Search for the subscription the topic belonged to bool found = false; MQTTSubscription* superiorSubscription = nullptr; for (auto* subscription : m_MQTTSubscriptions) { if (checkTopicContains(subscription->subscriptionName(), topicName) - && topicName != subscription->subscriptionName()) { + && topicName != subscription->subscriptionName()) { found = true; superiorSubscription = subscription; break; } } if (found) { //Search for topics belonging to the superior(old) subscription //which are also contained by the new subscription QVector topics = superiorSubscription->topics(); qDebug()<< topics.size(); QVector inferiorTopics; for (auto* topic : topics) { if (checkTopicContains(topicName, topic->topicName())) { inferiorTopics.push_back(topic); } } //Reparent these topics, in order to avoid data loss for (auto* inferiorTopic : inferiorTopics) { inferiorTopic->reparent(newSubscription); } } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::MQTTSubscriptionMessageReceived); } } } /*! * \brief Reparents the given MQTTTopic to the given MQTTSubscription * * \param topic, the name of the MQTTTopic * \param parent, the name of the MQTTSubscription */ void MQTTClient::reparentTopic(const QString& topicName, const QString& parentTopicName) { //We can only reparent if the parent containd the topic if (m_subscriptions.contains(parentTopicName) && m_topicNames.contains(topicName)) { qDebug() << "Reparent " << topicName << " to " << parentTopicName; //search for the parent MQTTSubscription bool found = false; MQTTSubscription* superiorSubscription = nullptr; for (auto* subscription : m_MQTTSubscriptions) { if (subscription->subscriptionName() == parentTopicName) { found = true; superiorSubscription = subscription; break; } } if (found) { //get every topic of the MQTTClient QVector topics = children(AbstractAspect::Recursive); //Search for the given topic among the MQTTTopics for (auto* topic : topics) { if (topicName == topic->topicName()) { //if found, it is reparented to the parent MQTTSubscription topic->reparent(superiorSubscription); break; } } } } } /*! *\brief Checks if a topic contains another one * * \param superior the name of a topic * \param inferior the name of a topic * \return true if superior is equal to or contains(if superior contains wildcards) inferior, * false otherwise */ bool MQTTClient::checkTopicContains(const QString& superior, const QString& inferior) { if (superior == inferior) return true; else { if (superior.contains(QLatin1String("/"))) { QStringList superiorList = superior.split('/', QString::SkipEmptyParts); QStringList inferiorList = inferior.split('/', QString::SkipEmptyParts); //a longer topic can't contain a shorter one if (superiorList.size() > inferiorList.size()) return false; bool ok = true; for (int i = 0; i < superiorList.size(); ++i) { if (superiorList.at(i) != inferiorList.at(i)) { if ((superiorList.at(i) != '+') && - !(superiorList.at(i) == '#' && i == superiorList.size() - 1)) { + !(superiorList.at(i) == '#' && i == superiorList.size() - 1)) { //if the two topics differ, and the superior's current level isn't + or #(which can be only in the last position) //then superior can't contain inferior ok = false; break; } else if (i == superiorList.size() - 1 && (superiorList.at(i) == '+' && inferiorList.at(i) == '#') ) { //if the two topics differ at the last level //and the superior's current level is + while the inferior's is #(which can be only in the last position) //then superior can't contain inferior ok = false; break; } } } return ok; } return false; } } /*! *\brief Returns the '+' wildcard containing topic name, which includes the given topic names * * \param first the name of a topic * \param second the name of a topic * \return The name of the common topic, if it exists, otherwise "" */ QString MQTTClient::checkCommonLevel(const QString& first, const QString& second) { QStringList firstList = first.split('/', QString::SkipEmptyParts); QStringList secondtList = second.split('/', QString::SkipEmptyParts); QString commonTopic = ""; if (!firstList.isEmpty()) { //the two topics have to be the same size and can't be identic if ((firstList.size() == secondtList.size()) && (first != second)) { //the index where they differ int differIndex = -1; for (int i = 0; i < firstList.size(); ++i) { if (firstList.at(i) != secondtList.at(i)) { differIndex = i; break; } } //they can differ at only one level and that can't be the first bool differ = false; if (differIndex > 0) { for (int j = differIndex +1; j < firstList.size(); ++j) { if (firstList.at(j) != secondtList.at(j)) { differ = true; break; } } } else differ = true; if (!differ) { for (int i = 0; i < firstList.size(); ++i) { if (i != differIndex) { commonTopic.append(firstList.at(i)); } else { //we put '+' wildcard at the level where they differ commonTopic.append('+'); } if (i != firstList.size() - 1) commonTopic.append("/"); } } } } qDebug() << first << " " << second << " common topic: "<stop(); } /*! * \brief Returns whether the user wants to use will message or not */ -bool MQTTClient::MQTTWillUse() const{ +bool MQTTClient::MQTTWillUse() const { return m_MQTTWill.enabled; } /*! * \brief Sets the will topic of the client * * \param topic */ void MQTTClient::setWillTopic(const QString& topic) { qDebug() << "Set will topic:" << topic; m_MQTTWill.willTopic = topic; } /*! * \brief Returns the will topic of the client */ -QString MQTTClient::willTopic() const{ +QString MQTTClient::willTopic() const { return m_MQTTWill.willTopic; } /*! * \brief Sets the retain flag of the client's will message * * \param retain */ void MQTTClient::setWillRetain(bool retain) { m_MQTTWill.willRetain = retain; } /*! * \brief Returns the retain flag of the client's will message */ bool MQTTClient::willRetain() const { return m_MQTTWill.willRetain; } /*! * \brief Sets the QoS level of the client's will message * * \param QoS */ void MQTTClient::setWillQoS(quint8 QoS) { m_MQTTWill.willQoS = QoS; } /*! * \brief Returns the QoS level of the client's will message */ quint8 MQTTClient::willQoS() const { return m_MQTTWill.willQoS; } /*! * \brief Sets the will message type of the client * * \param messageType */ void MQTTClient::setWillMessageType(WillMessageType messageType) { m_MQTTWill.willMessageType = messageType; } /*! * \brief Returns the will message type of the client */ MQTTClient::WillMessageType MQTTClient::willMessageType() const { return m_MQTTWill.willMessageType; } /*! * \brief Sets the own will message of the user * * \param ownMessage */ void MQTTClient::setWillOwnMessage(const QString& ownMessage) { m_MQTTWill.willOwnMessage = ownMessage; } /*! * \brief Returns the own will message of the user */ QString MQTTClient::willOwnMessage() const { return m_MQTTWill.willOwnMessage; } /*! * \brief Updates the will message of the client */ void MQTTClient::updateWillMessage() { QVector topics = children(AbstractAspect::Recursive); const MQTTTopic* willTopic = nullptr; //Search for the will topic for (const auto* topic : topics) { if (topic->topicName() == m_MQTTWill.willTopic) { willTopic = topic; break; } } //if the will topic is found we can update the will message if (willTopic != nullptr) { //To update the will message we have to disconnect first, then after setting everything connect again if (m_MQTTWill.enabled && (m_client->state() == QMqttClient::ClientState::Connected) ) { //Disconnect only once (disconnecting may take a while) if (!m_disconnectForWill) { qDebug() << "Disconnecting from host in order to update will message"; m_client->disconnectFromHost(); m_disconnectForWill = true; } //Try to update again updateWillMessage(); } //If client is disconnected we can update the settings else if (m_MQTTWill.enabled && (m_client->state() == QMqttClient::ClientState::Disconnected) && m_disconnectForWill) { m_client->setWillQoS(m_MQTTWill.willQoS); qDebug()<<"Will QoS" << m_MQTTWill.willQoS; m_client->setWillRetain(m_MQTTWill.willRetain); qDebug()<<"Will retain" << m_MQTTWill.willRetain; m_client->setWillTopic(m_MQTTWill.willTopic); qDebug()<<"Will Topic" << m_MQTTWill.willTopic; //Set the will message according to m_willMessageType switch (m_MQTTWill.willMessageType) { case WillMessageType::OwnMessage: m_client->setWillMessage(m_MQTTWill.willOwnMessage.toUtf8()); qDebug()<<"Will own message" << m_MQTTWill.willOwnMessage; break; case WillMessageType::Statistics: { const auto asciiFilter = willTopic->filter(); //If the topic's asciiFilter was found, get the needed statistics if (asciiFilter != nullptr) { //Statistics is only possible if the data stored in the MQTTTopic is of type integer or numeric if ((asciiFilter->MQTTColumnMode() == AbstractColumn::ColumnMode::Integer) || - (asciiFilter->MQTTColumnMode() == AbstractColumn::ColumnMode::Numeric)) { + (asciiFilter->MQTTColumnMode() == AbstractColumn::ColumnMode::Numeric)) { m_client->setWillMessage(asciiFilter->MQTTColumnStatistics(willTopic).toUtf8()); } //Otherwise set empty message else { m_client->setWillMessage(QString("").toUtf8()); } qDebug() << "Will statistics message: "<< QString(m_client->willMessage()); } break; } case WillMessageType::LastMessage: m_client->setWillMessage(m_MQTTWill.willLastMessage.toUtf8()); qDebug()<<"Will last message:\n" << m_MQTTWill.willLastMessage; break; default: break; } m_disconnectForWill = false; //Reconnect with the updated message m_client->connectToHost(); qDebug()<< "Reconnect to host after updating will message"; } } } /*! * \brief Returns the MQTTClient's will update type */ -MQTTClient::WillUpdateType MQTTClient::willUpdateType() const{ +MQTTClient::WillUpdateType MQTTClient::willUpdateType() const { return m_MQTTWill.willUpdateType; } /*! * \brief Sets the MQTTClient's will update type * * \param willUpdateType */ void MQTTClient::setWillUpdateType(WillUpdateType willUpdateType) { m_MQTTWill.willUpdateType = willUpdateType; } /*! * \brief Returns the time interval of updating the MQTTClient's will message */ -int MQTTClient::willTimeInterval() const{ +int MQTTClient::willTimeInterval() const { return m_MQTTWill.willTimeInterval; } /*! * \brief Sets the time interval of updating the MQTTClient's will message, if update type is TimePeriod * * \param interval */ void MQTTClient::setWillTimeInterval(int interval) { m_MQTTWill.willTimeInterval = interval; } /*! * \brief Clear the lastly received message by the will topic * Called when the will topic is changed */ void MQTTClient::clearLastMessage() { m_MQTTWill.willLastMessage.clear(); } /*! * \brief Sets true the corresponding flag of the statistic type, * what means that the given statistic type will be added to the will message * * \param statistics */ void MQTTClient::addWillStatistics(WillStatisticsType statistic) { m_MQTTWill.willStatistics[static_cast(statistic)] = true; } /*! * \brief Sets false the corresponding flag of the statistic type, * what means that the given statistic will no longer be added to the will message * * \param statistics */ void MQTTClient::removeWillStatistics(WillStatisticsType statistic) { m_MQTTWill.willStatistics[static_cast(statistic)] = false; } /*! * \brief Returns a bool vector, meaning which statistic types are included in the will message * If the corresponding value is true, the statistic type is included, otherwise it isn't */ -QVector MQTTClient::willStatistics() const{ +QVector MQTTClient::willStatistics() const { return m_MQTTWill.willStatistics; } /*! * \brief Starts the will timer, which will update the will message */ -void MQTTClient::startWillTimer() const{ +void MQTTClient::startWillTimer() const { if (m_MQTTWill.willUpdateType == WillUpdateType::TimePeriod) m_willTimer->start(m_MQTTWill.willTimeInterval); } /*! * \brief Stops the will timer */ -void MQTTClient::stopWillTimer() const{ +void MQTTClient::stopWillTimer() const { m_willTimer->stop(); } //############################################################################## //################################# SLOTS #################################### //############################################################################## /*! *\brief called periodically when update type is TimeInterval */ void MQTTClient::read() { if (!m_filter) return; if (!m_prepared) { qDebug()<<"Connect"; //connect to the broker m_client->connectToHost(); m_prepared = true; } if ((m_client->state() == QMqttClient::ClientState::Connected) && m_MQTTFirstConnectEstablished) { // qDebug()<<"Read"; //Signal for every MQTTTopic that they can read emit readFromTopics(); } } /*! *\brief called when the client successfully connected to the broker */ void MQTTClient::onMQTTConnect() { if (m_client->error() == QMqttClient::NoError) { //if this is the first connection (after setting the options in ImportFileWidget or loading saved project) if (!m_MQTTFirstConnectEstablished) { qDebug()<<"connection made in MQTTClient"; //Subscribe to initial or loaded topics QMapIterator i(m_subscribedTopicNameQoS); while (i.hasNext()) { i.next(); qDebug()<subscribe(i.key(), i.value()); if (temp) { //If we didn't load the MQTTClient from xml we have to add the MQTTSubscriptions if (!m_loaded) { m_subscriptions.push_back(temp->topic().filter()); MQTTSubscription* newSubscription = new MQTTSubscription(temp->topic().filter()); newSubscription->setMQTTClient(this); addChildFast(newSubscription); m_MQTTSubscriptions.push_back(newSubscription); } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::MQTTSubscriptionMessageReceived); } } m_MQTTFirstConnectEstablished = true; //Signal that the initial subscriptions were made emit MQTTSubscribed(); } //if there was already a connection made(happens after updating will message) else { qDebug() << "Start resubscribing after will message update"; //Only the client has to make the subscriptions again, every other connected data is still available QMapIterator i(m_subscribedTopicNameQoS); while (i.hasNext()) { i.next(); QMqttSubscription* temp = m_client->subscribe(i.key(), i.value()); if (temp) { qDebug()<topic()<<" "<qos(); connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::MQTTSubscriptionMessageReceived); } else qDebug()<<"Couldn't subscribe after will update"; } } } } /*! *\brief called when a message is received by a topic belonging to one of subscriptions of the client. * It passes the message to the appropriate MQTTSubscription which will pass it to the appropriate MQTTTopic */ void MQTTClient::MQTTSubscriptionMessageReceived(const QMqttMessage& msg) { //Decide to interpret retain message or not if (!msg.retain() || m_MQTTRetain) { //If this is the first message from the topic, save its name if (!m_topicNames.contains(msg.topic().name())) m_topicNames.push_back(msg.topic().name()); //Pass the message and the topic name to the MQTTSubscription which contains the topic for (auto* subscription : m_MQTTSubscriptions) { if (checkTopicContains(subscription->subscriptionName(), msg.topic().name())) { subscription->messageArrived(msg.payload(), msg.topic().name()); break; } } //if the message was received by the will topic, update the last message received by it - if (msg.topic().name() == m_MQTTWill.willTopic) { + if (msg.topic().name() == m_MQTTWill.willTopic) { m_MQTTWill.willLastMessage = QString(msg.payload()); - emit MQTTTopicsChanged(); - } + emit MQTTTopicsChanged(); + } } } /*! *\brief Handles some of the possible errors of the client, using MQTTErrorWidget */ void MQTTClient::MQTTErrorChanged(QMqttClient::ClientError clientError) { if (clientError != QMqttClient::ClientError::NoError) { MQTTErrorWidget* errorWidget = new MQTTErrorWidget(clientError, this); errorWidget->show(); } } /*! *\brief Called when a subscription is loaded. * Checks whether every saved subscription was loaded or not. * If everything is loaded, it makes the connection and starts the reading * * \param name, the name of the subscription */ void MQTTClient::subscriptionLoaded(const QString &name) { if (!name.isEmpty()) { qDebug() << "Finished loading: " << name; //Save information about the subscription m_subscriptionsLoaded++; m_subscriptions.push_back(name); QMqttTopicFilter filter {name}; m_subscribedTopicNameQoS[filter] = 0; //Save the topics belonging to the subscription for (const auto* subscription : m_MQTTSubscriptions) { if (subscription->subscriptionName() == name) { const auto& topics = subscription->topics(); for (auto* topic : topics) { m_topicNames.push_back(topic->topicName()); } break; } } //Check whether every subscription was loaded or not if (m_subscriptionsLoaded == m_subscriptionCountToLoad) { //if everything was loaded we can start reading m_loaded = true; read(); } } } //############################################################################## //################## Serialization/Deserialization ########################### //############################################################################## /*! Saves as XML. */ void MQTTClient::save(QXmlStreamWriter* writer) const { writer->writeStartElement("MQTTClient"); writeBasicAttributes(writer); writeCommentElement(writer); //general writer->writeStartElement("general"); writer->writeAttribute("subscriptionCount", QString::number(m_MQTTSubscriptions.size())); writer->writeAttribute("updateType", QString::number(m_updateType)); writer->writeAttribute("readingType", QString::number(m_readingType)); writer->writeAttribute("keepValues", QString::number(m_keepNValues)); if (m_updateType == TimeInterval) writer->writeAttribute("updateInterval", QString::number(m_updateInterval)); if (m_readingType != TillEnd) writer->writeAttribute("sampleSize", QString::number(m_sampleSize)); writer->writeAttribute("host", m_client->hostname()); writer->writeAttribute("port", QString::number(m_client->port())); writer->writeAttribute("username", m_client->username()); writer->writeAttribute("password", m_client->password()); writer->writeAttribute("clientId", m_client->clientId()); writer->writeAttribute("useRetain", QString::number(m_MQTTRetain)); writer->writeAttribute("useWill", QString::number(m_MQTTWill.enabled)); writer->writeAttribute("willTopic", m_MQTTWill.willTopic); writer->writeAttribute("willOwnMessage", m_MQTTWill.willOwnMessage); writer->writeAttribute("willQoS", QString::number(m_MQTTWill.willQoS)); writer->writeAttribute("willRetain", QString::number(m_MQTTWill.willRetain)); writer->writeAttribute("willMessageType", QString::number(static_cast(m_MQTTWill.willMessageType))); writer->writeAttribute("willUpdateType", QString::number(static_cast(m_MQTTWill.willUpdateType))); writer->writeAttribute("willTimeInterval", QString::number(m_MQTTWill.willTimeInterval)); for (int i = 0; i < m_MQTTWill.willStatistics.count(); ++i) writer->writeAttribute("willStatistics"+QString::number(i), QString::number(m_MQTTWill.willStatistics[i])); writer->writeAttribute("useID", QString::number(m_MQTTUseID)); writer->writeAttribute("useAuthentication", QString::number(m_MQTTUseAuthentication)); writer->writeEndElement(); //filter m_filter->save(writer); //MQTTSubscription for (auto* sub : children(IncludeHidden)) sub->save(writer); writer->writeEndElement(); // "MQTTClient" } /*! Loads from XML. */ bool MQTTClient::load(XmlStreamReader* reader, bool preview) { if (!readBasicAttributes(reader)) return false; QString attributeWarning = i18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs; QString str; while (!reader->atEnd()) { reader->readNext(); if (reader->isEndElement() && reader->name() == "MQTTClient") break; if (!reader->isStartElement()) continue; if (reader->name() == "comment") { if (!readCommentElement(reader)) return false; } else if (reader->name() == "general") { attribs = reader->attributes(); str = attribs.value("subscriptionCount").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'subscriptionCount'")); else m_subscriptionCountToLoad = str.toInt(); str = attribs.value("keepValues").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'keepValues'")); else m_keepNValues = str.toInt(); str = attribs.value("updateType").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'updateType'")); else m_updateType = static_cast(str.toInt()); str = attribs.value("readingType").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'readingType'")); else m_readingType = static_cast(str.toInt()); if (m_updateType == TimeInterval) { str = attribs.value("updateInterval").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'updateInterval'")); else m_updateInterval = str.toInt(); } if (m_readingType != TillEnd) { str = attribs.value("sampleSize").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'sampleSize'")); else m_sampleSize = str.toInt(); } str = attribs.value("host").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'host'")); else m_client->setHostname(str); str = attribs.value("port").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'port'")); else m_client->setPort(str.toUInt()); str = attribs.value("useAuthentication").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useAuthentication'")); else m_MQTTUseAuthentication = str.toInt(); if (m_MQTTUseAuthentication) { str = attribs.value("username").toString(); if (!str.isEmpty()) m_client->setUsername(str); str = attribs.value("password").toString(); if (!str.isEmpty()) m_client->setPassword(str); } str = attribs.value("useID").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useID'")); else m_MQTTUseID = str.toInt(); if (m_MQTTUseID) { str = attribs.value("clientId").toString(); if (!str.isEmpty()) m_client->setClientId(str); } str = attribs.value("useRetain").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useRetain'")); else m_MQTTRetain = str.toInt(); str = attribs.value("useWill").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useWill'")); else m_MQTTWill.enabled = str.toInt(); if (m_MQTTWill.enabled) { str = attribs.value("willTopic").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTopic'")); else m_MQTTWill.willTopic = str; str = attribs.value("willOwnMessage").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willOwnMessage'")); else m_MQTTWill.willOwnMessage = str; str = attribs.value("willQoS").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willQoS'")); else m_MQTTWill.willQoS = str.toUInt(); str = attribs.value("willRetain").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willRetain'")); else m_MQTTWill.willRetain = str.toInt(); str = attribs.value("willMessageType").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willMessageType'")); else m_MQTTWill.willMessageType = static_cast(str.toInt()); str = attribs.value("willUpdateType").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willUpdateType'")); else m_MQTTWill.willUpdateType = static_cast(str.toInt()); str = attribs.value("willTimeInterval").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTimeInterval'")); else m_MQTTWill.willTimeInterval = str.toInt(); for (int i = 0; i < m_MQTTWill.willStatistics.count(); ++i) { str = attribs.value("willStatistics"+QString::number(i)).toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTimeInterval'")); else m_MQTTWill.willStatistics[i] = str.toInt(); } } } else if (reader->name() == "asciiFilter") { setFilter(new AsciiFilter); if (!m_filter->load(reader)) return false; } else if (reader->name() == "MQTTSubscription") { MQTTSubscription* subscription = new MQTTSubscription(""); subscription->setMQTTClient(this); m_MQTTSubscriptions.push_back(subscription); connect(subscription, &MQTTSubscription::loaded, this, &MQTTClient::subscriptionLoaded); if (!subscription->load(reader, preview)) { delete subscription; return false; } addChildFast(subscription); } else {// unknown element reader->raiseWarning(i18n("unknown element '%1'", reader->name().toString())); if (!reader->skipToEndElement()) return false; } } return !reader->hasError(); } #endif diff --git a/src/backend/datasources/MQTTClient.h b/src/backend/datasources/MQTTClient.h index 3ba5cadd0..649064634 100644 --- a/src/backend/datasources/MQTTClient.h +++ b/src/backend/datasources/MQTTClient.h @@ -1,260 +1,260 @@ /*************************************************************************** File : MQTTClient.h Project : LabPlot Description : Represents a MQTT Client -------------------------------------------------------------------- Copyright : (C) 2018 Kovacs Ferencz (kferike98@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #ifndef MQTTCLIENT_H #define MQTTCLIENT_H #include "backend/core/Folder.h" #ifdef HAVE_MQTT #include #include #include #include #include #include #include #include class QString; class AsciiFilter; class MQTTSubscription; class QAction; #endif class MQTTClient : public Folder { #ifdef HAVE_MQTT Q_OBJECT public: enum UpdateType { TimeInterval = 0, NewData }; enum ReadingType { ContinuousFixed = 0, FromEnd, TillEnd }; enum WillMessageType { OwnMessage = 0, Statistics, LastMessage }; enum WillUpdateType { TimePeriod = 0, OnClick }; enum WillStatisticsType { NoStatistics = -1, Minimum, Maximum, ArithmeticMean, GeometricMean, HarmonicMean, ContraharmonicMean, Median, Variance, StandardDeviation, MeanDeviation, MeanDeviationAroundMedian, MedianDeviation, Skewness, Kurtosis, Entropy }; struct MQTTWill { bool enabled{false}; QString willMessage; QString willTopic; bool willRetain{false}; quint8 willQoS{0}; WillMessageType willMessageType{MQTTClient::WillMessageType::OwnMessage}; QString willOwnMessage; QString willLastMessage; int willTimeInterval{1000}; WillUpdateType willUpdateType{MQTTClient::WillUpdateType::TimePeriod}; QVector willStatistics{0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}; }; explicit MQTTClient(const QString& name); virtual ~MQTTClient() override; void ready(); UpdateType updateType() const; void setUpdateType(UpdateType); ReadingType readingType() const; void setReadingType(ReadingType); int sampleSize() const; void setSampleSize(int); bool isPaused() const; void setUpdateInterval(int); int updateInterval() const; void setKeepNValues(int); int keepNValues() const; void setKeepLastValues(bool); bool keepLastValues() const; void setMQTTClientHostPort(const QString&, const quint16&); void setMQTTClientAuthentication(const QString&, const QString&); void setMQTTClientId(const QString&); void addInitialMQTTSubscriptions(const QMqttTopicFilter&, const quint8&); QVector MQTTSubscriptions() const; bool checkTopicContains(const QString& superior, const QString& inferior); QString checkCommonLevel(const QString& first, const QString& second); QString clientHostName() const; quint16 clientPort() const; QString clientPassword() const; QString clientUserName() const; QString clientID () const; void updateNow(); void pauseReading(); void continueReading(); void setFilter(AsciiFilter*); AsciiFilter* filter() const; QIcon icon() const override; void save(QXmlStreamWriter*) const override; bool load(XmlStreamReader*, bool preview) override; QVector topicNames() const; bool checkAllArrived(); void setWillSettings(MQTTWill); MQTTWill willSettings() const; void setMQTTWillUse(bool); bool MQTTWillUse() const; void setWillTopic(const QString&); QString willTopic() const; void setWillRetain(bool); bool willRetain() const; void setWillQoS(quint8); quint8 willQoS() const; void setWillMessageType(WillMessageType); WillMessageType willMessageType() const; void setWillOwnMessage(const QString&); QString willOwnMessage() const; WillUpdateType willUpdateType() const; void setWillUpdateType(WillUpdateType); int willTimeInterval() const; void setWillTimeInterval(int); void startWillTimer() const; void stopWillTimer() const; void updateWillMessage() ; void setMQTTRetain(bool); bool MQTTRetain() const; void setMQTTUseID(bool); bool MQTTUseID() const; void setMQTTUseAuthentication(bool); bool MQTTUseAuthentication() const; void clearLastMessage(); void addWillStatistics(WillStatisticsType); void removeWillStatistics(WillStatisticsType); QVector willStatistics() const; void addMQTTSubscription(const QString&, quint8); void removeMQTTSubscription(const QString&); void addBeforeRemoveSubscription(const QString&, quint8); void reparentTopic(const QString& topic, const QString& parent); private: UpdateType m_updateType{TimeInterval}; ReadingType m_readingType{ContinuousFixed}; bool m_paused{false}; bool m_prepared{false}; int m_sampleSize{1}; int m_keepNValues{0}; int m_updateInterval{1000}; AsciiFilter* m_filter{nullptr}; QTimer* m_updateTimer; QMqttClient* m_client; QMap m_subscribedTopicNameQoS; QVector m_subscriptions; QVector m_topicNames; bool m_MQTTTest{false}; QTimer* m_willTimer; bool m_MQTTFirstConnectEstablished{false}; bool m_MQTTRetain{false}; bool m_MQTTUseID{false}; bool m_MQTTUseAuthentication{false}; QVector m_MQTTSubscriptions; bool m_disconnectForWill{false}; bool m_loaded{false}; int m_subscriptionsLoaded{0}; int m_subscriptionCountToLoad{0}; MQTTWill m_MQTTWill; public slots: void read(); private slots: void onMQTTConnect(); void MQTTSubscriptionMessageReceived(const QMqttMessage&); void MQTTErrorChanged(QMqttClient::ClientError); void subscriptionLoaded(const QString&); signals: void MQTTSubscribed(); - void MQTTTopicsChanged(); + void MQTTTopicsChanged(); void readFromTopics(); void clientAboutToBeDeleted(const QString&, quint16); #endif //HAVE_MQTT }; #endif // MQTTCLIENT_H diff --git a/src/backend/datasources/MQTTSubscription.cpp b/src/backend/datasources/MQTTSubscription.cpp index 3eef0c067..39ea63ec4 100644 --- a/src/backend/datasources/MQTTSubscription.cpp +++ b/src/backend/datasources/MQTTSubscription.cpp @@ -1,197 +1,197 @@ /*************************************************************************** File : MQTTSubscription.cpp Project : LabPlot Description : Represents a subscription made in MQTTClient -------------------------------------------------------------------- Copyright : (C) 2018 Kovacs Ferencz (kferike98@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #include "backend/datasources/MQTTSubscription.h" #ifdef HAVE_MQTT #include "backend/datasources/MQTTTopic.h" #include "backend/datasources/MQTTClient.h" #include #include /*! \class MQTTSubscription \brief Represents a subscription made in a MQTTClient object. It plays a role in managing MQTTTopic objects and makes possible representing the subscriptions and topics in a tree like structure \ingroup datasources */ MQTTSubscription::MQTTSubscription(const QString& name) : Folder(name), m_subscriptionName(name) { qDebug() << "New MQTTSubscription: " << name; } MQTTSubscription::~MQTTSubscription() { qDebug() << "Delete MQTTSubscription: " << m_subscriptionName; } /*! *\brief Returns the object's MQTTTopic children * * \return a vector of pointers to the children of the MQTTSubscription */ const QVector MQTTSubscription::topics() const { return children(); } /*! *\brief Returns the object's parent * * \return a pointer to the parent MQTTTopic of the object */ MQTTClient* MQTTSubscription::mqttClient() const { return m_MQTTClient; } /*! *\brief Called when a message arrived to a topic contained by the MQTTSubscription * If the topic can't be found among the children, a new MQTTTopic is instantiated * Passes the messages to the appropriate MQTTTopic * * \param message the message to pass * \param topicName the name of the topic the message was sent to */ void MQTTSubscription::messageArrived(const QString& message, const QString& topicName) { bool found = false; QVector topics = children(); //search for the topic among the MQTTTopic children for (auto* topic: topics) { if (topicName == topic->topicName()) { //pass the message to the topic topic->newMessage(message); //read the message if needed if ((m_MQTTClient->updateType() == MQTTClient::UpdateType::NewData) && - !m_MQTTClient->isPaused()) + !m_MQTTClient->isPaused()) topic->read(); found = true; break; } } //if the topic can't be found, we add it as a new MQTTTopic, and read from it if needed if (!found) { MQTTTopic* newTopic = new MQTTTopic(topicName, this, false); addChildFast(newTopic); //no need for undo/redo here newTopic->newMessage(message); if ((m_MQTTClient->updateType() == MQTTClient::UpdateType::NewData) && !m_MQTTClient->isPaused()) newTopic->read(); } } /*! *\brief Returns the subscription's name * * \return m_subscriptionName */ QString MQTTSubscription::subscriptionName() const { return m_subscriptionName; } /*! *\brief Sets the MQTTClient the subscription belongs to * * \param client */ void MQTTSubscription::setMQTTClient(MQTTClient* client) { m_MQTTClient = client; } /*! *\brief Returns the icon of MQTTSubscription */ QIcon MQTTSubscription::icon() const { return QIcon::fromTheme("mail-signed-full"); } //############################################################################## //################## Serialization/Deserialization ########################### //############################################################################## /*! Saves as XML. */ void MQTTSubscription::save(QXmlStreamWriter* writer) const { writer->writeStartElement("MQTTSubscription"); writeBasicAttributes(writer); writeCommentElement(writer); //general writer->writeStartElement("general"); writer->writeAttribute("subscriptionName", m_subscriptionName); writer->writeEndElement(); //MQTTTopics for (auto* topic : children(IncludeHidden)) topic->save(writer); writer->writeEndElement(); // "MQTTSubscription" } /*! Loads from XML. */ bool MQTTSubscription::load(XmlStreamReader* reader, bool preview) { if (!readBasicAttributes(reader)) return false; QString attributeWarning = i18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs; QString str; while (!reader->atEnd()) { reader->readNext(); if (reader->isEndElement() && reader->name() == "MQTTSubscription") break; if (!reader->isStartElement()) continue; if (reader->name() == "comment") { if (!readCommentElement(reader)) return false; } else if (reader->name() == "general") { attribs = reader->attributes(); m_subscriptionName = attribs.value("subscriptionName").toString(); } else if(reader->name() == QLatin1String("MQTTTopic")) { MQTTTopic* topic = new MQTTTopic("", this, false); if (!topic->load(reader, preview)) { delete topic; return false; } addChildFast(topic); } else {// unknown element reader->raiseWarning(i18n("unknown element '%1'", reader->name().toString())); if (!reader->skipToEndElement()) return false; } } emit loaded(this->subscriptionName()); return !reader->hasError(); } #endif diff --git a/src/backend/datasources/MQTTTopic.cpp b/src/backend/datasources/MQTTTopic.cpp index 4e33701a5..b0b72dd1e 100644 --- a/src/backend/datasources/MQTTTopic.cpp +++ b/src/backend/datasources/MQTTTopic.cpp @@ -1,313 +1,313 @@ /*************************************************************************** File : MQTTTopic.cpp Project : LabPlot Description : Represents a topic of a MQTTSubscription -------------------------------------------------------------------- Copyright : (C) 2018 Kovacs Ferencz (kferike98@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #include "backend/datasources/MQTTTopic.h" #ifdef HAVE_MQTT #include "backend/datasources/MQTTSubscription.h" #include "backend/datasources/MQTTClient.h" #include "kdefrontend/spreadsheet/PlotDataDialog.h" #include "commonfrontend/spreadsheet/SpreadsheetView.h" #include "backend/datasources/filters/AsciiFilter.h" #include #include #include #include #include #include /*! \class MQTTTopic \brief Represents a topic of a subscription made in MQTTClient. \ingroup datasources */ MQTTTopic::MQTTTopic(const QString& name, MQTTSubscription* subscription, bool loading) : Spreadsheet(name, loading), m_topicName(name), m_MQTTClient(subscription->mqttClient()), m_filter(new AsciiFilter) { auto mainFilter = m_MQTTClient->filter(); m_filter->setAutoModeEnabled(mainFilter->isAutoModeEnabled()); if (!mainFilter->isAutoModeEnabled()) { m_filter->setCommentCharacter(mainFilter->commentCharacter()); m_filter->setSeparatingCharacter(mainFilter->separatingCharacter()); m_filter->setDateTimeFormat(mainFilter->dateTimeFormat()); m_filter->setCreateIndexEnabled(mainFilter->createIndexEnabled()); m_filter->setSimplifyWhitespacesEnabled(mainFilter->simplifyWhitespacesEnabled()); m_filter->setNaNValueToZero(mainFilter->NaNValueToZeroEnabled()); m_filter->setRemoveQuotesEnabled(mainFilter->removeQuotesEnabled()); m_filter->setSkipEmptyParts(mainFilter->skipEmptyParts()); m_filter->setHeaderEnabled(mainFilter->isHeaderEnabled()); QString vectorNames; const QStringList& filterVectorNames = mainFilter->vectorNames(); for (int i = 0; i < filterVectorNames.size(); ++i) { vectorNames.append(filterVectorNames.at(i)); if (i != vectorNames.size() - 1) vectorNames.append(QLatin1String(" ")); } m_filter->setVectorNames(vectorNames); m_filter->setStartRow(mainFilter->startRow()); m_filter->setEndRow(mainFilter->endRow()); m_filter->setStartColumn(mainFilter->startColumn()); m_filter->setEndColumn(mainFilter->endColumn()); } connect(m_MQTTClient, &MQTTClient::readFromTopics, this, &MQTTTopic::read); qDebug()<<"New MqttTopic: " << m_topicName; initActions(); } MQTTTopic::~MQTTTopic() { qDebug()<<"MqttTopic destructor:"<actions().size() > 1) firstAction = menu->actions().at(1); menu->insertAction(firstAction, m_plotDataAction); menu->insertSeparator(firstAction); return menu; } QWidget* MQTTTopic::view() const { if (!m_partView) m_partView = new SpreadsheetView(const_cast(this), true); return m_partView; } /*! *\brief Adds a message received by the topic to the message puffer */ void MQTTTopic::newMessage(const QString& message) { m_messagePuffer.push_back(message); } /*! *\brief Returns the name of the MQTTTopic */ -QString MQTTTopic::topicName() const{ +QString MQTTTopic::topicName() const { return m_topicName; } /*! *\brief Initializes the actions of MQTTTopic */ void MQTTTopic::initActions() { m_plotDataAction = new QAction(QIcon::fromTheme("office-chart-line"), i18n("Plot data"), this); connect(m_plotDataAction, &QAction::triggered, this, &MQTTTopic::plotData); } /*! *\brief Returns the MQTTClient the topic belongs to */ -MQTTClient *MQTTTopic::mqttClient() const{ +MQTTClient *MQTTTopic::mqttClient() const { return m_MQTTClient; } //############################################################################## //################################# SLOTS #################################### //############################################################################## /*! *\brief Plots the data stored in MQTTTopic */ void MQTTTopic::plotData() { PlotDataDialog* dlg = new PlotDataDialog(this); dlg->exec(); } /*! *\brief Reads every message from the message puffer */ void MQTTTopic::read() { while (!m_messagePuffer.isEmpty()) { qDebug() << "Reading from topic " << m_topicName; const QString tempMessage = m_messagePuffer.takeFirst(); m_filter->readMQTTTopic(tempMessage, m_topicName, this); } } //############################################################################## //################## Serialization/Deserialization ########################### //############################################################################## /*! Saves as XML. */ void MQTTTopic::save(QXmlStreamWriter* writer) const { writer->writeStartElement("MQTTTopic"); writeBasicAttributes(writer); writeCommentElement(writer); //general writer->writeStartElement("general"); writer->writeAttribute("topicName", m_topicName); writer->writeAttribute("filterPrepared", QString::number(m_filter->isPrepared())); writer->writeAttribute("filterSeparator", m_filter->separator()); writer->writeAttribute("messagePufferSize", QString::number(m_messagePuffer.size())); for (int i = 0; i < m_messagePuffer.count(); ++i) writer->writeAttribute("message"+QString::number(i), m_messagePuffer[i]); writer->writeEndElement(); //filter m_filter->save(writer); //Columns for (auto* col : children(IncludeHidden)) col->save(writer); writer->writeEndElement(); //MQTTTopic } /*! Loads from XML. */ bool MQTTTopic::load(XmlStreamReader* reader, bool preview) { removeColumns(0, columnCount()); if (!readBasicAttributes(reader)) return false; bool isFilterPrepared = false; QString separator = ""; QString attributeWarning = i18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs; QString str; while (!reader->atEnd()) { reader->readNext(); if (reader->isEndElement() && reader->name() == "MQTTTopic") break; if (!reader->isStartElement()) continue; if (reader->name() == "comment") { if (!readCommentElement(reader)) return false; } else if (reader->name() == "general") { attribs = reader->attributes(); str = attribs.value("topicName").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'topicName'")); else { m_topicName = str; setName(str); } str = attribs.value("filterPrepared").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'filterPrepared'")); else { isFilterPrepared = str.toInt(); } str = attribs.value("filterSeparator").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'filterSeparator'")); else { separator = str; } int pufferSize = 0; str = attribs.value("messagePufferSize").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'messagePufferSize'")); else pufferSize = str.toInt(); for (int i = 0; i < pufferSize; ++i) { str = attribs.value("message"+QString::number(i)).toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'message"+QString::number(i)+"'")); else m_messagePuffer.push_back(str); } } else if (reader->name() == "asciiFilter") { if (!m_filter->load(reader)) return false; } else if (reader->name() == "column") { Column* column = new Column("", AbstractColumn::Text); if (!column->load(reader, preview)) { delete column; setColumnCount(0); return false; } addChild(column); } else {// unknown element reader->raiseWarning(i18n("unknown element '%1'", reader->name().toString())); if (!reader->skipToEndElement()) return false; } } //prepare filter for reading m_filter->setPreparedForMQTT(isFilterPrepared, this, separator); return !reader->hasError(); } #endif diff --git a/src/backend/datasources/MQTTTopic.h b/src/backend/datasources/MQTTTopic.h index 9cee32dc5..da0d689b3 100644 --- a/src/backend/datasources/MQTTTopic.h +++ b/src/backend/datasources/MQTTTopic.h @@ -1,84 +1,84 @@ /*************************************************************************** File : MQTTTopic.h Project : LabPlot Description : Represents a topic of a MQTTSubscription -------------------------------------------------------------------- Copyright : (C) 2018 Kovacs Ferencz (kferike98@gmail.com) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #ifndef MQTTTOPIC_H #define MQTTTOPIC_H #include "backend/spreadsheet/Spreadsheet.h" #ifdef HAVE_MQTT class MQTTSubscription; class MQTTClient; #endif class AsciiFilter; class MQTTTopic : public Spreadsheet { #ifdef HAVE_MQTT Q_OBJECT public: MQTTTopic(const QString& name, MQTTSubscription* subscription, bool loading = false); ~MQTTTopic() override; void setFilter(AsciiFilter*); AsciiFilter* filter() const; QIcon icon() const override; QMenu* createContextMenu() override; QWidget* view() const override; QString topicName() const; MQTTClient* mqttClient() const; - void newMessage(const QString&); + void newMessage(const QString&); void save(QXmlStreamWriter*) const override; bool load(XmlStreamReader*, bool preview) override; private: void initActions(); QString m_topicName; MQTTClient* m_MQTTClient; AsciiFilter* m_filter; QVector m_messagePuffer; QAction* m_plotDataAction; public slots: void read(); private slots: void plotData(); signals: void readOccured(); #endif // HAVE_MQTT }; #endif // MQTTTOPIC_H diff --git a/src/backend/datasources/filters/AsciiFilter.cpp b/src/backend/datasources/filters/AsciiFilter.cpp index 0305bef6a..d9e85ea9b 100644 --- a/src/backend/datasources/filters/AsciiFilter.cpp +++ b/src/backend/datasources/filters/AsciiFilter.cpp @@ -1,2911 +1,2911 @@ /*************************************************************************** File : AsciiFilter.cpp Project : LabPlot Description : ASCII I/O-filter -------------------------------------------------------------------- Copyright : (C) 2009-2018 Stefan Gerlach (stefan.gerlach@uni.kn) Copyright : (C) 2009-2019 Alexander Semke (alexander.semke@web.de) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #include "backend/datasources/LiveDataSource.h" #include "backend/core/column/Column.h" #include "backend/core/Project.h" #include "backend/datasources/filters/AsciiFilter.h" #include "backend/datasources/filters/AsciiFilterPrivate.h" #include "backend/worksheet/plots/cartesian/CartesianPlot.h" #include "backend/worksheet/plots/cartesian/XYCurve.h" #include "backend/lib/macros.h" #include "backend/lib/trace.h" #ifdef HAVE_MQTT #include "backend/datasources/MQTTClient.h" #include "backend/datasources/MQTTTopic.h" #endif #include #include #include #ifdef Q_OS_LINUX #include #include #endif /*! \class AsciiFilter \brief Manages the import/export of data organized as columns (vectors) from/to an ASCII-file. \ingroup datasources */ AsciiFilter::AsciiFilter() : AbstractFileFilter(Ascii), d(new AsciiFilterPrivate(this)) {} AsciiFilter::~AsciiFilter() = default; /*! reads the content of the device \c device. */ void AsciiFilter::readDataFromDevice(QIODevice& device, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode, int lines) { d->readDataFromDevice(device, dataSource, importMode, lines); } void AsciiFilter::readFromLiveDeviceNotFile(QIODevice &device, AbstractDataSource* dataSource) { d->readFromLiveDevice(device, dataSource); } qint64 AsciiFilter::readFromLiveDevice(QIODevice& device, AbstractDataSource* dataSource, qint64 from) { return d->readFromLiveDevice(device, dataSource, from); } #ifdef HAVE_MQTT /*! reads the content of a message received by the topic. */ void AsciiFilter::readMQTTTopic(const QString& message, const QString& topic, AbstractDataSource* dataSource) { d->readMQTTTopic(message, topic, dataSource); } /*! provides a preview of the data received by the topic. */ void AsciiFilter::MQTTPreview(QVector& list, const QString& message, const QString& topic) { d->MQTTPreview(list, message, topic); } /*! Returns the statistical data, that the MQTTTopic needs for the will message. */ -QString AsciiFilter::MQTTColumnStatistics(const MQTTTopic* topic) const{ +QString AsciiFilter::MQTTColumnStatistics(const MQTTTopic* topic) const { return d->MQTTColumnStatistics(topic); } /*! Returns the column mode of the last column (the value column of the MQTTTopic). */ -AbstractColumn::ColumnMode AsciiFilter::MQTTColumnMode() const{ +AbstractColumn::ColumnMode AsciiFilter::MQTTColumnMode() const { return d->MQTTColumnMode(); } /*! After the MQTTTopic is loaded, prepares the filter for reading. */ void AsciiFilter::setPreparedForMQTT(bool prepared, MQTTTopic* topic, const QString& separator) { d->setPreparedForMQTT(prepared, topic, separator); } #endif /*! returns the separator used by the filter. */ QString AsciiFilter::separator() const { return d->separator(); } /*! returns the separator used by the filter. */ int AsciiFilter::isPrepared() { return d->isPrepared(); } /*! reads the content of the file \c fileName. */ void AsciiFilter::readDataFromFile(const QString& fileName, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode) { d->readDataFromFile(fileName, dataSource, importMode); } QVector AsciiFilter::preview(const QString& fileName, int lines) { return d->preview(fileName, lines); } QVector AsciiFilter::preview(QIODevice &device) { return d->preview(device); } /*! reads the content of the file \c fileName to the data source \c dataSource. */ //void AsciiFilter::read(const QString& fileName, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode) { // d->read(fileName, dataSource, importMode); //} /*! writes the content of the data source \c dataSource to the file \c fileName. */ void AsciiFilter::write(const QString& fileName, AbstractDataSource* dataSource) { d->write(fileName, dataSource); // emit() } /*! loads the predefined filter settings for \c filterName */ void AsciiFilter::loadFilterSettings(const QString& filterName) { Q_UNUSED(filterName); } /*! saves the current settings as a new filter with the name \c filterName */ void AsciiFilter::saveFilterSettings(const QString& filterName) const { Q_UNUSED(filterName); } /*! returns the list with the names of all saved (system wide or user defined) filter settings. */ QStringList AsciiFilter::predefinedFilters() { return QStringList(); } /*! returns the list of all predefined separator characters. */ QStringList AsciiFilter::separatorCharacters() { return (QStringList() << "auto" << "TAB" << "SPACE" << "," << ";" << ":" << ",TAB" << ";TAB" << ":TAB" << ",SPACE" << ";SPACE" << ":SPACE" << "2xSPACE" << "3xSPACE" << "4xSPACE" << "2xTAB"); } /*! returns the list of all predefined comment characters. */ QStringList AsciiFilter::commentCharacters() { return (QStringList() << "#" << "!" << "//" << "+" << "c" << ":" << ";"); } /*! returns the list of all predefined data types. */ QStringList AsciiFilter::dataTypes() { const QMetaObject& mo = AbstractColumn::staticMetaObject; const QMetaEnum& me = mo.enumerator(mo.indexOfEnumerator("ColumnMode")); QStringList list; for (int i = 0; i <= 100; ++i) // me.keyCount() does not work because we have holes in enum if (me.valueToKey(i)) list << me.valueToKey(i); return list; } QString AsciiFilter::fileInfoString(const QString& fileName) { QString info(i18n("Number of columns: %1", AsciiFilter::columnNumber(fileName))); info += QLatin1String("
"); info += i18n("Number of lines: %1", AsciiFilter::lineNumber(fileName)); return info; } /*! returns the number of columns in the file \c fileName. */ int AsciiFilter::columnNumber(const QString& fileName, const QString& separator) { - KFilterDev device(fileName); + KFilterDev device(fileName); if (!device.open(QIODevice::ReadOnly)) { DEBUG("Could not open file " << fileName.toStdString() << " for determining number of columns"); return -1; } QString line = device.readLine(); line.remove(QRegExp("[\\n\\r]")); QStringList lineStringList; if (separator.length() > 0) lineStringList = line.split(separator); else lineStringList = line.split(QRegExp("\\s+")); DEBUG("number of columns : " << lineStringList.size()); return lineStringList.size(); } size_t AsciiFilter::lineNumber(const QString& fileName) { KFilterDev device(fileName); if (!device.open(QIODevice::ReadOnly)) { DEBUG("Could not open file " << fileName.toStdString() << " to determine number of lines"); return 0; } // if (!device.canReadLine()) // return -1; size_t lineCount = 0; #ifdef Q_OS_LINUX //on linux use wc, if available, which is much faster than counting lines in the file if (device.compressionType() == KCompressionDevice::None && !QStandardPaths::findExecutable(QLatin1String("wc")).isEmpty()) { QProcess wc; wc.start(QLatin1String("wc"), QStringList() << QLatin1String("-l") << fileName); size_t lineCount = 0; while (wc.waitForReadyRead()) lineCount = wc.readLine().split(' ')[0].toInt(); lineCount++; // last line not counted return lineCount; } #endif while (!device.atEnd()) { device.readLine(); lineCount++; } return lineCount; } /*! returns the number of lines in the device \c device and 0 if sequential. resets the position to 0! */ size_t AsciiFilter::lineNumber(QIODevice& device) const { if (device.isSequential()) return 0; // if (!device.canReadLine()) // DEBUG("WARNING in AsciiFilter::lineNumber(): device cannot 'readLine()' but using it anyway."); size_t lineCount = 0; device.seek(0); if (d->readingFile) lineCount = lineNumber(d->readingFileName); else { while (!device.atEnd()) { device.readLine(); lineCount++; } } device.seek(0); return lineCount; } void AsciiFilter::setCommentCharacter(const QString& s) { d->commentCharacter = s; } QString AsciiFilter::commentCharacter() const { return d->commentCharacter; } void AsciiFilter::setSeparatingCharacter(const QString& s) { d->separatingCharacter = s; } QString AsciiFilter::separatingCharacter() const { return d->separatingCharacter; } void AsciiFilter::setDateTimeFormat(const QString &f) { d->dateTimeFormat = f; } QString AsciiFilter::dateTimeFormat() const { return d->dateTimeFormat; } void AsciiFilter::setNumberFormat(QLocale::Language lang) { d->numberFormat = lang; } QLocale::Language AsciiFilter::numberFormat() const { return d->numberFormat; } void AsciiFilter::setAutoModeEnabled(const bool b) { d->autoModeEnabled = b; } bool AsciiFilter::isAutoModeEnabled() const { return d->autoModeEnabled; } void AsciiFilter::setHeaderEnabled(const bool b) { d->headerEnabled = b; } bool AsciiFilter::isHeaderEnabled() const { return d->headerEnabled; } void AsciiFilter::setSkipEmptyParts(const bool b) { d->skipEmptyParts = b; } bool AsciiFilter::skipEmptyParts() const { return d->skipEmptyParts; } void AsciiFilter::setCreateIndexEnabled(bool b) { d->createIndexEnabled = b; } -bool AsciiFilter::createIndexEnabled() const{ +bool AsciiFilter::createIndexEnabled() const { return d->createIndexEnabled; } void AsciiFilter::setSimplifyWhitespacesEnabled(bool b) { d->simplifyWhitespacesEnabled = b; } bool AsciiFilter::simplifyWhitespacesEnabled() const { return d->simplifyWhitespacesEnabled; } void AsciiFilter::setNaNValueToZero(bool b) { if (b) d->nanValue = 0; else d->nanValue = NAN; } bool AsciiFilter::NaNValueToZeroEnabled() const { return (d->nanValue == 0); } void AsciiFilter::setRemoveQuotesEnabled(bool b) { d->removeQuotesEnabled = b; } bool AsciiFilter::removeQuotesEnabled() const { return d->removeQuotesEnabled; } void AsciiFilter::setVectorNames(const QString& s) { d->vectorNames.clear(); if (!s.simplified().isEmpty()) d->vectorNames = s.simplified().split(' '); } QStringList AsciiFilter::vectorNames() const { return d->vectorNames; } QVector AsciiFilter::columnModes() { return d->columnModes; } void AsciiFilter::setStartRow(const int r) { d->startRow = r; } int AsciiFilter::startRow() const { return d->startRow; } void AsciiFilter::setEndRow(const int r) { d->endRow = r; } int AsciiFilter::endRow() const { return d->endRow; } void AsciiFilter::setStartColumn(const int c) { d->startColumn = c; } int AsciiFilter::startColumn() const { return d->startColumn; } void AsciiFilter::setEndColumn(const int c) { d->endColumn = c; } int AsciiFilter::endColumn() const { return d->endColumn; } //##################################################################### //################### Private implementation ########################## //##################################################################### AsciiFilterPrivate::AsciiFilterPrivate(AsciiFilter* owner) : q(owner), commentCharacter("#"), separatingCharacter("auto"), numberFormat(QLocale::C), autoModeEnabled(true), headerEnabled(true), skipEmptyParts(false), simplifyWhitespacesEnabled(true), nanValue(NAN), removeQuotesEnabled(false), createIndexEnabled(false), startRow(1), endRow(-1), startColumn(1), endColumn(-1), mqttPreviewFirstEmptyColCount (0), m_actualStartRow(1), m_actualRows(0), m_actualCols(0), m_prepared(false), m_columnOffset(0) { } /*! * get a single line from device */ QStringList AsciiFilterPrivate::getLineString(QIODevice& device) { QString line; do { // skip comment lines in data lines if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::getLineString(): device cannot 'readLine()' but using it anyway."); // line = device.readAll(); line = device.readLine(); } while (line.startsWith(commentCharacter)); line.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) line = line.simplified(); DEBUG("data line : \'" << line.toStdString() << '\''); QStringList lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); //TODO: remove quotes here? QDEBUG("data line, parsed: " << lineStringList); return lineStringList; } /*! * returns -1 if the device couldn't be opened, 1 if the current read position in the device is at the end and 0 otherwise. */ int AsciiFilterPrivate::prepareDeviceToRead(QIODevice& device) { DEBUG("AsciiFilterPrivate::prepareDeviceToRead(): is sequential = " << device.isSequential() << ", can readLine = " << device.canReadLine()); - if (!device.open(QIODevice::ReadOnly)) + if (!device.open(QIODevice::ReadOnly)) return -1; if (device.atEnd() && !device.isSequential()) // empty file return 1; ///////////////////////////////////////////////////////////////// // Find first data line (ignoring comment lines) DEBUG(" Skipping " << startRow - 1 << " lines"); for (int i = 0; i < startRow - 1; ++i) { QString line; if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::prepareDeviceToRead(): device cannot 'readLine()' but using it anyway."); line = device.readLine(); DEBUG(" line = " << line.toStdString()); if (device.atEnd()) { if (device.isSequential()) break; else return 1; } } // Parse the first line: // Determine the number of columns, create the columns and use (if selected) the first row to name them QString firstLine; do { // skip comment lines if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::prepareDeviceToRead(): device cannot 'readLine()' but using it anyway."); if (device.atEnd()) { DEBUG("device at end! Giving up."); if (device.isSequential()) break; else return 1; } firstLine = device.readLine(); } while (firstLine.startsWith(commentCharacter)); DEBUG(" device position after first line and comments = " << device.pos()); firstLine.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) firstLine = firstLine.simplified(); DEBUG("First line: \'" << firstLine.toStdString() << '\''); // determine separator and split first line QStringList firstLineStringList; if (separatingCharacter == "auto") { DEBUG("automatic separator"); QRegExp regExp("(\\s+)|(,\\s+)|(;\\s+)|(:\\s+)"); firstLineStringList = firstLine.split(regExp, (QString::SplitBehavior)skipEmptyParts); if (!firstLineStringList.isEmpty()) { int length1 = firstLineStringList.at(0).length(); if (firstLineStringList.size() > 1) { int pos2 = firstLine.indexOf(firstLineStringList.at(1), length1); m_separator = firstLine.mid(length1, pos2 - length1); } else { //old: separator = line.right(line.length() - length1); m_separator = ' '; } } } else { // use given separator // replace symbolic "TAB" with '\t' m_separator = separatingCharacter.replace(QLatin1String("2xTAB"), "\t\t", Qt::CaseInsensitive); m_separator = separatingCharacter.replace(QLatin1String("TAB"), "\t", Qt::CaseInsensitive); // replace symbolic "SPACE" with ' ' m_separator = m_separator.replace(QLatin1String("2xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("3xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("4xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("SPACE"), QLatin1String(" "), Qt::CaseInsensitive); firstLineStringList = firstLine.split(m_separator, (QString::SplitBehavior)skipEmptyParts); } DEBUG("separator: \'" << m_separator.toStdString() << '\''); DEBUG("number of columns: " << firstLineStringList.size()); QDEBUG("first line: " << firstLineStringList); DEBUG("headerEnabled: " << headerEnabled); //optionally, remove potential spaces in the first line if (simplifyWhitespacesEnabled) { for (int i = 0; i < firstLineStringList.size(); ++i) firstLineStringList[i] = firstLineStringList[i].simplified(); } if (headerEnabled) { // use first line to name vectors vectorNames = firstLineStringList; QDEBUG("vector names =" << vectorNames); m_actualStartRow = startRow + 1; } else m_actualStartRow = startRow; // set range to read if (endColumn == -1) { if (headerEnabled || vectorNames.size() == 0) endColumn = firstLineStringList.size(); // last column else //number of vector names provided in the import dialog (not more than the maximal number of columns in the file) endColumn = qMin(vectorNames.size(), firstLineStringList.size()); } if (endColumn < startColumn) m_actualCols = 0; else m_actualCols = endColumn - startColumn + 1; if (createIndexEnabled) { vectorNames.prepend(i18n("Index")); m_actualCols++; } //TEST: readline-seek-readline fails /* qint64 testpos = device.pos(); DEBUG("read data line @ pos " << testpos << " : " << device.readLine().toStdString()); device.seek(testpos); testpos = device.pos(); DEBUG("read data line again @ pos " << testpos << " : " << device.readLine().toStdString()); */ ///////////////////////////////////////////////////////////////// // parse first data line to determine data type for each column if (!device.isSequential()) firstLineStringList = getLineString(device); columnModes.resize(m_actualCols); int col = 0; if (createIndexEnabled) { columnModes[0] = AbstractColumn::Integer; col = 1; } for (auto& valueString : firstLineStringList) { // parse columns available in first data line if (simplifyWhitespacesEnabled) valueString = valueString.simplified(); if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); if (col == m_actualCols) break; columnModes[col++] = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); } // parsing more lines to better determine data types for (unsigned int i = 0; i < m_dataTypeLines; ++i) { if (device.atEnd()) // EOF reached break; firstLineStringList = getLineString(device); createIndexEnabled ? col = 1 : col = 0; for (auto& valueString : firstLineStringList) { if (simplifyWhitespacesEnabled) valueString = valueString.simplified(); if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); if (col == m_actualCols) break; AbstractColumn::ColumnMode mode = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); // numeric: integer -> numeric if (mode == AbstractColumn::Numeric && columnModes[col] == AbstractColumn::Integer) columnModes[col] = mode; // text: non text -> text if (mode == AbstractColumn::Text && columnModes[col] != AbstractColumn::Text) columnModes[col] = mode; col++; } } QDEBUG("column modes = " << columnModes); // ATTENTION: This resets the position in the device to 0 m_actualRows = (int)q->lineNumber(device); const int actualEndRow = (endRow == -1 || endRow > m_actualRows) ? m_actualRows : endRow; if (actualEndRow > m_actualStartRow) m_actualRows = actualEndRow - m_actualStartRow + 1; else m_actualRows = 0; DEBUG("start/end column: " << startColumn << ' ' << endColumn); DEBUG("start/end row: " << m_actualStartRow << ' ' << actualEndRow); DEBUG("actual cols/rows (w/o header): " << m_actualCols << ' ' << m_actualRows); if (m_actualRows == 0 && !device.isSequential()) return 1; return 0; } /*! reads the content of the file \c fileName to the data source \c dataSource. Uses the settings defined in the data source. */ void AsciiFilterPrivate::readDataFromFile(const QString& fileName, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode) { DEBUG("AsciiFilterPrivate::readDataFromFile(): fileName = \'" << fileName.toStdString() << "\', dataSource = " << dataSource << ", mode = " << ENUM_TO_STRING(AbstractFileFilter, ImportMode, importMode)); //dirty hack: set readingFile and readingFileName in order to know in lineNumber(QIODevice) //that we're reading from a file and to benefit from much faster wc on linux //TODO: redesign the APIs and remove this later readingFile = true; readingFileName = fileName; KFilterDev device(fileName); readDataFromDevice(device, dataSource, importMode); readingFile = false; } qint64 AsciiFilterPrivate::readFromLiveDevice(QIODevice& device, AbstractDataSource* dataSource, qint64 from) { DEBUG("AsciiFilterPrivate::readFromLiveDevice(): bytes available = " << device.bytesAvailable() << ", from = " << from); if (device.bytesAvailable() <= 0) { DEBUG(" No new data available"); return 0; } //TODO: may be also a matrix? auto* spreadsheet = dynamic_cast(dataSource); if (spreadsheet->sourceType() != LiveDataSource::SourceType::FileOrPipe) if (device.isSequential() && device.bytesAvailable() < (int)sizeof(quint16)) return 0; if (!m_prepared) { DEBUG(" Preparing .."); switch (spreadsheet->sourceType()) { case LiveDataSource::SourceType::FileOrPipe: { const int deviceError = prepareDeviceToRead(device); if (deviceError != 0) { DEBUG(" Device error = " << deviceError); return 0; } break; } case LiveDataSource::SourceType::NetworkTcpSocket: case LiveDataSource::SourceType::NetworkUdpSocket: case LiveDataSource::SourceType::LocalSocket: case LiveDataSource::SourceType::SerialPort: m_actualRows = 1; if (createIndexEnabled) { m_actualCols = 2; columnModes << AbstractColumn::Integer << AbstractColumn::Numeric; vectorNames << i18n("Index") << i18n("Value"); } else { m_actualCols = 1; columnModes << AbstractColumn::Numeric; vectorNames << i18n("Value"); } QDEBUG(" vector names = " << vectorNames); case LiveDataSource::SourceType::MQTT: break; } // prepare import for spreadsheet spreadsheet->setUndoAware(false); spreadsheet->resize(AbstractFileFilter::Replace, vectorNames, m_actualCols); DEBUG(" data source resized to col: " << m_actualCols); DEBUG(" data source rowCount: " << spreadsheet->rowCount()); //columns in a file data source don't have any manual changes. //make the available columns undo unaware and suppress the "data changed" signal. //data changes will be propagated via an explicit Column::setChanged() call once new data was read. for (int i = 0; i < spreadsheet->childCount(); i++) { spreadsheet->child(i)->setUndoAware(false); spreadsheet->child(i)->setSuppressDataChangedSignal(true); } int keepNValues = spreadsheet->keepNValues(); if (keepNValues == 0) spreadsheet->setRowCount(m_actualRows > 1 ? m_actualRows : 1); else { spreadsheet->setRowCount(keepNValues); m_actualRows = keepNValues; } m_dataContainer.resize(m_actualCols); DEBUG(" Setting data .."); for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) spreadsheet->child(n)->setColumnMode(columnModes[n]); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } DEBUG(" Prepared!"); } qint64 bytesread = 0; #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportTotal: "); #endif LiveDataSource::ReadingType readingType; if (!m_prepared) { readingType = LiveDataSource::ReadingType::TillEnd; } else { //we have to read all the data when reading from end //so we set readingType to TillEnd if (spreadsheet->readingType() == LiveDataSource::ReadingType::FromEnd) readingType = LiveDataSource::ReadingType::TillEnd; //if we read the whole file we just start from the beginning of it //and read till end else if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) readingType = LiveDataSource::ReadingType::TillEnd; else readingType = spreadsheet->readingType(); } DEBUG(" Reading type = " << ENUM_TO_STRING(LiveDataSource, ReadingType, readingType)); //move to the last read position, from == total bytes read //since the other source types are sequential we cannot seek on them if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) device.seek(from); //count the new lines, increase actualrows on each //now we read all the new lines, if we want to use sample rate //then here we can do it, if we have actually sample rate number of lines :-? int newLinesForSampleSizeNotTillEnd = 0; int newLinesTillEnd = 0; QVector newData; if (readingType != LiveDataSource::ReadingType::TillEnd) newData.resize(spreadsheet->sampleSize()); int newDataIdx = 0; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportReadingFromFile: "); #endif DEBUG(" source type = " << ENUM_TO_STRING(LiveDataSource, SourceType, spreadsheet->sourceType())); while (!device.atEnd()) { if (readingType != LiveDataSource::ReadingType::TillEnd) { switch (spreadsheet->sourceType()) { // different sources need different read methods case LiveDataSource::SourceType::LocalSocket: newData[newDataIdx++] = device.readAll(); break; case LiveDataSource::SourceType::NetworkUdpSocket: newData[newDataIdx++] = device.read(device.bytesAvailable()); break; case LiveDataSource::SourceType::FileOrPipe: newData.push_back(device.readLine()); break; case LiveDataSource::SourceType::NetworkTcpSocket: //TODO: check serial port case LiveDataSource::SourceType::SerialPort: newData[newDataIdx++] = device.read(device.bytesAvailable()); case LiveDataSource::SourceType::MQTT: break; } } else { // ReadingType::TillEnd switch (spreadsheet->sourceType()) { // different sources need different read methods case LiveDataSource::SourceType::LocalSocket: newData.push_back(device.readAll()); break; case LiveDataSource::SourceType::NetworkUdpSocket: newData.push_back(device.read(device.bytesAvailable())); break; case LiveDataSource::SourceType::FileOrPipe: newData.push_back(device.readLine()); break; case LiveDataSource::SourceType::NetworkTcpSocket: //TODO: check serial port case LiveDataSource::SourceType::SerialPort: newData.push_back(device.read(device.bytesAvailable())); case LiveDataSource::SourceType::MQTT: break; } } newLinesTillEnd++; if (readingType != LiveDataSource::ReadingType::TillEnd) { newLinesForSampleSizeNotTillEnd++; //for Continuous reading and FromEnd we read sample rate number of lines if possible //here TillEnd and Whole file behave the same if (newLinesForSampleSizeNotTillEnd == spreadsheet->sampleSize()) break; } } QDEBUG(" data read: " << newData); } //now we reset the readingType if (spreadsheet->readingType() == LiveDataSource::ReadingType::FromEnd) readingType = spreadsheet->readingType(); //we had less new lines than the sample size specified if (readingType != LiveDataSource::ReadingType::TillEnd) QDEBUG(" Removed empty lines: " << newData.removeAll("")); //back to the last read position before counting when reading from files if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) device.seek(from); const int spreadsheetRowCountBeforeResize = spreadsheet->rowCount(); int currentRow = 0; // indexes the position in the vector(column) int linesToRead = 0; int keepNValues = spreadsheet->keepNValues(); DEBUG(" Increase row count. keepNValues = " << keepNValues); if (m_prepared) { //increase row count if we don't have a fixed size //but only after the preparation step if (keepNValues == 0) { DEBUG(" keep All values"); if (readingType != LiveDataSource::ReadingType::TillEnd) m_actualRows += qMin(newData.size(), spreadsheet->sampleSize()); else { //we don't increase it if we reread the whole file, we reset it if (!(spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile)) m_actualRows += newData.size(); else m_actualRows = newData.size(); } //appending if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) linesToRead = m_actualRows; else linesToRead = m_actualRows - spreadsheetRowCountBeforeResize; } else { // fixed size DEBUG(" keep " << keepNValues << " values"); if (readingType == LiveDataSource::ReadingType::TillEnd) { //we had more lines than the fixed size, so we read m_actualRows number of lines if (newLinesTillEnd > m_actualRows) { linesToRead = m_actualRows; //TODO after reading we should skip the next data lines //because it's TillEnd actually } else linesToRead = newLinesTillEnd; } else { //we read max sample size number of lines when the reading mode //is ContinuouslyFixed or FromEnd, WholeFile is disabled linesToRead = qMin(spreadsheet->sampleSize(), newLinesTillEnd); } } DEBUG(" actual rows = " << m_actualRows); if (linesToRead == 0) return 0; } else { // not prepared linesToRead = newLinesTillEnd; if (headerEnabled) --m_actualRows; } DEBUG(" lines to read = " << linesToRead); if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe || spreadsheet->sourceType() == LiveDataSource::SourceType::NetworkUdpSocket) { if (m_actualRows < linesToRead) { DEBUG(" SET lines to read to " << m_actualRows); linesToRead = m_actualRows; } } //new rows/resize columns if we don't have a fixed size //TODO if the user changes this value..m_resizedToFixedSize..setResizedToFixedSize if (keepNValues == 0) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportResizing: "); #endif if (spreadsheet->rowCount() < m_actualRows) spreadsheet->setRowCount(m_actualRows); if (!m_prepared) currentRow = 0; else { // indexes the position in the vector(column) if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) currentRow = 0; else currentRow = spreadsheetRowCountBeforeResize; } // if we have fixed size, we do this only once in preparation, here we can use // m_prepared and we need something to decide whether it has a fixed size or increasing for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } else { // fixed size //when we have a fixed size we have to pop sampleSize number of lines if specified //here popping, setting currentRow if (!m_prepared) { if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) currentRow = 0; else currentRow = m_actualRows - qMin(newLinesTillEnd, m_actualRows); } else { if (readingType == LiveDataSource::ReadingType::TillEnd) { if (newLinesTillEnd > m_actualRows) { currentRow = 0; } else { if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) currentRow = 0; else currentRow = m_actualRows - newLinesTillEnd; } } else { //we read max sample size number of lines when the reading mode //is ContinuouslyFixed or FromEnd currentRow = m_actualRows - qMin(spreadsheet->sampleSize(), newLinesTillEnd); } } if (m_prepared) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportPopping: "); #endif // enable data change signal for (int col = 0; col < m_actualCols; ++col) spreadsheet->child(col)->setSuppressDataChangedSignal(false); for (int row = 0; row < linesToRead; ++row) { for (int col = 0; col < m_actualCols; ++col) { switch (columnModes[col]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } } } // from the last row we read the new data in the spreadsheet DEBUG(" Reading from line " << currentRow << " till end line " << newLinesTillEnd); DEBUG(" Lines to read:" << linesToRead <<", actual rows:" << m_actualRows << ", actual cols:" << m_actualCols); newDataIdx = 0; if (readingType == LiveDataSource::ReadingType::FromEnd) { if (m_prepared) { if (newData.size() > spreadsheet->sampleSize()) newDataIdx = newData.size() - spreadsheet->sampleSize(); //since we skip a couple of lines, we need to count those bytes too for (int i = 0; i < newDataIdx; ++i) bytesread += newData.at(i).size(); } } DEBUG(" newDataIdx: " << newDataIdx); static int indexColumnIdx = 1; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportFillingContainers: "); #endif int row = 0; if (readingType == LiveDataSource::ReadingType::TillEnd || (readingType == LiveDataSource::ReadingType::ContinuousFixed)) { if (headerEnabled) { if (!m_prepared) { row = 1; bytesread += newData.at(0).size(); } } } if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) { if (readingType == LiveDataSource::ReadingType::WholeFile) { if (headerEnabled) { row = 1; bytesread += newData.at(0).size(); } } } for (; row < linesToRead; ++row) { DEBUG("\n Reading row " << row + 1 << " of " << linesToRead); QString line; if (readingType == LiveDataSource::ReadingType::FromEnd) line = newData.at(newDataIdx++); else line = newData.at(row); //when we read the whole file we don't care about the previous position //so we don't have to count those bytes if (readingType != LiveDataSource::ReadingType::WholeFile) { if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) { bytesread += line.size(); } } DEBUG(" Line bytes: " << line.size() << " line: " << line.toStdString()); if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QLocale locale(numberFormat); QStringList lineStringList; // only FileOrPipe support multiple columns if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); else lineStringList << line; QDEBUG(" line = " << lineStringList << ", separator = \'" << m_separator << "\'"); if (createIndexEnabled) { if (spreadsheet->keepNValues() == 0) lineStringList.prepend(QString::number(currentRow + 1)); else lineStringList.prepend(QString::number(indexColumnIdx++)); } QDEBUG(" column modes = " << columnModes); for (int n = 0; n < m_actualCols; ++n) { DEBUG(" actual col = " << n); if (n < lineStringList.size()) { QString valueString = lineStringList.at(n); DEBUG(" value string = " << valueString.toStdString()); // set value depending on data type switch (columnModes[n]) { case AbstractColumn::Numeric: { DEBUG(" Numeric"); bool isNumber; const double value = locale.toDouble(valueString, &isNumber); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : nanValue); qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::Integer: { DEBUG(" Integer"); bool isNumber; const int value = locale.toInt(valueString, &isNumber); DEBUG(" container size = " << m_dataContainer.size() << ", current row = " << currentRow); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : 0); qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueString; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } } else { DEBUG(" missing columns in this line"); switch (columnModes[n]) { case AbstractColumn::Numeric: static_cast*>(m_dataContainer[n])->operator[](currentRow) = nanValue; break; case AbstractColumn::Integer: static_cast*>(m_dataContainer[n])->operator[](currentRow) = 0; break; case AbstractColumn::DateTime: static_cast*>(m_dataContainer[n])->operator[](currentRow) = QDateTime(); break; case AbstractColumn::Text: static_cast*>(m_dataContainer[n])->operator[](currentRow) = ""; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } } } currentRow++; } } if (m_prepared) { //notify all affected columns and plots about the changes PERFTRACE("AsciiLiveDataImport, notify affected columns and plots"); const Project* project = spreadsheet->project(); QVector curves = project->children(AbstractAspect::Recursive); QVector plots; for (int n = 0; n < m_actualCols; ++n) { Column* column = spreadsheet->column(n); //determine the plots where the column is consumed for (const auto* curve : curves) { if (curve->xColumn() == column || curve->yColumn() == column) { auto* plot = dynamic_cast(curve->parentAspect()); if (plots.indexOf(plot) == -1) { plots << plot; plot->setSuppressDataChangedSignal(true); } } } column->setChanged(); } //loop over all affected plots and retransform them for (auto* plot : plots) { plot->setSuppressDataChangedSignal(false); plot->dataChanged(); } } m_prepared = true; DEBUG("AsciiFilterPrivate::readFromLiveDevice() DONE"); return bytesread; } /*! reads the content of device \c device to the data source \c dataSource. Uses the settings defined in the data source. */ void AsciiFilterPrivate::readDataFromDevice(QIODevice& device, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode, int lines) { DEBUG("AsciiFilterPrivate::readDataFromDevice(): dataSource = " << dataSource << ", mode = " << ENUM_TO_STRING(AbstractFileFilter, ImportMode, importMode) << ", lines = " << lines); if (!m_prepared) { const int deviceError = prepareDeviceToRead(device); if (deviceError != 0) { DEBUG("Device error = " << deviceError); return; } // matrix data has only one column mode if (dynamic_cast(dataSource)) { auto mode = columnModes[0]; //TODO: remove this when Matrix supports text type if (mode == AbstractColumn::Text) mode = AbstractColumn::Numeric; for (auto& c : columnModes) if (c != mode) c = mode; } m_columnOffset = dataSource->prepareImport(m_dataContainer, importMode, m_actualRows, m_actualCols, vectorNames, columnModes); m_prepared = true; } DEBUG("locale = " << QLocale::languageToString(numberFormat).toStdString()); QLocale locale(numberFormat); // Read the data int currentRow = 0; // indexes the position in the vector(column) if (lines == -1) lines = m_actualRows; //skip data lines, if required DEBUG(" Skipping " << m_actualStartRow - 1 << " lines"); for (int i = 0; i < m_actualStartRow - 1; ++i) device.readLine(); DEBUG(" Reading " << qMin(lines, m_actualRows) << " lines, " << m_actualCols << " columns"); if (qMin(lines, m_actualRows) == 0 || m_actualCols == 0) return; for (int i = 0; i < qMin(lines, m_actualRows); ++i) { QString line = device.readLine(); line.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QStringList lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); // remove left white spaces if (skipEmptyParts) { for (int n = 0; n < lineStringList.size(); ++n) { QString valueString = lineStringList.at(n); if (!QString::compare(valueString, " ")) { lineStringList.removeAt(n); n--; } } } for (int n = 0; n < m_actualCols; ++n) { // index column if required if (n == 0 && createIndexEnabled) { static_cast*>(m_dataContainer[0])->operator[](currentRow) = i + 1; continue; } //column counting starts with 1, substract 1 as well as another 1 for the index column if required int col = createIndexEnabled ? n + startColumn - 2: n + startColumn - 1; if (col < lineStringList.size()) { QString valueString = lineStringList.at(col); // set value depending on data type switch (columnModes[n]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : nanValue); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: { if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); QVector* colData = static_cast*>(m_dataContainer[n]); colData->operator[](currentRow) = valueString; break; } case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } else { // missing columns in this line switch (columnModes[n]) { case AbstractColumn::Numeric: static_cast*>(m_dataContainer[n])->operator[](currentRow) = nanValue; break; case AbstractColumn::Integer: static_cast*>(m_dataContainer[n])->operator[](currentRow) = 0; break; case AbstractColumn::DateTime: static_cast*>(m_dataContainer[n])->operator[](currentRow) = QDateTime(); break; case AbstractColumn::Text: static_cast*>(m_dataContainer[n])->operator[](currentRow) = ""; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } } currentRow++; emit q->completed(100 * currentRow/m_actualRows); } DEBUG(" Read " << currentRow << " lines"); dataSource->finalizeImport(m_columnOffset, startColumn, startColumn + m_actualCols - 1, currentRow, dateTimeFormat, importMode); } /*! * preview for special devices (local/UDP/TCP socket or serial port) */ QVector AsciiFilterPrivate::preview(QIODevice &device) { DEBUG("AsciiFilterPrivate::preview(): bytesAvailable = " << device.bytesAvailable() << ", isSequential = " << device.isSequential()); QVector dataStrings; if (!(device.bytesAvailable() > 0)) { DEBUG("No new data available"); return dataStrings; } if (device.isSequential() && device.bytesAvailable() < (int)sizeof(quint16)) return dataStrings; #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportTotal: "); #endif int linesToRead = 0; QVector newData; //TODO: serial port "read(nBytes)"? while (!device.atEnd()) { if (device.canReadLine()) newData.push_back(device.readLine()); else // UDP fails otherwise newData.push_back(device.readAll()); linesToRead++; } QDEBUG(" data = " << newData); if (linesToRead == 0) return dataStrings; int col = 0; int colMax = newData.at(0).size(); if (createIndexEnabled) colMax++; columnModes.resize(colMax); if (createIndexEnabled) { columnModes[0] = AbstractColumn::ColumnMode::Integer; col = 1; vectorNames.prepend(i18n("Index")); } vectorNames.append(i18n("Value")); QDEBUG(" vector names = " << vectorNames); for (const auto& valueString : newData.at(0).split(' ', QString::SkipEmptyParts)) { if (col == colMax) break; columnModes[col++] = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); } for (int i = 0; i < linesToRead; ++i) { QString line = newData.at(i); if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QLocale locale(numberFormat); QStringList lineStringList = line.split(' ', QString::SkipEmptyParts); if (createIndexEnabled) lineStringList.prepend(QString::number(i + 1)); QStringList lineString; for (int n = 0; n < lineStringList.size(); ++n) { if (n < lineStringList.size()) { QString valueString = lineStringList.at(n); switch (columnModes[n]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); lineString += QString::number(isNumber ? value : nanValue, 'g', 16); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); lineString += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); lineString += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); lineString += valueString; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } else // missing columns in this line lineString += QLatin1String(""); } dataStrings << lineString; } return dataStrings; } /*! * generates the preview for the file \c fileName reading the provided number of \c lines. */ QVector AsciiFilterPrivate::preview(const QString& fileName, int lines) { QVector dataStrings; //dirty hack: set readingFile and readingFileName in order to know in lineNumber(QIODevice) //that we're reading from a file and to benefit from much faster wc on linux //TODO: redesign the APIs and remove this later readingFile = true; readingFileName = fileName; KFilterDev device(fileName); const int deviceError = prepareDeviceToRead(device); readingFile = false; if (deviceError != 0) { DEBUG("Device error = " << deviceError); return dataStrings; } //number formatting DEBUG("locale = " << QLocale::languageToString(numberFormat).toStdString()); QLocale locale(numberFormat); // Read the data if (lines == -1) lines = m_actualRows; // set column names for preview if (!headerEnabled) { int start = 0; if (createIndexEnabled) start = 1; for (int i = start; i < m_actualCols; i++) vectorNames << "Column " + QString::number(i + 1); } QDEBUG(" column names = " << vectorNames); //skip data lines, if required DEBUG(" Skipping " << m_actualStartRow - 1 << " lines"); for (int i = 0; i < m_actualStartRow - 1; ++i) device.readLine(); DEBUG(" Generating preview for " << qMin(lines, m_actualRows) << " lines"); for (int i = 0; i < qMin(lines, m_actualRows); ++i) { QString line = device.readLine(); line.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; const QStringList& lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); QDEBUG(" line = " << lineStringList); QStringList lineString; for (int n = 0; n < m_actualCols; ++n) { // index column if required if (n == 0 && createIndexEnabled) { lineString += QString::number(i + 1); continue; } //column counting starts with 1, substract 1 as well as another 1 for the index column if required int col = createIndexEnabled ? n + startColumn - 2: n + startColumn - 1; if (col < lineStringList.size()) { QString valueString = lineStringList.at(col); //DEBUG(" valueString = " << valueString.toStdString()); if (skipEmptyParts && !QString::compare(valueString, " ")) // handle left white spaces continue; // set value depending on data type switch (columnModes[n]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); lineString += QString::number(isNumber ? value : nanValue, 'g', 15); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); lineString += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); lineString += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); lineString += valueString; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } else // missing columns in this line lineString += QLatin1String(""); } dataStrings << lineString; } return dataStrings; } /*! writes the content of \c dataSource to the file \c fileName. */ void AsciiFilterPrivate::write(const QString & fileName, AbstractDataSource* dataSource) { Q_UNUSED(fileName); Q_UNUSED(dataSource); //TODO: save data to ascii file } //############################################################################## //################## Serialization/Deserialization ########################### //############################################################################## /*! Saves as XML. */ void AsciiFilter::save(QXmlStreamWriter* writer) const { writer->writeStartElement( "asciiFilter"); writer->writeAttribute( "commentCharacter", d->commentCharacter); writer->writeAttribute( "separatingCharacter", d->separatingCharacter); writer->writeAttribute( "autoMode", QString::number(d->autoModeEnabled)); writer->writeAttribute( "createIndex", QString::number(d->createIndexEnabled)); writer->writeAttribute( "header", QString::number(d->headerEnabled)); writer->writeAttribute( "vectorNames", d->vectorNames.join(' ')); writer->writeAttribute( "skipEmptyParts", QString::number(d->skipEmptyParts)); writer->writeAttribute( "simplifyWhitespaces", QString::number(d->simplifyWhitespacesEnabled)); writer->writeAttribute( "nanValue", QString::number(d->nanValue)); writer->writeAttribute( "removeQuotes", QString::number(d->removeQuotesEnabled)); writer->writeAttribute( "startRow", QString::number(d->startRow)); writer->writeAttribute( "endRow", QString::number(d->endRow)); writer->writeAttribute( "startColumn", QString::number(d->startColumn)); writer->writeAttribute( "endColumn", QString::number(d->endColumn)); writer->writeEndElement(); } /*! Loads from XML. */ bool AsciiFilter::load(XmlStreamReader* reader) { KLocalizedString attributeWarning = ki18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs = reader->attributes(); QString str; READ_STRING_VALUE("commentCharacter", commentCharacter); READ_STRING_VALUE("separatingCharacter", separatingCharacter); READ_INT_VALUE("createIndex", createIndexEnabled, bool); READ_INT_VALUE("autoMode", autoModeEnabled, bool); READ_INT_VALUE("header", headerEnabled, bool); str = attribs.value("vectorNames").toString(); d->vectorNames = str.split(' '); //may be empty READ_INT_VALUE("simplifyWhitespaces", simplifyWhitespacesEnabled, bool); READ_DOUBLE_VALUE("nanValue", nanValue); READ_INT_VALUE("removeQuotes", removeQuotesEnabled, bool); READ_INT_VALUE("skipEmptyParts", skipEmptyParts, bool); READ_INT_VALUE("startRow", startRow, int); READ_INT_VALUE("endRow", endRow, int); READ_INT_VALUE("startColumn", startColumn, int); READ_INT_VALUE("endColumn", endColumn, int); return true; } int AsciiFilterPrivate::isPrepared() { - return m_prepared; + return m_prepared; } #ifdef HAVE_MQTT /*! * \brief Offers a preview about the data received by the topic * \param list * \param message * \param topic */ void AsciiFilterPrivate::MQTTPreview(QVector& list, const QString& message, const QString& topic) { QVector dataStrings; if (!message.isEmpty()) { qDebug()<<"ascii mqtt preview for: " << topic; //check how many lines can be read int linesToRead = 0; QVector newData; QStringList newDataList = message.split(QRegExp("\n|\r\n|\r"), QString::SkipEmptyParts); for (const auto& valueString : newDataList) { const QStringList splitString = valueString.split(' ', QString::SkipEmptyParts); for (const auto& valueString2 : splitString) { if (!valueString2.isEmpty() && !valueString2.startsWith(commentCharacter) ) { linesToRead++; newData.push_back(valueString2); } } } qDebug() <<" data investigated, lines to read: "< list.size()) { linesToRead = list.size(); } } //Determine the number of columns int colSize = 0; if (list.isEmpty()) { if (createIndexEnabled) colSize = 2; else colSize = 1; } else colSize = columnModes.size() + 1; columnModes.resize(colSize); //if this is the first topic, we check if index has to be added if (list.isEmpty()) if (createIndexEnabled) { columnModes[0] = AbstractColumn::ColumnMode::Integer; vectorNames.prepend("index"); } vectorNames.append( topic); //Set the column mode for the topic columnModes[colSize - 1] = AbstractFileFilter::columnMode(newData[0], dateTimeFormat, numberFormat); //improve the column mode, based on the rest of the data for (int i = 0; i < linesToRead; i++) { QString tempLine = newData[i]; if (simplifyWhitespacesEnabled) tempLine = tempLine.simplified(); AbstractColumn::ColumnMode mode = AbstractFileFilter::columnMode(tempLine, dateTimeFormat, numberFormat); // numeric: integer -> numeric if (mode == AbstractColumn::Numeric && columnModes[colSize - 1] == AbstractColumn::Integer) columnModes[colSize - 1] = mode; // text: non text -> text if ( (mode == AbstractColumn::Text) && (columnModes[colSize - 1] != AbstractColumn::Text) ) columnModes[colSize - 1] = mode; } int forStart = 0; int forEnd = 0; if (list.isEmpty()) { forStart = 0; forEnd = linesToRead; } else { if (mqttPreviewFirstEmptyColCount == 0) { dataStrings = list; forStart = 0; forEnd = dataStrings.size(); } else { forStart = 1; forEnd = linesToRead; dataStrings = list; const QLocale locale(numberFormat); //this is the first column having values, after at least 1 empty column //until now only one row was filled with NaN values, so for starters we fill this row with the new value switch (columnModes[colSize - 1]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(newData[0], &isNumber); dataStrings[0] += QString::number(isNumber ? value : nanValue, 'g', 16); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(newData[0], &isNumber); dataStrings[0] += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(newData[0], dateTimeFormat); dataStrings[0] += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) newData[0].remove(QRegExp("[\"\']")); dataStrings[0] += newData[0]; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } } //We add every forEnd-forStart values to the list for (int i = forStart; i < forEnd; ++i) { QString line = newData[i]; if (simplifyWhitespacesEnabled) line = line.simplified(); //skip empty or commented lines if (line.isEmpty() || line.startsWith(commentCharacter)) continue; QLocale locale(numberFormat); QStringList lineString; //We don't have to do any preparation if there were already data containing topics if (!list.isEmpty() && mqttPreviewFirstEmptyColCount == 0) lineString = dataStrings[i]; //Add index if it is the case if (list.isEmpty() || mqttPreviewFirstEmptyColCount > 0) if (createIndexEnabled) lineString += QString::number(i); //If this is the first not empty topic, add nan value to all the empty topics before this one if (!list.isEmpty() && mqttPreviewFirstEmptyColCount > 0) { for (int j = 0; j < mqttPreviewFirstEmptyColCount; j++) lineString += QString::number(nanValue, 'g', 16); } //Add the actual value to the topic switch (columnModes[colSize - 1]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(line, &isNumber); lineString += QString::number(isNumber ? value : nanValue, 'g', 16); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(line, &isNumber); lineString += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(line, dateTimeFormat); lineString += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) line.remove(QRegExp("[\"\']")); lineString += line; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } qDebug()<<"column updated with value"; // if the list was empty, or this is the first not empty topic, we append the new line if (list.isEmpty() || mqttPreviewFirstEmptyColCount > 0) dataStrings << lineString; //Otherwise we update the already existing line if (!list.isEmpty() && mqttPreviewFirstEmptyColCount == 0) dataStrings[i] = lineString; } //The first empty topics were taken care of if (mqttPreviewFirstEmptyColCount > 0) { mqttPreviewFirstEmptyColCount = 0; } } //the message is empty and there were only empty messages before this else if (list.isEmpty() || mqttPreviewFirstEmptyColCount > 0) { //increment the counter mqttPreviewFirstEmptyColCount ++; //determine number of columns int colSize; if (mqttPreviewFirstEmptyColCount == 1) { if (createIndexEnabled) colSize = 2; else colSize = 1; } else colSize = columnModes.size() + 1; columnModes.resize(colSize); //add index if needed if (mqttPreviewFirstEmptyColCount == 1) if (createIndexEnabled) { columnModes[0] = AbstractColumn::ColumnMode::Integer; vectorNames.prepend("index"); } //Add new column fot the mepty topic vectorNames.append( topic); columnModes[colSize-1] = AbstractColumn::ColumnMode::Numeric; //Add a NaN value to the topic's column QStringList lineString; if (mqttPreviewFirstEmptyColCount == 1) { //Add index if needed if (createIndexEnabled) lineString += QString::number(0); lineString += QString::number(nanValue, 'g', 16); //Append since this is the first empty column dataStrings << lineString; } else { //Update the already existing line dataStrings = list; dataStrings[0] += QString::number(nanValue, 'g', 16); } } //The message is empty but there already was a non empty message else if (!list.isEmpty()) { //Append vector name vectorNames.append( topic); int colSize = columnModes.size() + 1; columnModes.resize(colSize); columnModes[colSize-1] = AbstractColumn::ColumnMode::Numeric; dataStrings = list; //Add as many NaN values as many lines the list already has for (int i = 0; i < dataStrings.size(); ++i) dataStrings[i] += QString::number(nanValue, 'g', 16); } //update the list list = dataStrings; } /*! * \brief Returns the statistical data that is needed by the topic for its MQTTClient's will message * \param topic */ -QString AsciiFilterPrivate::MQTTColumnStatistics(const MQTTTopic* topic) const{ +QString AsciiFilterPrivate::MQTTColumnStatistics(const MQTTTopic* topic) const { Column* const tempColumn = topic->child(m_actualCols - 1); QString statistics; QVector willStatistics = topic->mqttClient()->willStatistics(); //Add every statistical data to the string, the flag of which is set true for (int i = 0; i <= willStatistics.size(); i++) { if (willStatistics[i]) { switch (static_cast(i) ) { case MQTTClient::WillStatisticsType::ArithmeticMean: statistics += QLatin1String("Arithmetic mean: ") + QString::number(tempColumn->statistics().arithmeticMean) + "\n"; break; case MQTTClient::WillStatisticsType::ContraharmonicMean: statistics += QLatin1String("Contraharmonic mean: ") + QString::number(tempColumn->statistics().contraharmonicMean) + "\n"; break; case MQTTClient::WillStatisticsType::Entropy: statistics += QLatin1String("Entropy: ") + QString::number(tempColumn->statistics().entropy) + "\n"; break; case MQTTClient::WillStatisticsType::GeometricMean: statistics += QLatin1String("Geometric mean: ") + QString::number(tempColumn->statistics().geometricMean) + "\n"; break; case MQTTClient::WillStatisticsType::HarmonicMean: statistics += QLatin1String("Harmonic mean: ") + QString::number(tempColumn->statistics().harmonicMean) + "\n"; break; case MQTTClient::WillStatisticsType::Kurtosis: statistics += QLatin1String("Kurtosis: ") + QString::number(tempColumn->statistics().kurtosis) + "\n"; break; case MQTTClient::WillStatisticsType::Maximum: statistics += QLatin1String("Maximum: ") + QString::number(tempColumn->statistics().maximum) + "\n"; break; case MQTTClient::WillStatisticsType::MeanDeviation: statistics += QLatin1String("Mean deviation: ") + QString::number(tempColumn->statistics().meanDeviation) + "\n"; break; case MQTTClient::WillStatisticsType::MeanDeviationAroundMedian: statistics += QLatin1String("Mean deviation around median: ") + QString::number(tempColumn->statistics().meanDeviationAroundMedian) + "\n"; break; case MQTTClient::WillStatisticsType::Median: statistics += QLatin1String("Median: ") + QString::number(tempColumn->statistics().median) + "\n"; break; case MQTTClient::WillStatisticsType::MedianDeviation: statistics += QLatin1String("Median deviation: ") + QString::number(tempColumn->statistics().medianDeviation) + "\n"; break; case MQTTClient::WillStatisticsType::Minimum: statistics += QLatin1String("Minimum: ") + QString::number(tempColumn->statistics().minimum) + "\n"; break; case MQTTClient::WillStatisticsType::Skewness: statistics += QLatin1String("Skewness: ") + QString::number(tempColumn->statistics().skewness) + "\n"; break; case MQTTClient::WillStatisticsType::StandardDeviation: statistics += QLatin1String("Standard deviation: ") + QString::number(tempColumn->statistics().standardDeviation) + "\n"; break; case MQTTClient::WillStatisticsType::Variance: statistics += QLatin1String("Variance: ") + QString::number(tempColumn->statistics().variance) + "\n"; break; case MQTTClient::WillStatisticsType::NoStatistics: default: break; } } } return statistics; } -AbstractColumn::ColumnMode AsciiFilterPrivate::MQTTColumnMode() const{ +AbstractColumn::ColumnMode AsciiFilterPrivate::MQTTColumnMode() const { return columnModes[m_actualCols - 1]; } /*! * \brief reads the content of a message received by the topic. * Uses the settings defined in the MQTTTopic's MQTTClient * \param message * \param topic * \param dataSource */ void AsciiFilterPrivate::readMQTTTopic(const QString& message, const QString& topic, AbstractDataSource*dataSource) { //If the message is empty, there is nothing to do if (message.isEmpty()) { DEBUG("No new data available"); return; } MQTTTopic* spreadsheet = dynamic_cast(dataSource); - const int keepNValues = spreadsheet->mqttClient()->keepNValues(); + const int keepNValues = spreadsheet->mqttClient()->keepNValues(); if (!m_prepared) { qDebug()<<"Start preparing filter for: " << spreadsheet->topicName(); //Prepare the filter const int mqttPrepareError = prepareMQTTTopicToRead(message, topic); if (mqttPrepareError != 0) { DEBUG("Mqtt Prepare Error = " << mqttPrepareError); qDebug()<setUndoAware(false); spreadsheet->resize(AbstractFileFilter::Replace, vectorNames, m_actualCols); qDebug() << "fds resized to col: " << m_actualCols; qDebug() << "fds rowCount: " << spreadsheet->rowCount(); //columns in a MQTTTopic don't have any manual changes. //make the available columns undo unaware and suppress the "data changed" signal. //data changes will be propagated via an explicit Column::setChanged() call once new data was read. for (int i = 0; i < spreadsheet->childCount(); i++) { spreadsheet->child(i)->setUndoAware(false); spreadsheet->child(i)->setSuppressDataChangedSignal(true); } if (keepNValues == 0) spreadsheet->setRowCount(m_actualRows > 1 ? m_actualRows : 1); else { - spreadsheet->setRowCount(spreadsheet->mqttClient()->keepNValues()); - m_actualRows = spreadsheet->mqttClient()->keepNValues(); + spreadsheet->setRowCount(spreadsheet->mqttClient()->keepNValues()); + m_actualRows = spreadsheet->mqttClient()->keepNValues(); } m_dataContainer.resize(m_actualCols); for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) spreadsheet->child(n)->setColumnMode(columnModes[n]); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } - //TODO + //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportTotal: "); #endif MQTTClient::ReadingType readingType; if (!m_prepared) { //if filter is not prepared we read till the end readingType = MQTTClient::ReadingType::TillEnd; } else { //we have to read all the data when reading from end //so we set readingType to TillEnd - if (static_cast (spreadsheet->mqttClient()->readingType()) == MQTTClient::ReadingType::FromEnd) + if (static_cast (spreadsheet->mqttClient()->readingType()) == MQTTClient::ReadingType::FromEnd) readingType = MQTTClient::ReadingType::TillEnd; else - readingType = spreadsheet->mqttClient()->readingType(); + readingType = spreadsheet->mqttClient()->readingType(); } //count the new lines, increase actualrows on each //now we read all the new lines, if we want to use sample rate //then here we can do it, if we have actually sample rate number of lines :-? int newLinesForSampleSizeNotTillEnd = 0; int newLinesTillEnd = 0; QVector newData; if (readingType != MQTTClient::ReadingType::TillEnd) { - newData.reserve(spreadsheet->mqttClient()->sampleSize()); - newData.resize(spreadsheet->mqttClient()->sampleSize()); + newData.reserve(spreadsheet->mqttClient()->sampleSize()); + newData.resize(spreadsheet->mqttClient()->sampleSize()); } int newDataIdx = 0; bool sampleSizeReached = false; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportReadingFromFile: "); #endif QStringList newDataList = message.split(QRegExp("\n|\r\n|\r"), QString::SkipEmptyParts); for (auto& valueString : newDataList) { if (!valueString.startsWith(commentCharacter)) { const QStringList splitString = valueString.split(m_separator, static_cast(skipEmptyParts)); for (const auto& valueString2 : splitString) { if (!valueString2.isEmpty() && !valueString2.startsWith(commentCharacter)) { if (readingType != MQTTClient::ReadingType::TillEnd) newData[newDataIdx++] = valueString2; else newData.push_back(valueString2); newLinesTillEnd++; if (readingType != MQTTClient::ReadingType::TillEnd) { newLinesForSampleSizeNotTillEnd++; //for Continuous reading and FromEnd we read sample rate number of lines if possible - if (newLinesForSampleSizeNotTillEnd == spreadsheet->mqttClient()->sampleSize()) { + if (newLinesForSampleSizeNotTillEnd == spreadsheet->mqttClient()->sampleSize()) { sampleSizeReached = true; break; } } } } //if the sample size limit is reached we brake from the outer loop as well if (sampleSizeReached) break; } } } qDebug()<<"Processing message done"; //now we reset the readingType - if (spreadsheet->mqttClient()->readingType() == MQTTClient::ReadingType::FromEnd) - readingType = static_cast(spreadsheet->mqttClient()->readingType()); + if (spreadsheet->mqttClient()->readingType() == MQTTClient::ReadingType::FromEnd) + readingType = static_cast(spreadsheet->mqttClient()->readingType()); //we had less new lines than the sample rate specified if (readingType != MQTTClient::ReadingType::TillEnd) qDebug() << "Removed empty lines: " << newData.removeAll(""); qDebug()<<"Create index enabled: "<rowCount(); if (m_prepared ) { if (keepNValues == 0) m_actualRows = spreadsheetRowCountBeforeResize; else { //if the keepNValues changed since the last read we have to manage the columns accordingly - if (m_actualRows != spreadsheet->mqttClient()->keepNValues()) { - if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { - spreadsheet->setRowCount(spreadsheet->mqttClient()->keepNValues()); - qDebug()<<"rowcount set to: " << spreadsheet->mqttClient()->keepNValues(); + if (m_actualRows != spreadsheet->mqttClient()->keepNValues()) { + if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { + spreadsheet->setRowCount(spreadsheet->mqttClient()->keepNValues()); + qDebug()<<"rowcount set to: " << spreadsheet->mqttClient()->keepNValues(); } //Calculate the difference between the old and new keepNValues int rowDiff = 0; - if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) - rowDiff = m_actualRows - spreadsheet->mqttClient()->keepNValues(); + if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) + rowDiff = m_actualRows - spreadsheet->mqttClient()->keepNValues(); - if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) - rowDiff = spreadsheet->mqttClient()->keepNValues() - m_actualRows; + if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) + rowDiff = spreadsheet->mqttClient()->keepNValues() - m_actualRows; for (int n = 0; n < columnModes.size(); ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); //if the keepNValues got smaller then we move the last keepNValues count of data //in the first keepNValues places - if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { - for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { + if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { + for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = - static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); + static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); } } //if the keepNValues got bigger we move the existing values to the last m_actualRows positions //then fill the remaining lines with NaN - if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { - vector->reserve( spreadsheet->mqttClient()->keepNValues()); - vector->resize( spreadsheet->mqttClient()->keepNValues()); + if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { + vector->reserve( spreadsheet->mqttClient()->keepNValues()); + vector->resize( spreadsheet->mqttClient()->keepNValues()); for (int i = 1; i <= m_actualRows; i++) { - static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = - static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); + static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = + static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); } for (int i = 0; i < rowDiff; i++) static_cast*>(m_dataContainer[n])->operator[](i) = nanValue; } break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); //if the keepNValues got smaller then we move the last keepNValues count of data //in the first keepNValues places - if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { - for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { + if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { + for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = - static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); + static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); } } //if the keepNValues got bigger we move the existing values to the last m_actualRows positions //then fill the remaining lines with 0 - if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { - vector->reserve( spreadsheet->mqttClient()->keepNValues()); - vector->resize( spreadsheet->mqttClient()->keepNValues()); + if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { + vector->reserve( spreadsheet->mqttClient()->keepNValues()); + vector->resize( spreadsheet->mqttClient()->keepNValues()); for (int i = 1; i <= m_actualRows; i++) { - static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = - static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); + static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = + static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); } for (int i = 0; i < rowDiff; i++) static_cast*>(m_dataContainer[n])->operator[](i) = 0; } break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); //if the keepNValues got smaller then we move the last keepNValues count of data //in the first keepNValues places - if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { - for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { + if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { + for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = - static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); + static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); } } //if the keepNValues got bigger we move the existing values to the last m_actualRows positions //then fill the remaining lines with "" - if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { - vector->reserve( spreadsheet->mqttClient()->keepNValues()); - vector->resize( spreadsheet->mqttClient()->keepNValues()); + if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { + vector->reserve( spreadsheet->mqttClient()->keepNValues()); + vector->resize( spreadsheet->mqttClient()->keepNValues()); for (int i = 1; i <= m_actualRows; i++) { - static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = - static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); + static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = + static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); } for (int i = 0; i < rowDiff; i++) static_cast*>(m_dataContainer[n])->operator[](i) = ""; } break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); //if the keepNValues got smaller then we move the last keepNValues count of data //in the first keepNValues places - if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { - for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { + if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) { + for (int i = 0; i < spreadsheet->mqttClient()->keepNValues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = - static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); + static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->mqttClient()->keepNValues() + i); } } //if the keepNValues got bigger we move the existing values to the last m_actualRows positions //then fill the remaining lines with null datetime - if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { - vector->reserve( spreadsheet->mqttClient()->keepNValues()); - vector->resize( spreadsheet->mqttClient()->keepNValues()); + if (m_actualRows < spreadsheet->mqttClient()->keepNValues()) { + vector->reserve( spreadsheet->mqttClient()->keepNValues()); + vector->resize( spreadsheet->mqttClient()->keepNValues()); for (int i = 1; i <= m_actualRows; i++) { - static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = - static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); + static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->mqttClient()->keepNValues() - i) = + static_cast*>(m_dataContainer[n])->operator[](spreadsheet->mqttClient()->keepNValues() - i - rowDiff); } for (int i = 0; i < rowDiff; i++) static_cast*>(m_dataContainer[n])->operator[](i) = QDateTime(); } break; } - //TODO + //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } //if the keepNValues got smaller resize the spreadsheet - if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) - spreadsheet->setRowCount(spreadsheet->mqttClient()->keepNValues()); + if (m_actualRows > spreadsheet->mqttClient()->keepNValues()) + spreadsheet->setRowCount(spreadsheet->mqttClient()->keepNValues()); //set the new row count - m_actualRows = spreadsheet->mqttClient()->keepNValues(); + m_actualRows = spreadsheet->mqttClient()->keepNValues(); qDebug()<<"actual rows: "<mqttClient()->sampleSize()); + m_actualRows += qMin(newData.size(), spreadsheet->mqttClient()->sampleSize()); else { m_actualRows += newData.size(); } } //fixed size if (keepNValues != 0) { if (readingType == MQTTClient::ReadingType::TillEnd) { //we had more lines than the fixed size, so we read m_actualRows number of lines if (newLinesTillEnd > m_actualRows) { linesToRead = m_actualRows; } else linesToRead = newLinesTillEnd; } else { //we read max sample size number of lines when the reading mode //is ContinuouslyFixed or FromEnd - if (spreadsheet->mqttClient()->sampleSize() <= spreadsheet->mqttClient()->keepNValues()) - linesToRead = qMin(spreadsheet->mqttClient()->sampleSize(), newLinesTillEnd); + if (spreadsheet->mqttClient()->sampleSize() <= spreadsheet->mqttClient()->keepNValues()) + linesToRead = qMin(spreadsheet->mqttClient()->sampleSize(), newLinesTillEnd); else - linesToRead = qMin(spreadsheet->mqttClient()->keepNValues(), newLinesTillEnd); + linesToRead = qMin(spreadsheet->mqttClient()->keepNValues(), newLinesTillEnd); } } else linesToRead = m_actualRows - spreadsheetRowCountBeforeResize; if (linesToRead == 0) return; } else { if (keepNValues != 0) linesToRead = newLinesTillEnd > m_actualRows ? m_actualRows : newLinesTillEnd; else linesToRead = newLinesTillEnd; } qDebug()<<"linestoread = " << linesToRead; //new rows/resize columns if we don't have a fixed size if (keepNValues == 0) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportResizing: "); #endif if (spreadsheet->rowCount() < m_actualRows) spreadsheet->setRowCount(m_actualRows); if (!m_prepared) currentRow = 0; else { // indexes the position in the vector(column) currentRow = spreadsheetRowCountBeforeResize; } // if we have fixed size, we do this only once in preparation, here we can use // m_prepared and we need something to decide whether it has a fixed size or increasing for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } - //TODO + //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } else { //when we have a fixed size we have to pop sampleSize number of lines if specified //here popping, setting currentRow if (!m_prepared) currentRow = m_actualRows - qMin(newLinesTillEnd, m_actualRows); else { if (readingType == MQTTClient::ReadingType::TillEnd) { if (newLinesTillEnd > m_actualRows) currentRow = 0; else currentRow = m_actualRows - newLinesTillEnd; } else { //we read max sample rate number of lines when the reading mode //is ContinuouslyFixed or FromEnd currentRow = m_actualRows - linesToRead; } } if (m_prepared) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportPopping: "); #endif for (int row = 0; row < linesToRead; ++row) { for (int col = 0; col < m_actualCols; ++col) { switch (columnModes[col]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } - //TODO + //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } } } // from the last row we read the new data in the spreadsheet qDebug() << "reading from line: " << currentRow << " lines till end: " << newLinesTillEnd; qDebug() << "Lines to read: " << linesToRead <<" actual rows: " << m_actualRows; newDataIdx = 0; //From end means that we read the last sample size amount of data if (readingType == MQTTClient::ReadingType::FromEnd) { if (m_prepared) { - if (newData.size() > spreadsheet->mqttClient()->sampleSize()) - newDataIdx = newData.size() - spreadsheet->mqttClient()->sampleSize(); + if (newData.size() > spreadsheet->mqttClient()->sampleSize()) + newDataIdx = newData.size() - spreadsheet->mqttClient()->sampleSize(); } } qDebug() << "newDataIdx: " << newDataIdx; static int indexColumnIdx = 0; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportFillingContainers: "); #endif int row = 0; for (; row < linesToRead; ++row) { QString line; if (readingType == MQTTClient::ReadingType::FromEnd) line = newData.at(newDataIdx++); else line = newData.at(row); if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) { qDebug()<<"found empty line "<*>(m_dataContainer[0])->operator[](currentRow) = (isNumber ? value : nanValue); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(tempIndex, &isNumber); static_cast*>(m_dataContainer[0])->operator[](currentRow) = (isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(tempIndex, dateTimeFormat); static_cast*>(m_dataContainer[0])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) tempIndex.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[0])->operator[](currentRow) = tempIndex; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } } //setting timestamp on current time static_cast*>(m_dataContainer[col])->operator[](currentRow) = QDateTime::currentDateTime(); QString valueString = line; qDebug() << "putting in " << valueString << " in line: " << currentRow; // set value depending on data type switch (columnModes[m_actualCols - 1]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = (isNumber ? value : nanValue); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = (isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = valueString; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } currentRow++; } } if (m_prepared) { qDebug()<<"notifying plots"; //notify all affected columns and plots about the changes PERFTRACE("AsciiLiveDataImport, notify affected columns and plots"); const Project* project = spreadsheet->project(); QVector curves = project->children(AbstractAspect::Recursive); QVector plots; qDebug()<<"Uploading plots"; for (int n = 0; n < m_actualCols; ++n) { Column* column = spreadsheet->column(n); //determine the plots where the column is consumed for (const auto* curve : curves) { if (curve->xColumn() == column || curve->yColumn() == column) { CartesianPlot* plot = dynamic_cast(curve->parentAspect()); if (plots.indexOf(plot) == -1) { plots << plot; plot->setSuppressDataChangedSignal(true); } } } column->setChanged(); } //loop over all affected plots and retransform them for (auto* const plot : plots) { //TODO setting this back to true triggers again a lot of retransforms in the plot (one for each curve). // plot->setSuppressDataChangedSignal(false); plot->dataChanged(); } } //Set prepared true if needed if (!m_prepared) m_prepared = true; } /*! * \brief Prepares the filter to read from messages received by the MQTTTopic. * Returns whether the preparation was successful or not * \param message * \param topic */ int AsciiFilterPrivate::prepareMQTTTopicToRead(const QString& message, const QString& topic) { Q_UNUSED(topic) vectorNames.append("value"); if (endColumn == -1) endColumn = 1; else endColumn++; vectorNames.prepend("timestamp"); endColumn++; if (createIndexEnabled) { vectorNames.prepend("index"); endColumn++; } m_actualCols = endColumn - startColumn + 1; qDebug()<<"actual cols: "<(skipEmptyParts)); if (!firstLineStringList.isEmpty()) { int length1 = firstLineStringList.at(0).length(); if (firstLineStringList.size() > 1) { int pos2 = firstLine.indexOf(firstLineStringList.at(1), length1); m_separator = firstLine.mid(length1, pos2 - length1); } else { //old: separator = line.right(line.length() - length1); m_separator = ' '; } } } else { // use given separator // replace symbolic "TAB" with '\t' m_separator = separatingCharacter.replace(QLatin1String("2xTAB"), "\t\t", Qt::CaseInsensitive); m_separator = separatingCharacter.replace(QLatin1String("TAB"), "\t", Qt::CaseInsensitive); // replace symbolic "SPACE" with ' ' m_separator = m_separator.replace(QLatin1String("2xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("3xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("4xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("SPACE"), QLatin1String(" "), Qt::CaseInsensitive); firstLineStringList = firstLine.split(m_separator, (QString::SplitBehavior)skipEmptyParts); } DEBUG("separator: \'" << m_separator.toStdString() << '\''); DEBUG("number of columns: " << firstLineStringList.size()); QDEBUG("first line: " << firstLineStringList); DEBUG("headerEnabled = " << headerEnabled); // parse first data line to determine data type for each column columnModes.resize(m_actualCols); int col = 0; if (createIndexEnabled) { columnModes[0] = AbstractColumn::Integer; col = 1; } //set column mode for timestamp columnModes[col] = AbstractColumn::DateTime; col++; auto firstValue = firstLineStringList.takeFirst();//use first value to identify column mode while (firstValue.isEmpty() || firstValue.startsWith(commentCharacter) )//get the first usable value firstValue = firstLineStringList.takeFirst(); if (simplifyWhitespacesEnabled) firstValue = firstValue.simplified(); columnModes[m_actualCols-1] = AbstractFileFilter::columnMode(firstValue, dateTimeFormat, numberFormat); //Improve the column mode based on the other values from the first line for (auto& valueString : firstLineStringList) { if (!valueString.isEmpty() && !valueString.startsWith(commentCharacter) ) { if (createIndexEnabled) col = 1; else col = 0; if (simplifyWhitespacesEnabled) valueString = valueString.simplified(); AbstractColumn::ColumnMode mode = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); // numeric: integer -> numeric if (mode == AbstractColumn::Numeric && columnModes[m_actualCols-1] == AbstractColumn::Integer) columnModes[m_actualCols-1] = mode; // text: non text -> text if (mode == AbstractColumn::Text && columnModes[m_actualCols-1] != AbstractColumn::Text) columnModes[m_actualCols-1] = mode; } } //Improve the column mode based on the remaining values for (const auto& valueString : lineList) { QStringList splitString = valueString.split(m_separator, (QString::SplitBehavior)skipEmptyParts); for (auto& valueString2 : splitString) { if (!valueString2.isEmpty() && !valueString2.startsWith(commentCharacter)) { if (simplifyWhitespacesEnabled) valueString2 = valueString2.simplified(); AbstractColumn::ColumnMode mode = AbstractFileFilter::columnMode(valueString2, dateTimeFormat, numberFormat); // numeric: integer -> numeric if (mode == AbstractColumn::Numeric && columnModes[m_actualCols-1] == AbstractColumn::Integer) columnModes[m_actualCols-1] = mode; // text: non text -> text if (mode == AbstractColumn::Text && columnModes[m_actualCols-1] != AbstractColumn::Text) columnModes[m_actualCols-1] = mode; } } } //determine rowcount int tempRowCount = 0; QStringList newDataList = message.split(QRegExp("\n|\r\n|\r"), QString::SkipEmptyParts); for (const auto& valueString : newDataList) { if (!valueString.startsWith(commentCharacter)) { QStringList splitString = valueString.split(m_separator, static_cast(skipEmptyParts)); for (const auto& valueString2 : splitString) if (!valueString2.isEmpty() && !valueString2.startsWith(commentCharacter)) tempRowCount ++; } } QDEBUG("column modes = " << columnModes); m_actualRows = tempRowCount; ///////////////////////////////////////////////////////////////// int actualEndRow = endRow; DEBUG("endRow = " << endRow); if (endRow == -1 || endRow > m_actualRows) actualEndRow = m_actualRows; if (m_actualRows > actualEndRow) m_actualRows = actualEndRow; DEBUG("start/end column: " << startColumn << ' ' << endColumn); DEBUG("start/end row: " << m_actualStartRow << ' ' << actualEndRow); DEBUG("actual cols/rows (w/o header incl. start rows): " << m_actualCols << ' ' << m_actualRows); return 0; } /*! * \brief After the MQTTTopic was loaded, the filter is prepared for reading * \param prepared * \param topic * \param separator */ void AsciiFilterPrivate::setPreparedForMQTT(bool prepared, MQTTTopic* topic, const QString& separator) { m_prepared = prepared; //If originally it was prepared we have to restore the settings if (prepared) { m_separator = separator; m_actualCols = endColumn - startColumn + 1; m_actualRows = topic->rowCount(); //set the column modes columnModes.resize(topic->columnCount()); for (int i = 0; i < topic->columnCount(); ++i) { columnModes[i] = topic->column(i)->columnMode(); } //set the data containers m_dataContainer.resize(m_actualCols); for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) topic->child(n)->setColumnMode(columnModes[n]); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } - //TODO + //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } } #endif /*! * \brief Returns the separator used by the filter * \return */ QString AsciiFilterPrivate::separator() const { return m_separator; } diff --git a/src/backend/datasources/filters/AsciiFilter.h b/src/backend/datasources/filters/AsciiFilter.h index 643ae1fa3..4926e9e3c 100644 --- a/src/backend/datasources/filters/AsciiFilter.h +++ b/src/backend/datasources/filters/AsciiFilter.h @@ -1,131 +1,131 @@ /*************************************************************************** File : AsciiFilter.h Project : LabPlot Description : ASCII I/O-filter -------------------------------------------------------------------- Copyright : (C) 2009-2019 Alexander Semke (alexander.semke@web.de) Copyright : (C) 2017 Stefan Gerlach (stefan.gerlach@uni.kn) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #ifndef ASCIIFILTER_H #define ASCIIFILTER_H #include "backend/datasources/filters/AbstractFileFilter.h" #include "backend/core/AbstractColumn.h" class Spreadsheet; class QStringList; class QIODevice; class AsciiFilterPrivate; class QAbstractSocket; class MQTTTopic; class MQTTClient; class AsciiFilter : public AbstractFileFilter { Q_OBJECT public: AsciiFilter(); ~AsciiFilter() override; static QStringList separatorCharacters(); static QStringList commentCharacters(); static QStringList dataTypes(); static QStringList predefinedFilters(); static QString fileInfoString(const QString&); static int columnNumber(const QString& fileName, const QString& separator = QString()); static size_t lineNumber(const QString& fileName); size_t lineNumber(QIODevice&) const; // calculate number of lines if device supports it // read data from any device void readDataFromDevice(QIODevice& device, AbstractDataSource*, - AbstractFileFilter::ImportMode = AbstractFileFilter::Replace, int lines = -1); + AbstractFileFilter::ImportMode = AbstractFileFilter::Replace, int lines = -1); void readFromLiveDeviceNotFile(QIODevice& device, AbstractDataSource*dataSource); qint64 readFromLiveDevice(QIODevice& device, AbstractDataSource*, qint64 from = -1); // overloaded function to read from file void readDataFromFile(const QString& fileName, AbstractDataSource* = nullptr, - AbstractFileFilter::ImportMode = AbstractFileFilter::Replace) override; + AbstractFileFilter::ImportMode = AbstractFileFilter::Replace) override; void write(const QString& fileName, AbstractDataSource*) override; QVector preview(const QString& fileName, int lines); QVector preview(QIODevice& device); void loadFilterSettings(const QString&) override; void saveFilterSettings(const QString&) const override; #ifdef HAVE_MQTT void MQTTPreview(QVector&, const QString&, const QString&); QString MQTTColumnStatistics(const MQTTTopic*) const; AbstractColumn::ColumnMode MQTTColumnMode() const; void readMQTTTopic(const QString&, const QString&, AbstractDataSource*); void setPreparedForMQTT(bool, MQTTTopic*, const QString&); #endif QString separator() const; void setCommentCharacter(const QString&); QString commentCharacter() const; void setSeparatingCharacter(const QString&); QString separatingCharacter() const; void setDateTimeFormat(const QString&); QString dateTimeFormat() const; void setNumberFormat(QLocale::Language); QLocale::Language numberFormat() const; void setAutoModeEnabled(const bool); bool isAutoModeEnabled() const; void setHeaderEnabled(const bool); bool isHeaderEnabled() const; void setSkipEmptyParts(const bool); bool skipEmptyParts() const; void setSimplifyWhitespacesEnabled(const bool); bool simplifyWhitespacesEnabled() const; void setNaNValueToZero(const bool); bool NaNValueToZeroEnabled() const; void setRemoveQuotesEnabled(const bool); bool removeQuotesEnabled() const; void setCreateIndexEnabled(const bool); bool createIndexEnabled() const; void setVectorNames(const QString&); QStringList vectorNames() const; QVector columnModes(); void setStartRow(const int); int startRow() const; void setEndRow(const int); int endRow() const; void setStartColumn(const int); int startColumn() const; void setEndColumn(const int); int endColumn() const; void save(QXmlStreamWriter*) const override; bool load(XmlStreamReader*) override; - int isPrepared(); + int isPrepared(); private: std::unique_ptr const d; friend class AsciiFilterPrivate; }; #endif