diff --git a/src/server/handler/akappend.cpp b/src/server/handler/akappend.cpp index b5bec3b10..bef824ba9 100644 --- a/src/server/handler/akappend.cpp +++ b/src/server/handler/akappend.cpp @@ -1,456 +1,452 @@ /*************************************************************************** * Copyright (C) 2007 by Robert Zwerus * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "akappend.h" #include "fetchhelper.h" #include "connection.h" #include "preprocessormanager.h" #include "handlerhelper.h" #include "storage/datastore.h" #include "storage/transaction.h" #include "storage/parttypehelper.h" #include "storage/dbconfig.h" #include "storage/partstreamer.h" #include "storage/parthelper.h" #include "storage/selectquerybuilder.h" #include #include //std::accumulate using namespace Akonadi; using namespace Akonadi::Server; static QVector localFlagsToPreserve = QVector() << "$ATTACHMENT" << "$INVITATION" << "$ENCRYPTED" << "$SIGNED" << "$WATCHED"; bool AkAppend::buildPimItem(const Protocol::CreateItemCommand &cmd, PimItem &item, Collection &parentCol) { parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()); if (!parentCol.isValid()) { return failureResponse(QStringLiteral("Invalid parent collection")); } if (parentCol.isVirtual()) { return failureResponse(QStringLiteral("Cannot append item into virtual collection")); } MimeType mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType()); if (!mimeType.isValid()) { return failureResponse(QStringLiteral("Unable to create mimetype '") % cmd.mimeType() % QStringLiteral("'.")); } item.setRev(0); item.setSize(cmd.itemSize()); item.setMimeTypeId(mimeType.id()); item.setCollectionId(parentCol.id()); item.setDatetime(cmd.dateTime()); if (cmd.remoteId().isEmpty()) { // from application item.setDirty(true); } else { // from resource item.setRemoteId(cmd.remoteId()); item.setDirty(false); } item.setRemoteRevision(cmd.remoteRevision()); item.setGid(cmd.gid()); item.setAtime(QDateTime::currentDateTimeUtc()); return true; } bool AkAppend::insertItem(const Protocol::CreateItemCommand &cmd, PimItem &item, const Collection &parentCol) { if (!item.datetime().isValid()) { item.setDatetime(QDateTime::currentDateTimeUtc()); } if (!item.insert()) { return failureResponse(QStringLiteral("Failed to append item")); } // set message flags const QSet flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags(); if (!flags.isEmpty()) { // This will hit an entry in cache inserted there in buildPimItem() const Flag::List flagList = HandlerHelper::resolveFlags(flags); bool flagsChanged = false; if (!DataStore::self()->appendItemsFlags(PimItem::List() << item, flagList, &flagsChanged, false, parentCol, true)) { return failureResponse("Unable to append item flags."); } } const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags(); if (!tags.isEmpty()) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); bool tagsChanged = false; if (!DataStore::self()->appendItemsTags(PimItem::List() << item, tagList, &tagsChanged, false, parentCol, true)) { return failureResponse(QStringLiteral("Unable to append item tags.")); } } // Handle individual parts qint64 partSizes = 0; - PartStreamer streamer(connection(), item, this); - connect(&streamer, &PartStreamer::responseAvailable, - this, static_cast(&Handler::sendResponse)); + PartStreamer streamer(connection(), item); Q_FOREACH (const QByteArray &partName, cmd.parts()) { qint64 partSize = 0; if (!streamer.stream(true, partName, partSize)) { return failureResponse(streamer.error()); } partSizes += partSize; } const Protocol::Attributes attrs = cmd.attributes(); for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) { if (!streamer.streamAttribute(true, iter.key(), iter.value())) { return failureResponse(streamer.error()); } } // TODO: Try to avoid this addition query if (partSizes > item.size()) { item.setSize(partSizes); item.update(); } // Preprocessing if (PreprocessorManager::instance()->isActive()) { Part hiddenAttribute; hiddenAttribute.setPimItemId(item.id()); hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN))); hiddenAttribute.setData(QByteArray()); hiddenAttribute.setDatasize(0); // TODO: Handle errors? Technically, this is not a critical issue as no data are lost PartHelper::insert(&hiddenAttribute); } const bool seen = flags.contains(AKONADI_FLAG_SEEN) || flags.contains(AKONADI_FLAG_IGNORED); notify(item, seen, item.collection()); sendResponse(item, Protocol::CreateItemCommand::None); return true; } bool AkAppend::mergeItem(const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem ¤tItem, const Collection &parentCol) { bool needsUpdate = false; QSet changedParts; if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) { currentItem.setRemoteId(newItem.remoteId()); changedParts.insert(AKONADI_PARAM_REMOTEID); needsUpdate = true; } if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) { currentItem.setRemoteRevision(newItem.remoteRevision()); changedParts.insert(AKONADI_PARAM_REMOTEREVISION); needsUpdate = true; } if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) { currentItem.setGid(newItem.gid()); changedParts.insert(AKONADI_PARAM_GID); needsUpdate = true; } if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) { currentItem.setDatetime(newItem.datetime()); needsUpdate = true; } if (newItem.size() > 0 && newItem.size() != currentItem.size()) { currentItem.setSize(newItem.size()); needsUpdate = true; } const Collection col = Collection::retrieveById(parentCol.id()); if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) { bool flagsAdded = false, flagsRemoved = false; if (!cmd.addedFlags().isEmpty()) { const Flag::List addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags()); DataStore::self()->appendItemsFlags(PimItem::List() << currentItem, addedFlags, &flagsAdded, true, col, true); } if (!cmd.removedFlags().isEmpty()) { const Flag::List removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags()); DataStore::self()->removeItemsFlags(PimItem::List() << currentItem, removedFlags, &flagsRemoved, col, true); } if (flagsAdded || flagsRemoved) { changedParts.insert(AKONADI_PARAM_FLAGS); needsUpdate = true; } } else { bool flagsChanged = false; QSet flagNames = cmd.flags(); // Make sure we don't overwrite some local-only flags that can't come // through from Resource during ItemSync, like $ATTACHMENT, because the // resource is not aware of them (they are usually assigned by client // upon inspecting the payload) Q_FOREACH (const Flag ¤tFlag, currentItem.flags()) { const QByteArray currentFlagName = currentFlag.name().toLatin1(); if (localFlagsToPreserve.contains(currentFlagName)) { flagNames.insert(currentFlagName); } } const Flag::List flags = HandlerHelper::resolveFlags(flagNames); DataStore::self()->setItemsFlags(PimItem::List() << currentItem, flags, &flagsChanged, col, true); if (flagsChanged) { changedParts.insert(AKONADI_PARAM_FLAGS); needsUpdate = true; } } if (cmd.tags().isEmpty()) { bool tagsAdded = false, tagsRemoved = false; if (!cmd.addedTags().isEmpty()) { const Tag::List addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()); DataStore::self()->appendItemsTags(PimItem::List() << currentItem, addedTags, &tagsAdded, true, col, true); } if (!cmd.removedTags().isEmpty()) { const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()); DataStore::self()->removeItemsTags(PimItem::List() << currentItem, removedTags, &tagsRemoved, true); } if (tagsAdded || tagsRemoved) { changedParts.insert(AKONADI_PARAM_TAGS); needsUpdate = true; } } else { bool tagsChanged = false; const Tag::List tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()); DataStore::self()->setItemsTags(PimItem::List() << currentItem, tags, &tagsChanged, true); if (tagsChanged) { changedParts.insert(AKONADI_PARAM_TAGS); needsUpdate = true; } } const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id()); QMap partsSizes; for (const Part &part : existingParts) { partsSizes.insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize()); } PartStreamer streamer(connection(), currentItem); - connect(&streamer, &PartStreamer::responseAvailable, - this, static_cast(&Handler::sendResponse)); Q_FOREACH (const QByteArray &partName, cmd.parts()) { bool changed = false; qint64 partSize = 0; if (!streamer.stream(true, partName, partSize, &changed)) { return failureResponse(streamer.error()); } if (changed) { changedParts.insert(partName); partsSizes.insert(partName, partSize); needsUpdate = true; } } const qint64 size = std::accumulate(partsSizes.begin(), partsSizes.end(), 0); if (size > currentItem.size()) { currentItem.setSize(size); needsUpdate = true; } if (needsUpdate) { currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1); currentItem.setAtime(QDateTime::currentDateTimeUtc()); // Only mark dirty when merged from application currentItem.setDirty(!connection()->context()->resource().isValid()); // Store all changes if (!currentItem.update()) { return failureResponse("Failed to store merged item"); } notify(currentItem, currentItem.collection(), changedParts); } sendResponse(currentItem, cmd.mergeModes()); return true; } bool AkAppend::sendResponse(const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes) { if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) { auto resp = Protocol::FetchItemsResponsePtr::create(); resp->setId(item.id()); resp->setMTime(item.datetime()); Handler::sendResponse(resp); return true; } Protocol::ItemFetchScope fetchScope; fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor); fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags); fetchScope.setTagFetchScope({ "GID" }); ImapSet set; set.add(QVector() << item.id()); Scope scope; scope.setUidSet(set); FetchHelper fetchHelper(connection(), scope, fetchScope); if (!fetchHelper.fetchItems()) { return failureResponse("Failed to retrieve item"); } return true; } bool AkAppend::notify(const PimItem &item, bool seen, const Collection &collection) { DataStore::self()->notificationCollector()->itemAdded(item, seen, collection); if (PreprocessorManager::instance()->isActive()) { // enqueue the item for preprocessing PreprocessorManager::instance()->beginHandleItem(item, DataStore::self()); } return true; } bool AkAppend::notify(const PimItem &item, const Collection &collection, const QSet &changedParts) { if (!changedParts.isEmpty()) { DataStore::self()->notificationCollector()->itemChanged(item, changedParts, collection); } return true; } bool AkAppend::parseStream() { const auto &cmd = Protocol::cmdCast(m_command); // FIXME: The streaming/reading of all item parts can hold the transaction for // unnecessary long time -> should we wrap the PimItem into one transaction // and try to insert Parts independently? In case we fail to insert a part, // it's not a problem as it can be re-fetched at any time, except for attributes. DataStore *db = DataStore::self(); Transaction transaction(db, QStringLiteral("AKAPPEND")); ExternalPartStorageTransaction storageTrx; PimItem item; Collection parentCol; if (!buildPimItem(cmd, item, parentCol)) { return false; } if (cmd.mergeModes() == Protocol::CreateItemCommand::None) { if (!insertItem(cmd, item, parentCol)) { return false; } if (!transaction.commit()) { return failureResponse(QStringLiteral("Failed to commit transaction")); } storageTrx.commit(); } else { // Merging is always restricted to the same collection SelectQueryBuilder qb; qb.addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id()); Query::Condition rootCondition(Query::Or); Query::Condition mergeCondition(Query::And); if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) { mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid()); } if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) { mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId()); } rootCondition.addCondition(mergeCondition); // If an Item with matching RID but empty GID exists during GID merge, // merge into this item instead of creating a new one if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) { mergeCondition = Query::Condition(Query::And); mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId()); mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, QStringLiteral("")); rootCondition.addCondition(mergeCondition); } qb.addCondition(rootCondition); if (!qb.exec()) { return failureResponse("Failed to query database for item"); } const QVector result = qb.result(); if (result.isEmpty()) { // No item with such GID/RID exists, so call AkAppend::insert() and behave // like if this was a new item if (!insertItem(cmd, item, parentCol)) { return false; } if (!transaction.commit()) { return failureResponse("Failed to commit transaction"); } storageTrx.commit(); } else if (result.count() == 1) { // Item with matching GID/RID combination exists, so merge this item into it // and send itemChanged() PimItem existingItem = result.at(0); if (!mergeItem(cmd, item, existingItem, parentCol)) { return false; } if (!transaction.commit()) { return failureResponse("Failed to commit transaction"); } storageTrx.commit(); } else { qCDebug(AKONADISERVER_LOG) << "Multiple merge candidates:"; for (const PimItem &item : result) { qCDebug(AKONADISERVER_LOG) << "\tID:" << item.id() << ", RID:" << item.remoteId() << ", GID:" << item.gid() << ", Collection:" << item.collection().name() << "(" << item.collectionId() << ")" << ", Resource:" << item.collection().resource().name() << "(" << item.collection().resourceId() << ")"; } // Nor GID or RID are guaranteed to be unique, so make sure we don't merge // something we don't want return failureResponse(QStringLiteral("Multiple merge candidates, aborting")); } } return successResponse(); } diff --git a/src/server/handler/store.cpp b/src/server/handler/store.cpp index c58bc9e8d..b407f61cd 100644 --- a/src/server/handler/store.cpp +++ b/src/server/handler/store.cpp @@ -1,389 +1,385 @@ /*************************************************************************** * Copyright (C) 2006 by Tobias Koenig * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "store.h" #include "connection.h" #include "handlerhelper.h" #include "storage/datastore.h" #include "storage/transaction.h" #include "storage/itemqueryhelper.h" #include "storage/selectquerybuilder.h" #include "storage/parthelper.h" #include "storage/dbconfig.h" #include "storage/itemretriever.h" #include "storage/parttypehelper.h" #include "storage/partstreamer.h" #include #include "akonadiserver_debug.h" #include #include using namespace Akonadi; using namespace Akonadi::Server; static bool payloadChanged(const QSet &changes) { for (const QByteArray &change : changes) { if (change.startsWith(AKONADI_PARAM_PLD)) { return true; } } return false; } bool Store::replaceFlags(const PimItem::List &item, const QSet &flags, bool &flagsChanged) { Flag::List flagList = HandlerHelper::resolveFlags(flags); DataStore *store = connection()->storageBackend(); if (!store->setItemsFlags(item, flagList, &flagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::replaceFlags: Unable to replace flags"; return false; } return true; } bool Store::addFlags(const PimItem::List &items, const QSet &flags, bool &flagsChanged) { const Flag::List flagList = HandlerHelper::resolveFlags(flags); DataStore *store = connection()->storageBackend(); if (!store->appendItemsFlags(items, flagList, &flagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::addFlags: Unable to add new item flags"; return false; } return true; } bool Store::deleteFlags(const PimItem::List &items, const QSet &flags, bool &flagsChanged) { DataStore *store = connection()->storageBackend(); QVector flagList; flagList.reserve(flags.size()); for (auto iter = flags.cbegin(), end = flags.cend(); iter != end; ++iter) { Flag flag = Flag::retrieveByName(QString::fromUtf8(*iter)); if (!flag.isValid()) { continue; } flagList.append(flag); } if (!store->removeItemsFlags(items, flagList, &flagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::deleteFlags: Unable to remove item flags"; return false; } return true; } bool Store::replaceTags(const PimItem::List &item, const Scope &tags, bool &tagsChanged) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); if (!connection()->storageBackend()->setItemsTags(item, tagList, &tagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::replaceTags: unable to replace tags"; return false; } return true; } bool Store::addTags(const PimItem::List &items, const Scope &tags, bool &tagsChanged) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); if (!connection()->storageBackend()->appendItemsTags(items, tagList, &tagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::addTags: Unable to add new item tags"; return false; } return true; } bool Store::deleteTags(const PimItem::List &items, const Scope &tags, bool &tagsChanged) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); if (!connection()->storageBackend()->removeItemsTags(items, tagList, &tagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::deleteTags: Unable to remove item tags"; return false; } return true; } bool Store::parseStream() { const auto &cmd = Protocol::cmdCast(m_command); //parseCommand(); DataStore *store = connection()->storageBackend(); Transaction transaction(store, QStringLiteral("STORE")); ExternalPartStorageTransaction storageTrx; // Set the same modification time for each item. QDateTime modificationtime = QDateTime::currentDateTimeUtc(); if (DbType::type(store->database()) != DbType::Sqlite) { // Remove milliseconds from the modificationtime. PSQL and MySQL don't // support milliseconds in DATETIME column, so FETCHed Items will report // time without milliseconds, while this command would return answer // with milliseconds modificationtime = modificationtime.addMSecs(-modificationtime.time().msec()); } // retrieve selected items SelectQueryBuilder qb; ItemQueryHelper::scopeToQuery(cmd.items(), connection()->context(), qb); if (!qb.exec()) { return failureResponse("Unable to retrieve items"); } PimItem::List pimItems = qb.result(); if (pimItems.isEmpty()) { return failureResponse("No items found"); } for (int i = 0; i < pimItems.size(); ++i) { if (cmd.oldRevision() > -1) { // check for conflicts if a resources tries to overwrite an item with dirty payload const PimItem &pimItem = pimItems.at(i); if (connection()->isOwnerResource(pimItem)) { if (pimItem.dirty()) { const QString error = QStringLiteral("[LRCONFLICT] Resource %1 tries to modify item %2 (%3) (in collection %4) with dirty payload, aborting STORE."); return failureResponse( error.arg(pimItem.collection().resource().name()) .arg(pimItem.id()) .arg(pimItem.remoteId()).arg(pimItem.collectionId())); } } // check and update revisions if (pimItems.at(i).rev() != (int) cmd.oldRevision()) { const QString error = QStringLiteral("[LLCONFLICT] Resource %1 tries to modify item %2 (%3) (in collection %4) with revision %5; the item was modified elsewhere and has revision %6, aborting STORE."); return failureResponse(error.arg(pimItem.collection().resource().name()) .arg(pimItem.id()) .arg(pimItem.remoteId()).arg(pimItem.collectionId()) .arg(cmd.oldRevision()).arg(pimItems.at(i).rev())); } } } PimItem &item = pimItems.first(); QSet changes; qint64 partSizes = 0; qint64 size = 0; bool flagsChanged = false; bool tagsChanged = false; if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::AddedFlags) { if (!addFlags(pimItems, cmd.addedFlags(), flagsChanged)) { return failureResponse("Unable to add item flags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemovedFlags) { if (!deleteFlags(pimItems, cmd.removedFlags(), flagsChanged)) { return failureResponse("Unable to remove item flags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::Flags) { if (!replaceFlags(pimItems, cmd.flags(), flagsChanged)) { return failureResponse("Unable to reset flags"); } } if (flagsChanged) { changes << AKONADI_PARAM_FLAGS; } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::AddedTags) { if (!addTags(pimItems, cmd.addedTags(), tagsChanged)) { return failureResponse("Unable to add item tags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemovedTags) { if (!deleteTags(pimItems, cmd.removedTags(), tagsChanged)) { return failureResponse("Unabel to remove item tags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::Tags) { if (!replaceTags(pimItems, cmd.tags(), tagsChanged)) { return failureResponse("Unable to reset item tags"); } } if (tagsChanged) { changes << AKONADI_PARAM_TAGS; } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemoteID) { if (item.remoteId() != cmd.remoteId()) { if (!connection()->isOwnerResource(item)) { return failureResponse("Only resources can modify remote identifiers"); } item.setRemoteId(cmd.remoteId()); changes << AKONADI_PARAM_REMOTEID; } } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::GID) { if (item.gid() != cmd.gid()) { item.setGid(cmd.gid()); } changes << AKONADI_PARAM_GID; } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemoteRevision) { if (item.remoteRevision() != cmd.remoteRevision()) { if (!connection()->isOwnerResource(item)) { return failureResponse("Only resources can modify remote revisions"); } item.setRemoteRevision(cmd.remoteRevision()); changes << AKONADI_PARAM_REMOTEREVISION; } } if (item.isValid() && !cmd.dirty()) { item.setDirty(false); } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::Size) { size = cmd.itemSize(); changes << AKONADI_PARAM_SIZE; } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemovedParts) { if (!cmd.removedParts().isEmpty()) { if (!store->removeItemParts(item, cmd.removedParts())) { return failureResponse("Unable to remove item parts"); } Q_FOREACH (const QByteArray &part, cmd.removedParts()) { changes.insert(part); } } } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::Parts) { - PartStreamer streamer(connection(), item, this); - connect(&streamer, &PartStreamer::responseAvailable, - this, static_cast(&Handler::sendResponse)); + PartStreamer streamer(connection(), item); Q_FOREACH (const QByteArray &partName, cmd.parts()) { qint64 partSize = 0; if (!streamer.stream(true, partName, partSize)) { return failureResponse(streamer.error()); } changes.insert(partName); partSizes += partSize; } } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::Attributes) { - PartStreamer streamer(connection(), item, this); - connect(&streamer, &PartStreamer::responseAvailable, - this, static_cast(&Handler::sendResponse)); + PartStreamer streamer(connection(), item); const Protocol::Attributes attrs = cmd.attributes(); for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) { bool changed = false; if (!streamer.streamAttribute(true, iter.key(), iter.value(), &changed)) { return failureResponse(streamer.error()); } if (changed) { changes.insert(iter.key()); } } } QDateTime datetime; if (!changes.isEmpty() || cmd.invalidateCache() || !cmd.dirty()) { // update item size if (pimItems.size() == 1 && (size > 0 || partSizes > 0)) { pimItems.first().setSize(qMax(size, partSizes)); } const bool onlyRemoteIdChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_REMOTEID)); const bool onlyRemoteRevisionChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_REMOTEREVISION)); const bool onlyRemoteIdAndRevisionChanged = (changes.size() == 2 && changes.contains(AKONADI_PARAM_REMOTEID) && changes.contains(AKONADI_PARAM_REMOTEREVISION)); const bool onlyFlagsChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_FLAGS)); const bool onlyGIDChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_GID)); // If only the remote id and/or the remote revision changed, we don't have to increase the REV, // because these updates do not change the payload and can only be done by the owning resource -> no conflicts possible const bool revisionNeedsUpdate = (!changes.isEmpty() && !onlyRemoteIdChanged && !onlyRemoteRevisionChanged && !onlyRemoteIdAndRevisionChanged && !onlyGIDChanged); // run update query and prepare change notifications for (int i = 0; i < pimItems.count(); ++i) { if (revisionNeedsUpdate) { pimItems[i].setRev(pimItems[i].rev() + 1); } PimItem &item = pimItems[i]; item.setDatetime(modificationtime); item.setAtime(modificationtime); if (!connection()->isOwnerResource(item) && payloadChanged(changes)) { item.setDirty(true); } if (!item.update()) { return failureResponse("Unable to write item changes into the database"); } if (cmd.invalidateCache()) { if (!store->invalidateItemCache(item)) { return failureResponse("Unable to invalidate item cache in the database"); } } // flags change notification went separatly during command parsing // GID-only changes are ignored to prevent resources from updating their storage when no actual change happened if (cmd.notify() && !changes.isEmpty() && !onlyFlagsChanged && !onlyGIDChanged) { // Don't send FLAGS notification in itemChanged changes.remove(AKONADI_PARAM_FLAGS); store->notificationCollector()->itemChanged(item, changes); } if (!cmd.noResponse()) { auto resp = Protocol::ModifyItemsResponsePtr::create(); resp->setId(item.id()); resp->setNewRevision(item.rev()); sendResponse(resp); } } if (!transaction.commit()) { return failureResponse("Cannot commit transaction."); } // Always commit storage changes (deletion) after DB transaction storageTrx.commit(); datetime = modificationtime; } else { datetime = pimItems.first().datetime(); } auto resp = Protocol::ModifyItemsResponsePtr::create(); resp->setModificationDateTime(datetime); return successResponse(resp); } diff --git a/src/server/storage/partstreamer.cpp b/src/server/storage/partstreamer.cpp index 724c18728..1b4f60326 100644 --- a/src/server/storage/partstreamer.cpp +++ b/src/server/storage/partstreamer.cpp @@ -1,424 +1,423 @@ /* * Copyright (C) 2014 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #include "partstreamer.h" #include "parthelper.h" #include "parttypehelper.h" #include "selectquerybuilder.h" #include "dbconfig.h" #include "connection.h" #include "capabilities_p.h" #include "akonadiserver_debug.h" #include #include #include #include #ifdef HAVE_UNISTD_H # include #endif #include #include using namespace Akonadi; using namespace Akonadi::Server; PartStreamer::PartStreamer(Connection *connection, - const PimItem &pimItem, QObject *parent) - : QObject(parent) - , mConnection(connection) + const PimItem &pimItem) + : mConnection(connection) , mItem(pimItem) { // Make sure the file_db_data path exists StandardDirs::saveDir("data", QStringLiteral("file_db_data")); } PartStreamer::~PartStreamer() { } QString PartStreamer::error() const { return mError; } Protocol::PartMetaData PartStreamer::requestPartMetaData(const QByteArray &partName) { { auto resp = Protocol::StreamPayloadCommandPtr::create(); resp->setPayloadName(partName); resp->setRequest(Protocol::StreamPayloadCommand::MetaData); - Q_EMIT responseAvailable(resp); + mConnection->sendResponse(resp); } const auto cmd = mConnection->readCommand(); if (!cmd->isValid() || Protocol::cmdCast(cmd).isError()) { mError = QStringLiteral("Client failed to provide part metadata"); return Protocol::PartMetaData(); } return Protocol::cmdCast(cmd).metaData(); } bool PartStreamer::streamPayload(Part &part, const QByteArray &partName) { Protocol::PartMetaData metaPart = requestPartMetaData(partName); if (metaPart.name().isEmpty()) { mError = QStringLiteral("Part name is empty"); return false; } part.setVersion(metaPart.version()); if (part.datasize() != metaPart.size()) { part.setDatasize(metaPart.size()); // Shortcut: if sizes differ, we don't need to compare data later no in order // to detect whether the part has changed mDataChanged = mDataChanged || (metaPart.size() != part.datasize()); } if (metaPart.storageType() == Protocol::PartMetaData::Foreign) { return streamForeignPayload(part, metaPart); } else if (part.datasize() > DbConfig::configuredDatabase()->sizeThreshold()) { //actual case when streaming storage is used: external payload is enabled, // data is big enough in a literal return streamPayloadToFile(part, metaPart); } else { return streamPayloadData(part, metaPart); } } bool PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart) { // If the part WAS external previously, remove data file if (part.storage() == Part::External) { ExternalPartStorage::self()->removePartFile( ExternalPartStorage::resolveAbsolutePath(part.data())); } // Request the actual data { auto resp = Protocol::StreamPayloadCommandPtr::create(); resp->setPayloadName(metaPart.name()); resp->setRequest(Protocol::StreamPayloadCommand::Data); - Q_EMIT responseAvailable(resp); + mConnection->sendResponse(resp); } const auto cmd = mConnection->readCommand(); const auto &response = Protocol::cmdCast(cmd); if (!response.isValid() || response.isError()) { mError = QStringLiteral("Client failed to provide payload data"); qCCritical(AKONADISERVER_LOG) << mError; return false; } const QByteArray newData = response.data(); // only use the data size with intenral payload parts, for foreign parts // we use the size reported by client const auto newSize = (metaPart.storageType() == Protocol::PartMetaData::Internal) ? newData.size() : metaPart.size(); if (newSize != metaPart.size()) { mError = QStringLiteral("Payload size mismatch"); return false; } if (part.isValid()) { if (!mDataChanged) { mDataChanged = mDataChanged || (newData != part.data()); } PartHelper::update(&part, newData, newSize); } else { part.setData(newData); part.setDatasize(newSize); if (!part.insert()) { mError = QStringLiteral("Failed to insert part to database"); return false; } } return true; } bool PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart) { QByteArray origData; if (!mDataChanged && mCheckChanged) { origData = PartHelper::translateData(part); } QByteArray filename; if (part.isValid()) { if (part.storage() == Part::External) { // Part was external and is still external filename = part.data(); if (!filename.isEmpty()) { ExternalPartStorage::self()->removePartFile( ExternalPartStorage::resolveAbsolutePath(filename)); filename = ExternalPartStorage::updateFileNameRevision(filename); } else { // recover from data corruption filename = ExternalPartStorage::nameForPartId(part.id()); } } else { // Part wasn't external, but is now filename = ExternalPartStorage::nameForPartId(part.id()); } QFileInfo finfo(QString::fromUtf8(filename)); if (finfo.isAbsolute()) { filename = finfo.fileName().toUtf8(); } } part.setStorage(Part::External); part.setDatasize(metaPart.size()); part.setData(filename); if (part.isValid()) { if (!part.update()) { mError = QStringLiteral("Failed to update part in database"); return false; } } else { if (!part.insert()) { mError = QStringLiteral("Failed to insert part into database"); return false; } filename = ExternalPartStorage::nameForPartId(part.id()); part.setData(filename); if (!part.update()) { mError = QStringLiteral("Failed to update part in database"); return false; } } { auto cmd = Protocol::StreamPayloadCommandPtr::create(); cmd->setPayloadName(metaPart.name()); cmd->setRequest(Protocol::StreamPayloadCommand::Data); cmd->setDestination(QString::fromUtf8(filename)); - Q_EMIT responseAvailable(cmd); + mConnection->sendResponse(cmd); } const auto cmd = mConnection->readCommand(); const auto &response = Protocol::cmdCast(cmd); if (!response.isValid() || response.isError()) { mError = QStringLiteral("Client failed to store payload into file"); qCCritical(AKONADISERVER_LOG) << mError; return false; } - QFile file(ExternalPartStorage::resolveAbsolutePath(filename), this); + QFile file(ExternalPartStorage::resolveAbsolutePath(filename)); if (!file.exists()) { mError = QStringLiteral("External payload file does not exist"); qCCritical(AKONADISERVER_LOG) << mError; return false; } if (file.size() != metaPart.size()) { mError = QStringLiteral("Payload size mismatch"); qCDebug(AKONADISERVER_LOG) << mError << ", client advertised" << metaPart.size() << "bytes, but the file is" << file.size() << "bytes!"; return false; } if (mCheckChanged && !mDataChanged) { // This is invoked only when part already exists, data sizes match and // caller wants to know whether parts really differ mDataChanged = (origData != PartHelper::translateData(part)); } return true; } bool PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart) { QByteArray origData; if (!mDataChanged && mCheckChanged) { origData = PartHelper::translateData(part); } { auto cmd = Protocol::StreamPayloadCommandPtr::create(); cmd->setPayloadName(metaPart.name()); cmd->setRequest(Protocol::StreamPayloadCommand::Data); - Q_EMIT responseAvailable(cmd); + mConnection->sendResponse(cmd); } const auto cmd = mConnection->readCommand(); const auto response = Protocol::cmdCast(cmd); if (!response.isValid() || response.isError()) { mError = QStringLiteral("Client failed to store payload into file"); qCCritical(AKONADISERVER_LOG) << mError; return false; } // If the part was previously external, clean up the data if (part.storage() == Part::External) { const QString filename = QString::fromUtf8(part.data()); ExternalPartStorage::self()->removePartFile( ExternalPartStorage::resolveAbsolutePath(filename)); } part.setStorage(Part::Foreign); part.setData(response.data()); if (part.isValid()) { if (!part.update()) { mError = QStringLiteral("Failed to update part in database"); return false; } } else { if (!part.insert()) { mError = QStringLiteral("Failed to insert part into database"); return false; } } const QString filename = QString::fromUtf8(response.data()); QFile file(filename); if (!file.exists()) { mError = QStringLiteral("Foreign payload file does not exist"); qCCritical(AKONADISERVER_LOG) << mError; return false; } if (file.size() != metaPart.size()) { mError = QStringLiteral("Payload size mismatch"); qCCritical(AKONADISERVER_LOG) << mError << ", client advertised" << metaPart.size() << "bytes, but the file size is" << file.size() << "bytes!"; return false; } if (mCheckChanged && !mDataChanged) { // This is invoked only when part already exists, data sizes match and // caller wants to know whether parts really differ mDataChanged = (origData != PartHelper::translateData(part)); } return true; } bool PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part) { mError.clear(); mDataChanged = false; const PartType partType = PartTypeHelper::fromFqName(partName); if (checkExists || mCheckChanged) { SelectQueryBuilder qb; qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id()); qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id()); if (!qb.exec()) { mError = QStringLiteral("Unable to check item part existence"); return false; } const Part::List result = qb.result(); if (!result.isEmpty()) { part = result.at(0); } } // Shortcut: newly created parts are always "changed" if (!part.isValid()) { mDataChanged = true; } part.setPartType(partType); part.setPimItemId(mItem.id()); return true; } bool PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed) { mCheckChanged = (changed != nullptr); if (changed != nullptr) { *changed = false; } Part part; if (!preparePart(checkExists, partName, part)) { return false; } bool ok = streamPayload(part, partName); if (changed && ok && mCheckChanged) { *changed = mDataChanged; } partSize = part.datasize(); return ok; } bool PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed) { mCheckChanged = (changed != nullptr); if (changed != nullptr) { *changed = false; } QByteArray partName; if (!_partName.startsWith("ATR:")) { partName = "ATR:" + _partName; } else { partName = _partName; } Part part; if (!preparePart(checkExists, partName, part)) { return false; } if (part.isValid()) { if (mCheckChanged) { if (PartHelper::translateData(part) != value) { mDataChanged = true; } } PartHelper::update(&part, value, value.size()); } else { const bool storeInFile = value.size() > DbConfig::configuredDatabase()->sizeThreshold(); part.setDatasize(value.size()); part.setVersion(0); if (storeInFile) { if (!part.insert()) { mError = QStringLiteral("Failed to store part in database"); return false; } PartHelper::update(&part, value, value.size()); } else { part.setData(value); if (!part.insert()) { mError = QStringLiteral("Failed to store part in database"); return false; } } } if (mCheckChanged) { *changed = mDataChanged; } return true; } diff --git a/src/server/storage/partstreamer.h b/src/server/storage/partstreamer.h index fc33bf423..24b93f202 100644 --- a/src/server/storage/partstreamer.h +++ b/src/server/storage/partstreamer.h @@ -1,79 +1,74 @@ /* * Copyright (C) 2014 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #ifndef AKONADI_SERVER_PARTSTREAMER_H #define AKONADI_SERVER_PARTSTREAMER_H -#include +#include #include "entities.h" namespace Akonadi { namespace Protocol { class PartMetaData; class Command; using CommandPtr = QSharedPointer; } namespace Server { class PimItem; class Part; class Connection; -class PartStreamer : public QObject +class PartStreamer { - Q_OBJECT - public: - explicit PartStreamer(Connection *connection, const PimItem &pimItem, QObject *parent = nullptr); + explicit PartStreamer(Connection *connection, const PimItem &pimItem); ~PartStreamer(); bool stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed = nullptr); bool streamAttribute(bool checkExists, const QByteArray &partName, const QByteArray &value, bool *changed = nullptr); QString error() const; -Q_SIGNALS: - void responseAvailable(const Protocol::CommandPtr &response); - private: bool streamPayload(Part &part, const QByteArray &partName); bool streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart); bool streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart); bool streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart); Protocol::PartMetaData requestPartMetaData(const QByteArray &partName); bool preparePart(bool checkExists, const QByteArray &partName, Part &part); Connection *mConnection; PimItem mItem; bool mCheckChanged; bool mDataChanged; QString mError; }; } // namespace Server } // namespace Akonadi #endif // AKONADI_SERVER_PARTSTREAMER_H