diff --git a/src/backend/datasources/MQTTClient.cpp b/src/backend/datasources/MQTTClient.cpp index 69738cda3..88c419943 100644 --- a/src/backend/datasources/MQTTClient.cpp +++ b/src/backend/datasources/MQTTClient.cpp @@ -1,1055 +1,1055 @@ #ifdef HAVE_MQTT #include "backend/datasources/MQTTClient.h" #include "backend/datasources/MQTTSubscriptions.h" #include "backend/datasources/MQTTTopic.h" #include "backend/datasources/filters/AsciiFilter.h" #include "backend/datasources/filters/FITSFilter.h" #include "backend/datasources/filters/BinaryFilter.h" #include "backend/core/Project.h" #include "kdefrontend/spreadsheet/PlotDataDialog.h" #include "commonfrontend/spreadsheet/SpreadsheetView.h" #include "kdefrontend/datasources/MQTTErrorWidget.h" #include #include #include #include #include #include #include #include #include #include #include /*! \class MQTTClient \brief Represents data stored in a file. Reading and writing is done with the help of appropriate I/O-filters. \ingroup datasources */ MQTTClient::MQTTClient(const QString& name) : Folder(name), m_paused(false), m_prepared(false), m_keepLastValues(false), m_filter(nullptr), m_updateTimer(new QTimer(this)), m_willTimer(new QTimer(this)), m_client(new QMqttClient(this)), m_mqttTest(false), m_mqttRetain(false), m_mqttUseWill(false), m_mqttUseID(false), m_loaded(false), m_sampleRate(1), m_keepNvalues(0), m_updateInterval(1000), m_disconnectForWill(false), m_mqttUseAuthentication(false), m_subscriptionsLoaded(0), m_subscriptionCount(0), m_mqttFirstConnectEstablished(false) { //initActions(); qDebug()<<"MQTTClient constructor"; connect(m_updateTimer, &QTimer::timeout, this, &MQTTClient::read); m_willStatistics.fill(false, 15); connect(m_client, &QMqttClient::connected, this, &MQTTClient::onMqttConnect); connect(m_willTimer, &QTimer::timeout, this, &MQTTClient::setWillForMqtt); connect(m_client, &QMqttClient::errorChanged, this, &MQTTClient::mqttErrorChanged); //connect(this, &MQTTClient::mqttAllArrived, this, &MQTTClient::onAllArrived ); } MQTTClient::~MQTTClient() { //stop reading before deleting the objects pauseReading(); qDebug()<<"destructor"; if (m_filter) delete m_filter; qDebug()<<"delete timers"; delete m_updateTimer; delete m_willTimer; qDebug()<<"disocnnect"; m_client->disconnectFromHost(); qDebug()<<"delete client"; delete m_client; } /*! * depending on the update type, periodically or on data changes, starts the timer or activates the file watchers, respectively. */ void MQTTClient::ready() { if (m_updateType == TimeInterval) m_updateTimer->start(m_updateInterval); } /*void MQTTClient::initActions() { m_reloadAction = new QAction(QIcon::fromTheme("view-refresh"), i18n("Reload"), this); connect(m_reloadAction, &QAction::triggered, this, &MQTTClient::read); m_toggleLinkAction = new QAction(i18n("Link the file"), this); m_toggleLinkAction->setCheckable(true); connect(m_toggleLinkAction, &QAction::triggered, this, &MQTTClient::linkToggled); m_plotDataAction = new QAction(QIcon::fromTheme("office-chart-line"), i18n("Plot data"), this); connect(m_plotDataAction, &QAction::triggered, this, &MQTTClient::plotData); }*/ /*! * \brief Updates this data source at this moment */ void MQTTClient::updateNow() { qDebug()<<"Update now"; m_updateTimer->stop(); read(); if (m_updateType == TimeInterval && !m_paused) m_updateTimer->start(m_updateInterval); } /*! * \brief Continue reading from the live data source after it was paused. */ void MQTTClient::continueReading() { qDebug()<<"continue reading"; m_paused = false; if (m_updateType == TimeInterval) m_updateTimer->start(m_updateInterval); } /*! * \brief Pause the reading of the live data source. */ void MQTTClient::pauseReading() { qDebug()<<"pause reading"; m_paused = true; if (m_updateType == TimeInterval) m_updateTimer->stop(); } void MQTTClient::setFilter(AbstractFileFilter* f) { m_filter = f; } AbstractFileFilter* MQTTClient::filter() const { return m_filter; } /*! * \brief Sets the source's update interval to \c interval * \param interval */ void MQTTClient::setUpdateInterval(int interval) { qDebug()<<"Update interval " << interval; m_updateInterval = interval; if(!m_paused) m_updateTimer->start(m_updateInterval); } int MQTTClient::updateInterval() const { return m_updateInterval; } /*! * \brief Sets how many values we should store * \param keepnvalues */ void MQTTClient::setKeepNvalues(int keepnvalues) { qDebug()<<"Keep N values" << keepnvalues; m_keepNvalues = keepnvalues; } int MQTTClient::keepNvalues() const { return m_keepNvalues; } bool MQTTClient::isPaused() const { return m_paused; } /*! * \brief Sets the sample rate to samplerate * \param samplerate */ void MQTTClient::setSampleRate(int samplerate) { qDebug()<<"Sample rate: " << samplerate; m_sampleRate = samplerate; } int MQTTClient::sampleRate() const { return m_sampleRate; } /*! * \brief Sets the source's reading type to readingType * \param readingType */ void MQTTClient::setReadingType(ReadingType readingType) { qDebug()<<"Read Type : " << static_cast(readingType); m_readingType = readingType; } MQTTClient::ReadingType MQTTClient::readingType() const { return m_readingType; } /*! * \brief Sets the source's update type to updatetype and handles this change * \param updatetype */ void MQTTClient::setUpdateType(UpdateType updatetype) { qDebug()<<"Update Type : " << static_cast(updatetype); if (updatetype == NewData) { m_updateTimer->stop(); } m_updateType = updatetype; } MQTTClient::UpdateType MQTTClient::updateType() const { return m_updateType; } /*QIcon MQTTClient::icon() const { QIcon icon; if (m_fileType == MQTTClient::Ascii) icon = QIcon::fromTheme("text-plain"); else if (m_fileType == MQTTClient::Binary) icon = QIcon::fromTheme("application-octet-stream"); else if (m_fileType == MQTTClient::Image) icon = QIcon::fromTheme("image-x-generic"); // TODO: HDF5, NetCDF, FITS, etc. return icon; }*/ /*QMenu* MQTTClient::createContextMenu() { QMenu* menu = AbstractPart::createContextMenu(); QAction* firstAction = 0; // if we're populating the context menu for the project explorer, then //there're already actions available there. Skip the first title-action //and insert the action at the beginning of the menu. if (menu->actions().size()>1) firstAction = menu->actions().at(1); menu->insertAction(firstAction, m_plotDataAction); menu->insertSeparator(firstAction); //TODO: doesnt' always make sense... // if (!m_fileWatched) // menu->insertAction(firstAction, m_reloadAction); // // m_toggleWatchAction->setChecked(m_fileWatched); // menu->insertAction(firstAction, m_toggleWatchAction); // // m_toggleLinkAction->setChecked(m_fileLinked); // menu->insertAction(firstAction, m_toggleLinkAction); return menu; }*/ //############################################################################## //################################# SLOTS #################################### //############################################################################## /* * called periodically or on new data changes (file changed, new data in the socket, etc.) */ void MQTTClient::read() { if (m_filter == nullptr) return; //initialize the device (file, socket, serial port), when calling this function for the first time if (!m_prepared) { qDebug()<<"Read & Connect"; m_client->connectToHost(); qDebug()<<"connectTOHost called"; m_prepared = true; } if((m_client->state() == QMqttClient::ClientState::Connected) && m_mqttFirstConnectEstablished) { qDebug()<<"Read"; emit readFromTopics(); } } /*! Saves as XML. */ void MQTTClient::save(QXmlStreamWriter* writer) const { writer->writeStartElement("MQTTClient"); writeBasicAttributes(writer); writeCommentElement(writer); //general writer->writeStartElement("general"); writer->writeAttribute("subscriptionCount", QString::number(m_mqttSubscriptions.size())); writer->writeAttribute("updateType", QString::number(m_updateType)); writer->writeAttribute("readingType", QString::number(m_readingType)); writer->writeAttribute("keepValues", QString::number(m_keepNvalues)); if (m_updateType == TimeInterval) writer->writeAttribute("updateInterval", QString::number(m_updateInterval)); if (m_readingType != TillEnd) writer->writeAttribute("sampleRate", QString::number(m_sampleRate)); writer->writeAttribute("host", m_client->hostname()); writer->writeAttribute("port", QString::number(m_client->port())); writer->writeAttribute("username", m_client->username()); writer->writeAttribute("pasword", m_client->password()); writer->writeAttribute("clientId", m_client->clientId()); writer->writeAttribute("useRetain", QString::number(m_mqttRetain)); writer->writeAttribute("useWill", QString::number(m_mqttUseWill)); writer->writeAttribute("willTopic", m_willTopic); writer->writeAttribute("willOwnMessage", m_willOwnMessage); writer->writeAttribute("willQoS", QString::number(m_willQoS)); writer->writeAttribute("willRetain", QString::number(m_willRetain)); writer->writeAttribute("willMessageType", QString::number(static_cast(m_willMessageType))); writer->writeAttribute("willUpdateType", QString::number(static_cast(m_willUpdateType))); writer->writeAttribute("willTimeInterval", QString::number(m_willTimeInterval)); for( int i = 0; i < m_willStatistics.count(); ++i){ writer->writeAttribute("willStatistics"+QString::number(i), QString::number(m_willStatistics[i])); } writer->writeAttribute("useID", QString::number(m_mqttUseID)); writer->writeAttribute("useAuthentication", QString::number(m_mqttUseAuthentication)); writer->writeEndElement(); //filter m_filter->save(writer); //MQTTSubscriptions for(auto* sub : children(IncludeHidden)) sub->save(writer); writer->writeEndElement(); // "MQTTClient" } /*! Loads from XML. */ bool MQTTClient::load(XmlStreamReader* reader, bool preview) { qDebug()<<"Start loading MQTTClient"; if (!readBasicAttributes(reader)) return false; QString attributeWarning = i18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs; QString str; while (!reader->atEnd()) { reader->readNext(); if (reader->isEndElement() && reader->name() == "MQTTClient") break; if (!reader->isStartElement()) continue; if (reader->name() == "comment") { if (!readCommentElement(reader)) return false; } else if (reader->name() == "general") { qDebug()<<"MQTTClient general"; attribs = reader->attributes(); str = attribs.value("subscriptionCount").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'subscriptionCount'")); else m_subscriptionCount = str.toInt(); str = attribs.value("keepValues").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'keepValues'")); else m_keepNvalues = str.toInt(); str = attribs.value("updateType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'updateType'")); else m_updateType = static_cast(str.toInt()); str = attribs.value("readingType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'readingType'")); else m_readingType = static_cast(str.toInt()); if (m_updateType == TimeInterval) { str = attribs.value("updateInterval").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'updateInterval'")); else m_updateInterval = str.toInt(); } if (m_readingType != TillEnd) { str = attribs.value("sampleRate").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'sampleRate'")); else m_sampleRate = str.toInt(); } str = attribs.value("host").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'host'")); else m_client->setHostname(str); str =attribs.value("port").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'port'")); else m_client->setPort(str.toUInt()); str = attribs.value("useAuthentication").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useAuthentication'")); else m_mqttUseAuthentication = str.toInt(); if(m_mqttUseAuthentication) { str =attribs.value("username").toString(); if(!str.isEmpty()) m_client->setUsername(str); str =attribs.value("password").toString(); if(!str.isEmpty()) m_client->setPassword(str); } str = attribs.value("useID").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useID'")); else m_mqttUseID = str.toInt(); if(m_mqttUseID) { str =attribs.value("clientId").toString(); if(!str.isEmpty()) m_client->setClientId(str); } str =attribs.value("useRetain").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useRetain'")); else m_mqttRetain = str.toInt(); str =attribs.value("useWill").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'useWill'")); else m_mqttUseWill = str.toInt(); if(m_mqttUseWill) { str =attribs.value("willTopic").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTopic'")); else m_willTopic = str; str =attribs.value("willOwnMessage").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willOwnMessage'")); else m_willOwnMessage = str; str =attribs.value("willQoS").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willQoS'")); else m_willQoS = str.toUInt(); str =attribs.value("willRetain").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willRetain'")); else m_willRetain = str.toInt(); str =attribs.value("willMessageType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willMessageType'")); else m_willMessageType = static_cast(str.toInt()); str =attribs.value("willUpdateType").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willUpdateType'")); else m_willUpdateType = static_cast(str.toInt()); str =attribs.value("willTimeInterval").toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTimeInterval'")); else m_willTimeInterval = str.toInt(); for( int i = 0; i < m_willStatistics.count(); ++i){ str =attribs.value("willStatistics"+QString::number(i)).toString(); if(str.isEmpty()) reader->raiseWarning(attributeWarning.arg("'willTimeInterval'")); else m_willStatistics[i] = str.toInt(); } } } else if (reader->name() == "asciiFilter") { qDebug()<<"load filter"; m_filter = new AsciiFilter(); if (!m_filter->load(reader)) return false; } else if(reader->name() == "MQTTSubscriptions") { qDebug()<<"Load MQTTSubscription"; MQTTSubscriptions* subscription = new MQTTSubscriptions(""); subscription->setMQTTClient(this); connect(subscription, &MQTTSubscriptions::loaded, this, &MQTTClient::subscriptionLoaded); if (!subscription->load(reader, preview)) { delete subscription; return false; } m_mqttSubscriptions.push_back(subscription); addChildFast(subscription); } else {// unknown element reader->raiseWarning(i18n("unknown element '%1'", reader->name().toString())); if (!reader->skipToEndElement()) return false; } } return !reader->hasError(); } void MQTTClient::setMqttClientHostPort(const QString& host, const quint16& port) { m_client->setHostname(host); m_client->setPort(port); } void MQTTClient::setMqttClientAuthentication(const QString& username, const QString& password) { m_client->setUsername(username); m_client->setPassword(password); } void MQTTClient::setMqttClientId(const QString &id){ m_client->setClientId(id); } void MQTTClient::addMqttSubscriptions(const QMqttTopicFilter& filter, const quint8& qos) { m_subscribedTopicNameQoS[filter] = qos; } void MQTTClient::onMqttConnect() { qDebug() << "on mqtt connect"; if(m_client->error() == QMqttClient::NoError) { if(!m_mqttFirstConnectEstablished) { qDebug()<<"connection made in MQTTClient"; QMapIterator i(m_subscribedTopicNameQoS); while(i.hasNext()) { i.next(); qDebug()<subscribe(i.key(), i.value()); if(temp) { qDebug()<topic()<<" "<qos(); if(!m_loaded) { m_subscriptions.push_back(temp->topic().filter()); qDebug()<<"New MQTTSubscription"; MQTTSubscriptions* newSubscription = new MQTTSubscriptions(temp->topic().filter()); newSubscription->setMQTTClient(this); qDebug()<<"Add child"; addChild(newSubscription); qDebug()<<"Add to vector"; m_mqttSubscriptions.push_back(newSubscription); } connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); qDebug()<<"Added topic"; } } m_mqttFirstConnectEstablished = true; emit mqttSubscribed(); } else { qDebug() << "Resubscribing after will set"; QMapIterator i(m_subscribedTopicNameQoS); while(i.hasNext()) { i.next(); QMqttSubscription *temp = m_client->subscribe(i.key(), i.value()); if(temp) { qDebug()<topic()<<" "<qos(); connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); } else qDebug()<<"Couldn't subscribe after will change"; } } } } void MQTTClient::mqttSubscribtionMessageReceived(const QMqttMessage& msg) { if(!msg.retain() || (msg.retain() && m_mqttRetain) ) { qDebug()<<"message received from "<subscriptionName(), msg.topic().name())) { m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; } /*QString subscriptionName = m_mqttSubscriptions[i]->subscriptionName(); if(subscriptionName.contains('#') || subscriptionName.contains('+')) { if(subscriptionName.contains('#')) { if(msg.topic().name().startsWith(subscriptionName.left(subscriptionName.count() - 2)) ){ m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; } } else if (subscriptionName.contains('+')) { int pos = subscriptionName.indexOf('+'); QString start = subscriptionName.left(pos); QString end = subscriptionName.right(subscriptionName.count() - pos); if(msg.topic().name().startsWith(start) && msg.topic().name().endsWith(end)) { m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; } } } else if(subscriptionName == msg.topic().name()) { m_mqttSubscriptions[i]->messageArrived(QString(msg.payload()), msg.topic().name()); break; }*/ } if(msg.topic().name() == m_willTopic) m_willLastMessage = QString(msg.payload()); } } int MQTTClient::topicNumber() { return m_subscriptions.count(); } int MQTTClient::topicIndex(const QString& topic) { return m_subscriptions.indexOf(topic, 0); } void MQTTClient::setMqttWillUse(bool use) { m_mqttUseWill = use; if(use == false) m_willTimer->stop(); } bool MQTTClient::mqttWillUse() const{ return m_mqttUseWill; } void MQTTClient::setWillTopic(const QString& topic) { m_willTopic = topic; } QString MQTTClient::willTopic() const{ return m_willTopic; } void MQTTClient::setWillRetain(bool retain) { m_willRetain = retain; } bool MQTTClient::willRetain() const { return m_willRetain; } void MQTTClient::setWillQoS(quint8 QoS) { m_willQoS = QoS; } quint8 MQTTClient::willQoS() const { return m_willQoS; } void MQTTClient::setWillMessageType(WillMessageType messageType) { m_willMessageType = messageType; } MQTTClient::WillMessageType MQTTClient::willMessageType() const { return m_willMessageType; } void MQTTClient::setWillOwnMessage(const QString& ownMessage) { m_willOwnMessage = ownMessage; } QString MQTTClient::willOwnMessage() const { return m_willOwnMessage; } QVector MQTTClient::topicNames() const { return m_topicNames; } void MQTTClient::setWillForMqtt() { if(m_mqttUseWill && (m_client->state() == QMqttClient::ClientState::Connected) ) { if(!m_disconnectForWill) { qDebug() << "Disconnecting from host"; m_client->disconnectFromHost(); m_disconnectForWill = true; } setWillForMqtt(); } else if(m_mqttUseWill && (m_client->state() == QMqttClient::ClientState::Disconnected) && m_disconnectForWill) { m_client->setWillQoS(m_willQoS); qDebug()<<"Will QoS" << m_willQoS; m_client->setWillRetain(m_willRetain); qDebug()<<"Will retain" << m_willRetain; m_client->setWillTopic(m_willTopic); qDebug()<<"Will Topic" << m_willTopic; switch (m_willMessageType) { case WillMessageType::OwnMessage: m_client->setWillMessage(m_willOwnMessage.toUtf8()); qDebug()<<"Will own message" << m_willOwnMessage; break; case WillMessageType::Statistics: { qDebug()<<"Start will statistics"; QVector topics = children(AbstractAspect::Recursive); const AsciiFilter* asciiFilter = nullptr; const MQTTTopic* tempTopic = nullptr; qDebug()<<"Searching for topic"; for (int i = 0; i < topics.count(); ++i) { if(topics[i]->topicName() == m_willTopic) { asciiFilter = dynamic_cast(topics[i]->filter()); tempTopic = topics[i]; break; } } qDebug()<<"Check if topic found"; if(asciiFilter != nullptr && tempTopic != nullptr) { qDebug()<<"Checking column mode"; if((asciiFilter->mqttColumnMode() == AbstractColumn::ColumnMode::Integer) || (asciiFilter->mqttColumnMode() == AbstractColumn::ColumnMode::Numeric)) { - m_client->setWillMessage(asciiFilter->mqttColumnStatistics(tempTopic, this).toUtf8()); + m_client->setWillMessage(asciiFilter->mqttColumnStatistics(tempTopic).toUtf8()); qDebug() << "Will statistics message: "<< QString(m_client->willMessage()); } else { m_client->setWillMessage(QString("").toUtf8()); qDebug() << "Will statistics message: "<< QString(m_client->willMessage()); } } break; } case WillMessageType::LastMessage: m_client->setWillMessage(m_willLastMessage.toUtf8()); qDebug()<<"Will last message:\n" << m_willLastMessage; break; default: break; } m_disconnectForWill = false; m_client->connectToHost(); qDebug()<< "Reconnect to host"; } } MQTTClient::WillUpdateType MQTTClient::willUpdateType() const{ return m_willUpdateType; } void MQTTClient::setWillUpdateType(WillUpdateType updateType) { m_willUpdateType = updateType; } int MQTTClient::willTimeInterval() const{ return m_willTimeInterval; } void MQTTClient::setWillTimeInterval(int interval) { m_willTimeInterval = interval; } void MQTTClient::clearLastMessage() { m_willLastMessage.clear(); } void MQTTClient::addWillStatistics(WillStatistics statistic){ m_willStatistics[static_cast(statistic)] = true; } void MQTTClient::removeWillStatistics(WillStatistics statistic) { m_willStatistics[static_cast(statistic)] = false; } QVector MQTTClient::willStatistics() const{ return m_willStatistics; } void MQTTClient::startWillTimer() const{ if(m_willUpdateType == WillUpdateType::TimePeriod) m_willTimer->start(m_willTimeInterval); } void MQTTClient::stopWillTimer() const{ m_willTimer->stop(); } void MQTTClient::setMqttRetain(bool retain) { m_mqttRetain = retain; } bool MQTTClient::mqttRetain() const { return m_mqttRetain; } void MQTTClient::mqttErrorChanged(QMqttClient::ClientError clientError) { if(clientError != QMqttClient::ClientError::NoError) { MQTTErrorWidget* errorWidget = new MQTTErrorWidget(clientError, this); errorWidget->show(); } } QString MQTTClient::clientHostName() const{ return m_client->hostname(); } quint16 MQTTClient::clientPort() const { return m_client->port(); } QString MQTTClient::clientPassword() const{ return m_client->password(); } QString MQTTClient::clientUserName() const{ return m_client->username(); } QString MQTTClient::clientID () const{ return m_client->clientId(); } void MQTTClient::setMQTTUseID(bool use) { m_mqttUseID = use; } bool MQTTClient::mqttUseID() const { return m_mqttUseID; } void MQTTClient::setMQTTUseAuthentication(bool use) { m_mqttUseAuthentication = use; } bool MQTTClient::mqttUseAuthentication() const { return m_mqttUseAuthentication; } QVector MQTTClient::mqttSubscribtions() const { return m_subscriptions; } void MQTTClient::newMQTTSubscription(const QString& topic, quint8 QoS) { QMqttTopicFilter filter {topic}; QMqttSubscription* temp = m_client->subscribe(filter, QoS); if (temp) { qDebug()<topic()<<" "<qos(); m_subscriptions.push_back(temp->topic().filter()); m_subscribedTopicNameQoS[temp->topic().filter()] = temp->qos(); qDebug()<<"New MQTTSubscription"; MQTTSubscriptions* newSubscription = new MQTTSubscriptions(temp->topic().filter()); newSubscription->setMQTTClient(this); qDebug()<<"Add child"; addChild(newSubscription); qDebug()<<"Add to vector"; m_mqttSubscriptions.push_back(newSubscription); connect(temp, &QMqttSubscription::messageReceived, this, &MQTTClient::mqttSubscribtionMessageReceived); qDebug()<<"Added topic"; qDebug()<<"Check for inferior subscriptions"; bool found = false; QVector inferiorSubscriptions; for(int i = 0; i < m_mqttSubscriptions.size(); ++i) { if(checkTopicContains(topic, m_mqttSubscriptions[i]->subscriptionName()) && topic != m_mqttSubscriptions[i]->subscriptionName()) { found = true; inferiorSubscriptions.push_back(m_mqttSubscriptions[i]); } } if(found) { for(int sub = 0; sub < inferiorSubscriptions.size(); ++sub) { qDebug()<<"Inferior subscription: "<subscriptionName(); QVector topics = inferiorSubscriptions[sub]->topics(); qDebug()<< topics.size(); for(int i = 0; i < topics.size() ; ++i) { qDebug()<topicName(); topics[i]->reparent(newSubscription); } QMqttTopicFilter unsubscribeFilter {inferiorSubscriptions[sub]->subscriptionName()}; m_client->unsubscribe(unsubscribeFilter); for (int j = 0; j < m_mqttSubscriptions.size(); ++j) { if(m_mqttSubscriptions[j]->subscriptionName() == inferiorSubscriptions[sub]->subscriptionName()) { m_mqttSubscriptions.remove(j); } } m_subscriptions.removeAll(inferiorSubscriptions[sub]->subscriptionName()); removeChild(inferiorSubscriptions[sub]); } } emit mqttSubscribed(); } } void MQTTClient::removeMQTTSubscription(const QString &name) { qDebug()<<"Start to remove subscription in MQTTClient: "<unsubscribe(filter); qDebug()<<"QMqttClient's unsubscribe occured"; m_subscriptions.removeAll(name); for (int i = 0; i < m_mqttSubscriptions.size(); ++i) { if(m_mqttSubscriptions[i]->subscriptionName() == name) { qDebug()<<"Subscription name"<subscriptionName() << " "<name(); MQTTSubscriptions* removeSubscription = m_mqttSubscriptions[i]; m_mqttSubscriptions.remove(i); QVector topics = removeSubscription->topics(); for (int j = 0; j < topics.size(); ++j) { qDebug()<<"Removing topic name: "<topicName(); m_topicNames.removeAll(topics[j]->topicName()); } qDebug()<<"Removed from topic names and subscription names"; removeChild(removeSubscription); qDebug()<<"removed child"; break; } } QMapIterator j(m_subscribedTopicNameQoS); while(j.hasNext()) { j.next(); if(j.key().filter() == name) { m_subscribedTopicNameQoS.remove(j.key()); qDebug()<<"Removed from TopicNameQoS map "< 0 && matchIndex < firstList.size() -1) { for(int j = matchIndex +1; j < firstList.size(); ++j) { if(firstList.at(j) != secondtList.at(j)) { differ = true; break; } } } else differ = true; if(!differ) { for(int i = 0; i < firstList.size(); ++i) { if(i != matchIndex) commonTopic.append(firstList.at(i)); else commonTopic.append("+"); if(i != firstList.size() - 1) commonTopic.append("/"); } } } } qDebug() << "Common topic: "< #include #include #include #include /*! \class AsciiFilter \brief Manages the import/export of data organized as columns (vectors) from/to an ASCII-file. \ingroup datasources */ AsciiFilter::AsciiFilter() : AbstractFileFilter(), d(new AsciiFilterPrivate(this)) {} AsciiFilter::~AsciiFilter() {} /*! reads the content of the device \c device. */ void AsciiFilter::readDataFromDevice(QIODevice& device, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode, int lines) { d->readDataFromDevice(device, dataSource, importMode, lines); } void AsciiFilter::readFromLiveDeviceNotFile(QIODevice &device, AbstractDataSource* dataSource) { d->readFromLiveDevice(device, dataSource); } qint64 AsciiFilter::readFromLiveDevice(QIODevice& device, AbstractDataSource* dataSource, qint64 from) { return d->readFromLiveDevice(device, dataSource, from); } +void AsciiFilter::readMQTTTopic(const QString& message, const QString& topic, AbstractDataSource*dataSource) { + d->readMQTTTopic(message, topic, dataSource); +} + /*! reads the content of the file \c fileName. */ QVector AsciiFilter::readDataFromFile(const QString& fileName, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode, int lines) { d->readDataFromFile(fileName, dataSource, importMode, lines); return QVector(); //TODO: remove this later once all read*-functions in the filter classes don't return any preview strings anymore } QVector AsciiFilter::preview(const QString& fileName, int lines) { return d->preview(fileName, lines); } QVector AsciiFilter::preview(QIODevice &device) { return d->preview(device); } /*! reads the content of the file \c fileName to the data source \c dataSource. */ //void AsciiFilter::read(const QString& fileName, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode) { // d->read(fileName, dataSource, importMode); //} /*! writes the content of the data source \c dataSource to the file \c fileName. */ void AsciiFilter::write(const QString& fileName, AbstractDataSource* dataSource) { d->write(fileName, dataSource); // emit() } /*! loads the predefined filter settings for \c filterName */ void AsciiFilter::loadFilterSettings(const QString& filterName) { Q_UNUSED(filterName); } /*! saves the current settings as a new filter with the name \c filterName */ void AsciiFilter::saveFilterSettings(const QString& filterName) const { Q_UNUSED(filterName); } /*! returns the list with the names of all saved (system wide or user defined) filter settings. */ QStringList AsciiFilter::predefinedFilters() { return QStringList(); } /*! returns the list of all predefined separator characters. */ QStringList AsciiFilter::separatorCharacters() { return (QStringList() << "auto" << "TAB" << "SPACE" << "," << ";" << ":" << ",TAB" << ";TAB" << ":TAB" << ",SPACE" << ";SPACE" << ":SPACE" << "2xSPACE" << "3xSPACE" << "4xSPACE" << "2xTAB"); } /*! returns the list of all predefined comment characters. */ QStringList AsciiFilter::commentCharacters() { return (QStringList() << "#" << "!" << "//" << "+" << "c" << ":" << ";"); } /*! returns the list of all predefined data types. */ QStringList AsciiFilter::dataTypes() { const QMetaObject& mo = AbstractColumn::staticMetaObject; const QMetaEnum& me = mo.enumerator(mo.indexOfEnumerator("ColumnMode")); QStringList list; for (int i = 0; i <= 100; ++i) // me.keyCount() does not work because we have holes in enum if (me.valueToKey(i)) list << me.valueToKey(i); return list; } QString AsciiFilter::fileInfoString(const QString& fileName) { QString info(i18n("Number of columns: %1", AsciiFilter::columnNumber(fileName))); info += QLatin1String("
"); info += i18n("Number of lines: %1", AsciiFilter::lineNumber(fileName)); return info; } /*! returns the number of columns in the file \c fileName. */ int AsciiFilter::columnNumber(const QString& fileName, const QString& separator) { KFilterDev device(fileName); if (!device.open(QIODevice::ReadOnly)) { DEBUG("Could not open file " << fileName.toStdString() << " for determining number of columns"); return -1; } QString line = device.readLine(); line.remove(QRegExp("[\\n\\r]")); QStringList lineStringList; if (separator.length() > 0) lineStringList = line.split(separator); else lineStringList = line.split(QRegExp("\\s+")); DEBUG("number of columns : " << lineStringList.size()); return lineStringList.size(); } size_t AsciiFilter::lineNumber(const QString& fileName) { KFilterDev device(fileName); if (!device.open(QIODevice::ReadOnly)) { DEBUG("Could not open file " << fileName.toStdString() << " to determine number of lines"); return 0; } if (!device.canReadLine()) return -1; size_t lineCount = 0; while (!device.atEnd()) { device.readLine(); lineCount++; } //TODO: wc is much faster but not portable /* QElapsedTimer myTimer; myTimer.start(); QProcess wc; wc.start(QString("wc"), QStringList() << "-l" << fileName); size_t lineCount = 0; while (wc.waitForReadyRead()) lineCount = wc.readLine().split(' ')[0].toInt(); lineCount++; // last line not counted DEBUG(" Elapsed time counting lines : " << myTimer.elapsed() << " ms"); */ return lineCount; } /*! returns the number of lines in the device \c device and 0 if sequential. resets the position to 0! */ size_t AsciiFilter::lineNumber(QIODevice &device) { if (device.isSequential()) return 0; if (!device.canReadLine()) DEBUG("WARNING in AsciiFilter::lineNumber(): device cannot 'readLine()' but using it anyway."); size_t lineCount = 0; device.seek(0); while (!device.atEnd()) { device.readLine(); lineCount++; } device.seek(0); return lineCount; } void AsciiFilter::setCommentCharacter(const QString& s) { d->commentCharacter = s; } QString AsciiFilter::commentCharacter() const { return d->commentCharacter; } void AsciiFilter::setSeparatingCharacter(const QString& s) { d->separatingCharacter = s; } QString AsciiFilter::separatingCharacter() const { return d->separatingCharacter; } void AsciiFilter::setDateTimeFormat(const QString &f) { d->dateTimeFormat = f; } QString AsciiFilter::dateTimeFormat() const { return d->dateTimeFormat; } void AsciiFilter::setNumberFormat(QLocale::Language lang) { d->numberFormat = lang; } QLocale::Language AsciiFilter::numberFormat() const { return d->numberFormat; } void AsciiFilter::setAutoModeEnabled(const bool b) { d->autoModeEnabled = b; } bool AsciiFilter::isAutoModeEnabled() const { return d->autoModeEnabled; } void AsciiFilter::setHeaderEnabled(const bool b) { d->headerEnabled = b; } bool AsciiFilter::isHeaderEnabled() const { return d->headerEnabled; } void AsciiFilter::setSkipEmptyParts(const bool b) { d->skipEmptyParts = b; } bool AsciiFilter::skipEmptyParts() const { return d->skipEmptyParts; } void AsciiFilter::setCreateIndexEnabled(bool b) { d->createIndexEnabled = b; } bool AsciiFilter::createIndexEnabled() const{ return d->createIndexEnabled; } void AsciiFilter::setSimplifyWhitespacesEnabled(bool b) { d->simplifyWhitespacesEnabled = b; } bool AsciiFilter::simplifyWhitespacesEnabled() const { return d->simplifyWhitespacesEnabled; } void AsciiFilter::setNaNValueToZero(bool b) { if (b) d->nanValue = 0; else d->nanValue = NAN; } bool AsciiFilter::NaNValueToZeroEnabled() const { if (d->nanValue == 0) return true; return false; } void AsciiFilter::setRemoveQuotesEnabled(bool b) { d->removeQuotesEnabled = b; } bool AsciiFilter::removeQuotesEnabled() const { return d->removeQuotesEnabled; } void AsciiFilter::setVectorNames(const QString& s) { d->vectorNames.clear(); if (!s.simplified().isEmpty()) d->vectorNames = s.simplified().split(' '); } QStringList AsciiFilter::vectorNames() const { return d->vectorNames; } QVector AsciiFilter::columnModes() { return d->columnModes; } void AsciiFilter::setStartRow(const int r) { d->startRow = r; } int AsciiFilter::startRow() const { return d->startRow; } void AsciiFilter::setEndRow(const int r) { d->endRow = r; } int AsciiFilter::endRow() const { return d->endRow; } void AsciiFilter::setStartColumn(const int c) { d->startColumn = c; } int AsciiFilter::startColumn() const { return d->startColumn; } void AsciiFilter::setEndColumn(const int c) { d->endColumn = c; } int AsciiFilter::endColumn() const { return d->endColumn; } //##################################################################### //################### Private implementation ########################## //##################################################################### AsciiFilterPrivate::AsciiFilterPrivate(AsciiFilter* owner) : q(owner), commentCharacter("#"), separatingCharacter("auto"), numberFormat(QLocale::C), autoModeEnabled(true), headerEnabled(true), skipEmptyParts(false), simplifyWhitespacesEnabled(true), nanValue(NAN), removeQuotesEnabled(false), createIndexEnabled(false), startRow(1), endRow(-1), startColumn(1), endColumn(-1), m_actualStartRow(1), m_actualRows(0), m_maxActualRows (0), m_actualCols(0), m_prepared(false), m_lastRowNum (0), mqttPreviewFirstEmptyColCount (0), m_columnOffset(0) { } /*! * get a single line from device */ QStringList AsciiFilterPrivate::getLineString(QIODevice& device) { QString line; do { // skip comment lines in data lines if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::getLineString(): device cannot 'readLine()' but using it anyway."); // line = device.readAll(); line = device.readLine(); } while (line.startsWith(commentCharacter)); line.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) line = line.simplified(); DEBUG("data line : \'" << line.toStdString() << '\''); QStringList lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); //TODO: remove quotes here? QDEBUG("data line, parsed: " << lineStringList); return lineStringList; } /*! * returns -1 if the device couldn't be opened, 1 if the current read position in the device is at the end and 0 otherwise. */ int AsciiFilterPrivate::prepareDeviceToRead(QIODevice& device) { DEBUG("AsciiFilterPrivate::prepareDeviceToRead(): is sequential = " << device.isSequential() << ", can readLine = " << device.canReadLine()); if (!device.open(QIODevice::ReadOnly)) return -1; if (device.atEnd() && !device.isSequential()) // empty file return 1; ///////////////////////////////////////////////////////////////// // Find first data line (ignoring comment lines) DEBUG(" Skipping " << startRow - 1 << " lines"); for (int i = 0; i < startRow - 1; ++i) { QString line; if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::prepareDeviceToRead(): device cannot 'readLine()' but using it anyway."); line = device.readLine(); DEBUG(" line = " << line.toStdString()); if (device.atEnd()) { if (device.isSequential()) break; else return 1; } //TOOD: this logic seems to be wrong. If the user asks to read from line startRow, we should start here independent of any comments if (line.startsWith(commentCharacter)) // ignore commented lines before startRow i--; } // Parse the first line: // Determine the number of columns, create the columns and use (if selected) the first row to name them QString firstLine; do { // skip comment lines if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::prepareDeviceToRead(): device cannot 'readLine()' but using it anyway."); firstLine = device.readLine(); if (device.atEnd()) { if (device.isSequential()) break; else return 1; } } while (firstLine.startsWith(commentCharacter)); DEBUG(" device position after first line and comments = " << device.pos()); firstLine.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) firstLine = firstLine.simplified(); DEBUG("First line: \'" << firstLine.toStdString() << '\''); // determine separator and split first line QStringList firstLineStringList; if (separatingCharacter == "auto") { DEBUG("automatic separator"); QRegExp regExp("(\\s+)|(,\\s+)|(;\\s+)|(:\\s+)"); firstLineStringList = firstLine.split(regExp, (QString::SplitBehavior)skipEmptyParts); if (!firstLineStringList.isEmpty()) { int length1 = firstLineStringList.at(0).length(); if (firstLineStringList.size() > 1) { int pos2 = firstLine.indexOf(firstLineStringList.at(1), length1); m_separator = firstLine.mid(length1, pos2 - length1); } else { //old: separator = line.right(line.length() - length1); m_separator = ' '; } } } else { // use given separator // replace symbolic "TAB" with '\t' m_separator = separatingCharacter.replace(QLatin1String("2xTAB"), "\t\t", Qt::CaseInsensitive); m_separator = separatingCharacter.replace(QLatin1String("TAB"), "\t", Qt::CaseInsensitive); // replace symbolic "SPACE" with ' ' m_separator = m_separator.replace(QLatin1String("2xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("3xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("4xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("SPACE"), QLatin1String(" "), Qt::CaseInsensitive); firstLineStringList = firstLine.split(m_separator, (QString::SplitBehavior)skipEmptyParts); } DEBUG("separator: \'" << m_separator.toStdString() << '\''); DEBUG("number of columns: " << firstLineStringList.size()); QDEBUG("first line: " << firstLineStringList); DEBUG("headerEnabled = " << headerEnabled); //optionally, remove potential spaces in the first line if (simplifyWhitespacesEnabled) { for (int i = 0; i < firstLineStringList.size(); ++i) firstLineStringList[i] = firstLineStringList[i].simplified(); } if (headerEnabled) { // use first line to name vectors vectorNames = firstLineStringList; QDEBUG("vector names =" << vectorNames); m_actualStartRow = startRow + 1; } else m_actualStartRow = startRow; // set range to read if (endColumn == -1) { if (headerEnabled || vectorNames.size() == 0) endColumn = firstLineStringList.size(); // last column else //number of vector names provided in the import dialog (not more than the maximal number of columns in the file) endColumn = qMin(vectorNames.size(), firstLineStringList.size()); } if (createIndexEnabled) { vectorNames.prepend(i18n("Index")); endColumn++; } m_actualCols = endColumn - startColumn + 1; //TEST: readline-seek-readline fails /* qint64 testpos = device.pos(); DEBUG("read data line @ pos " << testpos << " : " << device.readLine().toStdString()); device.seek(testpos); testpos = device.pos(); DEBUG("read data line again @ pos " << testpos << " : " << device.readLine().toStdString()); */ ///////////////////////////////////////////////////////////////// // parse first data line to determine data type for each column if (!device.isSequential()) firstLineStringList = getLineString(device); columnModes.resize(m_actualCols); int col = 0; if (createIndexEnabled) { columnModes[0] = AbstractColumn::Integer; col = 1; } for (auto& valueString: firstLineStringList) { // parse columns available in first data line if (simplifyWhitespacesEnabled) valueString = valueString.simplified(); if (col == m_actualCols) break; columnModes[col++] = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); } // parsing more lines to better determine data types for (unsigned int i = 0; i < m_dataTypeLines; ++i) { firstLineStringList = getLineString(device); if (createIndexEnabled) col = 1; else col = 0; for (auto& valueString: firstLineStringList) { if (simplifyWhitespacesEnabled) valueString = valueString.simplified(); if (col == m_actualCols) break; AbstractColumn::ColumnMode mode = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); // numeric: integer -> numeric if (mode == AbstractColumn::Numeric && columnModes[col] == AbstractColumn::Integer) columnModes[col] = mode; // text: non text -> text if (mode == AbstractColumn::Text && columnModes[col] != AbstractColumn::Text) columnModes[col] = mode; col++; } } QDEBUG("column modes = " << columnModes); // ATTENTION: This resets the position in the device to 0 m_actualRows = (int)AsciiFilter::lineNumber(device); // reset to start of file //TODO: seems to be redundant since it's already done in the lineNumber() call above if (!device.isSequential()) device.seek(0); ///////////////////////////////////////////////////////////////// int actualEndRow = endRow; DEBUG("endRow(actualEndRow) = " << endRow << ", m_actualRows = " << m_actualRows); if (endRow == -1 || endRow > m_actualRows) actualEndRow = m_actualRows; if (m_actualRows > actualEndRow) m_actualRows = actualEndRow; DEBUG("start/end column: " << startColumn << ' ' << endColumn); DEBUG("start/end row: " << m_actualStartRow << ' ' << actualEndRow); DEBUG("actual cols/rows (w/o header incl. start rows): " << m_actualCols << ' ' << m_actualRows); if (m_actualRows == 0 && !device.isSequential()) return 1; return 0; } /*! reads the content of the file \c fileName to the data source \c dataSource. Uses the settings defined in the data source. */ void AsciiFilterPrivate::readDataFromFile(const QString& fileName, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode, int lines) { DEBUG("AsciiFilterPrivate::readDataFromFile(): fileName = \'" << fileName.toStdString() << "\', dataSource = " << dataSource << ", mode = " << ENUM_TO_STRING(AbstractFileFilter, ImportMode, importMode) << ", lines = " << lines); KFilterDev device(fileName); readDataFromDevice(device, dataSource, importMode, lines); } qint64 AsciiFilterPrivate::readFromLiveDevice(QIODevice& device, AbstractDataSource* dataSource, qint64 from) { DEBUG("AsciiFilterPrivate::readFromLiveDevice(): bytes available = " << device.bytesAvailable() << ", from = " << from); if (!(device.bytesAvailable() > 0)) { DEBUG(" No new data available"); return 0; } LiveDataSource* spreadsheet = dynamic_cast(dataSource); if (spreadsheet->sourceType() != LiveDataSource::SourceType::FileOrPipe) if (device.isSequential() && device.bytesAvailable() < (int)sizeof(quint16)) return 0; if (!m_prepared) { DEBUG("Preparing .."); switch (spreadsheet->sourceType()) { case LiveDataSource::SourceType::FileOrPipe: { const int deviceError = prepareDeviceToRead(device); if (deviceError != 0) { DEBUG("Device error = " << deviceError); return 0; } break; } case LiveDataSource::SourceType::NetworkTcpSocket: case LiveDataSource::SourceType::NetworkUdpSocket: case LiveDataSource::SourceType::LocalSocket: case LiveDataSource::SourceType::SerialPort: m_actualRows = 1; if (createIndexEnabled) { m_actualCols = 2; columnModes << AbstractColumn::Integer << AbstractColumn::Numeric; vectorNames << i18n("Index") << i18n("Value"); } else { m_actualCols = 1; columnModes << AbstractColumn::Numeric; vectorNames << i18n("Value"); } QDEBUG(" vector names = " << vectorNames); } // prepare import for spreadsheet spreadsheet->setUndoAware(false); spreadsheet->resize(AbstractFileFilter::Replace, vectorNames, m_actualCols); DEBUG(" data source resized to col: " << m_actualCols); DEBUG(" data source rowCount: " << spreadsheet->rowCount()); //columns in a file data source don't have any manual changes. //make the available columns undo unaware and suppress the "data changed" signal. //data changes will be propagated via an explicit Column::setChanged() call once new data was read. for (int i = 0; i < spreadsheet->childCount(); i++) { spreadsheet->child(i)->setUndoAware(false); spreadsheet->child(i)->setSuppressDataChangedSignal(true); } int keepNValues = spreadsheet->keepNValues(); if (keepNValues == 0) spreadsheet->setRowCount(m_actualRows > 1 ? m_actualRows : 1); else { spreadsheet->setRowCount(keepNValues); m_actualRows = keepNValues; } m_dataContainer.resize(m_actualCols); DEBUG(" Setting data .."); for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) spreadsheet->child(n)->setColumnMode(columnModes[n]); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } DEBUG("Prepared!"); } qint64 bytesread = 0; #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportTotal: "); #endif LiveDataSource::ReadingType readingType; if (!m_prepared) { readingType = LiveDataSource::ReadingType::TillEnd; } else { //we have to read all the data when reading from end //so we set readingType to TillEnd if (spreadsheet->readingType() == LiveDataSource::ReadingType::FromEnd) readingType = LiveDataSource::ReadingType::TillEnd; //if we read the whole file we just start from the beginning of it //and read till end else if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) readingType = LiveDataSource::ReadingType::TillEnd; else readingType = spreadsheet->readingType(); } DEBUG(" reading type = " << ENUM_TO_STRING(LiveDataSource, ReadingType, readingType)); //move to the last read position, from == total bytes read //since the other source types are sequencial we cannot seek on them if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) device.seek(from); DEBUG(" bytes available = " << device.bytesAvailable()); //count the new lines, increase actualrows on each //now we read all the new lines, if we want to use sample rate //then here we can do it, if we have actually sample rate number of lines :-? int newLinesForSampleSizeNotTillEnd = 0; int newLinesTillEnd = 0; QVector newData; if (readingType != LiveDataSource::ReadingType::TillEnd) newData.resize(spreadsheet->sampleSize()); int newDataIdx = 0; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportReadingFromFile: "); #endif while (!device.atEnd()) { DEBUG(" reading type = " << ENUM_TO_STRING(LiveDataSource, ReadingType, readingType)); DEBUG(" source type = " << ENUM_TO_STRING(LiveDataSource, SourceType, spreadsheet->sourceType())); if (readingType != LiveDataSource::ReadingType::TillEnd) { switch (spreadsheet->sourceType()) { // different sources need different read methods case LiveDataSource::SourceType::LocalSocket: newData[newDataIdx++] = device.readAll(); break; case LiveDataSource::SourceType::NetworkUdpSocket: newData[newDataIdx++] = device.read(device.bytesAvailable()); break; case LiveDataSource::SourceType::FileOrPipe: case LiveDataSource::SourceType::NetworkTcpSocket: //TODO: check serial port case LiveDataSource::SourceType::SerialPort: if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::readFromLiveDevice(): device cannot 'readLine()' but using it anyway."); newData[newDataIdx++] = device.readLine(); } } else { // ReadingType::TillEnd switch (spreadsheet->sourceType()) { // different sources need different read methods case LiveDataSource::SourceType::LocalSocket: newData.push_back(device.readAll()); break; case LiveDataSource::SourceType::NetworkUdpSocket: newData.push_back(device.read(device.bytesAvailable())); break; case LiveDataSource::SourceType::FileOrPipe: case LiveDataSource::SourceType::NetworkTcpSocket: //TODO: check serial port case LiveDataSource::SourceType::SerialPort: if (!device.canReadLine()) DEBUG("WARNING in AsciiFilterPrivate::readFromLiveDevice(): device cannot 'readLine()' but using it anyway."); newData.push_back(device.readLine()); } } newLinesTillEnd++; if (readingType != LiveDataSource::ReadingType::TillEnd) { newLinesForSampleSizeNotTillEnd++; //for Continuous reading and FromEnd we read sample rate number of lines if possible //here TillEnd and Whole file behave the same if (newLinesForSampleSizeNotTillEnd == spreadsheet->sampleSize()) break; } } QDEBUG(" data read: " << newData); } //now we reset the readingType if (spreadsheet->readingType() == LiveDataSource::ReadingType::FromEnd) readingType = spreadsheet->readingType(); //we had less new lines than the sample size specified if (readingType != LiveDataSource::ReadingType::TillEnd) QDEBUG("Removed empty lines: " << newData.removeAll("")); //back to the last read position before counting when reading from files if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) device.seek(from); const int spreadsheetRowCountBeforeResize = spreadsheet->rowCount(); int currentRow = 0; // indexes the position in the vector(column) int linesToRead = 0; int keepNValues = spreadsheet->keepNValues(); DEBUG("Increase row count"); if (m_prepared) { //increase row count if we don't have a fixed size //but only after the preparation step if (keepNValues == 0) { if (readingType != LiveDataSource::ReadingType::TillEnd) m_actualRows += qMin(newData.size(), spreadsheet->sampleSize()); else { //we don't increase it if we reread the whole file, we reset it if (!(spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile)) m_actualRows += newData.size(); else m_actualRows = newData.size(); } //appending if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) linesToRead = m_actualRows; else linesToRead = m_actualRows - spreadsheetRowCountBeforeResize; } else { // fixed size if (readingType == LiveDataSource::ReadingType::TillEnd) { //we had more lines than the fixed size, so we read m_actualRows number of lines if (newLinesTillEnd > m_actualRows) { linesToRead = m_actualRows; //TODO after reading we should skip the next data lines //because it's TillEnd actually } else linesToRead = newLinesTillEnd; } else { //we read max sample rate number of lines when the reading mode //is ContinuouslyFixed or FromEnd, WholeFile is disabled linesToRead = qMin(spreadsheet->sampleSize(), newLinesTillEnd); } } DEBUG(" actual row = " << m_actualRows); if (linesToRead == 0) return 0; } else { linesToRead = newLinesTillEnd; if (headerEnabled) --m_actualRows; } DEBUG(" lines to read = " << linesToRead); //TODO: check other source types if (spreadsheet->sourceType() == LiveDataSource::SourceType::NetworkUdpSocket) { if (m_actualRows < linesToRead) { DEBUG(" SET actual rows to " << linesToRead); m_actualRows = linesToRead; } } //new rows/resize columns if we don't have a fixed size //TODO if the user changes this value..m_resizedToFixedSize..setResizedToFixedSize if (keepNValues == 0) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportResizing: "); #endif if (spreadsheet->rowCount() < m_actualRows) spreadsheet->setRowCount(m_actualRows); if (!m_prepared) currentRow = 0; else { // indexes the position in the vector(column) if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) currentRow = 0; else currentRow = spreadsheetRowCountBeforeResize; } // if we have fixed size, we do this only once in preparation, here we can use // m_prepared and we need something to decide whether it has a fixed size or increasing for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } else { //when we have a fixed size we have to pop sampleSize number of lines if specified //here popping, setting currentRow if (!m_prepared) { if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) currentRow = 0; else currentRow = m_actualRows - qMin(newLinesTillEnd, m_actualRows); } else { if (readingType == LiveDataSource::ReadingType::TillEnd) { if (newLinesTillEnd > m_actualRows) { currentRow = 0; } else { if (spreadsheet->readingType() == LiveDataSource::ReadingType::WholeFile) currentRow = 0; else currentRow = m_actualRows - newLinesTillEnd; } } else { //we read max sample rate number of lines when the reading mode //is ContinuouslyFixed or FromEnd currentRow = m_actualRows - qMin(spreadsheet->sampleSize(), newLinesTillEnd); } } if (m_prepared) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportPopping: "); #endif for (int row = 0; row < linesToRead; ++row) { for (int col = 0; col < m_actualCols; ++col) { switch (columnModes[col]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } } } // from the last row we read the new data in the spreadsheet qDebug() << "reading from line" << currentRow << " till end" << newLinesTillEnd; qDebug() << "Lines to read:" << linesToRead <<", actual rows:" << m_actualRows << ", actual cols:" << m_actualCols; newDataIdx = 0; if (readingType == LiveDataSource::ReadingType::FromEnd) { if (m_prepared) { if (newData.size() > spreadsheet->sampleSize()) newDataIdx = newData.size() - spreadsheet->sampleSize(); //since we skip a couple of lines, we need to count those bytes too for (int i = 0; i < newDataIdx; ++i) bytesread += newData.at(i).size(); } } qDebug() << "newDataIdx: " << newDataIdx; //TODO static int indexColumnIdx = 0; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportFillingContainers: "); #endif int row = 0; if (readingType == LiveDataSource::ReadingType::TillEnd || (readingType == LiveDataSource::ReadingType::ContinuousFixed)) { if (headerEnabled) { if (!m_prepared) { row = 1; bytesread += newData.at(0).size(); } } } if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) { if (readingType == LiveDataSource::ReadingType::WholeFile) { if (headerEnabled) { row = 1; bytesread += newData.at(0).size(); } } } for (; row < linesToRead; ++row) { DEBUG(" row = " << row); QString line; if (readingType == LiveDataSource::ReadingType::FromEnd) line = newData.at(newDataIdx++); else line = newData.at(row); //when we read the whole file we don't care about the previous position //so we don't have to count those bytes if (readingType != LiveDataSource::ReadingType::WholeFile) { if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) { bytesread += line.size(); } } //qDebug() << "line bytes: " << line.size() << " line: " << line; if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QLocale locale(numberFormat); QStringList lineStringList; // only FileOrPipe support multiple columns if (spreadsheet->sourceType() == LiveDataSource::SourceType::FileOrPipe) lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); else lineStringList << line; QDEBUG(" line = " << lineStringList << ", separator = \'" << m_separator << "\'"); if (createIndexEnabled) { if (spreadsheet->keepNValues() == 0) lineStringList.prepend(QString::number(currentRow)); else lineStringList.prepend(QString::number(indexColumnIdx++)); } QDEBUG(" column modes = " << columnModes); for (int n = 0; n < m_actualCols; ++n) { DEBUG(" actual col = " << n); if (n < lineStringList.size()) { QString valueString = lineStringList.at(n); DEBUG(" value string = " << valueString.toStdString()); // set value depending on data type switch (columnModes[n]) { case AbstractColumn::Numeric: { DEBUG(" Numeric"); bool isNumber; const double value = locale.toDouble(valueString, &isNumber); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : nanValue); qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::Integer: { DEBUG(" Integer"); bool isNumber; const int value = locale.toInt(valueString, &isNumber); DEBUG(" container size = " << m_dataContainer.size() << ", current row = " << currentRow); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : 0); qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueString; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } } else { DEBUG(" missing columns in this line"); switch (columnModes[n]) { case AbstractColumn::Numeric: static_cast*>(m_dataContainer[n])->operator[](currentRow) = nanValue; break; case AbstractColumn::Integer: static_cast*>(m_dataContainer[n])->operator[](currentRow) = 0; break; case AbstractColumn::DateTime: static_cast*>(m_dataContainer[n])->operator[](currentRow) = QDateTime(); break; case AbstractColumn::Text: static_cast*>(m_dataContainer[n])->operator[](currentRow) = ""; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } } } currentRow++; } } if (m_prepared) { //notify all affected columns and plots about the changes PERFTRACE("AsciiLiveDataImport, notify affected columns and plots"); const Project* project = spreadsheet->project(); QVector curves = project->children(AbstractAspect::Recursive); QVector plots; for (int n = 0; n < m_actualCols; ++n) { Column* column = spreadsheet->column(n); //determine the plots where the column is consumed for (const auto* curve: curves) { if (curve->xColumn() == column || curve->yColumn() == column) { CartesianPlot* plot = dynamic_cast(curve->parentAspect()); if (plots.indexOf(plot) == -1) { plots << plot; plot->setSuppressDataChangedSignal(true); } } } column->setChanged(); } //loop over all affected plots and retransform them for (auto* plot: plots) { plot->setSuppressDataChangedSignal(false); plot->dataChanged(); } } m_prepared = true; return bytesread; } /*! reads the content of device \c device to the data source \c dataSource. Uses the settings defined in the data source. */ void AsciiFilterPrivate::readDataFromDevice(QIODevice& device, AbstractDataSource* dataSource, AbstractFileFilter::ImportMode importMode, int lines) { DEBUG("AsciiFilterPrivate::readDataFromDevice(): dataSource = " << dataSource << ", mode = " << ENUM_TO_STRING(AbstractFileFilter, ImportMode, importMode) << ", lines = " << lines); if (!m_prepared) { const int deviceError = prepareDeviceToRead(device); if (deviceError != 0) { DEBUG("Device error = " << deviceError); return; } // matrix data has only one column mode (which is not text) if (dynamic_cast(dataSource)) { auto mode = columnModes[0]; if (mode == AbstractColumn::Text) mode = AbstractColumn::Numeric; for (auto& c: columnModes) if (c != mode) c = mode; } m_columnOffset = dataSource->prepareImport(m_dataContainer, importMode, m_actualRows - m_actualStartRow + 1, m_actualCols, vectorNames, columnModes); m_prepared = true; } DEBUG("locale = " << QLocale::languageToString(numberFormat).toStdString()); QLocale locale(numberFormat); // Read the data int currentRow = 0; // indexes the position in the vector(column) if (lines == -1) lines = m_actualRows; DEBUG("reading " << qMin(lines, m_actualRows) << " lines"); for (int i = 0; i < qMin(lines, m_actualRows); ++i) { QString line = device.readLine(); // skip start lines if (m_actualStartRow > 1) { m_actualStartRow--; continue; } line.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QStringList lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); //prepend the index if required //TODO: come up maybe with a solution with adding the index inside of the loop below, //without conversion to string, prepending to the list and then conversion back to integer. if (createIndexEnabled) lineStringList.prepend(QString::number(i+1)); // remove left white spaces if (skipEmptyParts) { for (int n = 0; n < lineStringList.size(); ++n) { QString valueString = lineStringList.at(n); if (!QString::compare(valueString, " ")) { lineStringList.removeAt(n); n--; } } } for (int n = 0; n < m_actualCols; ++n) { if (n < lineStringList.size()) { QString valueString = lineStringList.at(n); // set value depending on data type switch (columnModes[n]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : nanValue); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); static_cast*>(m_dataContainer[n])->operator[](currentRow) = (isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[n])->operator[](currentRow) = valueString; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } else { // missing columns in this line switch (columnModes[n]) { case AbstractColumn::Numeric: static_cast*>(m_dataContainer[n])->operator[](currentRow) = nanValue; break; case AbstractColumn::Integer: static_cast*>(m_dataContainer[n])->operator[](currentRow) = 0; break; case AbstractColumn::DateTime: static_cast*>(m_dataContainer[n])->operator[](currentRow) = QDateTime(); break; case AbstractColumn::Text: static_cast*>(m_dataContainer[n])->operator[](currentRow) = ""; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } } currentRow++; emit q->completed(100 * currentRow/m_actualRows); } dataSource->finalizeImport(m_columnOffset, startColumn, endColumn, dateTimeFormat, importMode); } /*! * preview for special devices (local/UDP/TCP socket or serial port) */ QVector AsciiFilterPrivate::preview(QIODevice &device) { DEBUG("AsciiFilterPrivate::preview(): bytesAvailable = " << device.bytesAvailable() << ", isSequential = " << device.isSequential()); QVector dataStrings; if (!(device.bytesAvailable() > 0)) { DEBUG("No new data available"); return dataStrings; } if (device.isSequential() && device.bytesAvailable() < (int)sizeof(quint16)) return dataStrings; #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportTotal: "); #endif int linesToRead = 0; QVector newData; while (!device.atEnd()) { if (device.canReadLine()) newData.push_back(device.readLine()); else // UDP fails otherwise newData.push_back(device.readAll()); linesToRead++; } QDEBUG(" data = " << newData); if (linesToRead == 0) return dataStrings; int col = 0; int colMax = newData.at(0).size(); if (createIndexEnabled) colMax++; columnModes.resize(colMax); if (createIndexEnabled) { columnModes[0] = AbstractColumn::ColumnMode::Integer; col = 1; vectorNames.prepend(i18n("Index")); } vectorNames.append(i18n("Value")); QDEBUG(" vector names = " << vectorNames); for (const auto& valueString: newData.at(0).split(' ', QString::SkipEmptyParts)) { if (col == colMax) break; columnModes[col++] = AbstractFileFilter::columnMode(valueString, dateTimeFormat, numberFormat); } for (int i = 0; i < linesToRead; ++i) { QString line = newData.at(i); if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QLocale locale(numberFormat); QStringList lineStringList = line.split(' ', QString::SkipEmptyParts); if (createIndexEnabled) lineStringList.prepend(QString::number(i)); QStringList lineString; for (int n = 0; n < lineStringList.size(); ++n) { if (n < lineStringList.size()) { QString valueString = lineStringList.at(n); switch (columnModes[n]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); lineString += QString::number(isNumber ? value : nanValue, 'g', 16); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); lineString += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); lineString += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); lineString += valueString; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } else // missing columns in this line lineString += QLatin1String(""); } dataStrings << lineString; } return dataStrings; } /*! * generates the preview for the file \c fileName reading the provided number of \c lines. */ QVector AsciiFilterPrivate::preview(const QString& fileName, int lines) { QVector dataStrings; KFilterDev device(fileName); const int deviceError = prepareDeviceToRead(device); if (deviceError != 0) { DEBUG("Device error = " << deviceError); return dataStrings; } //number formatting DEBUG("locale = " << QLocale::languageToString(numberFormat).toStdString()); QLocale locale(numberFormat); // Read the data if (lines == -1) lines = m_actualRows; // set column names for preview if (!headerEnabled) { int start = 0; if (createIndexEnabled) start = 1; for (int i=start;i 1) { m_actualStartRow--; continue; } line.remove(QRegExp("[\\n\\r]")); // remove any newline if (simplifyWhitespacesEnabled) line = line.simplified(); if (line.isEmpty() || line.startsWith(commentCharacter)) // skip empty or commented lines continue; QStringList lineStringList = line.split(m_separator, (QString::SplitBehavior)skipEmptyParts); QDEBUG(" line = " << lineStringList); //prepend index if required if (createIndexEnabled) lineStringList.prepend(QString::number(i+1)); QStringList lineString; for (int n = 0; n < m_actualCols; ++n) { if (n < lineStringList.size()) { QString valueString = lineStringList.at(n); //DEBUG(" valueString = " << valueString.toStdString()); if (skipEmptyParts && !QString::compare(valueString, " ")) // handle left white spaces continue; // set value depending on data type switch (columnModes[n]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(valueString, &isNumber); lineString += QString::number(isNumber ? value : nanValue, 'g', 15); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); lineString += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); lineString += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); lineString += valueString; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } } else // missing columns in this line lineString += QLatin1String(""); } dataStrings << lineString; } return dataStrings; } /*! writes the content of \c dataSource to the file \c fileName. */ void AsciiFilterPrivate::write(const QString & fileName, AbstractDataSource* dataSource) { Q_UNUSED(fileName); Q_UNUSED(dataSource); //TODO: save data to ascii file } //############################################################################## //################## Serialization/Deserialization ########################### //############################################################################## /*! Saves as XML. */ void AsciiFilter::save(QXmlStreamWriter* writer) const { writer->writeStartElement( "asciiFilter"); writer->writeAttribute( "commentCharacter", d->commentCharacter); writer->writeAttribute( "separatingCharacter", d->separatingCharacter); writer->writeAttribute( "autoMode", QString::number(d->autoModeEnabled)); writer->writeAttribute( "createIndex", QString::number(d->createIndexEnabled)); writer->writeAttribute( "header", QString::number(d->headerEnabled)); writer->writeAttribute( "vectorNames", d->vectorNames.join(' ')); writer->writeAttribute( "skipEmptyParts", QString::number(d->skipEmptyParts)); writer->writeAttribute( "simplifyWhitespaces", QString::number(d->simplifyWhitespacesEnabled)); writer->writeAttribute( "nanValue", QString::number(d->nanValue)); writer->writeAttribute( "removeQuotes", QString::number(d->removeQuotesEnabled)); writer->writeAttribute( "startRow", QString::number(d->startRow)); writer->writeAttribute( "endRow", QString::number(d->endRow)); writer->writeAttribute( "startColumn", QString::number(d->startColumn)); writer->writeAttribute( "endColumn", QString::number(d->endColumn)); writer->writeEndElement(); } /*! Loads from XML. */ bool AsciiFilter::load(XmlStreamReader* reader) { if (!reader->isStartElement() || reader->name() != "asciiFilter") { reader->raiseError(i18n("no ascii filter element found")); return false; } KLocalizedString attributeWarning = ki18n("Attribute '%1' missing or empty, default value is used"); QXmlStreamAttributes attribs = reader->attributes(); QString str = attribs.value("commentCharacter").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("commentCharacter").toString()); else d->commentCharacter = str; str = attribs.value("separatingCharacter").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("separatingCharacter").toString()); else d->separatingCharacter = str; str = attribs.value("createIndex").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("createIndex").toString()); else d->createIndexEnabled = str.toInt(); str = attribs.value("autoMode").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("autoMode").toString()); else d->autoModeEnabled = str.toInt(); str = attribs.value("header").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("header").toString()); else d->headerEnabled = str.toInt(); str = attribs.value("vectorNames").toString(); d->vectorNames = str.split(' '); //may be empty str = attribs.value("simplifyWhitespaces").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("simplifyWhitespaces").toString()); else d->simplifyWhitespacesEnabled = str.toInt(); str = attribs.value("nanValue").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("nanValue").toString()); else d->nanValue = str.toDouble(); str = attribs.value("removeQuotes").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("removeQuotes").toString()); else d->removeQuotesEnabled = str.toInt(); str = attribs.value("skipEmptyParts").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("skipEmptyParts").toString()); else d->skipEmptyParts = str.toInt(); str = attribs.value("startRow").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("startRow").toString()); else d->startRow = str.toInt(); str = attribs.value("endRow").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("endRow").toString()); else d->endRow = str.toInt(); str = attribs.value("startColumn").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("startColumn").toString()); else d->startColumn = str.toInt(); str = attribs.value("endColumn").toString(); if (str.isEmpty()) reader->raiseWarning(attributeWarning.subs("endColumn").toString()); else d->endColumn = str.toInt(); return true; } int AsciiFilterPrivate::isPrepared() { return m_prepared; } int AsciiFilter::isPrepared() { return d->isPrepared(); } #ifdef HAVE_MQTT void AsciiFilter::mqttPreview(QVector& list, const QString& message, const QString& topic) { d->mqttPreview(list, message, topic); } void AsciiFilterPrivate::mqttPreview(QVector& list, const QString& message, const QString& topic) { QVector dataStrings; if (!message.isEmpty()) { qDebug()<<"ascii mqtt preview" < newData; QStringList newDataList = message.split(QRegExp("\n|\r\n|\r"), QString::SkipEmptyParts); for (auto& valueString: newDataList) { QStringList splitString = valueString.split(' ', QString::SkipEmptyParts); for ( const auto& valueString2: splitString) { if (!valueString2.isEmpty() && !valueString2.startsWith(commentCharacter) ) { linesToRead++; newData.push_back(valueString2); } } } qDebug() <<" data investigated, lines to read: "< list.size()) { linesToRead = list.size(); qDebug()<<"Lines to bigger: " < numeric if (mode == AbstractColumn::Numeric && columnModes[colSize - 1] == AbstractColumn::Integer) { columnModes[colSize - 1] = mode; qDebug()<<"-----setting column mode" << " "< text if ( (mode == AbstractColumn::Text) && (columnModes[colSize - 1] != AbstractColumn::Text) ) { columnModes[colSize - 1] = mode; qDebug()<<"-----setting column mode" << " "< 0) ) if (createIndexEnabled) lineString += QString::number(i); if(!list.isEmpty() && mqttPreviewFirstEmptyColCount > 0){ for(int j = 0; j < mqttPreviewFirstEmptyColCount; j++) { lineString += QString::number(nanValue, 'g', 16); qDebug()<<"first column updated with nan"; } } switch (columnModes[colSize - 1]) { case AbstractColumn::Numeric: { bool isNumber; const double value = locale.toDouble(line, &isNumber); lineString += QString::number(isNumber ? value : nanValue, 'g', 16); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(line, &isNumber); lineString += QString::number(isNumber ? value : 0); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(line, dateTimeFormat); lineString += valueDateTime.isValid() ? valueDateTime.toString(dateTimeFormat) : QLatin1String(" "); break; } case AbstractColumn::Text: if (removeQuotesEnabled) line.remove(QRegExp("[\"\']")); lineString += line; break; case AbstractColumn::Month: // never happens case AbstractColumn::Day: break; } qDebug()<<"column updated with value"; if(list.isEmpty() || (!list.isEmpty() && mqttPreviewFirstEmptyColCount > 0)) dataStrings << lineString; if (!list.isEmpty() && mqttPreviewFirstEmptyColCount == 0) dataStrings[i] = lineString; } if(mqttPreviewFirstEmptyColCount > 0) { mqttPreviewFirstEmptyColCount = 0; } } else if (list.isEmpty() || (!list.isEmpty() && mqttPreviewFirstEmptyColCount > 0) ) { mqttPreviewFirstEmptyColCount ++; qDebug()<<"first column empty: "<< mqttPreviewFirstEmptyColCount; int colSize; if (mqttPreviewFirstEmptyColCount == 1){ if (createIndexEnabled) colSize = 2; else colSize = 1; } else colSize = columnModes.size() + 1; columnModes.resize(colSize); if (mqttPreviewFirstEmptyColCount == 1) if (createIndexEnabled) { columnModes[0] = AbstractColumn::ColumnMode::Integer; vectorNames.prepend("index"); } vectorNames.append( topic); columnModes[colSize-1] = AbstractColumn::ColumnMode::Numeric; qDebug()<<"Column mode set for empty column"; QStringList lineString; if (mqttPreviewFirstEmptyColCount == 1) { if(createIndexEnabled) lineString += QString::number(0); lineString += QString::number(nanValue, 'g', 16); dataStrings << lineString; qDebug()<<"new line added with nan"; } else { dataStrings = list; dataStrings[0] += QString::number(nanValue, 'g', 16); qDebug()<<"line expanded with nan"; } } else if(!list.isEmpty()) { vectorNames.append( topic); qDebug()<<"vector name set on : " << topic; int colSize = columnModes.size() + 1; columnModes.resize(colSize); columnModes[colSize-1] = AbstractColumn::ColumnMode::Numeric; dataStrings = list; for (int i = 0; i < dataStrings.size(); ++i) { dataStrings[i] += QString::number(nanValue, 'g', 16); } } list = dataStrings; } -QString AsciiFilter::mqttColumnStatistics(const MQTTTopic* topic, MQTTClient* client) const{ - return d->mqttColumnStatistics(topic, client); +QString AsciiFilter::mqttColumnStatistics(const MQTTTopic* topic) const{ + return d->mqttColumnStatistics(topic); } -QString AsciiFilterPrivate::mqttColumnStatistics(const MQTTTopic* topic, MQTTClient* client) const{ +QString AsciiFilterPrivate::mqttColumnStatistics(const MQTTTopic* topic) const{ qDebug()<<"MQTT Column Statistics"; Column* tempColumn = topic->child(m_actualCols - 1); QString statistics; - QVector willStatistics = client->willStatistics(); + QVector willStatistics = topic->mqttClient()->willStatistics(); for(int i = 0; i <= willStatistics.size(); i++) { if(willStatistics[i]) { switch (static_cast(i) ) { case MQTTClient::WillStatistics::ArithmeticMean: statistics += QLatin1String("Arithmetic mean: ") + QString::number(tempColumn->statistics().arithmeticMean) + "\n"; break; case MQTTClient::WillStatistics::ContraharmonicMean: statistics += QLatin1String("Contraharmonic mean: ") + QString::number(tempColumn->statistics().contraharmonicMean) + "\n"; break; case MQTTClient::WillStatistics::Entropy: statistics += QLatin1String("Entropy: ") + QString::number(tempColumn->statistics().entropy) + "\n"; break; case MQTTClient::WillStatistics::GeometricMean: statistics += QLatin1String("Geometric mean: ") + QString::number(tempColumn->statistics().geometricMean) + "\n"; break; case MQTTClient::WillStatistics::HarmonicMean: statistics += QLatin1String("Harmonic mean: ") + QString::number(tempColumn->statistics().harmonicMean) + "\n"; break; case MQTTClient::WillStatistics::Kurtosis: statistics += QLatin1String("Kurtosis: ") + QString::number(tempColumn->statistics().kurtosis) + "\n"; break; case MQTTClient::WillStatistics::Maximum: statistics += QLatin1String("Maximum: ") + QString::number(tempColumn->statistics().maximum) + "\n"; break; case MQTTClient::WillStatistics::MeanDeviation: statistics += QLatin1String("Mean deviation: ") + QString::number(tempColumn->statistics().meanDeviation) + "\n"; break; case MQTTClient::WillStatistics::MeanDeviationAroundMedian: statistics += QLatin1String("Mean deviation around median: ") + QString::number(tempColumn->statistics().meanDeviationAroundMedian) + "\n"; break; case MQTTClient::WillStatistics::Median: statistics += QLatin1String("Median: ") + QString::number(tempColumn->statistics().median) + "\n"; break; case MQTTClient::WillStatistics::MedianDeviation: statistics += QLatin1String("Median deviation: ") + QString::number(tempColumn->statistics().medianDeviation) + "\n"; break; case MQTTClient::WillStatistics::Minimum: statistics += QLatin1String("Minimum: ") + QString::number(tempColumn->statistics().minimum) + "\n"; break; case MQTTClient::WillStatistics::Skewness: statistics += QLatin1String("Skewness: ") + QString::number(tempColumn->statistics().skewness) + "\n"; break; case MQTTClient::WillStatistics::StandardDeviation: statistics += QLatin1String("Standard deviation: ") + QString::number(tempColumn->statistics().standardDeviation) + "\n"; break; case MQTTClient::WillStatistics::Variance: statistics += QLatin1String("Variance: ") + QString::number(tempColumn->statistics().variance) + "\n"; break; default: break; } } } return statistics; } AbstractColumn::ColumnMode AsciiFilter::mqttColumnMode() const{ return d->mqttColumnMode(); } AbstractColumn::ColumnMode AsciiFilterPrivate::mqttColumnMode() const{ return columnModes[m_actualCols - 1]; } -void AsciiFilter::readMQTTTopic(const QString& message, const QString& topic, AbstractDataSource*dataSource) { - d->readMQTTTopic(message, topic, dataSource); -} - void AsciiFilterPrivate::readMQTTTopic(const QString& message, const QString& topic, AbstractDataSource*dataSource) { if (message.isEmpty()) { DEBUG("No new data available"); return; } MQTTTopic* spreadsheet = dynamic_cast(dataSource); int keepNValues = spreadsheet->keepNvalues(); if (!m_prepared) { qDebug()<<"Start prepare mqtt"; const int mqttPrepareError = prepareMQTTTopicToRead(message, topic); if (mqttPrepareError != 0) { DEBUG("Mqtt Prepare Error = " << mqttPrepareError); qDebug()<setUndoAware(false); spreadsheet->resize(AbstractFileFilter::Replace, vectorNames, m_actualCols); qDebug() << "fds resized to col: " << m_actualCols; qDebug() << "fds rowCount: " << spreadsheet->rowCount(); //columns in a file data source don't have any manual changes. //make the available columns undo unaware and suppress the "data changed" signal. //data changes will be propagated via an explicit Column::setChanged() call once new data was read. for (int i = 0; i < spreadsheet->childCount(); i++) { spreadsheet->child(i)->setUndoAware(false); spreadsheet->child(i)->setSuppressDataChangedSignal(true); } if (keepNValues == 0) spreadsheet->setRowCount(m_actualRows > 1 ? m_actualRows : 1); else { spreadsheet->setRowCount(spreadsheet->keepNvalues()); m_actualRows = spreadsheet->keepNvalues(); } m_dataContainer.resize(m_actualCols); for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) spreadsheet->child(n)->setColumnMode(columnModes[n]); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } qDebug() << "prepared!"; } #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportTotal: "); #endif MQTTClient::ReadingType readingType; if (!m_prepared) { readingType = MQTTClient::ReadingType::TillEnd; } else { //we have to read all the data when reading from end //so we set readingType to TillEnd if (spreadsheet->readingType() == MQTTClient::ReadingType::FromEnd) { readingType = MQTTClient::ReadingType::TillEnd; } else { readingType = static_cast(spreadsheet->readingType()); } } //count the new lines, increase actualrows on each //now we read all the new lines, if we want to use sample rate //then here we can do it, if we have actually sample rate number of lines :-? int newLinesForSampleRateNotTillEnd = 0; int newLinesTillEnd = 0; QVector newData; if (readingType != MQTTClient::ReadingType::TillEnd) { newData.reserve(spreadsheet->sampleRate()); newData.resize(spreadsheet->sampleRate()); } int newDataIdx = 0; bool sampleRateReached = false; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportReadingFromFile: "); #endif QStringList newDataList = message.split(QRegExp("\n|\r\n|\r"), QString::SkipEmptyParts); for (auto& valueString: newDataList) { QStringList splitString = valueString.split(m_separator, static_cast(skipEmptyParts)); for (const auto& valueString2: splitString) { if (!valueString2.isEmpty() && !valueString2.startsWith(commentCharacter)) { if (readingType != MQTTClient::ReadingType::TillEnd) newData[newDataIdx++] = valueString2; else newData.push_back(valueString2); newLinesTillEnd++; if (readingType != MQTTClient::ReadingType::TillEnd) { newLinesForSampleRateNotTillEnd++; //for Continous reading and FromEnd we read sample rate number of lines if possible qDebug()<<"new lines for sample rate: "<sampleRate()) { sampleRateReached = true; break; } } } } if(sampleRateReached) break; } } qDebug()<<"Processing message done"; //now we reset the readingType if (static_cast(spreadsheet->readingType()) == MQTTClient::ReadingType::FromEnd) readingType = static_cast(spreadsheet->readingType()); //we had less new lines than the sample rate specified if (readingType != MQTTClient::ReadingType::TillEnd) qDebug() << "Removed empty lines: " << newData.removeAll(""); qDebug()<<"Create index enabled: "<rowCount();; if(m_prepared ) { if (keepNValues == 0) m_actualRows = spreadsheetRowCountBeforeResize; else { if(m_actualRows != spreadsheet->keepNvalues()) { if(m_actualRows < spreadsheet->keepNvalues()) { qDebug()<setRowCount(spreadsheet->keepNvalues()); qDebug()<<"rowcount set to" << spreadsheet->keepNvalues(); } int rowDiff = 0; if(m_actualRows > spreadsheet->keepNvalues()) { rowDiff = m_actualRows - spreadsheet->keepNvalues(); } if(m_actualRows < spreadsheet->keepNvalues()) { rowDiff =spreadsheet->keepNvalues() - m_actualRows; } qDebug()<<"last value changed: "<keepNvalues()<<" "<keepNvalues(); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); if(m_actualRows > spreadsheet->keepNvalues()) { for(int i = 0; i < spreadsheet->keepNvalues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->keepNvalues() + i); qDebug()<< "overwrite row"<keepNvalues() + i; } } if(m_actualRows < spreadsheet->keepNvalues()) { vector->reserve( spreadsheet->keepNvalues()); vector->resize( spreadsheet->keepNvalues()); qDebug()<<"actual rows < keepn"; for(int i = 1; i <= m_actualRows; i++) { static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->keepNvalues() - i) = static_cast*>(m_dataContainer[n])->operator[](spreadsheet->keepNvalues() - i - rowDiff); qDebug()<< "overwrite row "<keepNvalues() - i<<" "<keepNvalues() - i - rowDiff; qDebug()<*>(m_dataContainer[n])->operator[](spreadsheet->keepNvalues() - i - rowDiff); } for(int i = 0; i < rowDiff; i++) { static_cast*>(m_dataContainer[n])->operator[](i) = nanValue; qDebug()<* vector = static_cast* >(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); if(m_actualRows > spreadsheet->keepNvalues()) { for(int i = 0; i < spreadsheet->keepNvalues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->keepNvalues() + i); qDebug()<< "overwrite row "<keepNvalues() + i; } } if(m_actualRows < spreadsheet->keepNvalues()) { vector->reserve( spreadsheet->keepNvalues()); vector->resize( spreadsheet->keepNvalues()); for(int i = 1; i <= m_actualRows; i++) { static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->keepNvalues() - i) = static_cast*>(m_dataContainer[n])->operator[](spreadsheet->keepNvalues() - i - rowDiff); qDebug()<< "overwrite row"<keepNvalues() - i<<" "<keepNvalues() - i - rowDiff; qDebug()<*>(m_dataContainer[n])->operator[](spreadsheet->keepNvalues() - i - rowDiff); } for(int i = 0; i < rowDiff; i++){ static_cast*>(m_dataContainer[n])->operator[](i) = 0; qDebug()<* vector = static_cast*>(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); if(m_actualRows > spreadsheet->keepNvalues()) { for(int i = 0; i < spreadsheet->keepNvalues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->keepNvalues() + i); qDebug()<< "overwrite row"<keepNvalues() + i; } } if(m_actualRows < spreadsheet->keepNvalues()) { vector->reserve( spreadsheet->keepNvalues()); vector->resize( spreadsheet->keepNvalues()); for(int i = 1; i <= m_actualRows; i++) { static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->keepNvalues() - i) = static_cast*>(m_dataContainer[n])->operator[](spreadsheet->keepNvalues() - i - rowDiff); } for(int i = 0; i < rowDiff; i++) static_cast*>(m_dataContainer[n])->operator[](i) = ""; } break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); m_dataContainer[n] = static_cast(vector); if(m_actualRows > spreadsheet->keepNvalues()) { for(int i = 0; i < spreadsheet->keepNvalues(); i++) { static_cast*>(m_dataContainer[n])->operator[] (i) = static_cast*>(m_dataContainer[n])->operator[](m_actualRows - spreadsheet->keepNvalues() + i); qDebug()<< "overwrite row"<keepNvalues() + i; } } if(m_actualRows < spreadsheet->keepNvalues()) { vector->reserve( spreadsheet->keepNvalues()); vector->resize( spreadsheet->keepNvalues()); for(int i = 1; i <= m_actualRows; i++) { static_cast*>(m_dataContainer[n])->operator[] (spreadsheet->keepNvalues() - i) = static_cast*>(m_dataContainer[n])->operator[](spreadsheet->keepNvalues() - i - rowDiff); } for(int i = 0; i < rowDiff; i++) static_cast*>(m_dataContainer[n])->operator[](i) = QDateTime(); } break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } if(m_actualRows > spreadsheet->keepNvalues()) spreadsheet->setRowCount(spreadsheet->keepNvalues()); m_actualRows = spreadsheet->keepNvalues(); qDebug()<<"actual rows: "<sampleRate()); else { m_actualRows += newData.size(); } } //fixed size if (keepNValues != 0) { if (readingType == MQTTClient::ReadingType::TillEnd) { //we had more lines than the fixed size, so we read m_actualRows number of lines if (newLinesTillEnd > m_actualRows) { linesToRead = m_actualRows; //TODO after reading we should skip the next data lines //because it's TillEnd actually } else linesToRead = newLinesTillEnd; } else { //we read max sample rate number of lines when the reading mode //is ContinouslyFixed or FromEnd if(spreadsheet->sampleRate() <= spreadsheet->keepNvalues()) linesToRead = qMin(spreadsheet->sampleRate(), newLinesTillEnd); else linesToRead = qMin(spreadsheet->keepNvalues(), newLinesTillEnd); } } else { linesToRead = m_actualRows - spreadsheetRowCountBeforeResize; } if (linesToRead == 0) return; } else { if(keepNValues != 0) { linesToRead = newLinesTillEnd > m_actualRows ? m_actualRows : newLinesTillEnd; } else { linesToRead = newLinesTillEnd; } } qDebug()<<"linestoread = "<rowCount() < m_actualRows) spreadsheet->setRowCount(m_actualRows); if (!m_prepared) currentRow = 0; else { // indexes the position in the vector(column) currentRow = spreadsheetRowCountBeforeResize; } // if we have fixed size, we do this only once in preparation, here we can use // m_prepared and we need something to decide whether it has a fixed size or increasing for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } else { //when we have a fixed size we have to pop sampleRate number of lines if specified //here popping, setting currentRow if (!m_prepared) currentRow = m_actualRows - qMin(newLinesTillEnd, m_actualRows); else { if (readingType == MQTTClient::ReadingType::TillEnd) { if (newLinesTillEnd > m_actualRows) currentRow = 0; else { currentRow = m_actualRows - newLinesTillEnd; } } else { //we read max sample rate number of lines when the reading mode //is ContinouslyFixed or FromEnd currentRow = m_actualRows - linesToRead; } } if (m_prepared) { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportPopping: "); #endif for (int row = 0; row < linesToRead; ++row) { for (int col = 0; col < m_actualCols; ++col) { switch (columnModes[col]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(spreadsheet->child(col)->data()); vector->pop_front(); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[col] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } } } // from the last row we read the new data in the spreadsheet qDebug() << "reading from line: " << currentRow << " lines till end: " << newLinesTillEnd; qDebug() << "Lines to read: " << linesToRead <<" actual rows: " << m_actualRows; newDataIdx = 0; if (readingType == MQTTClient::ReadingType::FromEnd) { if (m_prepared) { if (newData.size() > spreadsheet->sampleRate()) newDataIdx = newData.size() - spreadsheet->sampleRate(); } } qDebug() << "newDataIdx: " << newDataIdx; //TODO static int indexColumnIdx = 0; { #ifdef PERFTRACE_LIVE_IMPORT PERFTRACE("AsciiLiveDataImportFillingContainers: "); #endif int row = 0; for (; row < linesToRead; ++row) { QString line; if (readingType == MQTTClient::ReadingType::FromEnd) line = newData.at(newDataIdx++); else line = newData.at(row); qDebug ()<<"line "<*>(m_dataContainer[0])->operator[](currentRow) = (isNumber ? value : nanValue); //qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(tempIndex, &isNumber); static_cast*>(m_dataContainer[0])->operator[](currentRow) = (isNumber ? value : 0); //qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(tempIndex, dateTimeFormat); static_cast*>(m_dataContainer[0])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) tempIndex.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[0])->operator[](currentRow) = tempIndex; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } } //setting timestamp on current time static_cast*>(m_dataContainer[col])->operator[](currentRow) = QDateTime::currentDateTime(); //static_cast*>(m_dataContainer[col+1])->operator[](currentRow) = QDateTime::currentDateTime().time().msecsSinceStartOfDay(); QString valueString = line; qDebug()<<"putting in this line" << valueString<< " "<*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = (isNumber ? value : nanValue); //qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::Integer: { bool isNumber; const int value = locale.toInt(valueString, &isNumber); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = (isNumber ? value : 0); //qDebug() << "dataContainer[" << n << "] size:" << static_cast*>(m_dataContainer[n])->size(); break; } case AbstractColumn::DateTime: { const QDateTime valueDateTime = QDateTime::fromString(valueString, dateTimeFormat); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = valueDateTime.isValid() ? valueDateTime : QDateTime(); break; } case AbstractColumn::Text: if (removeQuotesEnabled) valueString.remove(QRegExp("[\"\']")); static_cast*>(m_dataContainer[m_actualCols - 1])->operator[](currentRow) = valueString; break; case AbstractColumn::Month: //TODO break; case AbstractColumn::Day: //TODO break; } currentRow++; qDebug()<<"adding data is ok, current row increment"; } } if (m_prepared) { qDebug()<<"notifying plots"; //notify all affected columns and plots about the changes PERFTRACE("AsciiLiveDataImport, notify affected columns and plots"); qDebug()<<"project "; const Project* project = spreadsheet->project(); qDebug()<<"Curves"; QVector curves = project->children(AbstractAspect::Recursive); QVector plots; qDebug()<<"Uploading plots"; for (int n = 0; n < m_actualCols; ++n) { Column* column = spreadsheet->column(n); //determine the plots where the column is consumed for (const auto* curve: curves) { if (curve->xColumn() == column || curve->yColumn() == column) { CartesianPlot* plot = dynamic_cast(curve->parentAspect()); if (plots.indexOf(plot) == -1) { plots << plot; plot->setSuppressDataChangedSignal(true); } } } column->setChanged(); } //loop over all affected plots and retransform them for (auto* const plot: plots) { //TODO setting this back to true triggers again a lot of retransforms in the plot (one for each curve). // plot->setSuppressDataChangedSignal(false); plot->dataChanged(); } } qDebug()<<"check m_prepared"; if(!m_prepared) m_prepared = true; qDebug()<<"return , comes end"; } int AsciiFilterPrivate::prepareMQTTTopicToRead(const QString& message, const QString& topic) { vectorNames.append("value"); if (endColumn == -1) endColumn = 1; else endColumn++; //vectorNames.prepend("timestamp for plot"); //endColumn++; vectorNames.prepend("timestamp"); endColumn++; if (createIndexEnabled) { vectorNames.prepend("index"); endColumn++; } m_actualCols = endColumn - startColumn + 1; qDebug()<<"actual cols"< 1) { int pos2 = firstLine.indexOf(firstLineStringList.at(1), length1); m_separator = firstLine.mid(length1, pos2 - length1); } else { //old: separator = line.right(line.length() - length1); m_separator = ' '; } } } else { // use given separator // replace symbolic "TAB" with '\t' m_separator = separatingCharacter.replace(QLatin1String("2xTAB"), "\t\t", Qt::CaseInsensitive); m_separator = separatingCharacter.replace(QLatin1String("TAB"), "\t", Qt::CaseInsensitive); // replace symbolic "SPACE" with ' ' m_separator = m_separator.replace(QLatin1String("2xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("3xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("4xSPACE"), QLatin1String(" "), Qt::CaseInsensitive); m_separator = m_separator.replace(QLatin1String("SPACE"), QLatin1String(" "), Qt::CaseInsensitive); firstLineStringList = firstLine.split(m_separator, (QString::SplitBehavior)skipEmptyParts); } DEBUG("separator: \'" << m_separator.toStdString() << '\''); DEBUG("number of columns: " << firstLineStringList.size()); QDEBUG("first line: " << firstLineStringList); DEBUG("headerEnabled = " << headerEnabled); // parse first data line to determine data type for each column columnModes.resize(m_actualCols); int col = 0; if (createIndexEnabled) { columnModes[0] = AbstractColumn::Integer; col = 1; } //set column mode for timestamp columnModes[col] = AbstractColumn::DateTime; col++; //columnModes[col] = AbstractColumn::Integer; //col++; auto firstValue = firstLineStringList.takeFirst();//use first value to identify column mode while(firstValue.isEmpty() || firstValue.startsWith(commentCharacter) ) firstValue = firstLineStringList.takeFirst(); if (simplifyWhitespacesEnabled) firstValue = firstValue.simplified(); columnModes[m_actualCols-1] = AbstractFileFilter::columnMode(firstValue, dateTimeFormat, numberFormat); qDebug()<<"-----setting column mode" << " "< numeric if (mode == AbstractColumn::Numeric && columnModes[m_actualCols-1] == AbstractColumn::Integer) { columnModes[m_actualCols-1] = mode; qDebug()<<"-----setting column mode" << " "< text if (mode == AbstractColumn::Text && columnModes[m_actualCols-1] != AbstractColumn::Text) { columnModes[m_actualCols-1] = mode; qDebug()<<"-----setting column mode" << " "< numeric if (mode == AbstractColumn::Numeric && columnModes[m_actualCols-1] == AbstractColumn::Integer) { columnModes[m_actualCols-1] = mode; qDebug()<<"-----setting column mode" << " "< text if (mode == AbstractColumn::Text && columnModes[m_actualCols-1] != AbstractColumn::Text) { columnModes[m_actualCols-1] = mode; qDebug()<<"-----setting column mode" << " "<setPreparedForMQTT(prepared, topic, separator); } void AsciiFilterPrivate::setPreparedForMQTT(bool prepared, MQTTTopic *topic, const QString& separator) { m_prepared = prepared; if(prepared == true) { m_separator = separator; //m_actualRows = endRow - startRow + 1; m_actualCols = endColumn - startColumn + 1; m_actualRows = topic->rowCount(); columnModes.resize(topic->columnCount()); for(int i = 0; i < topic->columnCount(); ++i) { columnModes[i] = topic->column(i)->columnMode(); } m_dataContainer.resize(m_actualCols); for (int n = 0; n < m_actualCols; ++n) { // data() returns a void* which is a pointer to any data type (see ColumnPrivate.cpp) topic->child(n)->setColumnMode(columnModes[n]); switch (columnModes[n]) { case AbstractColumn::Numeric: { QVector* vector = static_cast* >(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Integer: { QVector* vector = static_cast* >(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::Text: { QVector* vector = static_cast*>(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } case AbstractColumn::DateTime: { QVector* vector = static_cast* >(topic->child(n)->data()); vector->reserve(m_actualRows); vector->resize(m_actualRows); m_dataContainer[n] = static_cast(vector); break; } //TODO case AbstractColumn::Month: case AbstractColumn::Day: break; } } } } QString AsciiFilter::separator() const { return d->separator(); } QString AsciiFilterPrivate::separator() const { return m_separator; } #endif diff --git a/src/backend/datasources/filters/AsciiFilter.h b/src/backend/datasources/filters/AsciiFilter.h index da1c04128..cf01dfb5c 100644 --- a/src/backend/datasources/filters/AsciiFilter.h +++ b/src/backend/datasources/filters/AsciiFilter.h @@ -1,132 +1,132 @@ /*************************************************************************** File : AsciiFilter.h Project : LabPlot Description : ASCII I/O-filter -------------------------------------------------------------------- Copyright : (C) 2009-2013 Alexander Semke (alexander.semke@web.de) Copyright : (C) 2017 Stefan Gerlach (stefan.gerlach@uni.kn) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #ifndef ASCIIFILTER_H #define ASCIIFILTER_H #include "backend/datasources/filters/AbstractFileFilter.h" #include "backend/core/AbstractColumn.h" class Spreadsheet; class QStringList; class QIODevice; class AsciiFilterPrivate; class QAbstractSocket; class MQTTTopic; class MQTTClient; class AsciiFilter : public AbstractFileFilter { Q_OBJECT public: AsciiFilter(); ~AsciiFilter() override; static QStringList separatorCharacters(); static QStringList commentCharacters(); static QStringList dataTypes(); static QStringList predefinedFilters(); static QString fileInfoString(const QString&); static int columnNumber(const QString& fileName, const QString& separator = QString()); static size_t lineNumber(const QString& fileName); static size_t lineNumber(QIODevice&); // calculate number of lines if device supports it // read data from any device void readDataFromDevice(QIODevice& device, AbstractDataSource*, AbstractFileFilter::ImportMode = AbstractFileFilter::Replace, int lines = -1); void readFromLiveDeviceNotFile(QIODevice& device, AbstractDataSource*dataSource); qint64 readFromLiveDevice(QIODevice& device, AbstractDataSource*, qint64 from = -1); // overloaded function to read from file QVector readDataFromFile(const QString& fileName, AbstractDataSource* = nullptr, AbstractFileFilter::ImportMode = AbstractFileFilter::Replace, int lines = -1) override; void write(const QString& fileName, AbstractDataSource*) override; QVector preview(const QString& fileName, int lines); QVector preview(QIODevice& device); void loadFilterSettings(const QString&) override; void saveFilterSettings(const QString&) const override; #ifdef HAVE_MQTT void mqttPreview(QVector&, const QString&, const QString&); - QString mqttColumnStatistics(const MQTTTopic * topic, MQTTClient *client) const; + QString mqttColumnStatistics(const MQTTTopic * topic) const; AbstractColumn::ColumnMode mqttColumnMode() const; void readMQTTTopic(const QString&, const QString&, AbstractDataSource*dataSource); void setPreparedForMQTT(bool, MQTTTopic *topic, const QString&); QString separator() const; #endif void setCommentCharacter(const QString&); QString commentCharacter() const; void setSeparatingCharacter(const QString&); QString separatingCharacter() const; void setDateTimeFormat(const QString&); QString dateTimeFormat() const; void setNumberFormat(QLocale::Language); QLocale::Language numberFormat() const; void setAutoModeEnabled(const bool); bool isAutoModeEnabled() const; void setHeaderEnabled(const bool); bool isHeaderEnabled() const; void setSkipEmptyParts(const bool); bool skipEmptyParts() const; void setSimplifyWhitespacesEnabled(const bool); bool simplifyWhitespacesEnabled() const; void setNaNValueToZero(const bool); bool NaNValueToZeroEnabled() const; void setRemoveQuotesEnabled(const bool); bool removeQuotesEnabled() const; void setCreateIndexEnabled(const bool); bool createIndexEnabled() const; void setVectorNames(const QString&); QStringList vectorNames() const; QVector columnModes(); void setStartRow(const int); int startRow() const; void setEndRow(const int); int endRow() const; void setStartColumn(const int); int startColumn() const; void setEndColumn(const int); int endColumn() const; void save(QXmlStreamWriter*) const override; bool load(XmlStreamReader*) override; int isPrepared(); private: std::unique_ptr const d; friend class AsciiFilterPrivate; }; #endif diff --git a/src/backend/datasources/filters/AsciiFilterPrivate.h b/src/backend/datasources/filters/AsciiFilterPrivate.h index 4f9fa73db..7907bf9c7 100644 --- a/src/backend/datasources/filters/AsciiFilterPrivate.h +++ b/src/backend/datasources/filters/AsciiFilterPrivate.h @@ -1,105 +1,104 @@ /*************************************************************************** File : AsciiFilterPrivate.h Project : LabPlot Description : Private implementation class for AsciiFilter. -------------------------------------------------------------------- Copyright : (C) 2009-2013 Alexander Semke (alexander.semke@web.de) Copyright : (C) 2017 Stefan Gerlach (stefan.gerlach@uni.kn) ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software * * Foundation, Inc., 51 Franklin Street, Fifth Floor, * * Boston, MA 02110-1301 USA * * * ***************************************************************************/ #ifndef ASCIIFILTERPRIVATE_H #define ASCIIFILTERPRIVATE_H class KFilterDev; class AbstractDataSource; class AbstractColumn; class AbstractAspect; class Spreadsheet; class MQTTTopic; -class MQTTClient; class AsciiFilterPrivate { public: explicit AsciiFilterPrivate(AsciiFilter*); QStringList getLineString(QIODevice&); int prepareDeviceToRead(QIODevice&); void readDataFromDevice(QIODevice&, AbstractDataSource* = nullptr, AbstractFileFilter::ImportMode = AbstractFileFilter::Replace, int lines = -1); void readFromLiveDeviceNotFile(QIODevice& device, AbstractDataSource*, AbstractFileFilter::ImportMode = AbstractFileFilter::Replace); qint64 readFromLiveDevice(QIODevice&, AbstractDataSource*, qint64 from = -1); void readDataFromFile(const QString& fileName, AbstractDataSource* = nullptr, AbstractFileFilter::ImportMode = AbstractFileFilter::Replace, int lines = -1); void write(const QString& fileName, AbstractDataSource*); QVector preview(const QString& fileName, int lines); QVector preview(QIODevice& device); #ifdef HAVE_MQTT void mqttPreview(QVector&, const QString&, const QString&); AbstractColumn::ColumnMode mqttColumnMode() const; - QString mqttColumnStatistics(const MQTTTopic* , MQTTClient*) const; + QString mqttColumnStatistics(const MQTTTopic* ) const; void readMQTTTopic(const QString&, const QString&, AbstractDataSource*dataSource); int prepareMQTTTopicToRead(const QString& message, const QString& topic); void setPreparedForMQTT(bool, MQTTTopic*topic, const QString&); QString separator() const; #endif const AsciiFilter* q; QString commentCharacter; QString separatingCharacter; QString dateTimeFormat; QLocale::Language numberFormat; bool autoModeEnabled; bool headerEnabled; bool skipEmptyParts; bool simplifyWhitespacesEnabled; double nanValue; bool removeQuotesEnabled; bool createIndexEnabled; QStringList vectorNames; QVector columnModes; int startRow; int endRow; int startColumn; int endColumn; int mqttPreviewFirstEmptyColCount; int isPrepared(); private: static const unsigned int m_dataTypeLines = 10; // lines to read for determining data types QString m_separator; int m_actualStartRow; int m_actualRows; int m_actualCols; int m_maxActualRows; int m_lastRowNum; int m_prepared; int m_columnOffset; // indexes the "start column" in the datasource. Data will be imported starting from this column. QVector m_dataContainer; // pointers to the actual data containers }; #endif