diff --git a/akonadi/resourcebase.h b/akonadi/resourcebase.h index e706139b2..4429d7145 100644 --- a/akonadi/resourcebase.h +++ b/akonadi/resourcebase.h @@ -1,522 +1,528 @@ /* This file is part of akonadiresources. Copyright (c) 2006 Till Adam Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_RESOURCEBASE_H #define AKONADI_RESOURCEBASE_H #include "akonadi_export.h" #include #include #include class KJob; class Akonadi__ResourceAdaptor; namespace Akonadi { class ResourceBasePrivate; /** * @short The base class for all Akonadi resources. * * This class should be used as a base class by all resource agents, * because it encapsulates large parts of the protocol between * resource agent, agent manager and the Akonadi storage. * * It provides many convenience methods to make implementing a * new Akonadi resource agent as simple as possible. * *

How to write a resource

* * The following provides an overview of what you need to do to implement * your own Akonadi resource. In the following, the term 'backend' refers * to the entity the resource connects with Akonadi, be it a single file * or a remote server. * * @todo Complete this (online/offline state management) * *
Basic %Resource Framework
* * The following is needed to create a new resource: * - A new class deriving from Akonadi::ResourceBase, implementing at least all * pure-virtual methods, see below for further details. * - call init() in your main() function. * - a .desktop file similar to the following example * \code * [Desktop Entry] * Encoding=UTF-8 * Name=My Akonadi Resource * Type=AkonadiResource * Exec=akonadi_my_resource * Icon=my-icon * * X-Akonadi-MimeTypes= * X-Akonadi-Capabilities=Resource * X-Akonadi-Identifier=akonadi_my_resource * \endcode * *
Handling PIM Items
* * To follow item changes in the backend, the following steps are necessary: * - Implement retrieveItems() to synchronize all items in the given * collection. If the backend supports incremental retrieval, * implementing support for that is recommended to improve performance. * - Convert the items provided by the backend to Akonadi items. * This typically happens either in retrieveItems() if you retrieved * the collection synchronously (not recommended for network backends) or * in the result slot of the asynchronous retrieval job. * Converting means to create Akonadi::Item objects for every retrieved * item. It's very important that every object has its remote identifier set. * - Call itemsRetrieved() or itemsRetrievedIncremental() respectively * with the item objects created above. The Akonadi storage will then be * updated automatically. Note that it is usually not necessary to manipulate * any item in the Akonadi storage manually. * * To fetch item data on demand, the method retrieveItem() needs to be * reimplemented. Fetch the requested data there and call itemRetrieved() * with the result item. * * To write local changes back to the backend, you need to re-implement * the following three methods: * - itemAdded() * - itemChanged() * - itemRemoved() * Note that these three functions don't get the full payload of the items by default, * you need to change the item fetch scope of the change recorder to fetch the full * payload. This can be expensive with big payloads, though.
* Once you have handled changes in these methods call changeCommitted(). * These methods are called whenever a local item related to this resource is * added, modified or deleted. They are only called if the resource is online, otherwise * all changes are recorded and replayed as soon the resource is online again. * *
Handling Collections
* * To follow collection changes in the backend, the following steps are necessary: * - Implement retrieveCollections() to retrieve collections from the backend. * If the backend supports incremental collections updates, implementing * support for that is recommended to improve performance. * - Convert the collections of the backend to Akonadi collections. * This typically happens either in retrieveCollections() if you retrieved * the collection synchronously (not recommended for network backends) or * in the result slot of the asynchronous retrieval job. * Converting means to create Akonadi::Collection objects for every retrieved * collection. It's very important that every object has its remote identifier * and its parent remote identifier set. * - Call collectionsRetrieved() or collectionsRetrievedIncremental() respectively * with the collection objects created above. The Akonadi storage will then be * updated automatically. Note that it is usually not necessary to manipulate * any collection in the Akonadi storage manually. * * * To write local collection changes back to the backend, you need to re-implement * the following three methods: * - collectionAdded() * - collectionChanged() * - collectionRemoved() * Once you have handled changes in these methods call changeCommitted(). * These methods are called whenever a local collection related to this resource is * added, modified or deleted. They are only called if the resource is online, otherwise * all changes are recorded and replayed as soon the resource is online again. * * @todo Convenience base class for collection-less resources */ // FIXME_API: API dox need to be updated for Observer approach (kevin) class AKONADI_EXPORT ResourceBase : public AgentBase { Q_OBJECT public: /** * Use this method in the main function of your resource * application to initialize your resource subclass. * This method also takes care of creating a KApplication * object and parsing command line arguments. * * @note In case the given class is also derived from AgentBase::Observer * it gets registered as its own observer (see AgentBase::Observer), e.g. * resourceInstance->registerObserver( resourceInstance ); * * @code * * class MyResource : public ResourceBase * { * ... * }; * * int main( int argc, char **argv ) * { * return ResourceBase::init( argc, argv ); * } * * @endcode */ template static int init( int argc, char **argv ) { const QString id = parseArguments( argc, argv ); KApplication app; T* r = new T( id ); // check if T also inherits AgentBase::Observer and // if it does, automatically register it on itself Observer *observer = dynamic_cast( r ); if ( observer != 0 ) r->registerObserver( observer ); return init( r ); } /** * This method is used to set the name of the resource. */ void setName( const QString &name ); /** * Returns the name of the resource. */ QString name() const; Q_SIGNALS: /** * This signal is emitted whenever the name of the resource has changed. * * @param name The new name of the resource. */ void nameChanged( const QString &name ); /** * Emitted when a full synchronization has been completed. */ void synchronized(); protected Q_SLOTS: /** * Retrieve the collection tree from the remote server and supply it via * collectionsRetrieved() or collectionsRetrievedIncremental(). * @see collectionsRetrieved(), collectionsRetrievedIncremental() */ virtual void retrieveCollections() = 0; /** * Retrieve all (new/changed) items in collection @p collection. * It is recommended to use incremental retrieval if the backend supports that * and provide the result by calling itemsRetrievedIncremental(). * If incremental retrieval is not possible, provide the full listing by calling * itemsRetrieved( const Item::List& ). * In any case, ensure that all items have a correctly set remote identifier * to allow synchronizing with items already existing locally. * In case you don't want to use the built-in item syncing code, store the retrieved * items manually and call itemsRetrieved() once you are done. * @param collection The collection whose items to retrieve. * @see itemsRetrieved( const Item::List& ), itemsRetrievedIncremental(), itemsRetrieved(), currentCollection() */ virtual void retrieveItems( const Akonadi::Collection &collection ) = 0; /** * Retrieve a single item from the backend. The item to retrieve is provided as @p item. * Add the requested payload parts and call itemRetrieved() when done. * @param item The empty item whose payload should be retrieved. Use this object when delivering * the result instead of creating a new item to ensure conflict detection will work. * @param parts The item parts that should be retrieved. * @return false if there is an immediate error when retrieving the item. * @see itemRetrieved() */ virtual bool retrieveItem( const Akonadi::Item &item, const QSet &parts ) = 0; protected: /** * Creates a base resource. * * @param id The instance id of the resource. */ ResourceBase( const QString & id ); /** * Destroys the base resource. */ ~ResourceBase(); /** * Call this method from retrieveItem() once the result is available. * * @param item The retrieved item. */ void itemRetrieved( const Item &item ); /** * Resets the dirty flag of the given item and updates the remote id. * * Call whenever you have successfully written changes back to the server. * This implicitly calls changeProcessed(). * @param item The changed item. */ void changeCommitted( const Item &item ); /** * Call whenever you have successfully handled or ignored a collection * change notification. * * This will update the remote identifier of @p collection if necessary, * as well as any other collection attributes. * This implicitly calls changeProcessed(). * @param collection The collection which changes have been handled. */ void changeCommitted( const Collection &collection ); /** * Call this to supply the full folder tree retrieved from the remote server. * * @param collections A list of collections. * @see collectionsRetrievedIncremental() */ void collectionsRetrieved( const Collection::List &collections ); /** * Call this to supply incrementally retrieved collections from the remote server. * * @param changedCollections Collections that have been added or changed. * @param removedCollections Collections that have been deleted. * @see collectionsRetrieved() */ void collectionsRetrievedIncremental( const Collection::List &changedCollections, const Collection::List &removedCollections ); /** * Enable collection streaming, that is collections don't have to be delivered at once * as result of a retrieveCollections() call but can be delivered by multiple calls * to collectionsRetrieved() or collectionsRetrievedIncremental(). When all collections * have been retrieved, call collectionsRetrievalDone(). * @param enable @c true if collection streaming should be enabled, @c false by default */ void setCollectionStreamingEnabled( bool enable ); /** * Call this method to indicate you finished synchronizing the collection tree. * * This is not needed if you use the built in syncing without collection streaming * and call collectionsRetrieved() or collectionRetrievedIncremental() instead. * If collection streaming is enabled, call this method once all collections have been delivered * using collectionsRetrieved() or collectionsRetrievedIncremental(). */ void collectionsRetrievalDone(); /** * Call this method to supply the full collection listing from the remote server. * * If the remote server supports incremental listing, it's strongly * recommended to use itemsRetrievedIncremental() instead. * @param items A list of items. * @see itemsRetrievedIncremental(). */ void itemsRetrieved( const Item::List &items ); /** * Call this method when you want to use the itemsRetrieved() method * in streaming mode and indicate the amount of items that will arrive * that way. * @deprecated Use setItemStreamingEnabled( true ) + itemsRetrieved[Incremental]() * + itemsRetrieved() instead. */ void setTotalItems( int amount ); /** * Enable item streaming. * Item streaming is disabled by default. * @param enable @c true if items are delivered in chunks rather in one big block. */ void setItemStreamingEnabled( bool enable ); /** * Call this method to supply incrementally retrieved items from the remote server. * * @param changedItems Items changed in the backend. * @param removedItems Items removed from the backend. */ void itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems ); /** * Call this method to indicate you finished synchronizing the current collection. * * This is not needed if you use the built in syncing without item streaming * and call itemsRetrieved() or itemsRetrievedIncremental() instead. * If item streaming is enabled, call this method once all items have been delivered * using itemsRetrieved() or itemsRetrievedIncremental(). * @see retrieveItems() */ void itemsRetrievalDone(); /** * Call this method to remove all items and collections of the resource from the * server cache. * * The method should be used whenever the configuration of the resource has changed * and therefor the cached items might not be valid any longer. * * @since 4.3 */ void clearCache(); /** * Returns the collection that is currently synchronized. */ Collection currentCollection() const; /** * Returns the item that is currently retrieved. */ Item currentItem() const; /** * This method is called whenever the resource should start synchronize all data. */ void synchronize(); /** * This method is called whenever the collection with the given @p id * shall be synchronized. */ void synchronizeCollection( qint64 id ); /** * Refetches the Collections. */ void synchronizeCollectionTree(); /** * Stops the execution of the current task and continues with the next one. */ void cancelTask(); /** * Stops the execution of the current task and continues with the next one. * Additionally an error message is emitted. */ void cancelTask( const QString &error ); /** * Stops the execution of the current task and continues with the next one. * The current task will be tried again later. * + * This can be used to delay the task processing until the resource has reached a safe + * state, e.g. login to a server succeeded. + * + * @note This does not change the order of tasks so if there is no task with higher priority + * e.g. a custom task added with #Prepend the deferred task will be processed again. + * * @since 4.3 */ void deferTask(); /** * Inherited from AgentBase. */ void doSetOnline( bool online ); /** * Indicate the use of hierarchical remote identifiers. * * This means that it is possible to have two different items with the same * remoteId in different Collections. * * This should be called in the resource constructor as needed. * * @since 4.4 */ void setHierarchicalRemoteIdentifiersEnabled( bool enable ); friend class ResourceScheduler; /** * Describes the scheduling priority of a task that has been queued * for execution. * * @see scheduleCustomTask * @since 4.4 */ enum SchedulePriority { Prepend, ///> The task will be executed as soon as the current task has finished. AfterChangeReplay, ///> The task is scheduled after the last ChangeReplay task in the queue Append ///> The task will be executed after all tasks currently in the queue are finished }; /** * Schedules a custom task in the internal scheduler. It will be queued with * all other tasks such as change replays and retrieval requests and eventually * executed by calling the specified method. With the priority parameter the * time of execution of the Task can be influenced. @see SchedulePriority * @param receiver The object the slot should be called on. * @param method The name of the method (and only the name, not signature, not SLOT(...) macro), * that should be called to execute this task. The method has to be a slot and take a QVariant as * argument. * @param argument A QVariant argument passed to the method specified above. Use this to pass task * parameters. * @param priority Priority of the task. Use this to influence the position in * the execution queue. * @since 4.4 */ void scheduleCustomTask( QObject* receiver, const char* method, const QVariant &argument, SchedulePriority priority = Append ); /** * Indicate that the current task is finished. Use this method from the slot called via scheduleCustomTaks(). * As with all the other callbacks, make sure to either call taskDone() or cancelTask()/deferTask() on all * exit paths, otherwise the resource will hang. * @since 4.4 */ void taskDone(); private: static QString parseArguments( int, char** ); static int init( ResourceBase *r ); // dbus resource interface friend class ::Akonadi__ResourceAdaptor; bool requestItemDelivery( qint64 uid, const QString &remoteId, const QString &mimeType, const QStringList &parts ); private: Q_DECLARE_PRIVATE( ResourceBase ) Q_PRIVATE_SLOT( d_func(), void slotDeliveryDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotCollectionSyncDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotDeleteResourceCollection() ) Q_PRIVATE_SLOT( d_func(), void slotDeleteResourceCollectionDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotCollectionDeletionDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotLocalListDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotSynchronizeCollection( const Akonadi::Collection& ) ) Q_PRIVATE_SLOT( d_func(), void slotCollectionListDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotItemSyncDone( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void slotPercent( KJob*, unsigned long ) ) Q_PRIVATE_SLOT( d_func(), void slotPrepareItemRetrieval( const Akonadi::Item& item ) ) Q_PRIVATE_SLOT( d_func(), void slotPrepareItemRetrievalResult( KJob* ) ) Q_PRIVATE_SLOT( d_func(), void changeCommittedResult( KJob* ) ) }; } #ifndef AKONADI_RESOURCE_MAIN /** * Convenience Macro for the most common main() function for Akonadi resources. */ #define AKONADI_RESOURCE_MAIN( resourceClass ) \ int main( int argc, char **argv ) \ { \ return Akonadi::ResourceBase::init( argc, argv ); \ } #endif #endif diff --git a/akonadi/resourcescheduler.cpp b/akonadi/resourcescheduler.cpp index ef6152e06..df1628277 100644 --- a/akonadi/resourcescheduler.cpp +++ b/akonadi/resourcescheduler.cpp @@ -1,428 +1,442 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "resourcescheduler_p.h" #include #include #include #include #include #include using namespace Akonadi; qint64 ResourceScheduler::Task::latestSerial = 0; static QDBusAbstractInterface *s_resourcetracker = 0; //@cond PRIVATE ResourceScheduler::ResourceScheduler( QObject *parent ) : QObject( parent ), + mCurrentTasksQueue( -1 ), mOnline( false ) { } void ResourceScheduler::scheduleFullSync() { Task t; t.type = SyncAll; TaskList& queue = queueForTaskType( t.type ); if ( queue.contains( t ) || mCurrentTask == t ) return; queue << t; signalTaskToTracker( t, "SyncAll" ); scheduleNext(); } void ResourceScheduler::scheduleCollectionTreeSync() { Task t; t.type = SyncCollectionTree; TaskList& queue = queueForTaskType( t.type ); if ( queue.contains( t ) || mCurrentTask == t ) return; queue << t; signalTaskToTracker( t, "SyncCollectionTree" ); scheduleNext(); } void ResourceScheduler::scheduleSync(const Collection & col) { Task t; t.type = SyncCollection; t.collection = col; TaskList& queue = queueForTaskType( t.type ); if ( queue.contains( t ) || mCurrentTask == t ) return; queue << t; signalTaskToTracker( t, "SyncCollection" ); scheduleNext(); } void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet &parts, const QDBusMessage & msg) { Task t; t.type = FetchItem; t.item = item; t.itemParts = parts; // if the current task does already fetch the requested item, break here but // keep the dbus message, so we can send the reply later on if ( mCurrentTask == t ) { mCurrentTask.dbusMsgs << msg; return; } // If this task is already in the queue, merge with it. TaskList& queue = queueForTaskType( t.type ); const int idx = queue.indexOf( t ); if ( idx != -1 ) { queue[ idx ].dbusMsgs << msg; return; } t.dbusMsgs << msg; queue << t; signalTaskToTracker( t, "FetchItem" ); scheduleNext(); } void ResourceScheduler::scheduleResourceCollectionDeletion() { Task t; t.type = DeleteResourceCollection; TaskList& queue = queueForTaskType( t.type ); if ( queue.contains( t ) || mCurrentTask == t ) return; queue << t; signalTaskToTracker( t, "DeleteResourceCollection" ); scheduleNext(); } void ResourceScheduler::scheduleChangeReplay() { Task t; t.type = ChangeReplay; TaskList& queue = queueForTaskType( t.type ); // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks if ( queue.contains( t ) ) return; queue << t; signalTaskToTracker( t, "ChangeReplay" ); scheduleNext(); } void Akonadi::ResourceScheduler::scheduleFullSyncCompletion() { Task t; t.type = SyncAllDone; TaskList& queue = queueForTaskType( t.type ); // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost queue << t; signalTaskToTracker( t, "SyncAllDone" ); scheduleNext(); } void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority ) { Task t; t.type = Custom; t.receiver = receiver; t.methodName = methodName; t.argument = argument; QueueType queueType = GenericTaskQueue; if ( priority == ResourceBase::AfterChangeReplay ) queueType = AfterChangeReplayQueue; + else if ( priority == ResourceBase::Prepend ) + queueType = PrependTaskQueue; TaskList& queue = mTaskList[ queueType ]; if ( queue.contains( t ) ) return; switch (priority) { case ResourceBase::Prepend: queue.prepend( t ); break; default: queue.append(t); break; } signalTaskToTracker( t, "Custom-" + t.methodName ); scheduleNext(); } void ResourceScheduler::taskDone() { if ( isEmpty() ) emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) ); if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ) << QString(); s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); } mCurrentTask = Task(); + mCurrentTasksQueue = -1; scheduleNext(); } void ResourceScheduler::deferTask() { + if ( mCurrentTask.type == Invalid ) + return; + if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ) << QString(); s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); } Task t = mCurrentTask; mCurrentTask = Task(); - mTaskList[GenericTaskQueue] << t; + + Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount ); + mTaskList[mCurrentTasksQueue].prepend( t ); + mCurrentTasksQueue = -1; + signalTaskToTracker( t, "DeferedTask" ); scheduleNext(); } bool ResourceScheduler::isEmpty() { for ( int i = 0; i < NQueueCount; ++i ) { if ( !mTaskList[i].isEmpty() ) return false; } return true; } void ResourceScheduler::scheduleNext() { if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline ) return; QTimer::singleShot( 0, this, SLOT( executeNext() ) ); } void ResourceScheduler::executeNext() { if ( mCurrentTask.type != Invalid || isEmpty() ) return; for ( int i = 0; i < NQueueCount; ++i ) { if ( !mTaskList[ i ].isEmpty() ) { mCurrentTask = mTaskList[ i ].takeFirst(); + mCurrentTasksQueue = i; break; } } if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ); s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList); } switch ( mCurrentTask.type ) { case SyncAll: emit executeFullSync(); break; case SyncCollectionTree: emit executeCollectionTreeSync(); break; case SyncCollection: emit executeCollectionSync( mCurrentTask.collection ); break; case FetchItem: emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); break; case DeleteResourceCollection: emit executeResourceCollectionDeletion(); break; case ChangeReplay: emit executeChangeReplay(); break; case SyncAllDone: emit fullSyncComplete(); break; case Custom: { bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) ); if ( !success ) success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName ); if ( !success ) kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument; break; } default: { kError() << "Unhandled task type" << mCurrentTask.type; dump(); Q_ASSERT( false ); } } } ResourceScheduler::Task ResourceScheduler::currentTask() const { return mCurrentTask; } void ResourceScheduler::setOnline(bool state) { if ( mOnline == state ) return; mOnline = state; if ( mOnline ) { scheduleNext(); } else { if ( mCurrentTask.type != Invalid ) { // abort running task queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask ); mCurrentTask = Task(); + mCurrentTasksQueue = -1; } // abort pending synchronous tasks, might take longer until the resource goes online again TaskList& itemFetchQueue = queueForTaskType( FetchItem ); for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) { if ( (*it).type == FetchItem ) { (*it).sendDBusReplies( false ); it = itemFetchQueue.erase( it ); if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ) << QLatin1String( "Job canceled." ); s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList ); } } else { ++it; } } } } void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType ) { // if there's a job tracer running, tell it about the new job if ( !s_resourcetracker && QDBusConnection::sessionBus().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) { s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ), QLatin1String( "/resourcesJobtracker" ), QLatin1String( "org.freedesktop.Akonadi.JobTracker" ), QDBusConnection::sessionBus(), 0 ); } if ( s_resourcetracker ) { QList argumentList; argumentList << static_cast( parent() )->identifier() << QString::number( task.serial ) << QString() << QString::fromLatin1( taskType ); s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList); } } void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection ) { if ( !collection.isValid() ) // should not happen, but you never know... return; TaskList& queue = queueForTaskType( SyncCollection ); for ( QList::iterator it = queue.begin(); it != queue.end(); ) { if ( (*it).type == SyncCollection && (*it).collection == collection ) { it = queue.erase( it ); kDebug() << " erasing"; } else ++it; } } void ResourceScheduler::Task::sendDBusReplies( bool success ) { Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) { QDBusMessage reply( msg ); reply << success; QDBusConnection::sessionBus().send( reply ); } } ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type ) { switch( type ) { case ChangeReplay: return ChangeReplayQueue; case FetchItem: return ItemFetchQueue; default: return GenericTaskQueue; } } ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type ) { const QueueType qt = queueTypeForTaskType( type ); return mTaskList[ qt ]; } void ResourceScheduler::dump() { kDebug() << "ResourceScheduler: Online:" << mOnline; kDebug() << " current task:" << mCurrentTask; for ( int i = 0; i < NQueueCount; ++i ) { const TaskList& queue = mTaskList[i]; kDebug() << " queue" << i << queue.size() << "tasks:"; for ( QList::const_iterator it = queue.begin(); it != queue.end(); ++it ) { kDebug() << " " << (*it); } } } void ResourceScheduler::clear() { kDebug() << "Clearing ResourceScheduler queues:"; for ( int i = 0; i < NQueueCount; ++i ) { TaskList& queue = mTaskList[i]; queue.clear(); } mCurrentTask = Task(); + mCurrentTasksQueue = -1; } static const char s_taskTypes[][25] = { "Invalid", "SyncAll", "SyncCollectionTree", "SyncCollection", "FetchItem", "ChangeReplay", "DeleteResourceCollection", "SyncAllDone", "Custom" }; QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task ) { d << task.serial << s_taskTypes[task.type]; if ( task.type != ResourceScheduler::Invalid ) { if ( task.collection.id() != -1 ) d << "collection" << task.collection.id(); if ( task.item.id() != -1 ) d << "item" << task.item.id(); if ( !task.methodName.isEmpty() ) d << task.methodName << task.argument; } return d; } //@endcond #include "resourcescheduler_p.moc" diff --git a/akonadi/resourcescheduler_p.h b/akonadi/resourcescheduler_p.h index 87eac0cc9..5bc75a64e 100644 --- a/akonadi/resourcescheduler_p.h +++ b/akonadi/resourcescheduler_p.h @@ -1,230 +1,232 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_RESOURCESCHEDULER_P_H #define AKONADI_RESOURCESCHEDULER_P_H #include #include #include #include #include #include #include namespace Akonadi { //@cond PRIVATE /** @internal Manages synchronization and fetch requests for a resource. @todo Attach to the ResourceBase Monitor, */ class ResourceScheduler : public QObject { Q_OBJECT public: enum TaskType { Invalid, SyncAll, SyncCollectionTree, SyncCollection, FetchItem, ChangeReplay, DeleteResourceCollection, SyncAllDone, Custom }; class Task { static qint64 latestSerial; public: Task() : serial( ++latestSerial ), type( Invalid ), receiver( 0 ) {} qint64 serial; TaskType type; Collection collection; Item item; QSet itemParts; QList dbusMsgs; QObject *receiver; QByteArray methodName; QVariant argument; void sendDBusReplies( bool success ); bool operator==( const Task &other ) const { return type == other.type && (collection == other.collection || (!collection.isValid() && !other.collection.isValid())) && (item == other.item || (!item.isValid() && !other.item.isValid())) && itemParts == other.itemParts && receiver == other.receiver && methodName == other.methodName && argument == other.argument; } }; ResourceScheduler( QObject *parent = 0 ); /** Schedules a full synchronization. */ void scheduleFullSync(); /** Schedules a collection tree sync. */ void scheduleCollectionTreeSync(); /** Schedules the synchronization of a single collection. @param col The collection to synchronize. */ void scheduleSync( const Collection &col ); /** Schedules fetching of a single PIM item. @param item The item to fetch. @param parts List of names of the parts of the item to fetch. @param msg The associated D-Bus message. */ void scheduleItemFetch( const Item &item, const QSet &parts, const QDBusMessage &msg ); /** Schedules deletion of the resource collection. This method is used to implement the ResourceBase::clearCache() functionality. */ void scheduleResourceCollectionDeletion(); /** Insert synchronization completetion marker into the task queue. */ void scheduleFullSyncCompletion(); /** Insert a custom taks. @param methodName The method name, without signature, do not use the SLOT() macro */ void scheduleCustomTask( QObject *receiver, const char *methodName, const QVariant &argument, ResourceBase::SchedulePriority priority = ResourceBase::Append ); /** Returns true if no tasks are running or in the queue. */ bool isEmpty(); /** Returns the current task. */ Task currentTask() const; /** Sets the online state. */ void setOnline( bool state ); /** Print debug output showing the state of the scheduler. */ void dump(); /** Clear the state of the scheduler. Warning: this is intended to be used purely in debugging scenarios, as it might cause loss of uncommitted local changes. */ void clear(); public Q_SLOTS: /** Schedules replaying changes. */ void scheduleChangeReplay(); /** The current task has been finished */ void taskDone(); /** The current task can't be finished now and will be rescheduled later */ void deferTask(); /** Remove tasks that affect @p collection. */ void collectionRemoved( const Akonadi::Collection &collection ); Q_SIGNALS: void executeFullSync(); void executeCollectionSync( const Akonadi::Collection &col ); void executeCollectionTreeSync(); void executeItemFetch( const Akonadi::Item &item, const QSet &parts ); void executeResourceCollectionDeletion(); void executeChangeReplay(); void fullSyncComplete(); void status( int status, const QString &message = QString() ); private slots: void scheduleNext(); void executeNext(); private: void signalTaskToTracker( const Task &task, const QByteArray &taskType ); // We have a number of task queues, by order of priority. // * ChangeReplay must be first: // change replays have to happen before we pull changes from the backend, otherwise // we will overwrite our still unsaved local changes if the backend can't do // incremental retrieval // // * then the stuff that is "immediately after change replay", like writeFile calls. // * then ItemFetch tasks, because they are made by blocking DBus calls // * then everything else. enum QueueType { + PrependTaskQueue, ChangeReplayQueue, // one task at most AfterChangeReplayQueue, // also one task at most, currently ItemFetchQueue, GenericTaskQueue, NQueueCount }; typedef QList TaskList; static QueueType queueTypeForTaskType( TaskType type ); TaskList& queueForTaskType( TaskType type ); TaskList mTaskList[ NQueueCount ]; Task mCurrentTask; + int mCurrentTasksQueue; // queue mCurrentTask came from bool mOnline; }; QDebug operator<<( QDebug, const ResourceScheduler::Task& task ); //@endcond } #endif