diff --git a/akonadi/resourcebase.cpp b/akonadi/resourcebase.cpp index 7a6ebfcc8..6a66c1094 100644 --- a/akonadi/resourcebase.cpp +++ b/akonadi/resourcebase.cpp @@ -1,506 +1,502 @@ /* Copyright (c) 2006 Till Adam Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "resourcebase.h" #include "agentbase_p.h" #include "resourceadaptor.h" #include "collectionsync.h" #include "itemsync.h" #include "resourcescheduler.h" #include "tracerinterface.h" #include "xdgbasedirs_p.h" #include "changerecorder.h" #include "collectionfetchjob.h" #include "collectionmodifyjob.h" #include "itemfetchjob.h" #include "itemfetchscope.h" #include "itemmodifyjob.h" #include "itemmodifyjob_p.h" #include "session.h" #include #include #include #include #include #include #include #include #include #include #include using namespace Akonadi; class Akonadi::ResourceBasePrivate : public AgentBasePrivate { public: ResourceBasePrivate( ResourceBase *parent ) : AgentBasePrivate( parent ), scheduler( 0 ), mItemSyncer( 0 ) { mStatusMessage = defaultReadyMessage(); } Q_DECLARE_PUBLIC( ResourceBase ) void delayedInit() { if ( !QDBusConnection::sessionBus().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) kFatal() << "Unable to register service at D-Bus: " << QDBusConnection::sessionBus().lastError().message(); AgentBasePrivate::delayedInit(); } virtual void changeProcessed() { mMonitor->changeProcessed(); if ( !mMonitor->isEmpty() ) scheduler->scheduleChangeReplay(); scheduler->taskDone(); } void slotDeliveryDone( KJob* job ); void slotCollectionSyncDone( KJob *job ); void slotLocalListDone( KJob *job ); void slotSynchronizeCollection( const Collection &col ); void slotCollectionListDone( KJob *job ); void slotItemSyncDone( KJob *job ); void slotPercent( KJob* job, unsigned long percent ); QString mName; // synchronize states Collection currentCollection; ResourceScheduler *scheduler; ItemSync *mItemSyncer; }; ResourceBase::ResourceBase( const QString & id ) : AgentBase( new ResourceBasePrivate( this ), id ) { Q_D( ResourceBase ); new ResourceAdaptor( this ); const QString name = d->mSettings->value( QLatin1String( "Resource/Name" ) ).toString(); if ( !name.isEmpty() ) d->mName = name; d->scheduler = new ResourceScheduler( this ); d->mMonitor->setChangeRecordingEnabled( true ); connect( d->mMonitor, SIGNAL(changesAdded()), d->scheduler, SLOT(scheduleChangeReplay()) ); d->mMonitor->setResourceMonitored( d->mId.toLatin1() ); connect( d->scheduler, SIGNAL(executeFullSync()), SLOT(retrieveCollections()) ); connect( d->scheduler, SIGNAL(executeCollectionTreeSync()), SLOT(retrieveCollections()) ); connect( d->scheduler, SIGNAL(executeCollectionSync(Akonadi::Collection)), SLOT(slotSynchronizeCollection(Akonadi::Collection)) ); connect( d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet)), SLOT(retrieveItem(Akonadi::Item,QSet)) ); + connect( d->scheduler, SIGNAL( status( int, QString ) ), + SIGNAL( status( int, QString ) ) ); connect( d->scheduler, SIGNAL(executeChangeReplay()), d->mMonitor, SLOT(replayNext()) ); d->scheduler->setOnline( d->mOnline ); if ( !d->mMonitor->isEmpty() ) d->scheduler->scheduleChangeReplay(); } ResourceBase::~ResourceBase() { } void ResourceBase::synchronize() { d_func()->scheduler->scheduleFullSync(); } void ResourceBase::setName( const QString &name ) { Q_D( ResourceBase ); if ( name == d->mName ) return; // TODO: rename collection d->mName = name; if ( d->mName.isEmpty() || d->mName == d->mId ) d->mSettings->remove( QLatin1String( "Resource/Name" ) ); else d->mSettings->setValue( QLatin1String( "Resource/Name" ), d->mName ); d->mSettings->sync(); emit nameChanged( d->mName ); } QString ResourceBase::name() const { Q_D( const ResourceBase ); if ( d->mName.isEmpty() ) return d->mId; else return d->mName; } static char* sAppName = 0; QString ResourceBase::parseArguments( int argc, char **argv ) { QString identifier; if ( argc < 3 ) { kDebug( 5250 ) << "Not enough arguments passed..."; exit( 1 ); } for ( int i = 1; i < argc - 1; ++i ) { if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) ) identifier = QLatin1String( argv[ i + 1 ] ); } if ( identifier.isEmpty() ) { kDebug( 5250 ) << "Identifier argument missing"; exit( 1 ); } sAppName = qstrdup( identifier.toLatin1().constData() ); KCmdLineArgs::init( argc, argv, sAppName, 0, ki18nc("@title, application name", "Akonadi Resource"), "0.1", ki18nc("@title, application description", "Akonadi Resource") ); KCmdLineOptions options; options.add("identifier ", ki18nc("@label, commandline option", "Resource identifier")); KCmdLineArgs::addCmdLineOptions( options ); return identifier; } int ResourceBase::init( ResourceBase *r ) { QApplication::setQuitOnLastWindowClosed( false ); int rv = kapp->exec(); delete r; delete[] sAppName; return rv; } void ResourceBase::itemRetrieved( const Item &item ) { Q_D( ResourceBase ); Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ); if ( !item.isValid() ) { QDBusMessage reply( d->scheduler->currentTask().dbusMsg ); reply << false; QDBusConnection::sessionBus().send( reply ); d->scheduler->taskDone(); return; } Item i( item ); QSet requestedParts = d->scheduler->currentTask().itemParts; foreach ( const QByteArray &part, requestedParts ) { if ( !item.loadedPayloadParts().contains( part ) ) { kWarning( 5250 ) << "Item does not provide part" << part; } } ItemModifyJob *job = new ItemModifyJob( i ); // FIXME: remove once the item with which we call retrieveItem() has a revision number job->disableRevisionCheck(); connect( job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)) ); } void ResourceBasePrivate::slotDeliveryDone(KJob * job) { Q_Q( ResourceBase ); Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem ); QDBusMessage reply( scheduler->currentTask().dbusMsg ); if ( job->error() ) { emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() ); reply << false; } else { reply << true; } QDBusConnection::sessionBus().send( reply ); scheduler->taskDone(); } void ResourceBase::changeCommitted(const Item& item) { Q_D( ResourceBase ); ItemModifyJob *job = new ItemModifyJob( item ); job->d_func()->setClean(); job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error? d->changeProcessed(); } void ResourceBase::changeCommitted( const Collection &collection ) { Q_D( ResourceBase ); CollectionModifyJob *job = new CollectionModifyJob( collection ); Q_UNUSED( job ); //TODO: error checking d->changeProcessed(); } bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId, const QString &mimeType, const QStringList &_parts ) { Q_D( ResourceBase ); if ( !isOnline() ) { emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) ); return false; } setDelayedReply( true ); // FIXME: we need at least the revision number too Item item( uid ); item.setMimeType( mimeType ); item.setRemoteId( remoteId ); QSet parts; Q_FOREACH( const QString &str, _parts ) parts.insert( str.toLatin1() ); d->scheduler->scheduleItemFetch( item, parts, message().createReply() ); return true; } void ResourceBase::collectionsRetrieved(const Collection::List & collections) { Q_D( ResourceBase ); CollectionSync *syncer = new CollectionSync( d->mId ); syncer->setRemoteCollections( collections ); connect( syncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) ); } void ResourceBase::collectionsRetrievedIncremental(const Collection::List & changedCollections, const Collection::List & removedCollections) { Q_D( ResourceBase ); CollectionSync *syncer = new CollectionSync( d->mId ); syncer->setRemoteCollections( changedCollections, removedCollections ); connect( syncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) ); } void ResourceBasePrivate::slotCollectionSyncDone(KJob * job) { Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); } else { if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) { CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive ); list->setResource( mId ); q->connect( list, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)) ); return; } } - if ( scheduler->isEmpty() ) - emit q->status( AgentBase::Idle ); scheduler->taskDone(); } void ResourceBasePrivate::slotLocalListDone(KJob * job) { Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); } else { Collection::List cols = static_cast( job )->collections(); foreach ( const Collection &col, cols ) { scheduler->scheduleSync( col ); } } scheduler->taskDone(); } void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col ) { Q_Q( ResourceBase ); currentCollection = col; // check if this collection actually can contain anything QStringList contentTypes = currentCollection.contentMimeTypes(); contentTypes.removeAll( Collection::mimeType() ); if ( !contentTypes.isEmpty() ) { emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) ); q->retrieveItems( currentCollection ); return; } scheduler->taskDone(); } void ResourceBase::itemsRetrievalDone() { Q_D( ResourceBase ); // streaming enabled, so finalize the sync if ( d->mItemSyncer ) { d->mItemSyncer->deliveryDone(); } // user did the sync himself, we are done now else { - if ( d->scheduler->isEmpty() ) - emit status( Idle ); d->scheduler->taskDone(); } } Collection ResourceBase::currentCollection() const { Q_D( const ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection , "ResourceBase::currentCollection()", "Trying to access current collection although no item retrieval is in progress" ); return d->currentCollection; } Item ResourceBase::currentItem() const { Q_D( const ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem , "ResourceBase::currentItem()", "Trying to access current item although no item retrieval is in progress" ); return d->scheduler->currentTask().item; } void ResourceBase::synchronizeCollectionTree() { d_func()->scheduler->scheduleCollectionTreeSync(); } void ResourceBase::cancelTask() { d_func()->changeProcessed(); } void ResourceBase::cancelTask( const QString &msg ) { cancelTask(); emit error( msg ); } void ResourceBase::doSetOnline( bool state ) { d_func()->scheduler->setOnline( state ); } void ResourceBase::synchronizeCollection(qint64 collectionId ) { CollectionFetchJob* job = new CollectionFetchJob( Collection(collectionId), CollectionFetchJob::Base ); job->setResource( identifier() ); connect( job, SIGNAL(result(KJob*)), SLOT(slotCollectionListDone(KJob*)) ); } void ResourceBasePrivate::slotCollectionListDone( KJob *job ) { if ( !job->error() ) { Collection::List list = static_cast( job )->collections(); if ( !list.isEmpty() ) { Collection col = list.first(); scheduler->scheduleSync( col ); } } // TODO: error handling } void ResourceBase::setTotalItems( int amount ) { kDebug() << amount; Q_D( ResourceBase ); setItemStreamingEnabled( true ); d->mItemSyncer->setTotalItems( amount ); } void ResourceBase::setItemStreamingEnabled( bool enable ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection, "ResourceBase::setItemStreamingEnabled()", "Calling setItemStreamingEnabled() although no item retrieval is in progress" ); if ( !d->mItemSyncer ) { d->mItemSyncer = new ItemSync( currentCollection() ); connect( d->mItemSyncer, SIGNAL(percent(KJob*,unsigned long)), SLOT(slotPercent(KJob*,unsigned long)) ); connect( d->mItemSyncer, SIGNAL(result(KJob*)), SLOT(slotItemSyncDone(KJob*)) ); } d->mItemSyncer->setStreamingEnabled( enable ); } void ResourceBase::itemsRetrieved( const Item::List &items ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection, "ResourceBase::itemsRetrieved()", "Calling itemsRetrieved() although no item retrieval is in progress" ); if ( !d->mItemSyncer ) { d->mItemSyncer = new ItemSync( currentCollection() ); connect( d->mItemSyncer, SIGNAL(percent(KJob*,unsigned long)), SLOT(slotPercent(KJob*,unsigned long)) ); connect( d->mItemSyncer, SIGNAL(result(KJob*)), SLOT(slotItemSyncDone(KJob*)) ); } d->mItemSyncer->setFullSyncItems( items ); } void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection, "ResourceBase::itemsRetrievedIncremental()", "Calling itemsRetrievedIncremental() although no item retrieval is in progress" ); if ( !d->mItemSyncer ) { d->mItemSyncer = new ItemSync( currentCollection() ); connect( d->mItemSyncer, SIGNAL(percent(KJob*,unsigned long)), SLOT(slotPercent(KJob*,unsigned long)) ); connect( d->mItemSyncer, SIGNAL(result(KJob*)), SLOT(slotItemSyncDone(KJob*)) ); } d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems ); } void ResourceBasePrivate::slotItemSyncDone( KJob *job ) { mItemSyncer = 0; Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); } - if ( scheduler->isEmpty() ) - emit q->status( AgentBase::Idle ); scheduler->taskDone(); } void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent ) { Q_Q( ResourceBase ); Q_UNUSED( job ); emit q->percent( percent ); } #include "resourcebase.moc" diff --git a/akonadi/resourcescheduler.cpp b/akonadi/resourcescheduler.cpp index 0fb5c4e74..0c9c735e7 100644 --- a/akonadi/resourcescheduler.cpp +++ b/akonadi/resourcescheduler.cpp @@ -1,148 +1,150 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "resourcescheduler.h" #include #include using namespace Akonadi; //@cond PRIVATE ResourceScheduler::ResourceScheduler( QObject *parent ) : QObject( parent ), mOnline( false ) { } void ResourceScheduler::scheduleFullSync() { Task t; t.type = SyncAll; mTaskList << t; scheduleNext(); } void ResourceScheduler::scheduleCollectionTreeSync() { Task t; t.type = SyncCollectionTree; mTaskList << t; scheduleNext(); } void ResourceScheduler::scheduleSync(const Collection & col) { Task t; t.type = SyncCollection; t.collection = col; mTaskList << t; scheduleNext(); } void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet &parts, const QDBusMessage & msg) { Task t; t.type = FetchItem; t.item = item; t.itemParts = parts; t.dbusMsg = msg; mTaskList << t; scheduleNext(); } void ResourceScheduler::scheduleChangeReplay() { Task t; t.type = ChangeReplay; if ( mTaskList.contains( t ) ) return; mTaskList << t; scheduleNext(); } void ResourceScheduler::taskDone() { + if ( isEmpty() ) + emit status( AgentBase::Idle ); mCurrentTask = Task(); scheduleNext(); } bool ResourceScheduler::isEmpty() { return mTaskList.isEmpty(); } void ResourceScheduler::scheduleNext() { if ( mCurrentTask.type != Invalid || mTaskList.isEmpty() || !mOnline ) return; QTimer::singleShot( 0, this, SLOT(executeNext()) ); } void ResourceScheduler::executeNext() { if( mCurrentTask.type != Invalid || mTaskList.isEmpty() ) return; mCurrentTask = mTaskList.takeFirst(); switch ( mCurrentTask.type ) { case SyncAll: emit executeFullSync(); break; case SyncCollectionTree: emit executeCollectionTreeSync(); break; case SyncCollection: emit executeCollectionSync( mCurrentTask.collection ); break; case FetchItem: emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); break; case ChangeReplay: emit executeChangeReplay(); break; default: Q_ASSERT( false ); } } ResourceScheduler::Task ResourceScheduler::currentTask() const { return mCurrentTask; } void ResourceScheduler::setOnline(bool state) { if ( mOnline == state ) return; mOnline = state; if ( mOnline ) { scheduleNext(); } else if ( mCurrentTask.type != Invalid ) { // abort running task mTaskList.prepend( mCurrentTask ); mCurrentTask = Task(); } } //@endcond #include "resourcescheduler.moc" diff --git a/akonadi/resourcescheduler.h b/akonadi/resourcescheduler.h index e13b0f4a6..95bbbff2e 100644 --- a/akonadi/resourcescheduler.h +++ b/akonadi/resourcescheduler.h @@ -1,146 +1,148 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_RESOURCESCHEDULER_H #define AKONADI_RESOURCESCHEDULER_H +#include #include #include #include #include #include namespace Akonadi { //@cond PRIVATE /** @internal Manages synchronization and fetch requests for a resource. @todo Attach to the ResourceBase Monitor, */ class ResourceScheduler : public QObject { Q_OBJECT public: enum TaskType { Invalid, SyncAll, SyncCollectionTree, SyncCollection, FetchItem, ChangeReplay }; class Task { public: Task() : type( Invalid ) {} TaskType type; Collection collection; Item item; QSet itemParts; QDBusMessage dbusMsg; bool operator==( const Task &other ) const { return type == other.type && collection == other.collection && item == other.item && itemParts == other.itemParts; } }; ResourceScheduler( QObject *parent = 0 ); /** Schedules a full synchronization. */ void scheduleFullSync(); /** Schedules a collection tree sync. */ void scheduleCollectionTreeSync(); /** Schedules the synchronization of a single collection. @param col The collection to synchronize. */ void scheduleSync( const Collection &col ); /** Schedules fetching of a single PIM item. @param item The item to fetch. @param parts List of names of the parts of the item to fetch. @param msg The associated D-Bus message. */ void scheduleItemFetch( const Item &item, const QSet &parts, const QDBusMessage &msg ); /** The current task has been finished */ void taskDone(); /** Returns true if no tasks are running or in the queue. */ bool isEmpty(); /** Returns the current task. */ Task currentTask() const; /** Sets the online state. */ void setOnline( bool state ); public Q_SLOTS: /** Schedules replaying changes. */ void scheduleChangeReplay(); Q_SIGNALS: void executeFullSync(); void executeCollectionSync( const Akonadi::Collection &col ); void executeCollectionTreeSync(); void executeItemFetch( const Akonadi::Item &item, const QSet &parts ); void executeChangeReplay(); + void status( int status, const QString &message = QString() ); private slots: void scheduleNext(); void executeNext(); private: QList mTaskList; Task mCurrentTask; bool mOnline; }; //@endcond } #endif