diff --git a/src/server/handler/akappend.cpp b/src/server/handler/akappend.cpp index 0cf5c408f..5d184fc99 100644 --- a/src/server/handler/akappend.cpp +++ b/src/server/handler/akappend.cpp @@ -1,444 +1,450 @@ /*************************************************************************** * Copyright (C) 2007 by Robert Zwerus * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "akappend.h" #include "fetchhelper.h" #include "connection.h" #include "preprocessormanager.h" #include "handlerhelper.h" #include "storage/datastore.h" #include "storage/transaction.h" #include "storage/parttypehelper.h" #include "storage/dbconfig.h" #include "storage/partstreamer.h" #include "storage/parthelper.h" #include "storage/selectquerybuilder.h" #include #include //std::accumulate using namespace Akonadi; using namespace Akonadi::Server; static QVector localFlagsToPreserve = QVector() << "$ATTACHMENT" << "$INVITATION" << "$ENCRYPTED" << "$SIGNED" << "$WATCHED"; bool AkAppend::buildPimItem(const Protocol::CreateItemCommand &cmd, PimItem &item, Collection &parentCol) { parentCol = HandlerHelper::collectionFromScope(cmd.collection(), connection()); if (!parentCol.isValid()) { return failureResponse(QStringLiteral("Invalid parent collection")); } if (parentCol.isVirtual()) { return failureResponse(QStringLiteral("Cannot append item into virtual collection")); } MimeType mimeType = MimeType::retrieveByNameOrCreate(cmd.mimeType()); if (!mimeType.isValid()) { return failureResponse(QStringLiteral("Unable to create mimetype '") % cmd.mimeType() % QStringLiteral("'.")); } item.setRev(0); item.setSize(cmd.itemSize()); item.setMimeTypeId(mimeType.id()); item.setCollectionId(parentCol.id()); item.setDatetime(cmd.dateTime()); if (cmd.remoteId().isEmpty()) { // from application item.setDirty(true); } else { // from resource item.setRemoteId(cmd.remoteId()); item.setDirty(false); } item.setRemoteRevision(cmd.remoteRevision()); item.setGid(cmd.gid()); item.setAtime(QDateTime::currentDateTimeUtc()); return true; } bool AkAppend::insertItem(const Protocol::CreateItemCommand &cmd, PimItem &item, const Collection &parentCol) { if (!item.datetime().isValid()) { item.setDatetime(QDateTime::currentDateTimeUtc()); } if (!item.insert()) { return failureResponse(QStringLiteral("Failed to append item")); } // set message flags const QSet flags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.flags() : cmd.addedFlags(); if (!flags.isEmpty()) { // This will hit an entry in cache inserted there in buildPimItem() const Flag::List flagList = HandlerHelper::resolveFlags(flags); bool flagsChanged = false; if (!storageBackend()->appendItemsFlags({item}, flagList, &flagsChanged, false, parentCol, true)) { return failureResponse("Unable to append item flags."); } } const Scope tags = cmd.mergeModes() == Protocol::CreateItemCommand::None ? cmd.tags() : cmd.addedTags(); if (!tags.isEmpty()) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); bool tagsChanged = false; if (!storageBackend()->appendItemsTags({item}, tagList, &tagsChanged, false, parentCol, true)) { return failureResponse(QStringLiteral("Unable to append item tags.")); } } // Handle individual parts qint64 partSizes = 0; PartStreamer streamer(connection(), item); Q_FOREACH (const QByteArray &partName, cmd.parts()) { qint64 partSize = 0; - if (!streamer.stream(true, partName, partSize)) { - return failureResponse(streamer.error()); + try { + streamer.stream(true, partName, partSize); + } catch (const PartStreamerException &e) { + return failureResponse(e.what()); } partSizes += partSize; } const Protocol::Attributes attrs = cmd.attributes(); for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) { - if (!streamer.streamAttribute(true, iter.key(), iter.value())) { - return failureResponse(streamer.error()); + try { + streamer.streamAttribute(true, iter.key(), iter.value()); + } catch (const PartStreamerException &e) { + return failureResponse(e.what()); } } // TODO: Try to avoid this addition query if (partSizes > item.size()) { item.setSize(partSizes); item.update(); } // Preprocessing if (PreprocessorManager::instance()->isActive()) { Part hiddenAttribute; hiddenAttribute.setPimItemId(item.id()); hiddenAttribute.setPartType(PartTypeHelper::fromFqName(QStringLiteral(AKONADI_ATTRIBUTE_HIDDEN))); hiddenAttribute.setData(QByteArray()); hiddenAttribute.setDatasize(0); // TODO: Handle errors? Technically, this is not a critical issue as no data are lost PartHelper::insert(&hiddenAttribute); } const bool seen = flags.contains(AKONADI_FLAG_SEEN) || flags.contains(AKONADI_FLAG_IGNORED); notify(item, seen, item.collection()); sendResponse(item, Protocol::CreateItemCommand::None); return true; } bool AkAppend::mergeItem(const Protocol::CreateItemCommand &cmd, PimItem &newItem, PimItem ¤tItem, const Collection &parentCol) { bool needsUpdate = false; QSet changedParts; if (!newItem.remoteId().isEmpty() && currentItem.remoteId() != newItem.remoteId()) { currentItem.setRemoteId(newItem.remoteId()); changedParts.insert(AKONADI_PARAM_REMOTEID); needsUpdate = true; } if (!newItem.remoteRevision().isEmpty() && currentItem.remoteRevision() != newItem.remoteRevision()) { currentItem.setRemoteRevision(newItem.remoteRevision()); changedParts.insert(AKONADI_PARAM_REMOTEREVISION); needsUpdate = true; } if (!newItem.gid().isEmpty() && currentItem.gid() != newItem.gid()) { currentItem.setGid(newItem.gid()); changedParts.insert(AKONADI_PARAM_GID); needsUpdate = true; } if (newItem.datetime().isValid() && newItem.datetime() != currentItem.datetime()) { currentItem.setDatetime(newItem.datetime()); needsUpdate = true; } if (newItem.size() > 0 && newItem.size() != currentItem.size()) { currentItem.setSize(newItem.size()); needsUpdate = true; } const Collection col = Collection::retrieveById(parentCol.id()); if (cmd.flags().isEmpty() && !cmd.flagsOverwritten()) { bool flagsAdded = false, flagsRemoved = false; if (!cmd.addedFlags().isEmpty()) { const auto addedFlags = HandlerHelper::resolveFlags(cmd.addedFlags()); storageBackend()->appendItemsFlags({currentItem}, addedFlags, &flagsAdded, true, col, true); } if (!cmd.removedFlags().isEmpty()) { const auto removedFlags = HandlerHelper::resolveFlags(cmd.removedFlags()); storageBackend()->removeItemsFlags({currentItem}, removedFlags, &flagsRemoved, col, true); } if (flagsAdded || flagsRemoved) { changedParts.insert(AKONADI_PARAM_FLAGS); needsUpdate = true; } } else { bool flagsChanged = false; QSet flagNames = cmd.flags(); // Make sure we don't overwrite some local-only flags that can't come // through from Resource during ItemSync, like $ATTACHMENT, because the // resource is not aware of them (they are usually assigned by client // upon inspecting the payload) Q_FOREACH (const Flag ¤tFlag, currentItem.flags()) { const QByteArray currentFlagName = currentFlag.name().toLatin1(); if (localFlagsToPreserve.contains(currentFlagName)) { flagNames.insert(currentFlagName); } } const auto flags = HandlerHelper::resolveFlags(flagNames); storageBackend()->setItemsFlags({currentItem}, flags, &flagsChanged, col, true); if (flagsChanged) { changedParts.insert(AKONADI_PARAM_FLAGS); needsUpdate = true; } } if (cmd.tags().isEmpty()) { bool tagsAdded = false, tagsRemoved = false; if (!cmd.addedTags().isEmpty()) { const auto addedTags = HandlerHelper::tagsFromScope(cmd.addedTags(), connection()); storageBackend()->appendItemsTags({currentItem}, addedTags, &tagsAdded, true, col, true); } if (!cmd.removedTags().isEmpty()) { const Tag::List removedTags = HandlerHelper::tagsFromScope(cmd.removedTags(), connection()); storageBackend()->removeItemsTags({currentItem}, removedTags, &tagsRemoved, true); } if (tagsAdded || tagsRemoved) { changedParts.insert(AKONADI_PARAM_TAGS); needsUpdate = true; } } else { bool tagsChanged = false; const auto tags = HandlerHelper::tagsFromScope(cmd.tags(), connection()); storageBackend()->setItemsTags({currentItem}, tags, &tagsChanged, true); if (tagsChanged) { changedParts.insert(AKONADI_PARAM_TAGS); needsUpdate = true; } } const Part::List existingParts = Part::retrieveFiltered(Part::pimItemIdColumn(), currentItem.id()); QMap partsSizes; for (const Part &part : existingParts) { partsSizes.insert(PartTypeHelper::fullName(part.partType()).toLatin1(), part.datasize()); } PartStreamer streamer(connection(), currentItem); Q_FOREACH (const QByteArray &partName, cmd.parts()) { bool changed = false; qint64 partSize = 0; - if (!streamer.stream(true, partName, partSize, &changed)) { - return failureResponse(streamer.error()); + try { + streamer.stream(true, partName, partSize, &changed); + } catch (const PartStreamerException &e) { + return failureResponse(e.what()); } if (changed) { changedParts.insert(partName); partsSizes.insert(partName, partSize); needsUpdate = true; } } const qint64 size = std::accumulate(partsSizes.begin(), partsSizes.end(), 0); if (size > currentItem.size()) { currentItem.setSize(size); needsUpdate = true; } if (needsUpdate) { currentItem.setRev(qMax(newItem.rev(), currentItem.rev()) + 1); currentItem.setAtime(QDateTime::currentDateTimeUtc()); // Only mark dirty when merged from application currentItem.setDirty(!connection()->context()->resource().isValid()); // Store all changes if (!currentItem.update()) { return failureResponse("Failed to store merged item"); } notify(currentItem, currentItem.collection(), changedParts); } sendResponse(currentItem, cmd.mergeModes()); return true; } bool AkAppend::sendResponse(const PimItem &item, Protocol::CreateItemCommand::MergeModes mergeModes) { if (mergeModes & Protocol::CreateItemCommand::Silent || mergeModes & Protocol::CreateItemCommand::None) { Protocol::FetchItemsResponse resp; resp.setId(item.id()); resp.setMTime(item.datetime()); Handler::sendResponse(std::move(resp)); return true; } Protocol::ItemFetchScope fetchScope; fetchScope.setAncestorDepth(Protocol::ItemFetchScope::ParentAncestor); fetchScope.setFetch(Protocol::ItemFetchScope::AllAttributes | Protocol::ItemFetchScope::FullPayload | Protocol::ItemFetchScope::CacheOnly | Protocol::ItemFetchScope::Flags | Protocol::ItemFetchScope::GID | Protocol::ItemFetchScope::MTime | Protocol::ItemFetchScope::RemoteID | Protocol::ItemFetchScope::RemoteRevision | Protocol::ItemFetchScope::Size | Protocol::ItemFetchScope::Tags); ImapSet set; set.add(QVector() << item.id()); Scope scope; scope.setUidSet(set); FetchHelper fetchHelper(connection(), scope, fetchScope, Protocol::TagFetchScope{}); if (!fetchHelper.fetchItems()) { return failureResponse("Failed to retrieve item"); } return true; } bool AkAppend::notify(const PimItem &item, bool seen, const Collection &collection) { storageBackend()->notificationCollector()->itemAdded(item, seen, collection); if (PreprocessorManager::instance()->isActive()) { // enqueue the item for preprocessing PreprocessorManager::instance()->beginHandleItem(item, storageBackend()); } return true; } bool AkAppend::notify(const PimItem &item, const Collection &collection, const QSet &changedParts) { if (!changedParts.isEmpty()) { storageBackend()->notificationCollector()->itemChanged(item, changedParts, collection); } return true; } bool AkAppend::parseStream() { const auto &cmd = Protocol::cmdCast(m_command); // FIXME: The streaming/reading of all item parts can hold the transaction for // unnecessary long time -> should we wrap the PimItem into one transaction // and try to insert Parts independently? In case we fail to insert a part, // it's not a problem as it can be re-fetched at any time, except for attributes. Transaction transaction(storageBackend(), QStringLiteral("AKAPPEND")); ExternalPartStorageTransaction storageTrx; PimItem item; Collection parentCol; if (!buildPimItem(cmd, item, parentCol)) { return false; } if (cmd.mergeModes() == Protocol::CreateItemCommand::None) { if (!insertItem(cmd, item, parentCol)) { return false; } if (!transaction.commit()) { return failureResponse(QStringLiteral("Failed to commit transaction")); } storageTrx.commit(); } else { // Merging is always restricted to the same collection SelectQueryBuilder qb; qb.setForUpdate(); qb.addValueCondition(PimItem::collectionIdColumn(), Query::Equals, parentCol.id()); Query::Condition rootCondition(Query::Or); Query::Condition mergeCondition(Query::And); if (cmd.mergeModes() & Protocol::CreateItemCommand::GID) { mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, item.gid()); } if (cmd.mergeModes() & Protocol::CreateItemCommand::RemoteID) { mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId()); } rootCondition.addCondition(mergeCondition); // If an Item with matching RID but empty GID exists during GID merge, // merge into this item instead of creating a new one if (cmd.mergeModes() & Protocol::CreateItemCommand::GID && !item.remoteId().isEmpty()) { mergeCondition = Query::Condition(Query::And); mergeCondition.addValueCondition(PimItem::remoteIdColumn(), Query::Equals, item.remoteId()); mergeCondition.addValueCondition(PimItem::gidColumn(), Query::Equals, QStringLiteral("")); rootCondition.addCondition(mergeCondition); } qb.addCondition(rootCondition); if (!qb.exec()) { return failureResponse("Failed to query database for item"); } const QVector result = qb.result(); if (result.isEmpty()) { // No item with such GID/RID exists, so call AkAppend::insert() and behave // like if this was a new item if (!insertItem(cmd, item, parentCol)) { return false; } if (!transaction.commit()) { return failureResponse("Failed to commit transaction"); } storageTrx.commit(); } else if (result.count() == 1) { // Item with matching GID/RID combination exists, so merge this item into it // and send itemChanged() PimItem existingItem = result.at(0); if (!mergeItem(cmd, item, existingItem, parentCol)) { return false; } if (!transaction.commit()) { return failureResponse("Failed to commit transaction"); } storageTrx.commit(); } else { qCDebug(AKONADISERVER_LOG) << "Multiple merge candidates:"; for (const PimItem &item : result) { qCDebug(AKONADISERVER_LOG) << "\tID:" << item.id() << ", RID:" << item.remoteId() << ", GID:" << item.gid() << ", Collection:" << item.collection().name() << "(" << item.collectionId() << ")" << ", Resource:" << item.collection().resource().name() << "(" << item.collection().resourceId() << ")"; } // Nor GID or RID are guaranteed to be unique, so make sure we don't merge // something we don't want return failureResponse(QStringLiteral("Multiple merge candidates, aborting")); } } return successResponse(); } diff --git a/src/server/handler/store.cpp b/src/server/handler/store.cpp index aa75cd20c..4e957992f 100644 --- a/src/server/handler/store.cpp +++ b/src/server/handler/store.cpp @@ -1,387 +1,391 @@ /*************************************************************************** * Copyright (C) 2006 by Tobias Koenig * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU Library General Public License as * * published by the Free Software Foundation; either version 2 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU Library General Public * * License along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "store.h" #include "connection.h" #include "handlerhelper.h" #include "storage/datastore.h" #include "storage/transaction.h" #include "storage/itemqueryhelper.h" #include "storage/selectquerybuilder.h" #include "storage/parthelper.h" #include "storage/dbconfig.h" #include "storage/itemretriever.h" #include "storage/parttypehelper.h" #include "storage/partstreamer.h" #include #include "akonadiserver_debug.h" #include #include using namespace Akonadi; using namespace Akonadi::Server; static bool payloadChanged(const QSet &changes) { for (const QByteArray &change : changes) { if (change.startsWith(AKONADI_PARAM_PLD)) { return true; } } return false; } bool Store::replaceFlags(const PimItem::List &item, const QSet &flags, bool &flagsChanged) { Flag::List flagList = HandlerHelper::resolveFlags(flags); DataStore *store = connection()->storageBackend(); if (!store->setItemsFlags(item, flagList, &flagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::replaceFlags: Unable to replace flags"; return false; } return true; } bool Store::addFlags(const PimItem::List &items, const QSet &flags, bool &flagsChanged) { const Flag::List flagList = HandlerHelper::resolveFlags(flags); DataStore *store = connection()->storageBackend(); if (!store->appendItemsFlags(items, flagList, &flagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::addFlags: Unable to add new item flags"; return false; } return true; } bool Store::deleteFlags(const PimItem::List &items, const QSet &flags, bool &flagsChanged) { DataStore *store = connection()->storageBackend(); QVector flagList; flagList.reserve(flags.size()); for (auto iter = flags.cbegin(), end = flags.cend(); iter != end; ++iter) { Flag flag = Flag::retrieveByName(QString::fromUtf8(*iter)); if (!flag.isValid()) { continue; } flagList.append(flag); } if (!store->removeItemsFlags(items, flagList, &flagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::deleteFlags: Unable to remove item flags"; return false; } return true; } bool Store::replaceTags(const PimItem::List &item, const Scope &tags, bool &tagsChanged) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); if (!connection()->storageBackend()->setItemsTags(item, tagList, &tagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::replaceTags: unable to replace tags"; return false; } return true; } bool Store::addTags(const PimItem::List &items, const Scope &tags, bool &tagsChanged) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); if (!connection()->storageBackend()->appendItemsTags(items, tagList, &tagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::addTags: Unable to add new item tags"; return false; } return true; } bool Store::deleteTags(const PimItem::List &items, const Scope &tags, bool &tagsChanged) { const Tag::List tagList = HandlerHelper::tagsFromScope(tags, connection()); if (!connection()->storageBackend()->removeItemsTags(items, tagList, &tagsChanged)) { qCDebug(AKONADISERVER_LOG) << "Store::deleteTags: Unable to remove item tags"; return false; } return true; } bool Store::parseStream() { const auto &cmd = Protocol::cmdCast(m_command); //parseCommand(); DataStore *store = connection()->storageBackend(); Transaction transaction(store, QStringLiteral("STORE")); ExternalPartStorageTransaction storageTrx; // Set the same modification time for each item. QDateTime modificationtime = QDateTime::currentDateTimeUtc(); if (DbType::type(store->database()) != DbType::Sqlite) { // Remove milliseconds from the modificationtime. PSQL and MySQL don't // support milliseconds in DATETIME column, so FETCHed Items will report // time without milliseconds, while this command would return answer // with milliseconds modificationtime = modificationtime.addMSecs(-modificationtime.time().msec()); } // retrieve selected items SelectQueryBuilder qb; qb.setForUpdate(); ItemQueryHelper::scopeToQuery(cmd.items(), connection()->context(), qb); if (!qb.exec()) { return failureResponse("Unable to retrieve items"); } PimItem::List pimItems = qb.result(); if (pimItems.isEmpty()) { return failureResponse("No items found"); } for (int i = 0; i < pimItems.size(); ++i) { if (cmd.oldRevision() > -1) { // check for conflicts if a resources tries to overwrite an item with dirty payload const PimItem &pimItem = pimItems.at(i); if (connection()->isOwnerResource(pimItem)) { if (pimItem.dirty()) { const QString error = QStringLiteral("[LRCONFLICT] Resource %1 tries to modify item %2 (%3) (in collection %4) with dirty payload, aborting STORE."); return failureResponse( error.arg(pimItem.collection().resource().name()) .arg(pimItem.id()) .arg(pimItem.remoteId()).arg(pimItem.collectionId())); } } // check and update revisions if (pimItems.at(i).rev() != (int) cmd.oldRevision()) { const QString error = QStringLiteral("[LLCONFLICT] Resource %1 tries to modify item %2 (%3) (in collection %4) with revision %5; the item was modified elsewhere and has revision %6, aborting STORE."); return failureResponse(error.arg(pimItem.collection().resource().name()) .arg(pimItem.id()) .arg(pimItem.remoteId()).arg(pimItem.collectionId()) .arg(cmd.oldRevision()).arg(pimItems.at(i).rev())); } } } PimItem &item = pimItems.first(); QSet changes; qint64 partSizes = 0; qint64 size = 0; bool flagsChanged = false; bool tagsChanged = false; if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::AddedFlags) { if (!addFlags(pimItems, cmd.addedFlags(), flagsChanged)) { return failureResponse("Unable to add item flags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemovedFlags) { if (!deleteFlags(pimItems, cmd.removedFlags(), flagsChanged)) { return failureResponse("Unable to remove item flags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::Flags) { if (!replaceFlags(pimItems, cmd.flags(), flagsChanged)) { return failureResponse("Unable to reset flags"); } } if (flagsChanged) { changes << AKONADI_PARAM_FLAGS; } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::AddedTags) { if (!addTags(pimItems, cmd.addedTags(), tagsChanged)) { return failureResponse("Unable to add item tags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemovedTags) { if (!deleteTags(pimItems, cmd.removedTags(), tagsChanged)) { return failureResponse("Unable to remove item tags"); } } if (cmd.modifiedParts() & Protocol::ModifyItemsCommand::Tags) { if (!replaceTags(pimItems, cmd.tags(), tagsChanged)) { return failureResponse("Unable to reset item tags"); } } if (tagsChanged) { changes << AKONADI_PARAM_TAGS; } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemoteID) { if (item.remoteId() != cmd.remoteId() && !cmd.remoteId().isEmpty()) { if (!connection()->isOwnerResource(item)) { qCWarning(AKONADISERVER_LOG) << "Invalid attempt to modify the remoteID for item" << item.id() << "from" << item.remoteId() << "to" << cmd.remoteId(); return failureResponse("Only resources can modify remote identifiers"); } item.setRemoteId(cmd.remoteId()); changes << AKONADI_PARAM_REMOTEID; } } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::GID) { if (item.gid() != cmd.gid()) { item.setGid(cmd.gid()); } changes << AKONADI_PARAM_GID; } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemoteRevision) { if (item.remoteRevision() != cmd.remoteRevision()) { if (!connection()->isOwnerResource(item)) { return failureResponse("Only resources can modify remote revisions"); } item.setRemoteRevision(cmd.remoteRevision()); changes << AKONADI_PARAM_REMOTEREVISION; } } if (item.isValid() && !cmd.dirty()) { item.setDirty(false); } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::Size) { size = cmd.itemSize(); changes << AKONADI_PARAM_SIZE; } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::RemovedParts) { if (!cmd.removedParts().isEmpty()) { if (!store->removeItemParts(item, cmd.removedParts())) { return failureResponse("Unable to remove item parts"); } Q_FOREACH (const QByteArray &part, cmd.removedParts()) { changes.insert(part); } } } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::Parts) { PartStreamer streamer(connection(), item); Q_FOREACH (const QByteArray &partName, cmd.parts()) { qint64 partSize = 0; - if (!streamer.stream(true, partName, partSize)) { - return failureResponse(streamer.error()); + try { + streamer.stream(true, partName, partSize); + } catch (const PartStreamerException &e) { + return failureResponse(e.what()); } changes.insert(partName); partSizes += partSize; } } if (item.isValid() && cmd.modifiedParts() & Protocol::ModifyItemsCommand::Attributes) { PartStreamer streamer(connection(), item); const Protocol::Attributes attrs = cmd.attributes(); for (auto iter = attrs.cbegin(), end = attrs.cend(); iter != end; ++iter) { bool changed = false; - if (!streamer.streamAttribute(true, iter.key(), iter.value(), &changed)) { - return failureResponse(streamer.error()); + try { + streamer.streamAttribute(true, iter.key(), iter.value(), &changed); + } catch (const PartStreamerException &e) { + return failureResponse(e.what()); } if (changed) { changes.insert(iter.key()); } } } QDateTime datetime; if (!changes.isEmpty() || cmd.invalidateCache() || !cmd.dirty()) { // update item size if (pimItems.size() == 1 && (size > 0 || partSizes > 0)) { pimItems.first().setSize(qMax(size, partSizes)); } const bool onlyRemoteIdChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_REMOTEID)); const bool onlyRemoteRevisionChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_REMOTEREVISION)); const bool onlyRemoteIdAndRevisionChanged = (changes.size() == 2 && changes.contains(AKONADI_PARAM_REMOTEID) && changes.contains(AKONADI_PARAM_REMOTEREVISION)); const bool onlyFlagsChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_FLAGS)); const bool onlyGIDChanged = (changes.size() == 1 && changes.contains(AKONADI_PARAM_GID)); // If only the remote id and/or the remote revision changed, we don't have to increase the REV, // because these updates do not change the payload and can only be done by the owning resource -> no conflicts possible const bool revisionNeedsUpdate = (!changes.isEmpty() && !onlyRemoteIdChanged && !onlyRemoteRevisionChanged && !onlyRemoteIdAndRevisionChanged && !onlyGIDChanged); // run update query and prepare change notifications for (int i = 0; i < pimItems.count(); ++i) { if (revisionNeedsUpdate) { pimItems[i].setRev(pimItems[i].rev() + 1); } PimItem &item = pimItems[i]; item.setDatetime(modificationtime); item.setAtime(modificationtime); if (!connection()->isOwnerResource(item) && payloadChanged(changes)) { item.setDirty(true); } if (!item.update()) { return failureResponse("Unable to write item changes into the database"); } if (cmd.invalidateCache()) { if (!store->invalidateItemCache(item)) { return failureResponse("Unable to invalidate item cache in the database"); } } // flags change notification went separately during command parsing // GID-only changes are ignored to prevent resources from updating their storage when no actual change happened if (cmd.notify() && !changes.isEmpty() && !onlyFlagsChanged && !onlyGIDChanged) { // Don't send FLAGS notification in itemChanged changes.remove(AKONADI_PARAM_FLAGS); store->notificationCollector()->itemChanged(item, changes); } if (!cmd.noResponse()) { Protocol::ModifyItemsResponse resp; resp.setId(item.id()); resp.setNewRevision(item.rev()); sendResponse(std::move(resp)); } } if (!transaction.commit()) { return failureResponse("Cannot commit transaction."); } // Always commit storage changes (deletion) after DB transaction storageTrx.commit(); datetime = modificationtime; } else { datetime = pimItems.first().datetime(); } Protocol::ModifyItemsResponse resp; resp.setModificationDateTime(datetime); return successResponse(std::move(resp)); } diff --git a/src/server/storage/partstreamer.cpp b/src/server/storage/partstreamer.cpp index 8ed0e7670..c23893481 100644 --- a/src/server/storage/partstreamer.cpp +++ b/src/server/storage/partstreamer.cpp @@ -1,423 +1,386 @@ /* * Copyright (C) 2014 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #include "partstreamer.h" #include "parthelper.h" #include "parttypehelper.h" #include "selectquerybuilder.h" #include "dbconfig.h" #include "connection.h" #include "capabilities_p.h" #include "akonadiserver_debug.h" #include #include #include #include #ifdef HAVE_UNISTD_H # include #endif #include #include using namespace Akonadi; using namespace Akonadi::Server; PartStreamer::PartStreamer(Connection *connection, const PimItem &pimItem) : mConnection(connection) , mItem(pimItem) { // Make sure the file_db_data path exists StandardDirs::saveDir("data", QStringLiteral("file_db_data")); } PartStreamer::~PartStreamer() { } -QString PartStreamer::error() const -{ - return mError; -} Protocol::PartMetaData PartStreamer::requestPartMetaData(const QByteArray &partName) { { Protocol::StreamPayloadCommand resp; resp.setPayloadName(partName); resp.setRequest(Protocol::StreamPayloadCommand::MetaData); mConnection->sendResponse(std::move(resp)); } const auto cmd = mConnection->readCommand(); if (!cmd->isValid() || Protocol::cmdCast(cmd).isError()) { - mError = QStringLiteral("Client failed to provide part metadata"); - return Protocol::PartMetaData(); + throw PartStreamerException("Client failed to provide part metadata."); } return Protocol::cmdCast(cmd).metaData(); } -bool PartStreamer::streamPayload(Part &part, const QByteArray &partName) +void PartStreamer::streamPayload(Part &part, const QByteArray &partName) { Protocol::PartMetaData metaPart = requestPartMetaData(partName); if (metaPart.name().isEmpty()) { - mError = QStringLiteral("Part name is empty"); - return false; + throw PartStreamerException(QStringLiteral("Client sent empty metadata for part '%1'.") + .arg(QString::fromUtf8(partName))); } part.setVersion(metaPart.version()); if (part.datasize() != metaPart.size()) { part.setDatasize(metaPart.size()); // Shortcut: if sizes differ, we don't need to compare data later no in order // to detect whether the part has changed mDataChanged = mDataChanged || (metaPart.size() != part.datasize()); } if (metaPart.storageType() == Protocol::PartMetaData::Foreign) { - return streamForeignPayload(part, metaPart); + streamForeignPayload(part, metaPart); } else if (part.datasize() > DbConfig::configuredDatabase()->sizeThreshold()) { //actual case when streaming storage is used: external payload is enabled, // data is big enough in a literal - return streamPayloadToFile(part, metaPart); + streamPayloadToFile(part, metaPart); } else { - return streamPayloadData(part, metaPart); + streamPayloadData(part, metaPart); } } -bool PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart) +void PartStreamer::streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart) { // If the part WAS external previously, remove data file if (part.storage() == Part::External) { ExternalPartStorage::self()->removePartFile( ExternalPartStorage::resolveAbsolutePath(part.data())); } // Request the actual data { Protocol::StreamPayloadCommand resp; resp.setPayloadName(metaPart.name()); resp.setRequest(Protocol::StreamPayloadCommand::Data); mConnection->sendResponse(std::move(resp)); } const auto cmd = mConnection->readCommand(); const auto &response = Protocol::cmdCast(cmd); if (!response.isValid() || response.isError()) { - mError = QStringLiteral("Client failed to provide payload data"); - qCCritical(AKONADISERVER_LOG) << mError; - return false; + throw PartStreamerException(QStringLiteral("Client failed to provide payload data for part ID %1 (%2).") + .arg(part.id()).arg(part.partType().name())); } const QByteArray newData = response.data(); // only use the data size with internal payload parts, for foreign parts // we use the size reported by client const auto newSize = (metaPart.storageType() == Protocol::PartMetaData::Internal) ? newData.size() : metaPart.size(); if (newSize != metaPart.size()) { - mError = QStringLiteral("Payload size mismatch"); - return false; + throw PartStreamerException(QStringLiteral("Payload size mismatch: client advertised %1 bytes but sent %2 bytes.") + .arg(metaPart.size()).arg(newSize)); } if (part.isValid()) { if (!mDataChanged) { mDataChanged = mDataChanged || (newData != part.data()); } PartHelper::update(&part, newData, newSize); } else { part.setData(newData); part.setDatasize(newSize); if (!part.insert()) { - mError = QStringLiteral("Failed to insert part to database"); - return false; + throw PartStreamerException("Failed to insert new part into database."); } } - - return true; } -bool PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart) +void PartStreamer::streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart) { QByteArray origData; if (!mDataChanged && mCheckChanged) { origData = PartHelper::translateData(part); } QByteArray filename; if (part.isValid()) { if (part.storage() == Part::External) { // Part was external and is still external filename = part.data(); if (!filename.isEmpty()) { ExternalPartStorage::self()->removePartFile( ExternalPartStorage::resolveAbsolutePath(filename)); filename = ExternalPartStorage::updateFileNameRevision(filename); } else { // recover from data corruption filename = ExternalPartStorage::nameForPartId(part.id()); } } else { // Part wasn't external, but is now filename = ExternalPartStorage::nameForPartId(part.id()); } QFileInfo finfo(QString::fromUtf8(filename)); if (finfo.isAbsolute()) { filename = finfo.fileName().toUtf8(); } } part.setStorage(Part::External); part.setDatasize(metaPart.size()); part.setData(filename); if (part.isValid()) { if (!part.update()) { - mError = QStringLiteral("Failed to update part in database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id())); } } else { if (!part.insert()) { - mError = QStringLiteral("Failed to insert part into database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to insert new part fo PimItem %1 into database.") + .arg(part.pimItemId())); } filename = ExternalPartStorage::nameForPartId(part.id()); part.setData(filename); if (!part.update()) { - mError = QStringLiteral("Failed to update part in database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id())); } } { Protocol::StreamPayloadCommand cmd; cmd.setPayloadName(metaPart.name()); cmd.setRequest(Protocol::StreamPayloadCommand::Data); cmd.setDestination(QString::fromUtf8(filename)); mConnection->sendResponse(std::move(cmd)); } const auto cmd = mConnection->readCommand(); const auto &response = Protocol::cmdCast(cmd); if (!response.isValid() || response.isError()) { - mError = QStringLiteral("Client failed to store payload into file"); - qCCritical(AKONADISERVER_LOG) << mError; - return false; + throw PartStreamerException("Client failed to store payload into file."); } QFile file(ExternalPartStorage::resolveAbsolutePath(filename)); if (!file.exists()) { - mError = QStringLiteral("External payload file does not exist"); - qCCritical(AKONADISERVER_LOG) << mError; - return false; + throw PartStreamerException(QStringLiteral("External payload file %1 does not exist.").arg(file.fileName())); } if (file.size() != metaPart.size()) { - mError = QStringLiteral("Payload size mismatch"); - qCDebug(AKONADISERVER_LOG) << mError << ", client advertised" << metaPart.size() << "bytes, but the file is" << file.size() << "bytes!"; - return false; + throw PartStreamerException(QStringLiteral("Payload size mismatch, client advertised %1 bytes, but the file is %2 bytes.") + .arg(metaPart.size(), file.size())); } if (mCheckChanged && !mDataChanged) { // This is invoked only when part already exists, data sizes match and // caller wants to know whether parts really differ mDataChanged = (origData != PartHelper::translateData(part)); } - - return true; } -bool PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart) +void PartStreamer::streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart) { QByteArray origData; if (!mDataChanged && mCheckChanged) { origData = PartHelper::translateData(part); } { Protocol::StreamPayloadCommand cmd; cmd.setPayloadName(metaPart.name()); cmd.setRequest(Protocol::StreamPayloadCommand::Data); mConnection->sendResponse(std::move(cmd)); } const auto cmd = mConnection->readCommand(); const auto response = Protocol::cmdCast(cmd); if (!response.isValid() || response.isError()) { - mError = QStringLiteral("Client failed to store payload into file"); - qCCritical(AKONADISERVER_LOG) << mError; - return false; + throw PartStreamerException("Client failed to store payload into file."); } // If the part was previously external, clean up the data if (part.storage() == Part::External) { const QString filename = QString::fromUtf8(part.data()); ExternalPartStorage::self()->removePartFile( ExternalPartStorage::resolveAbsolutePath(filename)); } part.setStorage(Part::Foreign); part.setData(response.data()); if (part.isValid()) { if (!part.update()) { - mError = QStringLiteral("Failed to update part in database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to update part %1 in database.").arg(part.id())); } } else { if (!part.insert()) { - mError = QStringLiteral("Failed to insert part into database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to insert part for PimItem %1 into database.") + .arg(part.pimItemId())); } } const QString filename = QString::fromUtf8(response.data()); QFile file(filename); if (!file.exists()) { - mError = QStringLiteral("Foreign payload file does not exist"); - qCCritical(AKONADISERVER_LOG) << mError; - return false; + throw PartStreamerException(QStringLiteral("Foreign payload file %1 does not exist.").arg(filename)); } if (file.size() != metaPart.size()) { - mError = QStringLiteral("Payload size mismatch"); - qCCritical(AKONADISERVER_LOG) << mError << ", client advertised" << metaPart.size() << "bytes, but the file size is" << file.size() << "bytes!"; - return false; + throw PartStreamerException(QStringLiteral("Foreign payload size mismatch, client advertised %1 bytes, but the file size is %2 bytes.") + .arg(metaPart.size(), file.size())); } if (mCheckChanged && !mDataChanged) { // This is invoked only when part already exists, data sizes match and // caller wants to know whether parts really differ mDataChanged = (origData != PartHelper::translateData(part)); } - - return true; } -bool PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part) +void PartStreamer::preparePart(bool checkExists, const QByteArray &partName, Part &part) { - mError.clear(); mDataChanged = false; const PartType partType = PartTypeHelper::fromFqName(partName); if (checkExists || mCheckChanged) { SelectQueryBuilder qb; qb.addValueCondition(Part::pimItemIdColumn(), Query::Equals, mItem.id()); qb.addValueCondition(Part::partTypeIdColumn(), Query::Equals, partType.id()); if (!qb.exec()) { - mError = QStringLiteral("Unable to check item part existence"); - return false; + throw PartStreamerException(QStringLiteral("Failed to check if part %1 exists in PimItem %2.") + .arg(QString::fromUtf8(partName)).arg(mItem.id())); } const Part::List result = qb.result(); if (!result.isEmpty()) { part = result.at(0); } } // Shortcut: newly created parts are always "changed" if (!part.isValid()) { mDataChanged = true; } part.setPartType(partType); part.setPimItemId(mItem.id()); - - return true; } -bool PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed) +void PartStreamer::stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed) { mCheckChanged = (changed != nullptr); if (changed != nullptr) { *changed = false; } Part part; - if (!preparePart(checkExists, partName, part)) { - return false; - } + preparePart(checkExists, partName, part); - bool ok = streamPayload(part, partName); - if (changed && ok && mCheckChanged) { + streamPayload(part, partName); + if (changed && mCheckChanged) { *changed = mDataChanged; } partSize = part.datasize(); - - return ok; } -bool PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed) +void PartStreamer::streamAttribute(bool checkExists, const QByteArray &_partName, const QByteArray &value, bool *changed) { mCheckChanged = (changed != nullptr); if (changed != nullptr) { *changed = false; } QByteArray partName; if (!_partName.startsWith("ATR:")) { partName = "ATR:" + _partName; } else { partName = _partName; } Part part; - if (!preparePart(checkExists, partName, part)) { - return false; - } + preparePart(checkExists, partName, part); if (part.isValid()) { if (mCheckChanged) { if (PartHelper::translateData(part) != value) { mDataChanged = true; } } PartHelper::update(&part, value, value.size()); } else { const bool storeInFile = value.size() > DbConfig::configuredDatabase()->sizeThreshold(); part.setDatasize(value.size()); part.setVersion(0); if (storeInFile) { if (!part.insert()) { - mError = QStringLiteral("Failed to store part in database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.") + .arg(part.pimItemId())); } PartHelper::update(&part, value, value.size()); } else { part.setData(value); if (!part.insert()) { - mError = QStringLiteral("Failed to store part in database"); - return false; + throw PartStreamerException(QStringLiteral("Failed to store attribute part for PimItem %1 in database.") + .arg(part.pimItemId())); } } } if (mCheckChanged) { *changed = mDataChanged; } - - return true; } diff --git a/src/server/storage/partstreamer.h b/src/server/storage/partstreamer.h index 24b93f202..2ad552495 100644 --- a/src/server/storage/partstreamer.h +++ b/src/server/storage/partstreamer.h @@ -1,74 +1,81 @@ /* * Copyright (C) 2014 Daniel Vrátil * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #ifndef AKONADI_SERVER_PARTSTREAMER_H #define AKONADI_SERVER_PARTSTREAMER_H #include #include "entities.h" +#include "exception.h" namespace Akonadi { namespace Protocol { class PartMetaData; class Command; using CommandPtr = QSharedPointer; } namespace Server { +AKONADI_EXCEPTION_MAKE_INSTANCE(PartStreamerException); + class PimItem; class Part; class Connection; class PartStreamer { public: explicit PartStreamer(Connection *connection, const PimItem &pimItem); ~PartStreamer(); - bool stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed = nullptr); - bool streamAttribute(bool checkExists, const QByteArray &partName, const QByteArray &value, bool *changed = nullptr); + /** + * @throws PartStreamException + */ + void stream(bool checkExists, const QByteArray &partName, qint64 &partSize, bool *changed = nullptr); - QString error() const; + /** + * @throws PartStreamerException + */ + void streamAttribute(bool checkExists, const QByteArray &partName, const QByteArray &value, bool *changed = nullptr); private: - bool streamPayload(Part &part, const QByteArray &partName); - bool streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart); - bool streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart); - bool streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart); + void streamPayload(Part &part, const QByteArray &partName); + void streamPayloadToFile(Part &part, const Protocol::PartMetaData &metaPart); + void streamPayloadData(Part &part, const Protocol::PartMetaData &metaPart); + void streamForeignPayload(Part &part, const Protocol::PartMetaData &metaPart); Protocol::PartMetaData requestPartMetaData(const QByteArray &partName); - bool preparePart(bool checkExists, const QByteArray &partName, Part &part); + void preparePart(bool checkExists, const QByteArray &partName, Part &part); Connection *mConnection; PimItem mItem; bool mCheckChanged; bool mDataChanged; - QString mError; }; } // namespace Server } // namespace Akonadi #endif // AKONADI_SERVER_PARTSTREAMER_H