diff --git a/CMakeLists.txt b/CMakeLists.txt index 7a95f23..048d67d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,175 +1,175 @@ project(dferry) cmake_minimum_required(VERSION 3.1.0 FATAL_ERROR) list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) if (CMAKE_COMPILER_IS_GNUCXX) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Wextra -Werror -Wno-error=unused-result") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fvisibility=hidden -fvisibility-inlines-hidden") endif() set(CMAKE_CXX_STANDARD 11) if (WIN32 AND CMAKE_SYSTEM_VERSION VERSION_LESS 6.0) message(FATAL_ERROR "Windows Vista or later is required.") endif() include(TestBigEndian) if (BIGENDIAN) add_definitions(-DBIGENDIAN) endif() if (UNIX) add_definitions(-D__unix__) # help for platforms that don't define this standard macro endif() option(DFERRY_BUILD_ANALYZER "Build the dfer-analyzer bus analyzer GUI" TRUE) include(GNUInstallDirs) if (WIN32) # Windows doesn't have an RPATH equivalent, so just make sure that all .dll and .exe files # are located together, so that the .exes find the .dlls at runtime set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) else() set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) set(CMAKE_INSTALL_RPATH ${CMAKE_INSTALL_FULL_LIBDIR}) # add libdfer install dir to rpath set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) # add Qt (etc.) dir to rpath, if necessary endif() include_directories(${CMAKE_SOURCE_DIR}/buslogic ${CMAKE_SOURCE_DIR}/client ${CMAKE_SOURCE_DIR}/connection ${CMAKE_SOURCE_DIR}/events ${CMAKE_SOURCE_DIR}/serialization ${CMAKE_SOURCE_DIR}/util) set(DFER_SOURCES buslogic/authclient.cpp buslogic/connectaddress.cpp buslogic/imessagereceiver.cpp buslogic/pendingreply.cpp buslogic/transceiver.cpp - connection/iconnection.cpp - connection/iconnectionclient.cpp connection/ipserver.cpp connection/ipsocket.cpp connection/iserver.cpp + connection/itransport.cpp + connection/itransportlistener.cpp connection/stringtools.cpp events/event.cpp events/eventdispatcher.cpp events/foreigneventloopintegrator.cpp events/ieventpoller.cpp - events/iioeventclient.cpp + events/iioeventlistener.cpp events/platformtime.cpp events/timer.cpp serialization/arguments.cpp serialization/argumentsreader.cpp serialization/argumentswriter.cpp serialization/message.cpp util/error.cpp - util/icompletionclient.cpp + util/icompletionlistener.cpp util/types.cpp) if (UNIX) list(APPEND DFER_SOURCES connection/localserver.cpp connection/localsocket.cpp) endif() set(DFER_PUBLIC_HEADERS buslogic/connectaddress.h buslogic/imessagereceiver.h buslogic/pendingreply.h buslogic/transceiver.h client/introspection.h events/eventdispatcher.h events/foreigneventloopintegrator.h events/timer.h serialization/message.h serialization/arguments.h util/commutex.h util/error.h util/export.h - util/icompletionclient.h + util/icompletionlistener.h util/types.h util/valgrind-noop.h) set(DFER_PRIVATE_HEADERS buslogic/authclient.h - connection/iconnection.h - connection/iconnectionclient.h connection/ipserver.h connection/ipsocket.h connection/iserver.h + connection/itransport.h + connection/itransportlistener.h connection/platform.h connection/stringtools.h events/event.h events/ieventpoller.h - events/iioeventclient.h + events/iioeventlistener.h events/platformtime.h serialization/basictypeio.h) if (UNIX) list(APPEND DFER_PRIVATE_HEADERS connection/localserver.h connection/localsocket.h) endif() if (CMAKE_SYSTEM_NAME STREQUAL "Linux") list(APPEND DFER_SOURCES events/epolleventpoller.cpp) list(APPEND DFER_PRIVATE_HEADERS events/epolleventpoller.h) elseif(CMAKE_SYSTEM_NAME STREQUAL "Windows") list(APPEND DFER_PRIVATE_HEADERS events/selecteventpoller_win32.h util/winutil.h) list(APPEND DFER_SOURCES events/selecteventpoller_win32.cpp util/winutil.cpp) elseif(UNIX) list(APPEND DFER_PRIVATE_HEADERS events/selecteventpoller_unix.h) list(APPEND DFER_SOURCES events/selecteventpoller_unix.cpp) else() message(FATAL_ERROR "This operating system is not supported.") endif() set(DFER_HEADERS ${DFER_PUBLIC_HEADERS} ${DFER_PRIVATE_HEADERS}) add_library(dfer SHARED ${DFER_SOURCES} ${DFER_HEADERS}) target_include_directories(dfer INTERFACE "$") if (WIN32) target_link_libraries(dfer PRIVATE ws2_32) endif() find_package(LibTinyxml2 REQUIRED) # for the introspection parser in dferclient include_directories(${LIBTINYXML2_INCLUDE_DIRS}) find_package(Valgrind) # for checking homemade multithreading primitives if (VALGRIND_INCLUDE_DIR) add_definitions(-DHAVE_VALGRIND) include_directories(${VALGRIND_INCLUDE_DIR}) endif() # for small_vector, optional; small_vector appeared in 1.58 find_package(Boost 1.58) if (BOOST_FOUND) add_definitions(-DHAVE_BOOST) endif() set(DFERCLIENT_SOURCES client/introspection.h) set(DFERCLIENT_HEADERS client/introspection.cpp) add_library(dferclient SHARED ${DFERCLIENT_SOURCES} ${DFERCLIENT_HEADERS}) target_include_directories(dferclient INTERFACE "$") target_link_libraries(dferclient PUBLIC dfer PRIVATE ${LIBTINYXML2_LIBRARIES}) install(TARGETS dfer dferclient EXPORT dferryExports DESTINATION ${CMAKE_INSTALL_LIBDIR}) install(FILES ${DFER_PUBLIC_HEADERS} DESTINATION include/dferry) enable_testing() # need this here to get the "test" target in the toplevel build dir add_subdirectory(tests) add_subdirectory(applications) set(configModuleLocation "lib/cmake/dferry") install(EXPORT dferryExports DESTINATION "${configModuleLocation}" FILE dferryTargets.cmake) file(WRITE ${PROJECT_BINARY_DIR}/dferryConfig.cmake "include(\"\${CMAKE_CURRENT_LIST_DIR}/dferryTargets.cmake\")") install(FILES "${PROJECT_BINARY_DIR}/dferryConfig.cmake" DESTINATION "${configModuleLocation}") diff --git a/buslogic/authclient.cpp b/buslogic/authclient.cpp index 6828540..4c00fdd 100644 --- a/buslogic/authclient.cpp +++ b/buslogic/authclient.cpp @@ -1,162 +1,162 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "authclient.h" -#include "icompletionclient.h" -#include "iconnection.h" +#include "icompletionlistener.h" +#include "itransport.h" #include "stringtools.h" #include #include #include #ifdef __unix__ #include #include #endif #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include #include #include "winutil.h" #endif using namespace std; -AuthClient::AuthClient(IConnection *connection) +AuthClient::AuthClient(ITransport *transport) : m_state(InitialState), - m_completionClient(nullptr) + m_completionListener(nullptr) { cerr << "AuthClient constructing\n"; - connection->addClient(this); + transport->addListener(this); setReadNotificationEnabled(true); byte nullBuf[1] = { 0 }; - connection->write(chunk(nullBuf, 1)); + transport->write(chunk(nullBuf, 1)); stringstream uidEncoded; #ifdef _WIN32 // Most common (or rather... actually used) authentication method on Windows: // - Server publishes address of a nonce file; the file name is in a shared memory segment // - Client reads nonce file // - Client connects and sends the nonce data, TODO: before or after the null byte / is there a null byte? // - Client uses EXTERNAL auth and says which Windows security ID (SID) it intends to have uidEncoded << fetchWindowsSid(); #else // Most common (or rather... actually used) authentication method on Unix derivatives: // - Client sends a null byte so the server has something to receive with recvmsg() // - Server checks UID using SCM_CREDENTIALS, a mechanism of Unix local sockets // - Client uses EXTERNAL auth and says which Unix user ID it intends to have // The numeric UID is first encoded to ASCII ("1000") and the ASCII to hex... because. uidEncoded << geteuid(); #endif string extLine = "AUTH EXTERNAL " + hexEncode(uidEncoded.str()) + "\r\n"; cout << extLine; - connection->write(chunk(extLine.c_str(), extLine.length())); + transport->write(chunk(extLine.c_str(), extLine.length())); m_state = ExpectOkState; } bool AuthClient::isFinished() const { return m_state >= AuthenticationFailedState; } bool AuthClient::isAuthenticated() const { return m_state == AuthenticatedState; } -void AuthClient::setCompletionClient(ICompletionClient *client) +void AuthClient::setCompletionListener(ICompletionListener *listener) { - m_completionClient = client; + m_completionListener = listener; } -void AuthClient::handleConnectionCanRead() +void AuthClient::handleTransportCanRead() { bool wasFinished = isFinished(); while (!isFinished() && readLine()) { advanceState(); } - if (isFinished() && !wasFinished && m_completionClient) { - m_completionClient->handleCompletion(this); + if (isFinished() && !wasFinished && m_completionListener) { + m_completionListener->handleCompletion(this); } } bool AuthClient::readLine() { // don't care about performance here, this doesn't run often or process much data if (isEndOfLine()) { m_line.clear(); // start a new line } - while (connection()->availableBytesForReading()) { + while (transport()->availableBytesForReading()) { byte readBuf[1]; - chunk in = connection()->read(readBuf, 1); + chunk in = transport()->read(readBuf, 1); assert(in.length == 1); m_line += char(in.ptr[0]); if (isEndOfLine()) { return true; } } return false; } bool AuthClient::isEndOfLine() const { return m_line.length() >= 2 && m_line[m_line.length() - 2] == '\r' && m_line[m_line.length() - 1] == '\n'; } void AuthClient::advanceState() { // TODO authentication ping-pong done *properly* (grammar / some simple state machine), // but hey, this works for now! // some findings: // - the string after the server OK is its UUID that also appears in the address string cout << "> " << m_line; switch (m_state) { case ExpectOkState: { // TODO check the OK #ifdef __unix__ cstring negotiateLine("NEGOTIATE_UNIX_FD\r\n"); cout << negotiateLine.ptr; - connection()->write(chunk(negotiateLine.ptr, negotiateLine.length)); + transport()->write(chunk(negotiateLine.ptr, negotiateLine.length)); m_state = ExpectUnixFdResponseState; break; } case ExpectUnixFdResponseState: { #endif // TODO check the response cstring beginLine("BEGIN\r\n"); cout << beginLine.ptr; - connection()->write(chunk(beginLine.ptr, beginLine.length)); + transport()->write(chunk(beginLine.ptr, beginLine.length)); m_state = AuthenticatedState; break; } default: m_state = AuthenticationFailedState; - connection()->close(); + transport()->close(); } } diff --git a/buslogic/authclient.h b/buslogic/authclient.h index 6bcb325..c702f09 100644 --- a/buslogic/authclient.h +++ b/buslogic/authclient.h @@ -1,68 +1,68 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef AUTHCLIENT_H #define AUTHCLIENT_H -#include "iconnectionclient.h" +#include "itransportlistener.h" #include // TODO we are currently handling all authentication from here; later this class should only // enumerate client and server auth mechanisms and then instantiate and pass control to // the right IAuthMechanism implementation (with or without staying around as an intermediate). -class ICompletionClient; +class ICompletionListener; -class AuthClient : public IConnectionClient +class AuthClient : public ITransportListener { public: - explicit AuthClient(IConnection *connection); + explicit AuthClient(ITransport *transport); - // reimplemented from IConnectionClient - virtual void handleConnectionCanRead(); + // reimplemented from ITransportClient + virtual void handleTransportCanRead(); bool isFinished() const; bool isAuthenticated() const; - void setCompletionClient(ICompletionClient *); + void setCompletionListener(ICompletionListener *); private: bool readLine(); bool isEndOfLine() const; void advanceState(); enum State { InitialState, ExpectOkState, ExpectUnixFdResponseState, AuthenticationFailedState, AuthenticatedState }; State m_state; std::string m_line; - ICompletionClient *m_completionClient; + ICompletionListener *m_completionListener; }; #endif diff --git a/buslogic/pendingreply_p.h b/buslogic/pendingreply_p.h index 299065b..8211acc 100644 --- a/buslogic/pendingreply_p.h +++ b/buslogic/pendingreply_p.h @@ -1,69 +1,69 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef PENDINGREPLY_P_H #define PENDINGREPLY_P_H #include "error.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "message.h" #include "timer.h" class PendingReply; class TransceiverPrivate; -class PendingReplyPrivate : public ICompletionClient +class PendingReplyPrivate : public ICompletionListener { public: PendingReplyPrivate(EventDispatcher *dispatcher, int timeout) : m_replyTimeout(dispatcher), m_isFinished(false) { if (timeout >= 0) { m_replyTimeout.setRepeating(false); - m_replyTimeout.setCompletionClient(this); + m_replyTimeout.setCompletionListener(this); m_replyTimeout.start(timeout); } } // for Transceiver void handleReceived(Message *reply); void handleError(Error error); // for m_replyTimeout void handleCompletion(void *task) override; PendingReply *m_owner; union { TransceiverPrivate *transceiver; Message *reply; } m_transceiverOrReply; void *m_cookie; Timer m_replyTimeout; IMessageReceiver *m_receiver; Error m_error; uint32 m_serial; bool m_isFinished : 1; uint32 m_reserved : 31; }; #endif // PENDINGREPLY_P_H diff --git a/buslogic/transceiver.cpp b/buslogic/transceiver.cpp index c44cc05..dbfce1c 100644 --- a/buslogic/transceiver.cpp +++ b/buslogic/transceiver.cpp @@ -1,695 +1,695 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "transceiver.h" #include "transceiver_p.h" #include "arguments.h" #include "authclient.h" #include "event.h" #include "eventdispatcher_p.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "imessagereceiver.h" #include "iserver.h" #include "localsocket.h" #include "message.h" #include "message_p.h" #include "pendingreply.h" #include "pendingreply_p.h" #include "stringtools.h" #include #include #include using namespace std; class HelloReceiver : public IMessageReceiver { public: void handlePendingReplyFinished(PendingReply *pr) override { assert(pr == &m_helloReply); (void) pr; m_parent->handleHelloReply(); } PendingReply m_helloReply; // keep it here so it conveniently goes away when it's done TransceiverPrivate *m_parent; }; -class ClientConnectedHandler : public ICompletionClient +class ClientConnectedHandler : public ICompletionListener { public: ~ClientConnectedHandler() { delete m_server; } void handleCompletion(void *) override { m_parent->handleClientConnected(); } IServer *m_server; TransceiverPrivate *m_parent; }; TransceiverPrivate::TransceiverPrivate(EventDispatcher *dispatcher) : m_state(Unconnected), m_client(nullptr), m_receivingMessage(nullptr), - m_connection(nullptr), + m_transport(nullptr), m_helloReceiver(nullptr), m_clientConnectedHandler(nullptr), m_eventDispatcher(dispatcher), m_authClient(nullptr), m_defaultTimeout(25000), m_sendSerial(1), m_mainThreadTransceiver(nullptr) { } Transceiver::Transceiver(EventDispatcher *dispatcher, const ConnectAddress &ca) : d(new TransceiverPrivate(dispatcher)) { d->m_connectAddress = ca; assert(d->m_eventDispatcher); EventDispatcherPrivate::get(d->m_eventDispatcher)->m_transceiverToNotify = d; if (ca.bus() == ConnectAddress::Bus::None || ca.socketType() == ConnectAddress::SocketType::None || ca.role() == ConnectAddress::Role::None) { cerr << "\nTransceiver: connection constructor Exit A\n\n"; return; } if (ca.role() == ConnectAddress::Role::Server) { if (ca.bus() == ConnectAddress::Bus::PeerToPeer) { // this sets up a server that will be destroyed after accepting exactly one connection d->m_clientConnectedHandler = new ClientConnectedHandler; d->m_clientConnectedHandler->m_server = IServer::create(ca); d->m_clientConnectedHandler->m_server->setEventDispatcher(dispatcher); - d->m_clientConnectedHandler->m_server->setNewConnectionClient(d->m_clientConnectedHandler); + d->m_clientConnectedHandler->m_server->setNewConnectionListener(d->m_clientConnectedHandler); d->m_clientConnectedHandler->m_parent = d; d->m_state = TransceiverPrivate::ServerWaitingForClient; } else { cerr << "Transceiver constructor: bus server not supported.\n"; // state stays at Unconnected } } else { - d->m_connection = IConnection::create(ca); - d->m_connection->setEventDispatcher(dispatcher); + d->m_transport = ITransport::create(ca); + d->m_transport->setEventDispatcher(dispatcher); if (ca.bus() == ConnectAddress::Bus::Session || ca.bus() == ConnectAddress::Bus::System) { d->authAndHello(this); d->m_state = TransceiverPrivate::Authenticating; } else if (ca.bus() == ConnectAddress::Bus::PeerToPeer) { d->receiveNextMessage(); d->m_state = TransceiverPrivate::Connected; } } } Transceiver::Transceiver(EventDispatcher *dispatcher, CommRef mainTransceiverRef) : d(new TransceiverPrivate(dispatcher)) { EventDispatcherPrivate::get(d->m_eventDispatcher)->m_transceiverToNotify = d; d->m_mainThreadLink = std::move(mainTransceiverRef.commutex); CommutexLocker locker(&d->m_mainThreadLink); assert(locker.hasLock()); Commutex *const id = d->m_mainThreadLink.id(); if (!id) { assert(false); cerr << "\nTransceiver: slave constructor Exit A\n\n"; return; // stay in Unconnected state } // TODO how do we handle m_state? d->m_mainThreadTransceiver = mainTransceiverRef.transceiver; TransceiverPrivate *mainD = d->m_mainThreadTransceiver; // get the current values - if we got them from e.g. the CommRef they could be outdated // and we don't want to wait for more event ping-pong SpinLocker mainLocker(&mainD->m_lock); d->m_connectAddress = mainD->m_connectAddress; // register with the main Transceiver SecondaryTransceiverConnectEvent *evt = new SecondaryTransceiverConnectEvent(); evt->transceiver = d; evt->id = id; EventDispatcherPrivate::get(mainD->m_eventDispatcher) ->queueEvent(std::unique_ptr(evt)); } Transceiver::~Transceiver() { d->close(); - delete d->m_connection; + delete d->m_transport; delete d->m_authClient; delete d->m_helloReceiver; delete d->m_receivingMessage; delete d; d = nullptr; } void TransceiverPrivate::close() { // Can't be main and secondary at the main time - it could be made to work, but what for? assert(m_secondaryThreadLinks.empty() || !m_mainThreadTransceiver); if (m_mainThreadTransceiver) { CommutexUnlinker unlinker(&m_mainThreadLink); if (unlinker.hasLock()) { SecondaryTransceiverDisconnectEvent *evt = new SecondaryTransceiverDisconnectEvent(); evt->transceiver = this; EventDispatcherPrivate::get(m_mainThreadTransceiver->m_eventDispatcher) ->queueEvent(std::unique_ptr(evt)); } } // Destroy whatever is suitable and available at a given time, in order to avoid things like // one secondary thread blocking another indefinitely and smaller dependency-related slowdowns. while (!m_secondaryThreadLinks.empty()) { for (auto it = m_secondaryThreadLinks.begin(); it != m_secondaryThreadLinks.end(); ) { CommutexUnlinker unlinker(&it->second, false); if (unlinker.willSucceed()) { if (unlinker.hasLock()) { MainTransceiverDisconnectEvent *evt = new MainTransceiverDisconnectEvent(); EventDispatcherPrivate::get(it->first->m_eventDispatcher) ->queueEvent(std::unique_ptr(evt)); } unlinker.unlinkNow(); // don't access the element after erasing it, finish it now it = m_secondaryThreadLinks.erase(it); } else { ++it; // don't block, try again next iteration } } } cancelAllPendingReplies(); EventDispatcherPrivate::get(m_eventDispatcher)->m_transceiverToNotify = nullptr; } void TransceiverPrivate::authAndHello(Transceiver *parent) { - m_authClient = new AuthClient(m_connection); - m_authClient->setCompletionClient(this); + m_authClient = new AuthClient(m_transport); + m_authClient->setCompletionListener(this); // Announce our presence to the bus and have it send some introductory information of its own Message hello; hello.setType(Message::MethodCallMessage); hello.setExpectsReply(false); hello.setDestination(std::string("org.freedesktop.DBus")); hello.setInterface(std::string("org.freedesktop.DBus")); hello.setPath(std::string("/org/freedesktop/DBus")); hello.setMethod(std::string("Hello")); m_helloReceiver = new HelloReceiver; m_helloReceiver->m_helloReply = parent->send(std::move(hello)); m_helloReceiver->m_helloReply.setReceiver(m_helloReceiver); m_helloReceiver->m_parent = this; } void TransceiverPrivate::handleHelloReply() { if (!m_helloReceiver->m_helloReply.hasNonErrorReply()) { delete m_helloReceiver; m_helloReceiver = nullptr; m_state = Unconnected; // TODO set an error, provide access to it, also set it on messages when trying to send / receive them return; } Arguments argList = m_helloReceiver->m_helloReply.reply()->arguments(); delete m_helloReceiver; m_helloReceiver = nullptr; Arguments::Reader reader(argList); assert(reader.state() == Arguments::String); cstring busName = reader.readString(); assert(reader.state() == Arguments::Finished); m_uniqueName = toStdString(busName); // tell current secondaries UniqueNameReceivedEvent evt; evt.uniqueName = m_uniqueName; for (auto &it : m_secondaryThreadLinks) { CommutexLocker otherLocker(&it.second); if (otherLocker.hasLock()) { EventDispatcherPrivate::get(it.first->m_eventDispatcher) ->queueEvent(std::unique_ptr(new UniqueNameReceivedEvent(evt))); } } m_state = Connected; } void TransceiverPrivate::handleClientConnected() { - m_connection = m_clientConnectedHandler->m_server->takeNextConnection(); + m_transport = m_clientConnectedHandler->m_server->takeNextClient(); delete m_clientConnectedHandler; m_clientConnectedHandler = nullptr; - assert(m_connection); - m_connection->setEventDispatcher(m_eventDispatcher); + assert(m_transport); + m_transport->setEventDispatcher(m_eventDispatcher); receiveNextMessage(); m_state = Connected; } void Transceiver::setDefaultReplyTimeout(int msecs) { d->m_defaultTimeout = msecs; } int Transceiver::defaultReplyTimeout() const { return d->m_defaultTimeout; } uint32 TransceiverPrivate::takeNextSerial() { uint32 ret; do { ret = m_sendSerial.fetch_add(1, std::memory_order_relaxed); } while (unlikely(ret == 0)); return ret; } Error TransceiverPrivate::prepareSend(Message *msg) { if (!m_mainThreadTransceiver) { msg->setSerial(takeNextSerial()); } else { // we take a serial from the other Transceiver and then serialize locally in order to keep the CPU // expense of serialization local, even though it's more complicated than doing everything in the // other thread / Transceiver. CommutexLocker locker(&m_mainThreadLink); if (locker.hasLock()) { msg->setSerial(m_mainThreadTransceiver->takeNextSerial()); } else { return Error::LocalDisconnect; } } MessagePrivate *const mpriv = MessagePrivate::get(msg); // this is unchanged by move()ing the owning Message. if (!mpriv->serialize()) { return mpriv->m_error; } return Error::NoError; } void TransceiverPrivate::sendPreparedMessage(Message msg) { MessagePrivate *const mpriv = MessagePrivate::get(&msg); - mpriv->setCompletionClient(this); + mpriv->setCompletionListener(this); m_sendQueue.push_back(std::move(msg)); if (m_state == TransceiverPrivate::Connected && m_sendQueue.size() == 1) { // first in queue, don't wait for some other event to trigger sending - mpriv->send(m_connection); + mpriv->send(m_transport); } } PendingReply Transceiver::send(Message m, int timeoutMsecs) { if (timeoutMsecs == DefaultTimeout) { timeoutMsecs = d->m_defaultTimeout; } Error error = d->prepareSend(&m); PendingReplyPrivate *pendingPriv = new PendingReplyPrivate(d->m_eventDispatcher, timeoutMsecs); pendingPriv->m_transceiverOrReply.transceiver = d; pendingPriv->m_receiver = nullptr; pendingPriv->m_serial = m.serial(); // even if we're handing off I/O to a main Transceiver, keep a record because that simplifies // aborting all pending replies when we disconnect from the main Transceiver, no matter which // side initiated the disconnection. d->m_pendingReplies.emplace(m.serial(), pendingPriv); if (error.isError()) { // Signal the error asynchronously, in order to get the same delayed completion callback as in // the non-error case. This should make the behavior more predictable and client code harder to // accidentally get wrong. To detect errors immediately, PendingReply::error() can be used. pendingPriv->m_error = error; pendingPriv->m_replyTimeout.start(0); } else { if (!d->m_mainThreadTransceiver) { d->sendPreparedMessage(std::move(m)); } else { CommutexLocker locker(&d->m_mainThreadLink); if (locker.hasLock()) { std::unique_ptr evt(new SendMessageWithPendingReplyEvent); evt->message = std::move(m); evt->transceiver = d; EventDispatcherPrivate::get(d->m_mainThreadTransceiver->m_eventDispatcher) ->queueEvent(std::move(evt)); } else { pendingPriv->m_error = Error::LocalDisconnect; } } } return PendingReply(pendingPriv); } Error Transceiver::sendNoReply(Message m) { // ### (when not called from send()) warn if sending a message without the noreply flag set? // doing that is wasteful, but might be common. needs investigation. Error error = d->prepareSend(&m); if (error.isError()) { return error; } // pass ownership to the send queue now because if the IO system decided to send the message without // going through an event loop iteration, handleCompletion would be called and expects the message to // be in the queue if (!d->m_mainThreadTransceiver) { d->sendPreparedMessage(std::move(m)); } else { CommutexLocker locker(&d->m_mainThreadLink); if (locker.hasLock()) { std::unique_ptr evt(new SendMessageEvent); evt->message = std::move(m); EventDispatcherPrivate::get(d->m_mainThreadTransceiver->m_eventDispatcher) ->queueEvent(std::move(evt)); } else { return Error::LocalDisconnect; } } return Error::NoError; } ConnectAddress Transceiver::connectAddress() const { return d->m_connectAddress; } std::string Transceiver::uniqueName() const { return d->m_uniqueName; } bool Transceiver::isConnected() const { - return d->m_connection && d->m_connection->isOpen(); + return d->m_transport && d->m_transport->isOpen(); } EventDispatcher *Transceiver::eventDispatcher() const { return d->m_eventDispatcher; } IMessageReceiver *Transceiver::spontaneousMessageReceiver() const { return d->m_client; } void Transceiver::setSpontaneousMessageReceiver(IMessageReceiver *receiver) { d->m_client = receiver; } void TransceiverPrivate::handleCompletion(void *task) { switch (m_state) { case Authenticating: { assert(task == m_authClient); delete m_authClient; m_authClient = nullptr; // cout << "Authenticated.\n"; assert(!m_sendQueue.empty()); // the hello message should be in the queue - MessagePrivate::get(&m_sendQueue.front())->send(m_connection); + MessagePrivate::get(&m_sendQueue.front())->send(m_transport); receiveNextMessage(); m_state = AwaitingUniqueName; break; } case AwaitingUniqueName: // the code path for this only diverges in the PendingReply callback case Connected: { assert(!m_authClient); if (!m_sendQueue.empty() && task == &m_sendQueue.front()) { //cout << "Sent message.\n"; m_sendQueue.pop_front(); if (!m_sendQueue.empty()) { - MessagePrivate::get(&m_sendQueue.front())->send(m_connection); + MessagePrivate::get(&m_sendQueue.front())->send(m_transport); } } else { assert(task == m_receivingMessage); Message *const receivedMessage = m_receivingMessage; receiveNextMessage(); if (!maybeDispatchToPendingReply(receivedMessage)) { if (m_client) { m_client->handleSpontaneousMessageReceived(Message(move(*receivedMessage))); } // dispatch to other threads listening to spontaneous messages, if any for (auto it = m_secondaryThreadLinks.begin(); it != m_secondaryThreadLinks.end(); ) { SpontaneousMessageReceivedEvent *evt = new SpontaneousMessageReceivedEvent(); evt->message = *receivedMessage; CommutexLocker otherLocker(&it->second); if (otherLocker.hasLock()) { EventDispatcherPrivate::get(it->first->m_eventDispatcher) ->queueEvent(std::unique_ptr(evt)); ++it; } else { TransceiverPrivate *transceiver = it->first; it = m_secondaryThreadLinks.erase(it); discardPendingRepliesForSecondaryThread(transceiver); delete evt; } } delete receivedMessage; } } break; } default: // ### decide what to do here break; }; } bool TransceiverPrivate::maybeDispatchToPendingReply(Message *receivedMessage) { if (receivedMessage->type() != Message::MethodReturnMessage && receivedMessage->type() != Message::ErrorMessage) { return false; } auto it = m_pendingReplies.find(receivedMessage->replySerial()); if (it == m_pendingReplies.end()) { return false; } if (PendingReplyPrivate *pr = it->second.asPendingReply()) { m_pendingReplies.erase(it); assert(!pr->m_isFinished); pr->handleReceived(receivedMessage); } else { // forward to other thread's Transceiver TransceiverPrivate *transceiver = it->second.asTransceiver(); m_pendingReplies.erase(it); assert(transceiver); PendingReplySuccessEvent *evt = new PendingReplySuccessEvent; evt->reply = std::move(*receivedMessage); delete receivedMessage; EventDispatcherPrivate::get(transceiver->m_eventDispatcher)->queueEvent(std::unique_ptr(evt)); } return true; } void TransceiverPrivate::receiveNextMessage() { m_receivingMessage = new Message; MessagePrivate *const mpriv = MessagePrivate::get(m_receivingMessage); - mpriv->setCompletionClient(this); - mpriv->receive(m_connection); + mpriv->setCompletionListener(this); + mpriv->receive(m_transport); } void TransceiverPrivate::unregisterPendingReply(PendingReplyPrivate *p) { if (m_mainThreadTransceiver) { CommutexLocker otherLocker(&m_mainThreadLink); if (otherLocker.hasLock()) { PendingReplyCancelEvent *evt = new PendingReplyCancelEvent; evt->serial = p->m_serial; EventDispatcherPrivate::get(m_mainThreadTransceiver->m_eventDispatcher) ->queueEvent(std::unique_ptr(evt)); } } #ifndef NDEBUG auto it = m_pendingReplies.find(p->m_serial); assert(it != m_pendingReplies.end()); if (!m_mainThreadTransceiver) { assert(it->second.asPendingReply()); assert(it->second.asPendingReply() == p); } #endif m_pendingReplies.erase(p->m_serial); } void TransceiverPrivate::cancelAllPendingReplies() { // No locking because we should have no connections to other threads anymore at this point. // No const iteration followed by container clear because that has different semantics - many // things can happen in a callback... // In case we have pending replies for secondary threads, and we cancel all pending replies, // that is because we're shutting down, which we told the secondary thread, and it will deal // with bulk cancellation of replies. We just throw away our records about them. for (auto it = m_pendingReplies.begin() ; it != m_pendingReplies.end(); ) { PendingReplyPrivate *pendingPriv = it->second.asPendingReply(); it = m_pendingReplies.erase(it); if (pendingPriv) { // if from this thread pendingPriv->handleError(Error::LocalDisconnect); } } } void TransceiverPrivate::discardPendingRepliesForSecondaryThread(TransceiverPrivate *transceiver) { for (auto it = m_pendingReplies.begin() ; it != m_pendingReplies.end(); ) { if (it->second.asTransceiver() == transceiver) { it = m_pendingReplies.erase(it); // notification and deletion are handled on the event's source thread } else { ++it; } } } void TransceiverPrivate::processEvent(Event *evt) { // cerr << "TransceiverPrivate::processEvent() with event type " << evt->type << std::endl; switch (evt->type) { case Event::SendMessage: sendPreparedMessage(std::move(static_cast(evt)->message)); break; case Event::SendMessageWithPendingReply: { SendMessageWithPendingReplyEvent *pre = static_cast(evt); m_pendingReplies.emplace(pre->message.serial(), pre->transceiver); sendPreparedMessage(std::move(pre->message)); break; } case Event::SpontaneousMessageReceived: if (m_client) { SpontaneousMessageReceivedEvent *smre = static_cast(evt); m_client->handleSpontaneousMessageReceived(Message(move(smre->message))); } break; case Event::PendingReplySuccess: maybeDispatchToPendingReply(&static_cast(evt)->reply); break; case Event::PendingReplyFailure: { PendingReplyFailureEvent *prfe = static_cast(evt); const auto it = m_pendingReplies.find(prfe->m_serial); if (it == m_pendingReplies.end()) { // not a disaster, but when it happens in debug mode I want to check it out assert(false); break; } PendingReplyPrivate *pendingPriv = it->second.asPendingReply(); m_pendingReplies.erase(it); pendingPriv->handleError(prfe->m_error); break; } case Event::PendingReplyCancel: // This comes from a secondary thread, which handles PendingReply notification itself. m_pendingReplies.erase(static_cast(evt)->serial); break; case Event::SecondaryTransceiverConnect: { SecondaryTransceiverConnectEvent *sce = static_cast(evt); const auto it = find_if(m_unredeemedCommRefs.begin(), m_unredeemedCommRefs.end(), [sce](const CommutexPeer &item) { return item.id() == sce->id; } ); assert(it != m_unredeemedCommRefs.end()); const auto emplaced = m_secondaryThreadLinks.emplace(sce->transceiver, std::move(*it)).first; m_unredeemedCommRefs.erase(it); // "welcome package" - it's done (only) as an event to avoid locking order issues CommutexLocker locker(&emplaced->second); if (locker.hasLock()) { UniqueNameReceivedEvent *evt = new UniqueNameReceivedEvent; evt->uniqueName = m_uniqueName; EventDispatcherPrivate::get(sce->transceiver->m_eventDispatcher) ->queueEvent(std::unique_ptr(evt)); } break; } case Event::SecondaryTransceiverDisconnect: { SecondaryTransceiverDisconnectEvent *sde = static_cast(evt); // delete our records to make sure we don't call into it in the future! const auto found = m_secondaryThreadLinks.find(sde->transceiver); if (found == m_secondaryThreadLinks.end()) { // looks like we've noticed the disappearance of the other thread earlier return; } m_secondaryThreadLinks.erase(found); discardPendingRepliesForSecondaryThread(sde->transceiver); break; } case Event::MainTransceiverDisconnect: // since the main thread *sent* us the event, it already knows to drop all our PendingReplies m_mainThreadTransceiver = nullptr; cancelAllPendingReplies(); break; case Event::UniqueNameReceived: // We get this when the unique name became available after we were linked up with the main thread m_uniqueName = static_cast(evt)->uniqueName; break; } } Transceiver::CommRef Transceiver::createCommRef() { // TODO this is a good time to clean up "dead" CommRefs, where the counterpart was destroyed. CommRef ret; ret.transceiver = d; pair link = CommutexPeer::createLink(); { SpinLocker mainLocker(&d->m_lock); d->m_unredeemedCommRefs.emplace_back(move(link.first)); } ret.commutex = move(link.second); return ret; } diff --git a/buslogic/transceiver_p.h b/buslogic/transceiver_p.h index 945fa25..77686e7 100644 --- a/buslogic/transceiver_p.h +++ b/buslogic/transceiver_p.h @@ -1,164 +1,164 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef TRANSCEIVER_P_H #define TRANSCEIVER_P_H #include "transceiver.h" #include "connectaddress.h" #include "eventdispatcher_p.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "spinlock.h" #include #include #include class AuthClient; class HelloReceiver; -class IConnection; class IMessageReceiver; +class ITransport; class ClientConnectedHandler; /* How to handle destruction of connected Transceivers Main thread transceiver destroyed: Need to - "cancel" registered PendingReplies from other threads (I guess also own ones, we're not doing that, I think...) - Make sure that other threads stop calling us because that's going to be a memory error when our instance has been deleted Secondary thread Transceiver destroyed: Need to - "cancel" PendingReplies registered in main thread - unregister from main thread as receiver of spontaneous messages because receiving events about it is going to be a memory error when our instance has been deleted Problem areas: - destroying a Transceiver with a locked lock (locked from another thread, obviously) - can solved by "thoroughly" disconnecting from everything before destruction - deadlocks / locking order - preliminary solution: always main Transceiver first, then secondary - what about the lock in EventDispatcher? - blocking: secondary blocking (as in waiting for an event - both Transceivers wait on *locks* of the other) on main is okay, it does that all the time anyway. main blocking on secondary is probably (not sure) not okay. Let's define some invariants: - When a Transceiver is destroyed, all its PendingReply instances must have been detached (completed with or without error) or destroyed. "Its" means sent through that Transceiver's send() method, not when a PendingReply is using the connection of the Transceiver but send() was called on the Transceiver of another thread. - When a master and a secondary transceiver try to communicate in any way, and the other party has been destroyed, communication will fail gracefully and there will be no crash or undefined behavior. Any pending replies that cannot finish successfully anymore will finish with an LocalDisconnect error. */ -class TransceiverPrivate : public ICompletionClient +class TransceiverPrivate : public ICompletionListener { public: static TransceiverPrivate *get(Transceiver *t) { return t->d; } TransceiverPrivate(EventDispatcher *dispatcher); void close(); void authAndHello(Transceiver *parent); void handleHelloReply(); void handleClientConnected(); uint32 takeNextSerial(); Error prepareSend(Message *msg); void sendPreparedMessage(Message msg); void handleCompletion(void *task) override; bool maybeDispatchToPendingReply(Message *m); void receiveNextMessage(); void unregisterPendingReply(PendingReplyPrivate *p); void cancelAllPendingReplies(); void discardPendingRepliesForSecondaryThread(TransceiverPrivate *t); // For cross-thread communication between thread Transceivers. We could have a more complete event // system, but there is currently no need, so keep it simple and limited. void processEvent(Event *evt); // called from thread-local EventDispatcher enum { Unconnected, ServerWaitingForClient, Authenticating, AwaitingUniqueName, Connected } m_state; IMessageReceiver *m_client; Message *m_receivingMessage; std::deque m_sendQueue; // waiting to be sent // only one of them can be non-null. exception: in the main thread, m_mainThreadTransceiver // equals this, so that the main thread knows it's the main thread and not just a thread-local // transceiver. - IConnection *m_connection; + ITransport *m_transport; HelloReceiver *m_helloReceiver; ClientConnectedHandler *m_clientConnectedHandler; EventDispatcher *m_eventDispatcher; ConnectAddress m_connectAddress; std::string m_uniqueName; AuthClient *m_authClient; int m_defaultTimeout; class PendingReplyRecord { public: PendingReplyRecord(PendingReplyPrivate *pr) : isForSecondaryThread(false), ptr(pr) {} PendingReplyRecord(TransceiverPrivate *tp) : isForSecondaryThread(true), ptr(tp) {} PendingReplyPrivate *asPendingReply() const { return isForSecondaryThread ? nullptr : static_cast(ptr); } TransceiverPrivate *asTransceiver() const { return isForSecondaryThread ? static_cast(ptr) : nullptr; } private: bool isForSecondaryThread; void *ptr; }; std::unordered_map m_pendingReplies; // replies we're waiting for Spinlock m_lock; // only one lock because things done with lock held are quick, and anyway you shouldn't // be using one connection from multiple threads if you need best performance std::atomic m_sendSerial; std::unordered_map m_secondaryThreadLinks; std::vector m_unredeemedCommRefs; // for createCommRef() and the constructor from CommRef TransceiverPrivate *m_mainThreadTransceiver; CommutexPeer m_mainThreadLink; }; #endif // TRANSCEIVER_P_H diff --git a/connection/ipserver.cpp b/connection/ipserver.cpp index f390161..7d6d252 100644 --- a/connection/ipserver.cpp +++ b/connection/ipserver.cpp @@ -1,128 +1,128 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "ipserver.h" #include "connectaddress.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "ipsocket.h" #ifdef __unix__ #include #include #include #include #endif #ifdef _WIN32 #include #endif #include #include #include IpServer::IpServer(const ConnectAddress &ca) : m_listenFd(-1) { assert(ca.socketType() == ConnectAddress::SocketType::Ip); const FileDescriptor fd = socket(AF_INET, SOCK_STREAM, 0); if (!isValidFileDescriptor(fd)) { std::cerr << "IpServer contruction failed A.\n"; return; } #ifdef __unix__ // don't let forks inherit the file descriptor - just in case fcntl(fd, F_SETFD, FD_CLOEXEC); #endif struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(ca.port()); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); bool ok = bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == 0; ok = ok && (::listen(fd, /* max queued incoming connections */ 64) == 0); if (ok) { m_listenFd = fd; } else { std::cerr << "IpServer contruction failed B.\n"; #ifdef _WIN32 closesocket(fd); #else ::close(fd); #endif } } IpServer::~IpServer() { close(); } void IpServer::handleCanRead() { setEventDispatcher(nullptr); FileDescriptor connFd = accept(m_listenFd, nullptr, nullptr); if (!isValidFileDescriptor(connFd)) { std::cerr << "\nIpServer::notifyRead(): accept() failed.\n\n"; return; } #ifdef __unix__ fcntl(connFd, F_SETFD, FD_CLOEXEC); #endif m_incomingConnections.push_back(new IpSocket(connFd)); - if (m_newConnectionClient) { - m_newConnectionClient->handleCompletion(this); + if (m_newConnectionListener) { + m_newConnectionListener->handleCompletion(this); } } void IpServer::handleCanWrite() { // We never registered this to be called, so... assert(false); } bool IpServer::isListening() const { return isValidFileDescriptor(m_listenFd); } void IpServer::close() { if (isValidFileDescriptor(m_listenFd)) { #ifdef _WIN32 closesocket(m_listenFd); #else ::close(m_listenFd); #endif m_listenFd = InvalidFileDescriptor; } } FileDescriptor IpServer::fileDescriptor() const { return m_listenFd; } diff --git a/connection/ipsocket.cpp b/connection/ipsocket.cpp index 64c1fdc..f4a685d 100644 --- a/connection/ipsocket.cpp +++ b/connection/ipsocket.cpp @@ -1,237 +1,237 @@ /* Copyright (C) 2015 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "ipsocket.h" #include "connectaddress.h" #ifdef __unix__ #include #include #include #include #include #endif #ifdef _WIN32 #include typedef SSIZE_T ssize_t; #endif #include #include #include #include #include #include // HACK, put this somewhere else (get the value from original d-bus? or is it infinite?) static const int maxFds = 12; using namespace std; IpSocket::IpSocket(const ConnectAddress &ca) : m_fd(-1) { assert(ca.socketType() == ConnectAddress::SocketType::Ip); #ifdef _WIN32 WSAData wsadata; // IPv6 requires Winsock v2.0 or better (but we're not using IPv6 - yet!) if (WSAStartup(MAKEWORD(2, 0), &wsadata) != 0) { std::cerr << "IpSocket contruction failed A.\n"; return; } #endif const FileDescriptor fd = socket(AF_INET, SOCK_STREAM, 0); if (!isValidFileDescriptor(fd)) { std::cerr << "IpSocket contruction failed B.\n"; return; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(ca.port()); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); bool ok = connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == 0; // only make it non-blocking after connect() because Winsock returns // WSAEWOULDBLOCK when connecting a non-blocking socket #ifdef _WIN32 unsigned long value = 1; // 0 blocking, != 0 non-blocking if (ioctlsocket(fd, FIONBIO, &value) != NO_ERROR) { // something along the lines of... WS_ERROR_DEBUG(WSAGetLastError()); std::cerr << "IpSocket contruction failed C.\n"; closesocket(fd); return; } #else // don't let forks inherit the file descriptor - that can cause confusion... fcntl(fd, F_SETFD, FD_CLOEXEC); // To be able to use the same send() and recv() calls as Windows, also set the non-blocking // property on the socket descriptor here instead of passing MSG_DONTWAIT to send() and recv(). const int oldFlags = fcntl(fd, F_GETFL); if (oldFlags == -1) { ::close(fd); std::cerr << "IpSocket contruction failed D.\n"; return; } fcntl(fd, F_SETFL, oldFlags & O_NONBLOCK); #endif if (ok) { m_fd = fd; } else { #ifdef _WIN32 std::cerr << "IpSocket contruction failed E. Error is " << WSAGetLastError() << ".\n"; closesocket(fd); #else std::cerr << "IpSocket contruction failed E. Error is " << errno << ".\n"; ::close(fd); #endif } } IpSocket::IpSocket(FileDescriptor fd) : m_fd(fd) { } IpSocket::~IpSocket() { close(); #ifdef _WIN32 WSACleanup(); #endif } void IpSocket::close() { setEventDispatcher(nullptr); if (isValidFileDescriptor(m_fd)) { #ifdef _WIN32 closesocket(m_fd); #else ::close(m_fd); #endif m_fd = InvalidFileDescriptor; } } uint32 IpSocket::write(chunk a) { if (!isValidFileDescriptor(m_fd)) { std::cerr << "\nIpSocket::write() failed A.\n\n"; return 0; // TODO -1 and return int32? } const uint32 initialLength = a.length; while (a.length > 0) { ssize_t nbytes = send(m_fd, reinterpret_cast(a.ptr), a.length, 0); if (nbytes < 0) { if (errno == EINTR) { continue; } // see EAGAIN comment in LocalSocket::read() if (errno == EAGAIN) { break; } close(); return false; } a.ptr += nbytes; a.length -= uint32(nbytes); } return initialLength - a.length; } uint32 IpSocket::availableBytesForReading() { #ifdef _WIN32 u_long available = 0; if (ioctlsocket(m_fd, FIONREAD, &available) != NO_ERROR) { #else uint32 available = 0; if (ioctl(m_fd, FIONREAD, &available) < 0) { #endif available = 0; } return uint32(available); } chunk IpSocket::read(byte *buffer, uint32 maxSize) { chunk ret; if (maxSize <= 0) { std::cerr << "\nIpSocket::read() failed A.\n\n"; return ret; } ret.ptr = buffer; while (maxSize > 0) { ssize_t nbytes = recv(m_fd, reinterpret_cast(buffer), maxSize, 0); if (nbytes < 0) { if (errno == EINTR) { continue; } // see comment in LocalSocket for rationale of EAGAIN behavior if (errno == EAGAIN) { break; } close(); return ret; } ret.length += uint32(nbytes); buffer += nbytes; maxSize -= uint32(nbytes); } return ret; } bool IpSocket::isOpen() { return isValidFileDescriptor(m_fd); } FileDescriptor IpSocket::fileDescriptor() const { return m_fd; } void IpSocket::handleCanRead() { if (availableBytesForReading()) { - IConnection::handleCanRead(); + ITransport::handleCanRead(); } else { // This should really only happen in error cases! ### TODO test? close(); } } diff --git a/connection/ipsocket.h b/connection/ipsocket.h index a15182d..4195b11 100644 --- a/connection/ipsocket.h +++ b/connection/ipsocket.h @@ -1,63 +1,63 @@ /* Copyright (C) 2015 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef IPSOCKET_H #define IPSOCKET_H -#include "iconnection.h" +#include "itransport.h" #include class ConnectAddress; -class IpSocket : public IConnection +class IpSocket : public ITransport { public: // Connect to local socket at socketFilePath IpSocket(const ConnectAddress &ca); // Use an already open file descriptor IpSocket(FileDescriptor fd); ~IpSocket(); - // pure virtuals from IConnection + // pure virtuals from ITransport uint32 write(chunk data) override; uint32 availableBytesForReading() override; chunk read(byte *buffer, uint32 maxSize) override; void close() override; bool isOpen() override; FileDescriptor fileDescriptor() const override; void handleCanRead() override; - // end IConnection + // end ITransport IpSocket() = delete; IpSocket(const IpSocket &) = delete; IpSocket &operator=(const IpSocket &) = delete; private: friend class IEventLoop; FileDescriptor m_fd; }; #endif // IPSOCKET_H diff --git a/connection/iserver.cpp b/connection/iserver.cpp index 49526b6..db3c093 100644 --- a/connection/iserver.cpp +++ b/connection/iserver.cpp @@ -1,105 +1,105 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "iserver.h" #include "connectaddress.h" #include "eventdispatcher_p.h" -#include "iconnection.h" +#include "itransport.h" #include "ipserver.h" #ifdef __unix__ #include "localserver.h" #endif #include IServer::IServer() - : m_newConnectionClient(nullptr), + : m_newConnectionListener(nullptr), m_eventDispatcher(nullptr) { } IServer::~IServer() { - for (IConnection *c : m_incomingConnections) { + for (ITransport *c : m_incomingConnections) { delete c; } } //static IServer *IServer::create(const ConnectAddress &ca) { if (ca.bus() != ConnectAddress::Bus::PeerToPeer) { return nullptr; } switch (ca.socketType()) { #ifdef __unix__ case ConnectAddress::SocketType::Unix: return new LocalServer(ca.path()); case ConnectAddress::SocketType::AbstractUnix: return new LocalServer(std::string(1, '\0') + ca.path()); #endif case ConnectAddress::SocketType::Ip: return new IpServer(ca); default: return nullptr; } } -IConnection *IServer::takeNextConnection() +ITransport *IServer::takeNextClient() { if (m_incomingConnections.empty()) { return nullptr; } - IConnection *ret = m_incomingConnections.front(); + ITransport *ret = m_incomingConnections.front(); m_incomingConnections.pop_front(); return ret; } -void IServer::setNewConnectionClient(ICompletionClient *client) +void IServer::setNewConnectionListener(ICompletionListener *listener) { - m_newConnectionClient = client; + m_newConnectionListener = listener; } void IServer::setEventDispatcher(EventDispatcher *ed) { if (m_eventDispatcher == ed) { return; } if (m_eventDispatcher) { EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); - ep->removeIoEventClient(this); + ep->removeIoEventListener(this); } m_eventDispatcher = ed; if (m_eventDispatcher) { EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); - ep->addIoEventClient(this); + ep->addIoEventListener(this); ep->setReadWriteInterest(this, true, false); } } EventDispatcher *IServer::eventDispatcher() const { return m_eventDispatcher; } diff --git a/connection/iserver.h b/connection/iserver.h index 5db4196..fc928c2 100644 --- a/connection/iserver.h +++ b/connection/iserver.h @@ -1,67 +1,67 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef ISERVER_H #define ISERVER_H -#include "iioeventclient.h" +#include "iioeventlistener.h" #include "platform.h" #include "types.h" #include class ConnectAddress; class EventDispatcher; -class IConnection; -class ICompletionClient; +class ITransport; +class ICompletionListener; -class IServer : public IioEventClient +class IServer : public IioEventListener { public: IServer(); // TODO event dispatcher as constructor argument? virtual ~IServer(); virtual bool isListening() const = 0; - void setNewConnectionClient(ICompletionClient *client); // notified once on every new connection + void setNewConnectionListener(ICompletionListener *listener); // notified once on every new connection - IConnection *takeNextConnection(); + ITransport *takeNextClient(); virtual void close() = 0; virtual void setEventDispatcher(EventDispatcher *ed) override; virtual EventDispatcher *eventDispatcher() const override; static IServer *create(const ConnectAddress &connectAddress); protected: friend class EventDispatcher; - // handleCanRead() and handleCanWrite() from IioEventClient stay pure virtual + // handleCanRead() and handleCanWrite() from IioEventListener stay pure virtual - std::deque m_incomingConnections; - ICompletionClient *m_newConnectionClient; + std::deque m_incomingConnections; + ICompletionListener *m_newConnectionListener; private: EventDispatcher *m_eventDispatcher; }; #endif // ISERVER_H diff --git a/connection/iconnection.cpp b/connection/itransport.cpp similarity index 64% rename from connection/iconnection.cpp rename to connection/itransport.cpp index 79366ea..c8758bb 100644 --- a/connection/iconnection.cpp +++ b/connection/itransport.cpp @@ -1,174 +1,174 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#include "iconnection.h" +#include "itransport.h" #include "eventdispatcher.h" #include "eventdispatcher_p.h" -#include "iconnectionclient.h" +#include "itransportlistener.h" #include "ipsocket.h" #include "connectaddress.h" #ifdef __unix__ #include "localsocket.h" #endif #include #include using namespace std; -IConnection::IConnection() +ITransport::ITransport() : m_supportsFileDescriptors(false), m_eventDispatcher(0), m_readNotificationEnabled(false), m_writeNotificationEnabled(false) { } -IConnection::~IConnection() +ITransport::~ITransport() { - vector clientsCopy = m_clients; - for (size_t i = clientsCopy.size() - 1; i + 1 > 0; i--) { - removeClient(clientsCopy[i]); // LIFO (stack) order seems safest... + vector listenersCopy = m_listeners; + for (size_t i = listenersCopy.size() - 1; i + 1 > 0; i--) { + removeListener(listenersCopy[i]); // LIFO (stack) order seems safest... } } -chunk IConnection::readWithFileDescriptors(byte *buffer, uint32 maxSize, vector *) +chunk ITransport::readWithFileDescriptors(byte *buffer, uint32 maxSize, vector *) { return read(buffer, maxSize); } -uint32 IConnection::writeWithFileDescriptors(chunk data, const vector &) +uint32 ITransport::writeWithFileDescriptors(chunk data, const vector &) { return write(data); } -void IConnection::addClient(IConnectionClient *client) +void ITransport::addListener(ITransportListener *listener) { - if (find(m_clients.begin(), m_clients.end(), client) != m_clients.end()) { + if (find(m_listeners.begin(), m_listeners.end(), listener) != m_listeners.end()) { return; } - m_clients.push_back(client); - client->m_connection = this; + m_listeners.push_back(listener); + listener->m_transport = this; if (m_eventDispatcher) { updateReadWriteInterest(); } } -void IConnection::removeClient(IConnectionClient *client) +void ITransport::removeListener(ITransportListener *listener) { - vector::iterator it = find(m_clients.begin(), m_clients.end(), client); - if (it == m_clients.end()) { + vector::iterator it = find(m_listeners.begin(), m_listeners.end(), listener); + if (it == m_listeners.end()) { return; } - m_clients.erase(it); - client->m_connection = 0; + m_listeners.erase(it); + listener->m_transport = nullptr; if (m_eventDispatcher) { updateReadWriteInterest(); } } -void IConnection::updateReadWriteInterest() +void ITransport::updateReadWriteInterest() { bool readInterest = false; bool writeInterest = false; - for (IConnectionClient *client : m_clients) { - if (client->readNotificationEnabled()) { + for (ITransportListener *listener : m_listeners) { + if (listener->readNotificationEnabled()) { readInterest = true; } - if (client->writeNotificationEnabled()) { + if (listener->writeNotificationEnabled()) { writeInterest = true; } } if (readInterest != m_readNotificationEnabled || writeInterest != m_writeNotificationEnabled) { m_readNotificationEnabled = readInterest; m_writeNotificationEnabled = writeInterest; if (m_eventDispatcher) { EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); ep->setReadWriteInterest(this, m_readNotificationEnabled, m_writeNotificationEnabled); } } } -void IConnection::setEventDispatcher(EventDispatcher *ed) +void ITransport::setEventDispatcher(EventDispatcher *ed) { if (m_eventDispatcher == ed) { return; } if (m_eventDispatcher) { EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); - ep->removeIoEventClient(this); + ep->removeIoEventListener(this); } m_eventDispatcher = ed; if (m_eventDispatcher) { EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); - ep->addIoEventClient(this); + ep->addIoEventListener(this); m_readNotificationEnabled = false; m_writeNotificationEnabled = false; updateReadWriteInterest(); } } -EventDispatcher *IConnection::eventDispatcher() const +EventDispatcher *ITransport::eventDispatcher() const { return m_eventDispatcher; } -void IConnection::handleCanRead() +void ITransport::handleCanRead() { - for (IConnectionClient *client : m_clients) { - if (client->readNotificationEnabled()) { - client->handleConnectionCanRead(); + for (ITransportListener *listener : m_listeners) { + if (listener->readNotificationEnabled()) { + listener->handleTransportCanRead(); break; } } } -void IConnection::handleCanWrite() +void ITransport::handleCanWrite() { - for (IConnectionClient *client : m_clients) { - if (client->writeNotificationEnabled()) { - client->handleConnectionCanWrite(); + for (ITransportListener *listener : m_listeners) { + if (listener->writeNotificationEnabled()) { + listener->handleTransportCanWrite(); break; } } } //static -IConnection *IConnection::create(const ConnectAddress &ci) +ITransport *ITransport::create(const ConnectAddress &ci) { switch (ci.socketType()) { #ifdef __unix__ case ConnectAddress::SocketType::Unix: return new LocalSocket(ci.path()); case ConnectAddress::SocketType::AbstractUnix: return new LocalSocket(string(1, '\0') + ci.path()); #endif case ConnectAddress::SocketType::Ip: return new IpSocket(ci); default: assert(false); return nullptr; } } diff --git a/connection/iconnection.h b/connection/itransport.h similarity index 74% rename from connection/iconnection.h rename to connection/itransport.h index 0119bd1..8e613f2 100644 --- a/connection/iconnection.h +++ b/connection/itransport.h @@ -1,85 +1,85 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#ifndef ICONNECTION_H -#define ICONNECTION_H +#ifndef ITRANSPORT_H +#define ITRANSPORT_H -#include "iioeventclient.h" +#include "iioeventlistener.h" #include "platform.h" #include "types.h" #include class ConnectAddress; class EventDispatcher; -class IConnectionClient; +class ITransportListener; class SelectEventPoller; -class IConnection : public IioEventClient +class ITransport : public IioEventListener { public: - // An IConnection subclass must have a file descriptor after construction and it must not change + // An ITransport subclass must have a file descriptor after construction and it must not change // except to the invalid file descriptor when disconnected. - IConnection(); // TODO event dispatcher as constructor argument? - virtual ~IConnection(); + ITransport(); // TODO event dispatcher as constructor argument? + virtual ~ITransport(); - // usually, the maximum sensible number of clients is two: one for reading and one for writing. + // usually, the maximum sensible number of listeners is two: one for reading and one for writing. // avoiding (independent) readers and writers blocking each other is good for IO efficiency. - void addClient(IConnectionClient *client); - void removeClient(IConnectionClient *client); + void addListener(ITransportListener *listener); + void removeListener(ITransportListener *listener); virtual uint32 availableBytesForReading() = 0; virtual chunk read(byte *buffer, uint32 maxSize) = 0; virtual chunk readWithFileDescriptors(byte *buffer, uint32 maxSize, std::vector *fileDescriptors); virtual uint32 write(chunk data) = 0; virtual uint32 writeWithFileDescriptors(chunk data, const std::vector &fileDescriptors); virtual void close() = 0; virtual bool isOpen() = 0; void setEventDispatcher(EventDispatcher *ed) override; EventDispatcher *eventDispatcher() const override; // factory method - creates a suitable subclass to connect to address - static IConnection *create(const ConnectAddress &connectAddress); + static ITransport *create(const ConnectAddress &connectAddress); protected: friend class EventDispatcher; - // IioEventClient + // IioEventListener void handleCanRead() override; void handleCanWrite() override; bool m_supportsFileDescriptors; private: - friend class IConnectionClient; + friend class ITransportListener; friend class SelectEventPoller; - void updateReadWriteInterest(); // called internally and from IConnectionClient + void updateReadWriteInterest(); // called internally and from ITransportListener EventDispatcher *m_eventDispatcher; - std::vector m_clients; + std::vector m_listeners; bool m_readNotificationEnabled; bool m_writeNotificationEnabled; }; -#endif // ICONNECTION_H +#endif // ITRANSPORT_H diff --git a/connection/iconnectionclient.cpp b/connection/itransportlistener.cpp similarity index 65% rename from connection/iconnectionclient.cpp rename to connection/itransportlistener.cpp index ee99758..bede2c8 100644 --- a/connection/iconnectionclient.cpp +++ b/connection/itransportlistener.cpp @@ -1,83 +1,83 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#include "iconnectionclient.h" +#include "itransportlistener.h" -#include "iconnection.h" +#include "itransport.h" -IConnectionClient::IConnectionClient() +ITransportListener::ITransportListener() : m_readNotificationEnabled(false), m_writeNotificationEnabled(false), - m_connection(0) + m_transport(0) { } -IConnectionClient::~IConnectionClient() +ITransportListener::~ITransportListener() { - if (m_connection) { - m_connection->removeClient(this); + if (m_transport) { + m_transport->removeListener(this); } - m_connection = 0; + m_transport = 0; } -void IConnectionClient::setReadNotificationEnabled(bool enable) +void ITransportListener::setReadNotificationEnabled(bool enable) { if (enable == m_readNotificationEnabled) { return; } m_readNotificationEnabled = enable; - m_connection->updateReadWriteInterest(); + m_transport->updateReadWriteInterest(); } -bool IConnectionClient::readNotificationEnabled() const +bool ITransportListener::readNotificationEnabled() const { return m_readNotificationEnabled; } -void IConnectionClient::setWriteNotificationEnabled(bool enable) +void ITransportListener::setWriteNotificationEnabled(bool enable) { if (enable == m_writeNotificationEnabled) { return; } m_writeNotificationEnabled = enable; - m_connection->updateReadWriteInterest(); + m_transport->updateReadWriteInterest(); } -bool IConnectionClient::writeNotificationEnabled() const +bool ITransportListener::writeNotificationEnabled() const { return m_writeNotificationEnabled; } -void IConnectionClient::handleConnectionCanRead() +void ITransportListener::handleTransportCanRead() { } -void IConnectionClient::handleConnectionCanWrite() +void ITransportListener::handleTransportCanWrite() { } -IConnection *IConnectionClient::connection() const +ITransport *ITransportListener::transport() const { - return m_connection; + return m_transport; } diff --git a/connection/iconnectionclient.h b/connection/itransportlistener.h similarity index 76% rename from connection/iconnectionclient.h rename to connection/itransportlistener.h index 418251c..9dcbf04 100644 --- a/connection/iconnectionclient.h +++ b/connection/itransportlistener.h @@ -1,55 +1,55 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#ifndef ICONNECTIONCLIENT_H -#define ICONNECTIONCLIENT_H +#ifndef ITRANSPORTLISTENER_H +#define ITRANSPORTLISTENER_H -class IConnection; +class ITransport; -class IConnectionClient +class ITransportListener { public: - IConnectionClient(); - virtual ~IConnectionClient(); + ITransportListener(); + virtual ~ITransportListener(); void setReadNotificationEnabled(bool enable); bool readNotificationEnabled() const; void setWriteNotificationEnabled(bool enable); bool writeNotificationEnabled() const; // public mainly for testing purposes - only call if you know what you're doing // no-op default implementations are provided so you only need to reimplement what you need - virtual void handleConnectionCanRead(); - virtual void handleConnectionCanWrite(); + virtual void handleTransportCanRead(); + virtual void handleTransportCanWrite(); protected: - IConnection *connection() const; // returns m_connection + ITransport *transport() const; // returns m_transport bool m_readNotificationEnabled; bool m_writeNotificationEnabled; - friend class IConnection; + friend class ITransport; private: - IConnection *m_connection; // set from IConnection::addClient() / removeClient() + ITransport *m_transport; // set from ITransport::addListener() / removeListener() }; -#endif // ICONNECTIONCLIENT_H +#endif // ITRANSPORTLISTENER_H diff --git a/connection/localserver.cpp b/connection/localserver.cpp index be4ee99..7ed3486 100644 --- a/connection/localserver.cpp +++ b/connection/localserver.cpp @@ -1,110 +1,110 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "localserver.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "localsocket.h" #include #include #include #include #include #include LocalServer::LocalServer(const std::string &socketFilePath) : m_listenFd(-1) { const int fd = socket(PF_UNIX, SOCK_STREAM, 0); if (fd < 0) { return; } // don't let forks inherit the file descriptor - just in case fcntl(fd, F_SETFD, FD_CLOEXEC); struct sockaddr_un addr; addr.sun_family = PF_UNIX; bool ok = socketFilePath.length() < sizeof(addr.sun_path); if (ok) { memcpy(addr.sun_path, socketFilePath.c_str(), socketFilePath.length()); } if (!socketFilePath.empty() && socketFilePath[0] != '\0') { // not a so-called abstract socket (weird but useful Linux specialty) unlink(socketFilePath.c_str()); } ok = ok && (bind(fd, (struct sockaddr *)&addr, sizeof(sa_family_t) + socketFilePath.length()) == 0); ok = ok && (::listen(fd, /* max queued incoming connections */ 64) == 0); if (ok) { m_listenFd = fd; } else { ::close(fd); } } LocalServer::~LocalServer() { close(); } void LocalServer::handleCanRead() { setEventDispatcher(nullptr); int connFd = accept(m_listenFd, nullptr, nullptr); if (connFd < 0) { return; } fcntl(connFd, F_SETFD, FD_CLOEXEC); m_incomingConnections.push_back(new LocalSocket(connFd)); - if (m_newConnectionClient) { - m_newConnectionClient->handleCompletion(this); + if (m_newConnectionListener) { + m_newConnectionListener->handleCompletion(this); } } void LocalServer::handleCanWrite() { // We never registered this to be called, so... assert(false); } bool LocalServer::isListening() const { return m_listenFd >= 0; } void LocalServer::close() { if (m_listenFd >= 0) { ::close(m_listenFd); m_listenFd = -1; } } FileDescriptor LocalServer::fileDescriptor() const { return m_listenFd; } diff --git a/connection/localsocket.cpp b/connection/localsocket.cpp index 4499a07..d0a1f5e 100644 --- a/connection/localsocket.cpp +++ b/connection/localsocket.cpp @@ -1,308 +1,308 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "localsocket.h" #include #include #include #include #include "sys/uio.h" #include #include #include #include #include #include // HACK, put this somewhere else (get the value from original d-bus? or is it infinite?) static const int maxFds = 12; using namespace std; LocalSocket::LocalSocket(const string &socketFilePath) : m_fd(-1) { m_supportsFileDescriptors = true; const int fd = socket(PF_UNIX, SOCK_STREAM, 0); if (fd < 0) { return; } // don't let forks inherit the file descriptor - that can cause confusion... fcntl(fd, F_SETFD, FD_CLOEXEC); struct sockaddr_un addr; addr.sun_family = PF_UNIX; bool ok = socketFilePath.length() < sizeof(addr.sun_path); if (ok) { memcpy(addr.sun_path, socketFilePath.c_str(), socketFilePath.length()); } ok = ok && (connect(fd, (struct sockaddr *)&addr, sizeof(sa_family_t) + socketFilePath.length()) == 0); if (ok) { m_fd = fd; } else { ::close(fd); } } LocalSocket::LocalSocket(int fd) : m_fd(fd) { } LocalSocket::~LocalSocket() { close(); } void LocalSocket::close() { setEventDispatcher(nullptr); if (m_fd >= 0) { ::close(m_fd); } m_fd = -1; } uint32 LocalSocket::write(chunk data) { if (m_fd < 0) { return 0; // TODO -1? } const uint32 initialLength = data.length; while (data.length > 0) { ssize_t nbytes = send(m_fd, data.ptr, data.length, MSG_DONTWAIT); if (nbytes < 0) { if (errno == EINTR) { continue; } // see EAGAIN comment in read() if (errno == EAGAIN /* && iov.iov_len < a.length */ ) { break; } close(); return false; } data.ptr += nbytes; data.length -= size_t(nbytes); } return initialLength - data.length; } // TODO: consider using iovec to avoid "copying together" message parts before sending; iovec tricks // are probably not going to help for receiving, though. uint32 LocalSocket::writeWithFileDescriptors(chunk data, const vector &fileDescriptors) { if (m_fd < 0) { return 0; // TODO -1? } // sendmsg boilerplate struct msghdr send_msg; struct iovec iov; send_msg.msg_name = 0; send_msg.msg_namelen = 0; send_msg.msg_flags = 0; send_msg.msg_iov = &iov; send_msg.msg_iovlen = 1; iov.iov_base = data.ptr; iov.iov_len = data.length; // we can only send a fixed number of fds anyway due to the non-flexible size of the control message // receive buffer, so we set an arbitrary limit. const uint32 numFds = fileDescriptors.size(); assert(fileDescriptors.size() <= maxFds); // TODO proper error char cmsgBuf[CMSG_SPACE(sizeof(int) * maxFds)]; if (numFds) { // fill in a control message send_msg.msg_control = cmsgBuf; send_msg.msg_controllen = CMSG_SPACE(sizeof(int) * numFds); struct cmsghdr *c_msg = CMSG_FIRSTHDR(&send_msg); c_msg->cmsg_len = CMSG_LEN(sizeof(int) * numFds); c_msg->cmsg_level = SOL_SOCKET; c_msg->cmsg_type = SCM_RIGHTS; // set the control data to pass - this is why we don't use the simpler write() for (uint32 i = 0; i < numFds; i++) { reinterpret_cast(CMSG_DATA(c_msg))[i] = fileDescriptors[i]; } } else { // no file descriptor to send, no control message send_msg.msg_control = 0; send_msg.msg_controllen = 0; } while (iov.iov_len > 0) { ssize_t nbytes = sendmsg(m_fd, &send_msg, MSG_DONTWAIT); if (nbytes < 0) { if (errno == EINTR) { continue; } // see EAGAIN comment in read() if (errno == EAGAIN /* && iov.iov_len < a.length */ ) { break; } close(); return false; } iov.iov_base = static_cast(iov.iov_base) + nbytes; iov.iov_len -= size_t(nbytes); } return data.length - iov.iov_len; } uint32 LocalSocket::availableBytesForReading() { uint32 available = 0; if (ioctl(m_fd, FIONREAD, &available) < 0) { available = 0; } return available; } chunk LocalSocket::read(byte *buffer, uint32 maxSize) { chunk ret(buffer, 0); while (ret.length < maxSize) { ssize_t nbytes = recv(m_fd, ret.ptr + ret.length, maxSize - ret.length, MSG_DONTWAIT); if (nbytes < 0) { if (errno == EINTR) { continue; } // If we were notified for reading directly by the event dispatcher, we must be able to read at // least one byte before getting AGAIN aka EWOULDBLOCK - *however* the event loop might notify // something that is very eager to read everything (like Message::notifyRead()...) by reading // multiple times and in that case, we may be called in an attempt to read more when there is // currently no more data. // Just return zero bytes and no error in that case. if (errno == EAGAIN /* && iov.iov_len < maxSize */) { break; } close(); return ret; } ret.length += size_t(nbytes); } return ret; } chunk LocalSocket::readWithFileDescriptors(byte *buffer, uint32 maxSize, vector *fileDescriptors) { chunk ret; if (maxSize <= 0) { return ret; } // recvmsg-with-control-message boilerplate struct msghdr recv_msg; char cmsgBuf[CMSG_SPACE(sizeof(int) * maxFds)]; memset(cmsgBuf, 0, sizeof(cmsgBuf)); recv_msg.msg_control = cmsgBuf; recv_msg.msg_controllen = sizeof(cmsgBuf); recv_msg.msg_name = 0; recv_msg.msg_namelen = 0; recv_msg.msg_flags = 0; struct iovec iov; recv_msg.msg_iov = &iov; recv_msg.msg_iovlen = 1; // end boilerplate ret.ptr = buffer; ret.length = 0; iov.iov_base = ret.ptr; iov.iov_len = maxSize; while (iov.iov_len > 0) { ssize_t nbytes = recvmsg(m_fd, &recv_msg, MSG_DONTWAIT); if (nbytes < 0) { if (errno == EINTR) { continue; } // If we were notified for reading directly by the event dispatcher, we must be able to read at // least one byte before getting AGAIN aka EWOULDBLOCK - *however* the event loop might notify // something that is very eager to read everything (like Message::notifyRead()...) by reading // multiple times and in that case, we may be called in an attempt to read more when there is // currently no more data. // Just return zero bytes and no error in that case. if (errno == EAGAIN /* && iov.iov_len < maxSize */) { break; } close(); return ret; } ret.length += size_t(nbytes); iov.iov_base = static_cast(iov.iov_base) + nbytes; iov.iov_len -= size_t(nbytes); } // done reading "regular data", now read any file descriptors passed via control messages struct cmsghdr *c_msg = CMSG_FIRSTHDR(&recv_msg); if (c_msg && c_msg->cmsg_level == SOL_SOCKET && c_msg->cmsg_type == SCM_RIGHTS) { const int len = c_msg->cmsg_len / sizeof(int); int *cmsgData = reinterpret_cast(CMSG_DATA(c_msg)); for (int i = 0; i < len; i++) { fileDescriptors->push_back(cmsgData[i]); } } return ret; } bool LocalSocket::isOpen() { return m_fd != -1; } int LocalSocket::fileDescriptor() const { return m_fd; } void LocalSocket::handleCanRead() { if (availableBytesForReading()) { - IConnection::handleCanRead(); + ITransport::handleCanRead(); } else { // This should really only happen in error cases! ### TODO test? close(); } } diff --git a/connection/localsocket.h b/connection/localsocket.h index be4d822..d3a79db 100644 --- a/connection/localsocket.h +++ b/connection/localsocket.h @@ -1,63 +1,63 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef LOCALSOCKET_H #define LOCALSOCKET_H -#include "iconnection.h" +#include "itransport.h" #include -class LocalSocket : public IConnection +class LocalSocket : public ITransport { public: // Connect to local socket at socketFilePath LocalSocket(const std::string &socketFilePath); // Use an already open file descriptor LocalSocket(int fd); ~LocalSocket(); - // virtuals from IConnection + // virtuals from ITransport uint32 write(chunk data) override; uint32 writeWithFileDescriptors(chunk data, const std::vector &fileDescriptors) override; uint32 availableBytesForReading() override; chunk read(byte *buffer, uint32 maxSize) override; chunk readWithFileDescriptors(byte *buffer, uint32 maxSize, std::vector *fileDescriptors) override; void close() override; bool isOpen() override; FileDescriptor fileDescriptor() const override; void handleCanRead() override; - // end IConnection + // end ITransport LocalSocket() = delete; LocalSocket(const LocalSocket &) = delete; LocalSocket &operator=(const LocalSocket &) = delete; private: friend class IEventLoop; int m_fd; }; #endif // LOCALSOCKET_H diff --git a/events/epolleventpoller.cpp b/events/epolleventpoller.cpp index 576c14c..823040e 100644 --- a/events/epolleventpoller.cpp +++ b/events/epolleventpoller.cpp @@ -1,136 +1,136 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "epolleventpoller.h" #include "eventdispatcher_p.h" -#include "iconnection.h" +#include "iioeventlistener.h" #include #include #include #include #include EpollEventPoller::EpollEventPoller(EventDispatcher *dispatcher) : IEventPoller(dispatcher), m_epollFd(epoll_create(10)) { // set up a pipe that can interrupt the polling from another thread // (we could also use the Linux-only eventfd() - pipes are at least portable to epoll-like mechanisms) pipe2(m_interruptPipe, O_NONBLOCK); struct epoll_event epevt; epevt.events = EPOLLIN; epevt.data.u64 = 0; // clear high bits in the union epevt.data.fd = m_interruptPipe[0]; epoll_ctl(m_epollFd, EPOLL_CTL_ADD, m_interruptPipe[0], &epevt); } EpollEventPoller::~EpollEventPoller() { close(m_interruptPipe[0]); close(m_interruptPipe[1]); close(m_epollFd); } IEventPoller::InterruptAction EpollEventPoller::poll(int timeout) { IEventPoller::InterruptAction ret = IEventPoller::NoInterrupt; static const int maxEvPerPoll = 8; struct epoll_event results[maxEvPerPoll]; int nresults = epoll_wait(m_epollFd, results, maxEvPerPoll, timeout); if (nresults < 0) { // error? return ret; } for (int i = 0; i < nresults; i++) { struct epoll_event *evt = results + i; if (evt->events & EPOLLIN) { if (evt->data.fd != m_interruptPipe[0]) { - EventDispatcherPrivate::get(m_dispatcher)->notifyClientForReading(evt->data.fd); + EventDispatcherPrivate::get(m_dispatcher)->notifyListenerForReading(evt->data.fd); } else { // interrupt; read bytes from pipe to clear buffers and get the interrupt type ret = IEventPoller::ProcessAuxEvents; char buf; while (read(m_interruptPipe[0], &buf, 1) > 0) { if (buf == 'S') { ret = IEventPoller::Stop; } } // ### discarding the rest of the events // this works in our currently only use case, interrupting poll once to reap a thread if (ret == IEventPoller::Stop) { return ret; } } } if (evt->events & EPOLLOUT) { - EventDispatcherPrivate::get(m_dispatcher)->notifyClientForWriting(evt->data.fd); + EventDispatcherPrivate::get(m_dispatcher)->notifyListenerForWriting(evt->data.fd); } } return ret; } void EpollEventPoller::interrupt(IEventPoller::InterruptAction action) { assert(action == IEventPoller::ProcessAuxEvents || action == IEventPoller::Stop); // write a byte to the write end so the poll waiting on the read end returns char buf = (action == IEventPoller::Stop) ? 'S' : 'N'; write(m_interruptPipe[1], &buf, 1); } -void EpollEventPoller::addIoEventClient(IioEventClient *ioc) +void EpollEventPoller::addIoEventListener(IioEventListener *iol) { struct epoll_event epevt; epevt.events = 0; epevt.data.u64 = 0; // clear high bits in the union - epevt.data.fd = ioc->fileDescriptor(); - epoll_ctl(m_epollFd, EPOLL_CTL_ADD, ioc->fileDescriptor(), &epevt); + epevt.data.fd = iol->fileDescriptor(); + epoll_ctl(m_epollFd, EPOLL_CTL_ADD, iol->fileDescriptor(), &epevt); } -void EpollEventPoller::removeIoEventClient(IioEventClient *ioc) +void EpollEventPoller::removeIoEventListener(IioEventListener *iol) { - const int fd = ioc->fileDescriptor(); + const int fd = iol->fileDescriptor(); // Connection should call us *before* resetting its fd on failure assert(fd >= 0); struct epoll_event epevt; // required in Linux < 2.6.9 even though it's ignored epoll_ctl(m_epollFd, EPOLL_CTL_DEL, fd, &epevt); } -void EpollEventPoller::setReadWriteInterest(IioEventClient *ioc, bool readEnabled, bool writeEnabled) +void EpollEventPoller::setReadWriteInterest(IioEventListener *iol, bool readEnabled, bool writeEnabled) { - FileDescriptor fd = ioc->fileDescriptor(); + FileDescriptor fd = iol->fileDescriptor(); if (!fd) { return; } struct epoll_event epevt; epevt.events = (readEnabled ? uint32(EPOLLIN) : 0) | (writeEnabled ? uint32(EPOLLOUT) : 0); epevt.data.u64 = 0; // clear high bits in the union epevt.data.fd = fd; epoll_ctl(m_epollFd, EPOLL_CTL_MOD, fd, &epevt); } diff --git a/events/epolleventpoller.h b/events/epolleventpoller.h index c823ee0..d20af98 100644 --- a/events/epolleventpoller.h +++ b/events/epolleventpoller.h @@ -1,51 +1,51 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef EPOLLEVENTPOLLER_H #define EPOLLEVENTPOLLER_H #include "ieventpoller.h" #include class EpollEventPoller : public IEventPoller { public: EpollEventPoller(EventDispatcher *dispatcher); ~EpollEventPoller(); IEventPoller::InterruptAction poll(int timeout) override; void interrupt(IEventPoller::InterruptAction) override; // reimplemented from IEventPoller - void addIoEventClient(IioEventClient *ioc) override; - void removeIoEventClient(IioEventClient *ioc) override; - void setReadWriteInterest(IioEventClient *ioc, bool read, bool write) override; + void addIoEventListener(IioEventListener *iol) override; + void removeIoEventListener(IioEventListener *iol) override; + void setReadWriteInterest(IioEventListener *iol, bool read, bool write) override; private: void notifyRead(int fd); int m_interruptPipe[2]; FileDescriptor m_epollFd; }; #endif // EPOLLEVENTPOLLER_H diff --git a/events/eventdispatcher.cpp b/events/eventdispatcher.cpp index e9d629c..6e67851 100644 --- a/events/eventdispatcher.cpp +++ b/events/eventdispatcher.cpp @@ -1,371 +1,371 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "eventdispatcher.h" #include "eventdispatcher_p.h" #ifndef DFERRY_NO_NATIVE_POLL #ifdef __linux__ #include "epolleventpoller.h" #elif defined _WIN32 #include "selecteventpoller_win32.h" #else #include "selecteventpoller_unix.h" #endif #endif #include "event.h" #include "foreigneventloopintegrator.h" #include "ieventpoller.h" -#include "iioeventclient.h" +#include "iioeventlistener.h" #include "platformtime.h" #include "transceiver_p.h" #include "timer.h" #include #include #include //#define EVENTDISPATCHER_DEBUG using namespace std; #ifndef DFERRY_NO_NATIVE_POLL EventDispatcher::EventDispatcher() : d(new EventDispatcherPrivate) { #ifdef __linux__ d->m_poller = new EpollEventPoller(this); #else // TODO high performance IO multiplexers for non-Linux platforms d->m_poller = new SelectEventPoller(this); #endif } #endif EventDispatcher::EventDispatcher(ForeignEventLoopIntegrator *integrator) : d(new EventDispatcherPrivate) { d->m_integrator = integrator; d->m_poller = integrator->connectToDispatcher(this); } EventDispatcherPrivate::~EventDispatcherPrivate() { - for (const pair &fdCon : m_ioClients) { - fdCon.second->setEventDispatcher(0); + for (const pair &fdListener : m_ioListeners) { + fdListener.second->setEventDispatcher(nullptr); } for (const pair &dt : m_timers) { dt.second->m_eventDispatcher = nullptr; dt.second->m_isRunning = false; } if (m_integrator) { delete m_integrator; // owns the poller (its private class!) and deletes it } else { delete m_poller; } } EventDispatcher::~EventDispatcher() { delete d; d = nullptr; } bool EventDispatcher::poll(int timeout) { int nextDue = d->timeToFirstDueTimer(); if (timeout < 0) { timeout = nextDue; } else if (nextDue >= 0) { timeout = min(timeout, nextDue); } #ifdef EVENTDISPATCHER_DEBUG printf("EventDispatcher::poll(): timeout=%d, nextDue=%d.\n", timeout, nextDue); #endif IEventPoller::InterruptAction interrupAction = d->m_poller->poll(timeout); if (interrupAction == IEventPoller::Stop) { return false; } else if (interrupAction == IEventPoller::ProcessAuxEvents && d->m_transceiverToNotify) { d->processAuxEvents(); } d->triggerDueTimers(); return true; } void EventDispatcher::interrupt() { d->m_poller->interrupt(IEventPoller::Stop); } void EventDispatcherPrivate::wakeForEvents() { m_poller->interrupt(IEventPoller::ProcessAuxEvents); } -bool EventDispatcherPrivate::addIoEventClient(IioEventClient *ioc) +bool EventDispatcherPrivate::addIoEventListener(IioEventListener *iol) { - pair::iterator, bool> insertResult; - insertResult = m_ioClients.insert(make_pair(ioc->fileDescriptor(), ioc)); + pair::iterator, bool> insertResult; + insertResult = m_ioListeners.insert(make_pair(iol->fileDescriptor(), iol)); const bool ret = insertResult.second; if (ret) { - m_poller->addIoEventClient(ioc); + m_poller->addIoEventListener(iol); } return ret; } -bool EventDispatcherPrivate::removeIoEventClient(IioEventClient *ioc) +bool EventDispatcherPrivate::removeIoEventListener(IioEventListener *iol) { - const bool ret = m_ioClients.erase(ioc->fileDescriptor()); + const bool ret = m_ioListeners.erase(iol->fileDescriptor()); if (ret) { - m_poller->removeIoEventClient(ioc); + m_poller->removeIoEventListener(iol); } return ret; } -void EventDispatcherPrivate::setReadWriteInterest(IioEventClient *ioc, bool read, bool write) +void EventDispatcherPrivate::setReadWriteInterest(IioEventListener *iol, bool read, bool write) { - m_poller->setReadWriteInterest(ioc, read, write); + m_poller->setReadWriteInterest(iol, read, write); } -void EventDispatcherPrivate::notifyClientForReading(FileDescriptor fd) +void EventDispatcherPrivate::notifyListenerForReading(FileDescriptor fd) { - unordered_map::iterator it = m_ioClients.find(fd); - if (it != m_ioClients.end()) { + unordered_map::iterator it = m_ioListeners.find(fd); + if (it != m_ioListeners.end()) { it->second->handleCanRead(); } else { #ifdef IEVENTDISPATCHER_DEBUG // while interesting for debugging, this is not an error if a connection was in the epoll // set and disconnected in its handleCanRead() or handleCanWrite() implementation - std::cerr << "EventDispatcherPrivate::notifyClientForReading(): unhandled file descriptor " + std::cerr << "EventDispatcherPrivate::notifyListenerForReading(): unhandled file descriptor " << fd << ".\n"; #endif } } -void EventDispatcherPrivate::notifyClientForWriting(FileDescriptor fd) +void EventDispatcherPrivate::notifyListenerForWriting(FileDescriptor fd) { - unordered_map::iterator it = m_ioClients.find(fd); - if (it != m_ioClients.end()) { + unordered_map::iterator it = m_ioListeners.find(fd); + if (it != m_ioListeners.end()) { it->second->handleCanWrite(); } else { #ifdef IEVENTDISPATCHER_DEBUG // while interesting for debugging, this is not an error if a connection was in the epoll // set and disconnected in its handleCanRead() or handleCanWrite() implementation - std::cerr << "EventDispatcherPrivate::notifyClientForWriting(): unhandled file descriptor " + std::cerr << "EventDispatcherPrivate::notifyListenerForWriting(): unhandled file descriptor " << fd << ".\n"; #endif } } int EventDispatcherPrivate::timeToFirstDueTimer() const { multimap::const_iterator it = m_timers.cbegin(); if (it == m_timers.cend()) { return -1; } if (it->second == nullptr) { // this is the dead entry of the currently triggered, and meanwhile removed timer if (++it == m_timers.cend()) { return -1; } } uint64 nextTimeout = it->first >> 10; uint64 currentTime = PlatformTime::monotonicMsecs(); if (currentTime >= nextTimeout) { return 0; } return nextTimeout - currentTime; } uint EventDispatcherPrivate::nextTimerSerial() { if (++m_lastTimerSerial > s_maxTimerSerial) { m_lastTimerSerial = 0; } return m_lastTimerSerial; } void EventDispatcherPrivate::addTimer(Timer *timer) { if (timer->m_tag == 0) { timer->m_tag = nextTimerSerial(); } uint64 dueTime = PlatformTime::monotonicMsecs() + uint64(timer->m_interval); // ### When a timer is added from a timer callback, make sure it only runs in the *next* // iteration of the event loop. Otherwise, endless cascades of timers triggering, adding // more timers etc could occur without ever returning from triggerDueTimers(). // For the condition for this hazard, see "invariant:" in triggerDueTimers(): the only way // the new timer could trigger in this event loop iteration is when: // // m_triggerTime == currentTime(before call to trigger()) == timerAddedInTrigger().dueTime // // note: m_triggeredTimer.dueTime < m_triggerTime is well possible; if ==, the additional // condition applies that timerAddedInTrigger().serial >= m_triggeredTimer.serial; // we ignore this and do it conservatively and less complicated. // (the additional condition comes from serials as keys and that each "slot" in multimap with // the same keys is a list where new entries are back-inserted) // // As a countermeasure, tweak the new timer's timeout, putting it well before m_triggeredTimer's // iterator position in the multimap... because the new timer must have zero timeout in order for // its due time to occur within this triggerDueTimers() iteration, it is supposed to trigger ASAP // anyway. This disturbs the order of triggering a little compared to the usual, but all // timeouts are properly respected - the next event loop iteration is guaranteed to trigger // timers at times strictly greater-equal than this iteration (time goes only one way) ;) if (m_triggerTime && dueTime == m_triggerTime) { dueTime = m_triggerTime - 1; } timer->m_tag = (dueTime << 10) + (timer->m_tag & s_maxTimerSerial); m_timers.emplace(timer->m_tag, timer); maybeSetTimeoutForIntegrator(); } void EventDispatcherPrivate::removeTimer(Timer *timer) { assert(timer->m_tag != 0); // We cannot toggle m_isTriggeredTimerPendingRemoval back and forth, we can only set it once. // Because after the timer has been removed once, the next time we see the same pointer value, // it could be an entirely different timer. Consider this: // delete timer1; // calls removeTimer() // Timer *timer2 = new Timer(); // accidentally gets same memory address as timer1 // timer2->start(...); // timer2->stop(); // timer == m_triggeredTimer, uh oh // The last line does not necessarily cause a problem, but just don't be excessively clever. // On the other hand, not special-casing the currently triggered timer after it has been marked // for removal once is fine. In case it is re-added, it gets a new map entry in addTimer() // and from then on it can be handled like any other timer. bool removingTriggeredTimer = false; if (!m_isTriggeredTimerPendingRemoval && timer == m_triggeredTimer) { // using this variable, we can avoid dereferencing m_triggeredTimer should it have been // deleted while triggered m_isTriggeredTimerPendingRemoval = true; removingTriggeredTimer = true; } auto iterRange = m_timers.equal_range(timer->m_tag); for (; iterRange.first != iterRange.second; ++iterRange.first) { if (iterRange.first->second == timer) { if (!removingTriggeredTimer) { m_timers.erase(iterRange.first); } else { // mark it as dead for query methods such as timeToFirstDueTimer() iterRange.first->second = nullptr; } maybeSetTimeoutForIntegrator(); return; } } assert(false); // the timer should never request a remove when it has not been added } void EventDispatcherPrivate::maybeSetTimeoutForIntegrator() { if (m_integrator) { m_integrator->watchTimeout(timeToFirstDueTimer()); } } void EventDispatcherPrivate::triggerDueTimers() { m_triggerTime = PlatformTime::monotonicMsecs(); for (auto it = m_timers.begin(); it != m_timers.end();) { const uint64 timerTimeout = (it->first >> 10); if (timerTimeout > m_triggerTime) { break; } // careful here - protect against adding and removing any timer while inside its trigger()! // we do this by keeping the iterator at the current position (so changing any other timer // doesn't invalidate it) and blocking changes to the timer behind that iterator // (so we don't mess with its data should it have been deleted outright in the callback) m_triggeredTimer = it->second; Timer *const timer = m_triggeredTimer; m_isTriggeredTimerPendingRemoval = false; // invariant: // m_triggeredTimer.dueTime <= m_triggerTime <= currentTime(here) <= .dueTime timer->trigger(); m_triggeredTimer = nullptr; if (!m_isTriggeredTimerPendingRemoval && timer->m_isRunning) { // ### we are rescheduling timers based on triggerTime even though real time can be // much later - is this the desired behavior? I think so... if (timer->m_interval == 0) { // With the other branch we might iterate over this timer again in this invocation because // if there are several timers with the same tag, this entry will be back-inserted into the // list of values for the current tag / key slot. We only break out of the loop if // timerTimeout > m_triggerTime, so there would be an infinite loop. // Instead, we just leave the iterator alone, which does not put it in front of the current // iterator position. It's also good for performance. Win-win! ++it; } else { timer->m_tag = ((m_triggerTime + uint64(timer->m_interval)) << 10) + (timer->m_tag & s_maxTimerSerial); m_timers.erase(it++); m_timers.emplace(timer->m_tag, timer); } } else { m_timers.erase(it++); } } m_triggerTime = 0; maybeSetTimeoutForIntegrator(); } void EventDispatcherPrivate::queueEvent(std::unique_ptr evt) { // std::cerr << "EventDispatcherPrivate::queueEvent() " << evt->type << " " << this << std::endl; { SpinLocker locker(&m_queuedEventsLock); m_queuedEvents.emplace_back(std::move(evt)); } wakeForEvents(); } void EventDispatcherPrivate::processAuxEvents() { // std::cerr << "EventDispatcherPrivate::processAuxEvents() " << this << std::endl; // don't hog the lock while processing the events std::vector> events; { SpinLocker locker(&m_queuedEventsLock); std::swap(events, m_queuedEvents); } if (m_transceiverToNotify) { for (const std::unique_ptr &evt : events) { m_transceiverToNotify->processEvent(evt.get()); } } } diff --git a/events/eventdispatcher_p.h b/events/eventdispatcher_p.h index 98b8f7d..fc05646 100644 --- a/events/eventdispatcher_p.h +++ b/events/eventdispatcher_p.h @@ -1,105 +1,105 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef EVENTDISPATCHER_P_H #define EVENTDISPATCHER_P_H #include "eventdispatcher.h" #include "message.h" #include "platform.h" #include "spinlock.h" #include "types.h" #include #include #include #include struct Event; -class IioEventClient; +class IioEventListener; class IEventPoller; class Message; class PendingReplyPrivate; class Timer; class TransceiverPrivate; // note that the main purpose of EventDispatcher so far is dispatching I/O events; dispatching Event // instances is secondary class EventDispatcherPrivate { public: static EventDispatcherPrivate *get(EventDispatcher *ed) { return ed->d; } ~EventDispatcherPrivate(); int timeToFirstDueTimer() const; uint nextTimerSerial(); void triggerDueTimers(); - // for IioEventClient - friend class IioEventClient; - bool addIoEventClient(IioEventClient *ioc); - bool removeIoEventClient(IioEventClient *ioc); - void setReadWriteInterest(IioEventClient *ioc, bool read, bool write); + // for IioEventListener + friend class IioEventListener; + bool addIoEventListener(IioEventListener *iol); + bool removeIoEventListener(IioEventListener *iol); + void setReadWriteInterest(IioEventListener *iol, bool read, bool write); // for IEventPoller friend class IEventPoller; - void notifyClientForReading(FileDescriptor fd); - void notifyClientForWriting(FileDescriptor fd); + void notifyListenerForReading(FileDescriptor fd); + void notifyListenerForWriting(FileDescriptor fd); // for Timer friend class Timer; void addTimer(Timer *timer); void removeTimer(Timer *timer); // for ForeignEventLoopIntegrator (calls into it, not called from it) void maybeSetTimeoutForIntegrator(); // for Transceiver // this is similar to interrupt(), but doesn't make poll() return false and will call // m_transceiverToNotify() -> processQueuedEvents() void wakeForEvents(); void queueEvent(std::unique_ptr evt); // safe to call from any thread void processAuxEvents(); IEventPoller *m_poller = nullptr; ForeignEventLoopIntegrator *m_integrator = nullptr; - std::unordered_map m_ioClients; + std::unordered_map m_ioListeners; static const int s_maxTimerSerial = 0x3ff; // 10 bits set uint m_lastTimerSerial = s_maxTimerSerial; // the highest 54 bits in "due" encode due time, the lowest 10 bits act like a serial number to reduce // (not eliminate - the serial eventually wraps around) collisions of timers with the same timeout // (this is not expressed as a struct/class to avoid compiler pessimization in the multimap code) std::multimap m_timers; // for logic to prevent executing a timer in the dispatch run it was added uint64 m_triggerTime = 0; // helpers that help to avoid touching the currently triggered timer after it has been deleted in // a client called from trigger() Timer *m_triggeredTimer = nullptr; bool m_isTriggeredTimerPendingRemoval = false; // for inter thread event delivery to Transceiver TransceiverPrivate *m_transceiverToNotify = nullptr; Spinlock m_queuedEventsLock; std::vector> m_queuedEvents; }; #endif diff --git a/events/foreigneventloopintegrator.cpp b/events/foreigneventloopintegrator.cpp index 0f25ac5..d3ab67b 100644 --- a/events/foreigneventloopintegrator.cpp +++ b/events/foreigneventloopintegrator.cpp @@ -1,175 +1,175 @@ /* Copyright (C) 2017 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "foreigneventloopintegrator.h" #include "eventdispatcher_p.h" #include "ieventpoller.h" -#include "iioeventclient.h" +#include "iioeventlistener.h" #include #include class ForeignEventLoopIntegratorPrivate : public IEventPoller { public: ForeignEventLoopIntegratorPrivate(ForeignEventLoopIntegrator *integrator, EventDispatcher *dispatcher); //virtual ~ForeignEventLoopIntegratorPrivate(); IEventPoller::InterruptAction poll(int timeout = -1) override; // interrupt the waiting for events (from another thread) void interrupt(InterruptAction action) override; - void addIoEventClient(IioEventClient *ioc) override; - void removeIoEventClient(IioEventClient *ioc) override; - void setReadWriteInterest(IioEventClient *ioc, bool read, bool write) override; + void addIoEventListener(IioEventListener *iol) override; + void removeIoEventListener(IioEventListener *iol) override; + void setReadWriteInterest(IioEventListener *iol, bool read, bool write) override; // public accessor for protected member variable EventDispatcher *dispatcher() const { return m_dispatcher; } bool exiting = false; ForeignEventLoopIntegrator *m_integrator; struct RwEnabled { bool readEnabled : 1; bool writeEnabled : 1; }; std::unordered_map m_fds; }; ForeignEventLoopIntegratorPrivate::ForeignEventLoopIntegratorPrivate(ForeignEventLoopIntegrator *integrator, EventDispatcher *dispatcher) : IEventPoller(dispatcher), m_integrator(integrator) { } IEventPoller::InterruptAction ForeignEventLoopIntegratorPrivate::poll(int /* timeout */) { // do nothing, it can't possibly work (and it is *sometimes* a benign error to call this) return IEventPoller::NoInterrupt; } void ForeignEventLoopIntegratorPrivate::interrupt(InterruptAction /* action */) { // do nothing, it can't possibly work (and it is *sometimes* a benign error to call this) } -void ForeignEventLoopIntegratorPrivate::addIoEventClient(IioEventClient *ioc) +void ForeignEventLoopIntegratorPrivate::addIoEventListener(IioEventListener *iol) { if (!exiting) { RwEnabled rw = { false, false }; - m_fds.emplace(ioc->fileDescriptor(), rw); + m_fds.emplace(iol->fileDescriptor(), rw); } } -void ForeignEventLoopIntegratorPrivate::removeIoEventClient(IioEventClient *ioc) +void ForeignEventLoopIntegratorPrivate::removeIoEventListener(IioEventListener *iol) { if (!exiting) { - m_fds.erase(ioc->fileDescriptor()); + m_fds.erase(iol->fileDescriptor()); } } -void ForeignEventLoopIntegratorPrivate::setReadWriteInterest(IioEventClient *ioc, bool read, bool write) +void ForeignEventLoopIntegratorPrivate::setReadWriteInterest(IioEventListener *iol, bool read, bool write) { if (exiting) { return; } - RwEnabled &rw = m_fds.at(ioc->fileDescriptor()); + RwEnabled &rw = m_fds.at(iol->fileDescriptor()); if (rw.readEnabled != read) { rw.readEnabled = read; - m_integrator->setWatchRead(ioc->fileDescriptor(), read); + m_integrator->setWatchRead(iol->fileDescriptor(), read); } if (rw.writeEnabled != write) { rw.writeEnabled = write; - m_integrator->setWatchWrite(ioc->fileDescriptor(), write); + m_integrator->setWatchWrite(iol->fileDescriptor(), write); } } ForeignEventLoopIntegrator::ForeignEventLoopIntegrator() : d(nullptr) { } IEventPoller *ForeignEventLoopIntegrator::connectToDispatcher(EventDispatcher *dispatcher) { assert(!d); // this is a one-time operation d = new ForeignEventLoopIntegratorPrivate(this, dispatcher); return d; } ForeignEventLoopIntegrator::~ForeignEventLoopIntegrator() { d->exiting = true; // try to prevent surprising states during shutdown, including of d->m_fds // removeAllWatches() must be called from a derived class that implements the in this class pure // virtual methods setWatchRead(), setWatchWrite(), and watchTimeout(). During destruction, the // subclass data has already been destroyed and the vtable is the one of this class. //removeAllWatches(); } void ForeignEventLoopIntegrator::removeAllWatches() { for (auto it = d->m_fds.begin(); it != d->m_fds.end(); ++it) { if (it->second.readEnabled) { it->second.readEnabled = false; setWatchRead(it->first, false); } if (it->second.writeEnabled) { it->second.writeEnabled = false; setWatchWrite(it->first, false); } } watchTimeout(-1); if (d) { d->m_integrator = nullptr; delete d; d = nullptr; } } bool ForeignEventLoopIntegrator::exiting() const { return d->exiting; } void ForeignEventLoopIntegrator::handleTimeout() { if (!d->exiting) { EventDispatcherPrivate::get(d->dispatcher())->triggerDueTimers(); } } void ForeignEventLoopIntegrator::handleReadyRead(int fd) { if (!d->exiting) { - EventDispatcherPrivate::get(d->dispatcher())->notifyClientForReading(fd); + EventDispatcherPrivate::get(d->dispatcher())->notifyListenerForReading(fd); } } void ForeignEventLoopIntegrator::handleReadyWrite(int fd) { if (!d->exiting) { - EventDispatcherPrivate::get(d->dispatcher())->notifyClientForWriting(fd); + EventDispatcherPrivate::get(d->dispatcher())->notifyListenerForWriting(fd); } } diff --git a/events/ieventpoller.h b/events/ieventpoller.h index c08e447..eabd41f 100644 --- a/events/ieventpoller.h +++ b/events/ieventpoller.h @@ -1,59 +1,59 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef IEVENTPOLLER_H #define IEVENTPOLLER_H #include "eventdispatcher.h" #include "platform.h" -class IioEventClient; +class IioEventListener; class IEventPoller { public: enum InterruptAction { NoInterrupt = 0, ProcessAuxEvents, Stop }; // if you need to refer to the dispatcher, grab and save the value here - not all implementations // need it IEventPoller(EventDispatcher *dispatcher); virtual ~IEventPoller(); virtual InterruptAction poll(int timeout = -1) = 0; // interrupt the waiting for events (from another thread) virtual void interrupt(InterruptAction action) = 0; - virtual void addIoEventClient(IioEventClient *ioc) = 0; - virtual void removeIoEventClient(IioEventClient *ioc) = 0; - virtual void setReadWriteInterest(IioEventClient *ioc, bool read, bool write) = 0; + virtual void addIoEventListener(IioEventListener *iol) = 0; + virtual void removeIoEventListener(IioEventListener *iol) = 0; + virtual void setReadWriteInterest(IioEventListener *iol, bool read, bool write) = 0; protected: EventDispatcher *m_dispatcher; }; #endif // IEVENTPOLLER_H diff --git a/events/iioeventclient.cpp b/events/iioeventlistener.cpp similarity index 93% rename from events/iioeventclient.cpp rename to events/iioeventlistener.cpp index d833bd7..97d47b0 100644 --- a/events/iioeventclient.cpp +++ b/events/iioeventlistener.cpp @@ -1,28 +1,28 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#include "iioeventclient.h" +#include "iioeventlistener.h" -IioEventClient::~IioEventClient() +IioEventListener::~IioEventListener() { } diff --git a/events/iioeventclient.h b/events/iioeventlistener.h similarity index 90% rename from events/iioeventclient.h rename to events/iioeventlistener.h index 025f890..6def7ac 100644 --- a/events/iioeventclient.h +++ b/events/iioeventlistener.h @@ -1,48 +1,48 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#ifndef IIOEVENTCLIENT_H -#define IIOEVENTCLIENT_H +#ifndef IIOEVENTLISTENER_H +#define IIOEVENTLISTENER_H #include "platform.h" class EventDispatcher; class EventDispatcherPrivate; -class IioEventClient +class IioEventListener { public: - virtual ~IioEventClient(); + virtual ~IioEventListener(); virtual FileDescriptor fileDescriptor() const = 0; virtual void setEventDispatcher(EventDispatcher *ed) = 0; virtual EventDispatcher *eventDispatcher() const = 0; protected: friend class EventDispatcherPrivate; virtual void handleCanRead() = 0; virtual void handleCanWrite() = 0; }; -#endif // IIOEVENTCLIENT_H +#endif // IIOEVENTLISTENER_H diff --git a/events/selecteventpoller_unix.cpp b/events/selecteventpoller_unix.cpp index 8152593..b0af3ec 100644 --- a/events/selecteventpoller_unix.cpp +++ b/events/selecteventpoller_unix.cpp @@ -1,158 +1,158 @@ /* Copyright (C) 2015 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "selecteventpoller_unix.h" #include "eventdispatcher_p.h" -#include "iconnection.h" +#include "iioeventlistener.h" #include #include #include #include SelectEventPoller::SelectEventPoller(EventDispatcher *dispatcher) : IEventPoller(dispatcher) { pipe2(m_interruptPipe, O_NONBLOCK); resetFdSets(); } SelectEventPoller::~SelectEventPoller() { close(m_interruptPipe[0]); close(m_interruptPipe[1]); } IEventPoller::InterruptAction SelectEventPoller::poll(int timeout) { IEventPoller::InterruptAction ret = IEventPoller::NoInterrupt; resetFdSets(); int nfds = m_interruptPipe[0]; // set up the interruption listener FD_SET(m_interruptPipe[0], &m_readSet); for (const auto &fdRw : m_fds) { if (fdRw.second.readEnabled) { nfds = std::max(nfds, fdRw.first); FD_SET(fdRw.first, &m_readSet); } if (fdRw.second.writeEnabled) { nfds = std::max(nfds, fdRw.first); FD_SET(fdRw.first, &m_writeSet); } } struct timeval tv; struct timeval *tvPointer = nullptr; if (timeout >= 0) { tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout % 1000) * 1000; tvPointer = &tv; } // select! int numEvents = select(nfds + 1, &m_readSet, &m_writeSet, nullptr, tvPointer); // check for interruption if (FD_ISSET(m_interruptPipe[0], &m_readSet)) { // interrupt; read bytes from pipe to clear buffers and get the interrupt type ret = IEventPoller::ProcessAuxEvents; char buf; while (read(m_interruptPipe[0], &buf, 1) > 0) { if (buf == 'S') { ret = IEventPoller::Stop; } } } if (ret == IEventPoller::Stop) { // ### discarding the rest of the events, to avoid touching "dead" data while shutting down numEvents = 0; } // dispatch reads and writes if (numEvents < 0) { // TODO error handling } else if (numEvents > 0) { - for (auto fdClient : EventDispatcherPrivate::get(m_dispatcher)->m_ioClients) { - if (FD_ISSET(fdClient.first, &m_readSet)) { - EventDispatcherPrivate::get(m_dispatcher)->notifyClientForReading(fdClient.first); + for (auto fdListener : EventDispatcherPrivate::get(m_dispatcher)->m_ioListeners) { + if (FD_ISSET(fdListener.first, &m_readSet)) { + EventDispatcherPrivate::get(m_dispatcher)->notifyListenerForReading(fdListener.first); numEvents--; } - if (FD_ISSET(fdClient.first, &m_writeSet)) { - EventDispatcherPrivate::get(m_dispatcher)->notifyClientForWriting(fdClient.first); + if (FD_ISSET(fdListener.first, &m_writeSet)) { + EventDispatcherPrivate::get(m_dispatcher)->notifyListenerForWriting(fdListener.first); numEvents--; } if (numEvents <= 0) { break; } } } return ret; } void SelectEventPoller::resetFdSets() { FD_ZERO(&m_readSet); FD_ZERO(&m_writeSet); } void SelectEventPoller::interrupt(IEventPoller::InterruptAction action) { assert(action == IEventPoller::ProcessAuxEvents || action == IEventPoller::Stop); // write a byte to the write end so the poll waiting on the read end returns char buf = (action == IEventPoller::Stop) ? 'S' : 'N'; write(m_interruptPipe[1], &buf, 1); } -void SelectEventPoller::addIoEventClient(IioEventClient *ioc) +void SelectEventPoller::addIoEventListener(IioEventListener *ioc) { // The main select specific part of registration is in setReadWriteInterest(). // Here we just check fd limits. if (ioc->fileDescriptor() >= FD_SETSIZE) { // TODO error... return; } RwEnabled rw = { false, false }; m_fds.emplace(ioc->fileDescriptor(), rw); } -void SelectEventPoller::removeIoEventClient(IioEventClient *ioc) +void SelectEventPoller::removeIoEventListener(IioEventListener *ioc) { m_fds.erase(ioc->fileDescriptor()); } -void SelectEventPoller::setReadWriteInterest(IioEventClient *ioc, bool read, bool write) +void SelectEventPoller::setReadWriteInterest(IioEventListener *ioc, bool read, bool write) { RwEnabled &rw = m_fds.at(ioc->fileDescriptor()); rw.readEnabled = read; rw.writeEnabled = write; } diff --git a/events/selecteventpoller_unix.h b/events/selecteventpoller_unix.h index 74c27ef..0fd3ba2 100644 --- a/events/selecteventpoller_unix.h +++ b/events/selecteventpoller_unix.h @@ -1,67 +1,67 @@ /* Copyright (C) 2015 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef SELECTEVENTPOLLER_H #define SELECTEVENTPOLLER_H #include "ieventpoller.h" #include #include #include class SelectEventPoller : public IEventPoller { public: SelectEventPoller(EventDispatcher *dispatcher); ~SelectEventPoller(); IEventPoller::InterruptAction poll(int timeout) override; void interrupt(IEventPoller::InterruptAction) override; // reimplemented from IEventPoller - void addIoEventClient(IioEventClient *ioc) override; - void removeIoEventClient(IioEventClient *ioc) override; - void setReadWriteInterest(IioEventClient *ioc, bool read, bool write) override; + void addIoEventListener(IioEventListener *iol) override; + void removeIoEventListener(IioEventListener *iol) override; + void setReadWriteInterest(IioEventListener *iol, bool read, bool write) override; private: void notifyRead(int fd); void resetFdSets(); struct RwEnabled { bool readEnabled : 1; bool writeEnabled : 1; }; std::unordered_map m_fds; std::vector m_readFds; std::vector m_writeFds; fd_set m_readSet; fd_set m_writeSet; int m_interruptPipe[2]; }; #endif // SELECTEVENTPOLLER_H diff --git a/events/selecteventpoller_win32.cpp b/events/selecteventpoller_win32.cpp index b5db964..5e7717a 100644 --- a/events/selecteventpoller_win32.cpp +++ b/events/selecteventpoller_win32.cpp @@ -1,249 +1,249 @@ /* Copyright (C) 2015 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "selecteventpoller_win32.h" #include "eventdispatcher_p.h" -#include "iconnection.h" +#include "iioeventlistener.h" #include #include #include #include thread_local static SelectEventPoller *tls_selectPoller = nullptr; static SOCKET createInterruptSocket() { SOCKET ret = socket(AF_INET, SOCK_DGRAM, 0); if (ret == INVALID_SOCKET) { std::cerr << "createInterruptSocket() Error A.\n"; return ret; } unsigned long value = 1; // 0 blocking, != 0 non-blocking if (ioctlsocket(ret, FIONBIO, &value) != NO_ERROR) { // something along the lines of... WS_ERROR_DEBUG(WSAGetLastError()); std::cerr << "createInterruptSocket() Error B.\n"; closesocket(ret); return INVALID_SOCKET; } return ret; } VOID CALLBACK triggerInterruptSocket(ULONG_PTR dwParam) { SelectEventPoller *const sep = tls_selectPoller; if (sep) { sep->doInterruptSocket(dwParam != 0); } else { std::cerr << "triggerInterruptSocket() ignoring (apparently) spurios APC!\n"; } } void SelectEventPoller::doInterruptSocket(bool isStop) { const IEventPoller::InterruptAction newAction = isStop ? IEventPoller::Stop : IEventPoller::ProcessAuxEvents; if (newAction > m_interruptAction) { m_interruptAction = newAction; } // This runs from the blocked select(). Signal the socket by closing it, thus properly interrupting // the select(). if (m_interruptSocket != INVALID_SOCKET) { // closesocket() may enter an alertable walt, which may run APCs, which may call us *again*. // Prevent that by clearing m_interruptSocket *before* possibly triggering the APC. SOCKET sock = m_interruptSocket; m_interruptSocket = INVALID_SOCKET; closesocket(sock); // <- recursion here } } SelectEventPoller::SelectEventPoller(EventDispatcher *dispatcher) : IEventPoller(dispatcher), m_selectThreadHandle(INVALID_HANDLE_VALUE), m_interruptSocket(INVALID_SOCKET), m_interruptAction(IEventPoller::NoInterrupt) { // IFF there is still an asynchronous procedure call queued for this thread (which usually happens // in the mean thread), we want it to trigger nothing, in case any Winsock functions (say socket())... // enters an alertable wait. tls_selectPoller = nullptr; if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &m_selectThreadHandle, 0, FALSE, DUPLICATE_SAME_ACCESS)) { assert(false); // H4X, gross! } // XXX TOXIC BUG!! m_selectThreadHandle = GetCurrentThread(); WSAData wsadata; // IPv6 requires Winsock v2.0 or better (but we're not using IPv6 - yet!) if (WSAStartup(MAKEWORD(2, 0), &wsadata) != 0) { return; } m_interruptSocket = createInterruptSocket(); // stupid: flush any pending APCs from previous instances; closesocket() will do a brief alertable // wait, apparently. closesocket(m_interruptSocket); m_interruptSocket = createInterruptSocket(); tls_selectPoller = this; } SelectEventPoller::~SelectEventPoller() { tls_selectPoller = nullptr; if (m_interruptSocket != INVALID_SOCKET) { closesocket(m_interruptSocket); } CloseHandle(m_selectThreadHandle); } IEventPoller::InterruptAction SelectEventPoller::poll(int timeout) { // Check if some other code called an alertable waiting function which ran our user APC, // which closed m_interruptSocket and set m_interruptAction. Process it here if so. IEventPoller::InterruptAction ret = m_interruptAction; if (ret != IEventPoller::NoInterrupt) { assert(m_interruptSocket == INVALID_SOCKET); m_interruptAction = IEventPoller::NoInterrupt; m_interruptSocket = createInterruptSocket(); // re-arm return ret; } resetFdSets(); // ### doing FD_SET "manually", avoiding a scan of the whole list for each set action - there is // no danger of duplicates because our input is a set which already guarantees uniqueness. for (const auto &fdRw : m_fds) { if (fdRw.second.readEnabled) { // FD_SET(fdRw.first, &m_readSet); if (m_readSet.fd_count < FD_SETSIZE) { m_readSet.fd_array[m_readSet.fd_count++] = fdRw.first; } } if (fdRw.second.writeEnabled) { // FD_SET(fdRw.first, &m_writeSet); if (m_writeSet.fd_count < FD_SETSIZE) { m_writeSet.fd_array[m_writeSet.fd_count++] = fdRw.first; } } } if (m_interruptSocket == INVALID_SOCKET) { assert(m_interruptAction != IEventPoller::NoInterrupt); m_interruptSocket = createInterruptSocket(); } FD_SET(m_interruptSocket, &m_exceptSet); struct timeval tv; struct timeval *tvPointer = nullptr; if (timeout >= 0) { tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout % 1000) * 1000; tvPointer = &tv; } // select! int numEvents = select(0, &m_readSet, &m_writeSet, &m_exceptSet, tvPointer); if (numEvents == -1) { std::cerr << "Error code is " << WSAGetLastError() << " and except set has " << m_exceptSet.fd_count << " elements.\n"; } // check for interruption ret = m_interruptAction; if (ret != IEventPoller::NoInterrupt) { assert(m_interruptSocket == INVALID_SOCKET); m_interruptAction = IEventPoller::NoInterrupt; m_interruptSocket = createInterruptSocket(); // re-arm //numEvents--; // 1) We got here because the interrupt socket's exceptfd became active. // 2) However, fds in the except set don't count as "sockets /ready/" for // the return value of select(). return ret; } // dispatch reads and writes if (numEvents < 0) { // TODO error handling ? } // This being Windows-specfic code, and with Windows's famous binary compatibility, we may // as well exploit that the Windows fd_set struct allows for relatively efficient querying // if you just iterate over its internal list, instead of searching the list for each file // descriptor like with FD_ISSET. // numEvents -= m_readSet.fd_count + m_writeSet.fd_count; for (uint i = 0; i < m_readSet.fd_count; i++) { - EventDispatcherPrivate::get(m_dispatcher)->notifyClientForReading(m_readSet.fd_array[i]); + EventDispatcherPrivate::get(m_dispatcher)->notifyListenerForReading(m_readSet.fd_array[i]); } for (uint i = 0; i < m_writeSet.fd_count; i++) { - EventDispatcherPrivate::get(m_dispatcher)->notifyClientForWriting(m_writeSet.fd_array[i]); + EventDispatcherPrivate::get(m_dispatcher)->notifyListenerForWriting(m_writeSet.fd_array[i]); } return ret; } void SelectEventPoller::resetFdSets() { FD_ZERO(&m_readSet); FD_ZERO(&m_writeSet); FD_ZERO(&m_exceptSet); } void SelectEventPoller::interrupt(IEventPoller::InterruptAction action) { assert(action == IEventPoller::ProcessAuxEvents || action == IEventPoller::Stop); const ULONG_PTR dwParam = action == IEventPoller::Stop ? 0x1 : 0x0; QueueUserAPC(triggerInterruptSocket, m_selectThreadHandle, dwParam); } -void SelectEventPoller::addIoEventClient(IioEventClient *ioc) +void SelectEventPoller::addIoEventListener(IioEventListener *iol) { // The main select specific part of registration is in setReadWriteInterest(). // Here we just check fd limits. if (m_fds.size() + 1 >= FD_SETSIZE) { - std::cerr << "SelectEventPoller::addIoEventClient() failed A.\n"; + std::cerr << "SelectEventPoller::addIoEventListener() failed A.\n"; // TODO error... return; } RwEnabled rw = { false, false }; - m_fds.emplace(ioc->fileDescriptor(), rw); + m_fds.emplace(iol->fileDescriptor(), rw); } -void SelectEventPoller::removeIoEventClient(IioEventClient *ioc) +void SelectEventPoller::removeIoEventListener(IioEventListener *iol) { - m_fds.erase(ioc->fileDescriptor()); + m_fds.erase(iol->fileDescriptor()); } -void SelectEventPoller::setReadWriteInterest(IioEventClient *ioc, bool read, bool write) +void SelectEventPoller::setReadWriteInterest(IioEventListener *iol, bool read, bool write) { - RwEnabled &rw = m_fds.at(ioc->fileDescriptor()); + RwEnabled &rw = m_fds.at(iol->fileDescriptor()); rw.readEnabled = read; rw.writeEnabled = write; } diff --git a/events/selecteventpoller_win32.h b/events/selecteventpoller_win32.h index b5294ea..0934a75 100644 --- a/events/selecteventpoller_win32.h +++ b/events/selecteventpoller_win32.h @@ -1,81 +1,81 @@ /* Copyright (C) 2015 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef SELECTEVENTPOLLER_H #define SELECTEVENTPOLLER_H #include "ieventpoller.h" #include #define WIN32_LEAN_AND_MEAN #include #ifdef FD_SETSIZE #error We must be able to set FD_SETSIZE - make sure that nothing else sets it! #endif #define FD_SETSIZE 1024 #include // Windows select() diverges from "proper Unix" select() just enough to seriously hurt readability // when handling the differences with ifdefs, so use a separate implementation. // Besides, the fd_set from winsock2.h is actually an array of sockets (not a bitmap like on Unix), which // can be exploited to achive poll()-like performance characteristics without dealing with the problems // that WSAPoll() has. That is currently not implemented. class SelectEventPoller : public IEventPoller { public: SelectEventPoller(EventDispatcher *dispatcher); ~SelectEventPoller(); IEventPoller::InterruptAction poll(int timeout) override; void interrupt(IEventPoller::InterruptAction) override; // reimplemented from IEventPoller - void addIoEventClient(IioEventClient *ioc) override; - void removeIoEventClient(IioEventClient *ioc) override; - void setReadWriteInterest(IioEventClient *ioc, bool read, bool write) override; + void addIoEventListener(IioEventListener *iol) override; + void removeIoEventListener(IioEventListener *iol) override; + void setReadWriteInterest(IioEventListener *iol, bool read, bool write) override; private: void notifyRead(int fd); void resetFdSets(); friend VOID CALLBACK triggerInterruptSocket(ULONG_PTR dwParam); void doInterruptSocket(bool isStop); HANDLE m_selectThreadHandle; FileDescriptor m_interruptSocket; IEventPoller::InterruptAction m_interruptAction; struct RwEnabled { bool readEnabled : 1; bool writeEnabled : 1; }; std::unordered_map m_fds; fd_set m_readSet; fd_set m_writeSet; fd_set m_exceptSet; }; #endif // SELECTEVENTPOLLER_H diff --git a/events/timer.cpp b/events/timer.cpp index eeabcd9..26acb3d 100644 --- a/events/timer.cpp +++ b/events/timer.cpp @@ -1,173 +1,173 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "timer.h" #include "eventdispatcher.h" #include "eventdispatcher_p.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "platformtime.h" #include #include #include Timer::Timer(EventDispatcher *dispatcher) : m_eventDispatcher(dispatcher), - m_completionClient(0), + m_completionListener(0), m_reentrancyGuard(0), m_interval(0), m_isRunning(false), m_isRepeating(true), m_tag(0) { } Timer::~Timer() { if (m_reentrancyGuard) { *m_reentrancyGuard = false; m_reentrancyGuard = nullptr; } if (m_isRunning) { EventDispatcherPrivate::get(m_eventDispatcher)->removeTimer(this); } } void Timer::start(int msec) { if (msec < 0) { std::cerr << "Timer::start(): interval cannot be negative!\n"; } // restart if already running EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); if (m_isRunning) { ep->removeTimer(this); } m_interval = msec; m_isRunning = true; ep->addTimer(this); } void Timer::stop() { setRunning(false); } void Timer::setRunning(bool run) { if (m_isRunning == run) { return; } m_isRunning = run; EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); if (run) { ep->addTimer(this); } else { ep->removeTimer(this); } } bool Timer::isRunning() const { return m_isRunning; } void Timer::setInterval(int msec) { if (msec < 0) { std::cerr << "Timer::setInterval(): interval cannot be negative!\n"; } if (m_interval == msec) { return; } m_interval = msec; if (m_isRunning) { EventDispatcherPrivate *const ep = EventDispatcherPrivate::get(m_eventDispatcher); ep->removeTimer(this); ep->addTimer(this); } } int Timer::interval() const { return m_interval; } void Timer::setRepeating(bool repeating) { m_isRepeating = repeating; } bool Timer::isRepeating() const { return m_isRepeating; } int Timer::remainingTime() const { if (!m_isRunning) { return -1; } uint64 currentTime = PlatformTime::monotonicMsecs(); return std::max(int((m_tag >> 10) - currentTime), 0); } void Timer::trigger() { assert(m_isRunning); if (m_reentrancyGuard) { return; } if (!m_isRepeating) { m_isRunning = false; } // ### Reentrancy is not *currently* an issue, but when we have stuff like sub event loops, // we need this. Also in similar event-driven classes - here is how I think it should be done... bool alive = true; m_reentrancyGuard = &alive; - if (m_completionClient) { - m_completionClient->handleCompletion(this); + if (m_completionListener) { + m_completionListener->handleCompletion(this); } // if we we've been destroyed, we don't touch the member variable if (alive) { assert(m_reentrancyGuard); m_reentrancyGuard = nullptr; } } -void Timer::setCompletionClient(ICompletionClient *client) +void Timer::setCompletionListener(ICompletionListener *client) { - m_completionClient = client; + m_completionListener = client; } -ICompletionClient *Timer::completionClient() const +ICompletionListener *Timer::completionClient() const { - return m_completionClient; + return m_completionListener; } EventDispatcher *Timer::eventDispatcher() const { return m_eventDispatcher; } diff --git a/events/timer.h b/events/timer.h index 0f38596..da73296 100644 --- a/events/timer.h +++ b/events/timer.h @@ -1,74 +1,74 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef TIMER_H #define TIMER_H #include "types.h" class EventDispatcher; class EventDispatcherPrivate; -class ICompletionClient; +class ICompletionListener; class DFERRY_EXPORT Timer { public: Timer(EventDispatcher *dispatcher); ~Timer(); // can't allow copying because Timers are remembered by pointer in EventDispatcher // (and since this isn't a value class, copying makes little sense) Timer(const Timer &) = delete; void operator=(const Timer &) = delete; void start(int msec); // convenience: setInterval(msec) and setRunning(true) void stop(); // convenience: setRunning(false) void setRunning(bool); bool isRunning() const; void setInterval(int msec); int interval() const; void setRepeating(bool); bool isRepeating() const; int remainingTime() const; - void setCompletionClient(ICompletionClient *client); - ICompletionClient *completionClient() const; + void setCompletionListener(ICompletionListener *client); + ICompletionListener *completionClient() const; EventDispatcher *eventDispatcher() const; private: friend class EventDispatcherPrivate; void trigger(); EventDispatcher *m_eventDispatcher; // TODO make a per-thread event dispatcher implicit? - ICompletionClient *m_completionClient; + ICompletionListener *m_completionListener; bool *m_reentrancyGuard; int m_interval; bool m_isRunning : 1; bool m_isRepeating : 1; uint32 m_reserved : sizeof(uint32) - 2; uint64 m_tag; }; #endif // TIMER_H diff --git a/serialization/message.cpp b/serialization/message.cpp index 8e2ad79..290fbcc 100644 --- a/serialization/message.cpp +++ b/serialization/message.cpp @@ -1,1258 +1,1258 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "message.h" #include "message_p.h" #include "basictypeio.h" #include "malloccache.h" #include "stringtools.h" #ifndef DFERRY_SERDES_ONLY -#include "icompletionclient.h" -#include "iconnection.h" +#include "icompletionlistener.h" +#include "itransport.h" #endif #include #include #include #include #include using namespace std; #ifdef BIGENDIAN static const byte s_thisMachineEndianness = 'b'; #else static const byte s_thisMachineEndianness = 'l'; #endif struct MsgAllocCaches { MallocCache msgPrivate; MallocCache<256, 4> msgBuffer; }; thread_local static MsgAllocCaches msgAllocCaches; static const byte s_storageForHeader[Message::UnixFdsHeader + 1] = { 0, // dummy entry: there is no enum value for 0 0xf0 | 0, // PathHeader 0xf0 | 1, // InterfaceHeader 0xf0 | 2, // MethodHeader 0xf0 | 3, // ErrorNameHeader 0 | 0, // ReplySerialHeader 0xf0 | 4, // DestinationHeader 0xf0 | 5, // SenderHeader 0xf0 | 6, // SignatureHeader 0 | 1 // UnixFdsHeader }; static bool isStringHeader(int field) { return s_storageForHeader[field] & 0xf0; } static int indexOfHeader(int field) { return s_storageForHeader[field] & ~0xf0; } static const Message::VariableHeader s_stringHeaderAtIndex[VarHeaderStorage::s_stringHeaderCount] = { Message::PathHeader, Message::InterfaceHeader, Message::MethodHeader, Message::ErrorNameHeader, Message::DestinationHeader, Message::SenderHeader, Message::SignatureHeader }; static const Message::VariableHeader s_intHeaderAtIndex[VarHeaderStorage::s_intHeaderCount] = { Message::ReplySerialHeader, Message::UnixFdsHeader }; VarHeaderStorage::VarHeaderStorage() {} // initialization values are in class declaration VarHeaderStorage::VarHeaderStorage(const VarHeaderStorage &other) { // ### very suboptimal for (int i = 0; i < Message::UnixFdsHeader + 1; i++) { Message::VariableHeader vh = static_cast(i); if (other.hasHeader(vh)) { if (isStringHeader(vh)) { setStringHeader(vh, other.stringHeader(vh)); } else { setIntHeader(vh, other.intHeader(vh)); } } } } VarHeaderStorage::~VarHeaderStorage() { for (int i = 0; i < s_stringHeaderCount; i++) { const Message::VariableHeader field = s_stringHeaderAtIndex[i]; if (hasHeader(field)) { stringHeaders()[i].~string(); } } } bool VarHeaderStorage::hasHeader(Message::VariableHeader header) const { return m_headerPresenceBitmap & (1u << header); } bool VarHeaderStorage::hasStringHeader(Message::VariableHeader header) const { return hasHeader(header) && isStringHeader(header); } bool VarHeaderStorage::hasIntHeader(Message::VariableHeader header) const { return hasHeader(header) && !isStringHeader(header); } string VarHeaderStorage::stringHeader(Message::VariableHeader header) const { return hasStringHeader(header) ? stringHeaders()[indexOfHeader(header)] : string(); } cstring VarHeaderStorage::stringHeaderRaw(Message::VariableHeader header) { // this one is supposed to be a const method in the intended use, but it is dangerous so // outwardly non-const is kind of okay as a warning cstring ret; assert(isStringHeader(header)); if (hasHeader(header)) { string &str = stringHeaders()[indexOfHeader(header)]; ret.ptr = const_cast(str.c_str()); ret.length = str.length(); } return ret; } void VarHeaderStorage::setStringHeader(Message::VariableHeader header, const string &value) { if (!isStringHeader(header)) { return; } const int idx = indexOfHeader(header); if (hasHeader(header)) { stringHeaders()[idx] = value; } else { m_headerPresenceBitmap |= 1u << header; new(stringHeaders() + idx) string(value); } } bool VarHeaderStorage::setStringHeader_deser(Message::VariableHeader header, cstring value) { assert(isStringHeader(header)); if (hasHeader(header)) { return false; } m_headerPresenceBitmap |= 1u << header; new(stringHeaders() + indexOfHeader(header)) string(value.ptr, value.length); return true; } void VarHeaderStorage::clearStringHeader(Message::VariableHeader header) { if (!isStringHeader(header)) { return; } if (hasHeader(header)) { m_headerPresenceBitmap &= ~(1u << header); stringHeaders()[indexOfHeader(header)].~string(); } } uint32 VarHeaderStorage::intHeader(Message::VariableHeader header) const { return hasIntHeader(header) ? m_intHeaders[indexOfHeader(header)] : 0; } void VarHeaderStorage::setIntHeader(Message::VariableHeader header, uint32 value) { if (isStringHeader(header)) { return; } m_headerPresenceBitmap |= 1u << header; m_intHeaders[indexOfHeader(header)] = value; } bool VarHeaderStorage::setIntHeader_deser(Message::VariableHeader header, uint32 value) { assert(!isStringHeader(header)); if (hasHeader(header)) { return false; } m_headerPresenceBitmap |= 1u << header; m_intHeaders[indexOfHeader(header)] = value; return true; } void VarHeaderStorage::clearIntHeader(Message::VariableHeader header) { if (isStringHeader(header)) { return; } m_headerPresenceBitmap &= ~(1u << header); } // TODO think of copying signature from and to output! MessagePrivate::MessagePrivate(Message *parent) : m_message(parent), m_bufferPos(0), m_isByteSwapped(false), m_state(Empty), m_messageType(Message::InvalidMessage), m_flags(0), m_protocolVersion(1), m_dirty(true), m_headerLength(0), m_headerPadding(0), m_bodyLength(0), m_serial(0) {} MessagePrivate::MessagePrivate(const MessagePrivate &other, Message *parent) : m_message(parent), m_bufferPos(other.m_bufferPos), m_isByteSwapped(other.m_isByteSwapped), m_state(other.m_state), m_messageType(other.m_messageType), m_flags(other.m_flags), m_protocolVersion(other.m_protocolVersion), m_dirty(other.m_dirty), m_headerLength(other.m_headerLength), m_headerPadding(other.m_headerPadding), m_bodyLength(other.m_bodyLength), m_serial(other.m_serial), m_error(other.m_error), m_mainArguments(other.m_mainArguments), m_varHeaders(other.m_varHeaders) { if (other.m_buffer.ptr) { // we don't keep pointers into the buffer (only indexes), right? right? m_buffer.ptr = static_cast(malloc(other.m_buffer.length)); m_buffer.length = other.m_buffer.length; // Simplification: don't try to figure out which part of other.m_buffer contains "valid" data, // just copy everything. memcpy(m_buffer.ptr, other.m_buffer.ptr, other.m_buffer.length); } else { assert(!m_buffer.length); } // ### Maybe warn when copying a Message which is currently (de)serializing. It might even be impossible // to do that from client code. If that is the case, the "warning" could even be an assertion because // we should never do such a thing. } MessagePrivate::~MessagePrivate() { clearBuffer(); } Message::Message() : d(new(msgAllocCaches.msgPrivate.allocate()) MessagePrivate(this)) { } Message::Message(Message &&other) : d(other.d) { other.d = nullptr; d->m_message = this; } Message &Message::operator=(Message &&other) { if (this != &other) { if (d) { d->~MessagePrivate(); msgAllocCaches.msgPrivate.free(d); } d = other.d; if (other.d) { other.d = nullptr; d->m_message = this; } } return *this; } Message::Message(const Message &other) : d(nullptr) { if (!other.d) { return; } d = new(msgAllocCaches.msgPrivate.allocate()) MessagePrivate(*other.d, this); } Message &Message::operator=(const Message &other) { if (this != &other) { if (d) { d->~MessagePrivate(); msgAllocCaches.msgPrivate.free(d); } if (other.d) { // ### can be optimized by implementing and using assignment of MessagePrivate d = new(msgAllocCaches.msgPrivate.allocate()) MessagePrivate(*other.d, this); } else { d = nullptr; } } return *this; } Message::~Message() { if (d) { d->~MessagePrivate(); msgAllocCaches.msgPrivate.free(d); d = nullptr; } } Error Message::error() const { return d->m_error; } void Message::setCall(const string &path, const string &interface, const string &method) { setType(MethodCallMessage); setPath(path); setInterface(interface); setMethod(method); } void Message::setCall(const string &path, const string &method) { setType(MethodCallMessage); setPath(path); setMethod(method); } void Message::setReplyTo(const Message &call) { setType(MethodReturnMessage); setDestination(call.sender()); setReplySerial(call.serial()); } void Message::setErrorReplyTo(const Message &call, const string &errorName) { setType(ErrorMessage); setErrorName(errorName); setDestination(call.sender()); setReplySerial(call.serial()); } void Message::setSignal(const string &path, const string &interface, const string &method) { setType(SignalMessage); setPath(path); setInterface(interface); setMethod(method); } Message Message::createCall(const string &path, const string &interface, const string &method) { Message ret; ret.setCall(path, interface, method); return ret; } Message Message::createCall(const string &path, const string &method) { Message ret; ret.setCall(path, method); return ret; } Message Message::createReplyTo(const Message &call) { Message ret; ret.setReplyTo(call); return ret; } Message Message::createErrorReplyTo(const Message &call, const string &errorName) { Message ret; ret.setErrorReplyTo(call, errorName); return ret; } Message Message::createSignal(const string &path, const string &interface, const string &method) { Message ret; ret.setSignal(path, interface, method); return ret; } struct VarHeaderPrinter { Message::VariableHeader field; const char *name; }; static const int stringHeadersCount = 7; static VarHeaderPrinter stringHeaderPrinters[stringHeadersCount] = { { Message::PathHeader, "path" }, { Message::InterfaceHeader, "interface" }, { Message::MethodHeader, "method" }, { Message::ErrorNameHeader, "error name" }, { Message::DestinationHeader, "destination" }, { Message::SenderHeader, "sender" }, { Message::SignatureHeader, "signature" } }; static const int intHeadersCount = 2; static VarHeaderPrinter intHeaderPrinters[intHeadersCount] = { { Message::ReplySerialHeader, "reply serial" }, { Message::UnixFdsHeader, "#unix fds" } }; static const int messageTypeCount = 5; static const char *printableMessageTypes[messageTypeCount] = { "", // handled in code "Method call", "Method return", "Method error return", "Signal" }; string Message::prettyPrint() const { string ret; if (d->m_messageType >= 1 && d->m_messageType < messageTypeCount) { ret += printableMessageTypes[d->m_messageType]; } else { return string("Invalid message.\n"); } ostringstream os; for (int i = 0; i < stringHeadersCount; i++ ) { bool isPresent = false; string str = stringHeader(stringHeaderPrinters[i].field, &isPresent); if (isPresent) { os << "; " << stringHeaderPrinters[i].name << ": \"" << str << '"'; } } for (int i = 0; i < intHeadersCount; i++ ) { bool isPresent = false; uint32 intValue = intHeader(intHeaderPrinters[i].field, &isPresent); if (isPresent) { os << "; " << intHeaderPrinters[i].name << ": " << intValue; } } ret += os.str(); ret += '\n'; ret += d->m_mainArguments.prettyPrint(); return ret; } Message::Type Message::type() const { return d->m_messageType; } void Message::setType(Type type) { if (d->m_messageType == type) { return; } d->m_dirty = true; d->m_messageType = type; setExpectsReply(d->m_messageType == MethodCallMessage); } uint32 Message::protocolVersion() const { return d->m_protocolVersion; } void Message::setSerial(uint32 serial) { d->m_serial = serial; } uint32 Message::serial() const { return d->m_serial; } std::string Message::path() const { return stringHeader(PathHeader, 0); } void Message::setPath(const std::string &path) { setStringHeader(PathHeader, path); } std::string Message::interface() const { return stringHeader(InterfaceHeader, 0); } void Message::setInterface(const std::string &interface) { setStringHeader(InterfaceHeader, interface); } std::string Message::method() const { return stringHeader(MethodHeader, 0); } void Message::setMethod(const std::string &method) { setStringHeader(MethodHeader, method); } std::string Message::errorName() const { return stringHeader(ErrorNameHeader, 0); } void Message::setErrorName(const std::string &errorName) { setStringHeader(ErrorNameHeader, errorName); } uint32 Message::replySerial() const { return intHeader(ReplySerialHeader, 0); } void Message::setReplySerial(uint32 replySerial) { setIntHeader(ReplySerialHeader, replySerial); } std::string Message::destination() const { return stringHeader(DestinationHeader, 0); } void Message::setDestination(const std::string &destination) { setStringHeader(DestinationHeader, destination); } std::string Message::sender() const { return stringHeader(SenderHeader, 0); } void Message::setSender(const std::string &sender) { setStringHeader(SenderHeader, sender); } std::string Message::signature() const { return stringHeader(SignatureHeader, 0); } uint32 Message::unixFdCount() const { return intHeader(UnixFdsHeader, 0); } void Message::setUnixFdCount(uint32 fdCount) { setIntHeader(UnixFdsHeader, fdCount); } string Message::stringHeader(VariableHeader header, bool *isPresent) const { const bool exists = d->m_varHeaders.hasStringHeader(header); if (isPresent) { *isPresent = exists; } return exists ? d->m_varHeaders.stringHeader(header) : string(); } void Message::setStringHeader(VariableHeader header, const string &value) { if (header == SignatureHeader) { // ### warning? - this is a public method, and setting the signature separately does not make sense return; } d->m_dirty = true; d->m_varHeaders.setStringHeader(header, value); } uint32 Message::intHeader(VariableHeader header, bool *isPresent) const { const bool exists = d->m_varHeaders.hasIntHeader(header); if (isPresent) { *isPresent = exists; } return d->m_varHeaders.intHeader(header); } void Message::setIntHeader(VariableHeader header, uint32 value) { d->m_dirty = true; d->m_varHeaders.setIntHeader(header, value); } bool Message::expectsReply() const { return (d->m_flags & MessagePrivate::NoReplyExpectedFlag) == 0; } void Message::setExpectsReply(bool expectsReply) { if (expectsReply) { d->m_flags &= ~MessagePrivate::NoReplyExpectedFlag; } else { d->m_flags |= MessagePrivate::NoReplyExpectedFlag; } } void Message::setArguments(Arguments arguments) { d->m_dirty = true; d->m_error = arguments.error(); d->m_mainArguments = std::move(arguments); cstring signature = d->m_mainArguments.signature(); if (signature.length) { d->m_varHeaders.setStringHeader(Message::SignatureHeader, toStdString(signature)); } else { d->m_varHeaders.clearStringHeader(Message::SignatureHeader); } } const Arguments &Message::arguments() const { return d->m_mainArguments; } static const uint32 s_properFixedHeaderLength = 12; static const uint32 s_extendedFixedHeaderLength = 16; #ifndef DFERRY_SERDES_ONLY -void MessagePrivate::receive(IConnection *conn) +void MessagePrivate::receive(ITransport *transport) { if (m_state > LastSteadyState) { std::cerr << "MessagePrivate::receive() Error A.\n"; return; } - conn->addClient(this); + transport->addListener(this); setReadNotificationEnabled(true); m_state = MessagePrivate::Deserializing; m_headerLength = 0; m_bodyLength = 0; } bool Message::isReceiving() const { return d->m_state == MessagePrivate::Deserializing; } -void MessagePrivate::send(IConnection *conn) +void MessagePrivate::send(ITransport *transport) { if (!m_buffer.length && !serialize()) { std::cerr << "MessagePrivate::send() Error A.\n"; // m_error.setCode(); - // notifyCompletionClient(); would call into Transceiver, but it's easier for Transceiver to handle - // the error from non-callback code, directly in the caller of send(). + // notifyCompletionListener(); would call into Transceiver, but it's easier for Transceiver to handle + // the error from non-callback code, directly in the caller of send(). return; } if (m_state > MessagePrivate::LastSteadyState) { std::cerr << "MessagePrivate::send() Error B.\n"; // TODO error feedback return; } - conn->addClient(this); + transport->addListener(this); setWriteNotificationEnabled(true); m_state = MessagePrivate::Serializing; } bool Message::isSending() const { return d->m_state == MessagePrivate::Serializing; } -void MessagePrivate::setCompletionClient(ICompletionClient *client) +void MessagePrivate::setCompletionListener(ICompletionListener *listener) { - m_completionClient = client; + m_completionListener = listener; } -void MessagePrivate::notifyCompletionClient() +void MessagePrivate::notifyCompletionListener() { - if (m_completionClient) { - m_completionClient->handleCompletion(m_message); + if (m_completionListener) { + m_completionListener->handleCompletion(m_message); } } -void MessagePrivate::handleConnectionCanRead() +void MessagePrivate::handleTransportCanRead() { if (m_state != Deserializing) { return; } bool isError = false; chunk in; do { uint32 readMax = 0; if (!m_headerLength) { // the message might only consist of the header, so we must be careful to avoid reading // data meant for the next message readMax = s_extendedFixedHeaderLength - m_bufferPos; } else { // reading variable headers and/or body readMax = m_headerLength + m_bodyLength - m_bufferPos; } reserveBuffer(m_bufferPos + readMax); const bool headersDone = m_headerLength > 0 && m_bufferPos >= m_headerLength; if (m_bufferPos == 0) { // File descriptors should arrive only with the first byte - in = connection()->readWithFileDescriptors(m_buffer.ptr + m_bufferPos, readMax, + in = transport()->readWithFileDescriptors(m_buffer.ptr + m_bufferPos, readMax, &m_fileDescriptors); } else { - in = connection()->read(m_buffer.ptr + m_bufferPos, readMax); + in = transport()->read(m_buffer.ptr + m_bufferPos, readMax); } m_bufferPos += in.length; assert(m_bufferPos <= m_buffer.length); if (!headersDone) { if (m_headerLength == 0 && m_bufferPos >= s_extendedFixedHeaderLength) { if (!deserializeFixedHeaders()) { isError = true; break; } } if (m_headerLength > 0 && m_bufferPos >= m_headerLength) { if (!deserializeVariableHeaders()) { isError = true; break; } } } if (m_headerLength > 0 && m_bufferPos >= m_headerLength + m_bodyLength) { // all done! assert(m_bufferPos == m_headerLength + m_bodyLength); setReadNotificationEnabled(false); m_state = Deserialized; chunk bodyData(m_buffer.ptr + m_headerLength, m_bodyLength); m_mainArguments = Arguments(nullptr, m_varHeaders.stringHeaderRaw(Message::SignatureHeader), bodyData, std::move(m_fileDescriptors), m_isByteSwapped); m_fileDescriptors.clear(); // put it into a well-defined state assert(!isError); - connection()->removeClient(this); - notifyCompletionClient(); // do not access members after this because it might delete us! + transport()->removeListener(this); + notifyCompletionListener(); // do not access members after this because it might delete us! break; } - if (!connection()->isOpen()) { + if (!transport()->isOpen()) { isError = true; break; } } while (in.length); if (isError) { setReadNotificationEnabled(false); m_state = Empty; clearBuffer(); - connection()->removeClient(this); - notifyCompletionClient(); + transport()->removeListener(this); + notifyCompletionListener(); // TODO reset other data members, generally revisit error handling to make it robust } } -void MessagePrivate::handleConnectionCanWrite() +void MessagePrivate::handleTransportCanWrite() { if (m_state != Serializing) { return; } while (true) { assert(m_buffer.length >= m_bufferPos); const uint32 toWrite = m_buffer.length - m_bufferPos; if (!toWrite) { setWriteNotificationEnabled(false); m_state = Serialized; clearBuffer(); - connection()->removeClient(this); - assert(connection() == nullptr); - notifyCompletionClient(); + transport()->removeListener(this); + assert(transport() == nullptr); + notifyCompletionListener(); break; } uint32 written = 0; if (m_bufferPos == 0) { - written = connection()->writeWithFileDescriptors(chunk(m_buffer.ptr + m_bufferPos, toWrite), + written = transport()->writeWithFileDescriptors(chunk(m_buffer.ptr + m_bufferPos, toWrite), m_mainArguments.fileDescriptors()); } else { - written = connection()->write(chunk(m_buffer.ptr + m_bufferPos, toWrite)); + written = transport()->write(chunk(m_buffer.ptr + m_bufferPos, toWrite)); } if (written <= 0) { // TODO error handling break; } m_bufferPos += written; } } #endif // !DFERRY_SERDES_ONLY chunk Message::serializeAndView() { chunk ret; // one return variable to enable return value optimization (RVO) in gcc if (d->m_state > MessagePrivate::LastSteadyState) { return ret; } if (!d->m_buffer.length && !d->serialize()) { // TODO report error? return ret; } ret = d->m_buffer; return ret; } std::vector Message::save() { vector ret; if (d->m_state > MessagePrivate::LastSteadyState) { return ret; } if (!d->m_buffer.length && !d->serialize()) { return ret; } ret.reserve(d->m_buffer.length); for (uint32 i = 0; i < d->m_buffer.length; i++) { ret.push_back(d->m_buffer.ptr[i]); } return ret; } void Message::deserializeAndTake(chunk memOwnership) { if (d->m_state > MessagePrivate::LastSteadyState) { free(memOwnership.ptr); return; } d->m_headerLength = 0; d->m_bodyLength = 0; d->clearBuffer(); d->m_buffer = memOwnership; d->m_bufferPos = d->m_buffer.length; bool ok = d->m_buffer.length >= s_extendedFixedHeaderLength; ok = ok && d->deserializeFixedHeaders(); ok = ok && d->m_buffer.length >= d->m_headerLength; ok = ok && d->deserializeVariableHeaders(); ok = ok && d->m_buffer.length == d->m_headerLength + d->m_bodyLength; if (!ok) { d->m_state = MessagePrivate::Empty; d->clearBuffer(); return; } chunk bodyData(d->m_buffer.ptr + d->m_headerLength, d->m_bodyLength); d->m_mainArguments = Arguments(nullptr, d->m_varHeaders.stringHeaderRaw(SignatureHeader), bodyData, d->m_isByteSwapped); d->m_state = MessagePrivate::Deserialized; } // This does not return bool because full validation of the main arguments would take quite // a few cycles. Validating only the header of the message doesn't seem to be worth it. void Message::load(const std::vector &data) { if (d->m_state > MessagePrivate::LastSteadyState || data.empty()) { return; } chunk buf; buf.length = data.size(); buf.ptr = reinterpret_cast(malloc(buf.length)); deserializeAndTake(buf); } bool MessagePrivate::requiredHeadersPresent() { m_error = checkRequiredHeaders(); return !m_error.isError(); } Error MessagePrivate::checkRequiredHeaders() const { if (m_serial == 0) { return Error::MessageSerial; } if (m_protocolVersion != 1) { return Error::MessageProtocolVersion; } - // might want to check for DestinationHeader if the connection is a bus (not peer-to-peer) + // might want to check for DestinationHeader if the transport is a bus (not peer-to-peer) // very strange that this isn't in the spec! switch (m_messageType) { case Message::SignalMessage: // required: PathHeader, InterfaceHeader, MethodHeader if (!m_varHeaders.hasStringHeader(Message::InterfaceHeader)) { return Error::MessageInterface; } // fall through case Message::MethodCallMessage: // required: PathHeader, MethodHeader if (!m_varHeaders.hasStringHeader(Message::PathHeader)) { return Error::MessagePath; } if (!m_varHeaders.hasStringHeader(Message::MethodHeader)) { return Error::MessageMethod; } break; case Message::ErrorMessage: // required: ErrorNameHeader, ReplySerialHeader if (!m_varHeaders.hasStringHeader(Message::ErrorNameHeader)) { return Error::MessageErrorName; } // fall through case Message::MethodReturnMessage: // required: ReplySerialHeader if (!m_varHeaders.hasIntHeader(Message::ReplySerialHeader) ) { return Error::MessageReplySerial; } break; case Message::InvalidMessage: default: return Error::MessageType; } return Error::NoError; } bool MessagePrivate::deserializeFixedHeaders() { assert(m_bufferPos >= s_extendedFixedHeaderLength); byte *p = m_buffer.ptr; byte endianness = *p++; if (endianness != 'l' && endianness != 'B') { return false; } m_isByteSwapped = endianness != s_thisMachineEndianness; // TODO validate the values read here m_messageType = static_cast(*p++); m_flags = *p++; m_protocolVersion = *p++; m_bodyLength = basic::readUint32(p, m_isByteSwapped); m_serial = basic::readUint32(p + sizeof(uint32), m_isByteSwapped); // peek into the var-length header and use knowledge about array serialization to infer the // number of bytes still required for the header uint32 varArrayLength = basic::readUint32(p + 2 * sizeof(uint32), m_isByteSwapped); uint32 unpaddedHeaderLength = s_extendedFixedHeaderLength + varArrayLength; m_headerLength = align(unpaddedHeaderLength, 8); m_headerPadding = m_headerLength - unpaddedHeaderLength; return m_headerLength + m_bodyLength <= Arguments::MaxMessageLength; } bool MessagePrivate::deserializeVariableHeaders() { // use Arguments to parse the variable header fields // HACK: the fake first int argument is there to start the Arguments's data 8 byte aligned byte *base = m_buffer.ptr + s_properFixedHeaderLength - sizeof(int32); chunk headerData(base, m_headerLength - m_headerPadding - s_properFixedHeaderLength + sizeof(int32)); cstring varHeadersSig("ia(yv)"); Arguments argList(nullptr, varHeadersSig, headerData, m_isByteSwapped); Arguments::Reader reader(argList); assert(reader.isValid()); if (reader.state() != Arguments::Int32) { return false; } reader.readInt32(); if (reader.state() != Arguments::BeginArray) { return false; } reader.beginArray(); while (reader.state() == Arguments::BeginStruct) { reader.beginStruct(); const byte headerField = reader.readByte(); if (headerField < Message::PathHeader || headerField > Message::UnixFdsHeader) { return false; } const Message::VariableHeader eHeader = static_cast(headerField); reader.beginVariant(); bool ok = true; // short-circuit evaluation ftw if (isStringHeader(headerField)) { if (headerField == Message::PathHeader) { ok = ok && reader.state() == Arguments::ObjectPath; ok = ok && m_varHeaders.setStringHeader_deser(eHeader, reader.readObjectPath()); } else if (headerField == Message::SignatureHeader) { ok = ok && reader.state() == Arguments::Signature; // The spec allows having no signature header, which means "empty signature". However... // We do not drop empty signature headers when deserializing, in order to preserve // the original message contents. This could be useful for debugging and testing. ok = ok && m_varHeaders.setStringHeader_deser(eHeader, reader.readSignature()); } else { ok = ok && reader.state() == Arguments::String; ok = ok && m_varHeaders.setStringHeader_deser(eHeader, reader.readString()); } } else { ok = ok && reader.state() == Arguments::Uint32; if (headerField == Message::UnixFdsHeader) { reader.readUint32(); // discard it, for now (TODO) } else { ok = ok && m_varHeaders.setIntHeader_deser(eHeader, reader.readUint32()); } } if (!ok) { return false; } reader.endVariant(); reader.endStruct(); } reader.endArray(); // check that header->body padding is in fact zero filled base = m_buffer.ptr; for (uint32 i = m_headerLength - m_headerPadding; i < m_headerLength; i++) { if (base[i] != '\0') { return false; } } return true; } bool MessagePrivate::serialize() { if (!m_dirty) { return true; } clearBuffer(); if (m_error.isError() || !requiredHeadersPresent()) { return false; } Arguments headerArgs = serializeVariableHeaders(); // we need to cut out alignment padding bytes 4 to 7 in the variable header data stream because // the original dbus code aligns based on address in the final data stream // (offset s_properFixedHeaderLength == 12), we align based on address in the Arguments's buffer // (offset 0) - note that our modification keeps the stream valid because length is measured from end // of padding assert(headerArgs.data().length > 0); // if this fails the headerLength hack will break down const uint32 unalignedHeaderLength = s_properFixedHeaderLength + headerArgs.data().length - sizeof(uint32); m_headerLength = align(unalignedHeaderLength, 8); m_bodyLength = m_mainArguments.data().length; const uint32 messageLength = m_headerLength + m_bodyLength; if (messageLength > Arguments::MaxMessageLength) { m_error.setCode(Error::ArgumentsTooLong); return false; } reserveBuffer(messageLength); serializeFixedHeaders(); // copy header data: uint32 length... memcpy(m_buffer.ptr + s_properFixedHeaderLength, headerArgs.data().ptr, sizeof(uint32)); // skip four bytes of padding and copy the rest memcpy(m_buffer.ptr + s_properFixedHeaderLength + sizeof(uint32), headerArgs.data().ptr + 2 * sizeof(uint32), headerArgs.data().length - 2 * sizeof(uint32)); // zero padding between variable headers and message body for (uint32 i = unalignedHeaderLength; i < m_headerLength; i++) { m_buffer.ptr[i] = '\0'; } // copy message body (if any - arguments are not mandatory) if (m_mainArguments.data().length) { memcpy(m_buffer.ptr + m_headerLength, m_mainArguments.data().ptr, m_mainArguments.data().length); } m_bufferPos = m_headerLength + m_mainArguments.data().length; assert(m_bufferPos <= m_buffer.length); // for the upcoming message sending, "reuse" m_bufferPos for read position (formerly write position), // and m_buffer.length for end of data to read (formerly buffer capacity) m_buffer.length = m_bufferPos; m_bufferPos = 0; m_dirty = false; return true; } void MessagePrivate::serializeFixedHeaders() { assert(m_buffer.length >= s_extendedFixedHeaderLength); byte *p = m_buffer.ptr; *p++ = s_thisMachineEndianness; *p++ = byte(m_messageType); *p++ = m_flags; *p++ = m_protocolVersion; basic::writeUint32(p, m_bodyLength); basic::writeUint32(p + sizeof(uint32), m_serial); } static void doVarHeaderPrologue(Arguments::Writer *writer, Message::VariableHeader field) { writer->beginStruct(); writer->writeByte(byte(field)); } Arguments MessagePrivate::serializeVariableHeaders() { Arguments::Writer writer; // note that we don't have to deal with empty arrays because all valid message types require // at least one of the variable headers writer.beginArray(); for (int i = 0; i < VarHeaderStorage::s_stringHeaderCount; i++) { const Message::VariableHeader field = s_stringHeaderAtIndex[i]; if (m_varHeaders.hasHeader(field)) { doVarHeaderPrologue(&writer, field); const string &str = m_varHeaders.stringHeaders()[i]; if (field == Message::PathHeader) { writer.writeVariantForMessageHeader('o'); writer.writeObjectPath(cstring(str.c_str(), str.length())); } else if (field == Message::SignatureHeader) { writer.writeVariantForMessageHeader('g'); writer.writeSignature(cstring(str.c_str(), str.length())); } else { writer.writeVariantForMessageHeader('s'); writer.writeString(cstring(str.c_str(), str.length())); } writer.fixupAfterWriteVariantForMessageHeader(); writer.endStruct(); if (unlikely(writer.error().isError())) { static const Error::Code stringHeaderErrors[VarHeaderStorage::s_stringHeaderCount] = { Error::MessagePath, Error::MessageInterface, Error::MessageMethod, Error::MessageErrorName, Error::MessageDestination, Error::MessageSender, Error::MessageSignature }; m_error.setCode(stringHeaderErrors[i]); return Arguments(); } } } for (int i = 0; i < VarHeaderStorage::s_intHeaderCount; i++) { const Message::VariableHeader field = s_intHeaderAtIndex[i]; if (m_varHeaders.hasHeader(field)) { doVarHeaderPrologue(&writer, field); writer.writeVariantForMessageHeader('u'); writer.writeUint32(m_varHeaders.m_intHeaders[i]); writer.fixupAfterWriteVariantForMessageHeader(); writer.endStruct(); } } writer.endArray(); return writer.finish(); } void MessagePrivate::clearBuffer() { if (m_buffer.ptr) { free(m_buffer.ptr); m_buffer = chunk(); m_bufferPos = 0; } else { assert(m_buffer.length == 0); assert(m_bufferPos == 0); } m_fileDescriptors.clear(); } static uint32 nextPowerOf2(uint32 x) { --x; x |= x >> 1; x |= x >> 2; x |= x >> 4; x |= x >> 8; x |= x >> 16; return ++x; } void MessagePrivate::reserveBuffer(uint32 newLen) { const uint32 oldLen = m_buffer.length; if (newLen <= oldLen) { return; } if (newLen <= 256) { assert(oldLen == 0); newLen = 256; m_buffer.ptr = reinterpret_cast(msgAllocCaches.msgBuffer.allocate()); } else { newLen = nextPowerOf2(newLen); if (oldLen == 256) { byte *newAlloc = reinterpret_cast(malloc(newLen)); memcpy(newAlloc, m_buffer.ptr, oldLen); msgAllocCaches.msgBuffer.free(m_buffer.ptr); m_buffer.ptr = newAlloc; } else { m_buffer.ptr = reinterpret_cast(realloc(m_buffer.ptr, newLen)); } } m_buffer.length = newLen; } diff --git a/serialization/message_p.h b/serialization/message_p.h index abe4e5b..fda66b2 100644 --- a/serialization/message_p.h +++ b/serialization/message_p.h @@ -1,147 +1,147 @@ /* Copyright (C) 2014 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #ifndef MESSAGE_P_H #define MESSAGE_P_H #include "message.h" #include "arguments.h" #include "error.h" -#include "iconnectionclient.h" +#include "itransportlistener.h" #include -class ICompletionClient; +class ICompletionListener; class VarHeaderStorage { public: VarHeaderStorage(); VarHeaderStorage(const VarHeaderStorage &other); ~VarHeaderStorage(); bool hasHeader(Message::VariableHeader header) const; bool hasStringHeader(Message::VariableHeader header) const; std::string stringHeader(Message::VariableHeader header) const; cstring stringHeaderRaw(Message::VariableHeader header); void setStringHeader(Message::VariableHeader header, const std::string &value); void clearStringHeader(Message::VariableHeader header); bool hasIntHeader(Message::VariableHeader header) const; uint32 intHeader(Message::VariableHeader header) const; void setIntHeader(Message::VariableHeader header, uint32 value); void clearIntHeader(Message::VariableHeader header); // for use during header deserialization: returns false if a header occurs twice, // but does not check if the given header is of the right type (int / string). bool setIntHeader_deser(Message::VariableHeader header, uint32 value); bool setStringHeader_deser(Message::VariableHeader header, cstring value); const std::string *stringHeaders() const { return reinterpret_cast(m_stringStorage); } std::string *stringHeaders() { return reinterpret_cast(m_stringStorage); } static const int s_stringHeaderCount = 7; static const int s_intHeaderCount = 2; // Uninitialized storage for strings, to avoid con/destructing strings we'd never touch otherwise. std::aligned_storage::type m_stringStorage[VarHeaderStorage::s_stringHeaderCount]; uint32 m_intHeaders[s_intHeaderCount]; uint32 m_headerPresenceBitmap = 0; }; -class MessagePrivate : public IConnectionClient +class MessagePrivate : public ITransportListener { public: static MessagePrivate *get(Message *m) { return m->d; } MessagePrivate(Message *parent); MessagePrivate(const MessagePrivate &other, Message *parent); ~MessagePrivate(); - void handleConnectionCanRead() override; - void handleConnectionCanWrite() override; + void handleTransportCanRead() override; + void handleTransportCanWrite() override; - // IConnection is non-public API, so these make no sense in the public interface - void receive(IConnection *connection); // fills in this message from connection - void send(IConnection *connection); // sends this message over connection + // ITransport is non-public API, so these make no sense in the public interface + void receive(ITransport *transport); // fills in this message from transport + void send(ITransport *transport); // sends this message over transport // for receive or send completion (it should be clear which because receiving and sending can't // happen simultaneously) - void setCompletionClient(ICompletionClient *client); + void setCompletionListener(ICompletionListener *listener); bool requiredHeadersPresent(); Error checkRequiredHeaders() const; bool deserializeFixedHeaders(); bool deserializeVariableHeaders(); bool serialize(); void serializeFixedHeaders(); Arguments serializeVariableHeaders(); void clearBuffer(); void reserveBuffer(uint32 newSize); - void notifyCompletionClient(); + void notifyCompletionListener(); Message *m_message; chunk m_buffer; uint32 m_bufferPos; std::vector m_fileDescriptors; bool m_isByteSwapped; enum { // ### we don't have an error state, the need hasn't arisen yet. strange! Empty = 0, Serialized, Deserialized, LastSteadyState = Deserialized, Serializing, Deserializing } m_state; Message::Type m_messageType; enum { NoReplyExpectedFlag = 1, NoAutoStartFlag = 2 }; byte m_flags; byte m_protocolVersion; bool m_dirty : 1; uint32 m_headerLength; uint32 m_headerPadding; uint32 m_bodyLength; uint32 m_serial; Error m_error; Arguments m_mainArguments; VarHeaderStorage m_varHeaders; - ICompletionClient *m_completionClient; + ICompletionListener *m_completionListener; }; #endif // MESSAGE_P_H diff --git a/tests/events/tst_timer_slow.cpp b/tests/events/tst_timer_slow.cpp index 310c390..8eeae64 100644 --- a/tests/events/tst_timer_slow.cpp +++ b/tests/events/tst_timer_slow.cpp @@ -1,450 +1,450 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ #include "eventdispatcher.h" -#include "icompletionclient.h" +#include "icompletionlistener.h" #include "platformtime.h" #include "timer.h" #include "../testutil.h" #include #include -class BamPrinter : public ICompletionClient +class BamPrinter : public ICompletionListener { public: BamPrinter(const char *customMessage, uint64 startTime) : m_customMessage(customMessage), m_startTime(startTime) {} void handleCompletion(void *task) override { uint64 timeDiff = PlatformTime::monotonicMsecs() - m_startTime; std::cout << "BAM " << task << ' ' << timeDiff << ' ' << m_customMessage << " #" << m_counter++ << '\n'; } const char *m_customMessage; uint64 m_startTime; int m_counter = 0; }; // supposed to print some output to prove timers are working, and not crash :) static void testBasic() { EventDispatcher dispatcher; uint64 baseTime = PlatformTime::monotonicMsecs(); const char *customMessage1 = "Hello, world 1!"; BamPrinter printer1(customMessage1, baseTime); Timer t(&dispatcher); - t.setCompletionClient(&printer1); + t.setCompletionListener(&printer1); t.setInterval(231); t.setRunning(true); const char *customMessage2 = "Hello, world 2!"; BamPrinter printer2(customMessage2, baseTime); Timer t2(&dispatcher); - t2.setCompletionClient(&printer2); + t2.setCompletionListener(&printer2); t2.setInterval(100); t2.setRunning(true); const char *customMessage3 = "Hello, other world!"; int booCounter = 0; CompletionFunc booPrinter([baseTime, customMessage3, &booCounter, &dispatcher, &t] (void *task) { uint64 timeDiff = PlatformTime::monotonicMsecs() - baseTime; std::cout << "boo " << task << ' ' << timeDiff << ' ' << customMessage3 << " #" << booCounter << " - Timer 1 remaining time: " << t.remainingTime() << '\n'; if (booCounter >= 4) { dispatcher.interrupt(); } booCounter++; }); Timer t3(&dispatcher); - t3.setCompletionClient(&booPrinter); + t3.setCompletionListener(&booPrinter); t3.setInterval(420); t3.setRunning(true); while (dispatcher.poll()) { } } -class AccuracyTester : public ICompletionClient +class AccuracyTester : public ICompletionListener { public: AccuracyTester() : m_lastTriggerTime(PlatformTime::monotonicMsecs()) {} void handleCompletion(void *task) override { Timer *timer = reinterpret_cast(task); uint64 currentTime = PlatformTime::monotonicMsecs(); int timeDiff = int64(currentTime) - int64(m_lastTriggerTime); m_lastTriggerTime = currentTime; std::cout << timer->interval() << ' ' << timeDiff << std::endl; TEST(std::abs(timeDiff - timer->interval()) < 5); m_count++; TEST(m_count < 26); // event loop should have stopped right at 25 if (m_count == 25) { timer->eventDispatcher()->interrupt(); } } uint64 m_lastTriggerTime; uint m_count = 0; }; static void testAccuracy() { // this test is likely to fail spuriously on a machine under load EventDispatcher dispatcher; AccuracyTester at1; Timer t1(&dispatcher); - t1.setCompletionClient(&at1); + t1.setCompletionListener(&at1); t1.setInterval(225); t1.setRunning(true); AccuracyTester at2; Timer t2(&dispatcher); - t2.setCompletionClient(&at2); + t2.setCompletionListener(&at2); t2.setInterval(42); t2.setRunning(true); while (dispatcher.poll()) { } } // this not only bounds how long the dispatcher runs, it also creates another timer to make the // situation more interesting -class EventDispatcherInterruptor : public ICompletionClient +class EventDispatcherInterruptor : public ICompletionListener { public: EventDispatcherInterruptor(EventDispatcher *ed, int timeout) : m_ttl(ed) { m_ttl.setInterval(timeout); - m_ttl.setCompletionClient(this); + m_ttl.setCompletionListener(this); m_ttl.setRunning(true); } void handleCompletion(void * /*task*/) override { m_ttl.eventDispatcher()->interrupt(); m_ttl.setRunning(false); } Timer m_ttl; }; static void testDeleteInTrigger() { EventDispatcher dispatcher; bool alreadyCalled = false; CompletionFunc deleter([&alreadyCalled] (void *task) { TEST(!alreadyCalled); alreadyCalled = true; Timer *timer = reinterpret_cast(task); delete timer; }); Timer *t1 = new Timer(&dispatcher); - t1->setCompletionClient(&deleter); + t1->setCompletionListener(&deleter); t1->setRunning(true); EventDispatcherInterruptor interruptor(&dispatcher, 50); while (dispatcher.poll()) { } } static void testAddInTrigger() { // A timer added from the callback of another timer should not trigger in the same event loop // iteration, otherwise there could be an (accidental or intended) infinite cascade of zero interval // timers adding zero interval timers // since this test has a (small) false negative (note: negative == no problem found) rate - if // the current millisecond changes at certain points, it can mask a problem - just run it a couple // of times... for (int i = 0; i < 5; i++) { EventDispatcher dispatcher; int dispatchCounter = 0; int t2Counter = 0; CompletionFunc iterChecker([&dispatchCounter, &t2Counter] (void * /*task*/) { TEST(dispatchCounter > 0); t2Counter++; }); Timer t1(&dispatcher); Timer *t2 = nullptr; CompletionFunc adder([&dispatcher, &t2, &iterChecker] (void * /*task*/) { if (!t2) { t2 = new Timer(&dispatcher); - t2->setCompletionClient(&iterChecker); + t2->setCompletionListener(&iterChecker); t2->setRunning(true); // this could go wrong because we manipulate the due time in EventDispatcher::addTimer(), // but should be caught in Timer::remainingTime() TEST(t2->remainingTime() == 0); } }); t1.setInterval(10); t1.setRunning(true); - t1.setCompletionClient(&adder); + t1.setCompletionListener(&adder); EventDispatcherInterruptor interruptor(&dispatcher, 50); while (dispatcher.poll()) { dispatchCounter++; } TEST(t2Counter > 1); delete t2; } } static void testReAddInTrigger() { // - Add a timer // - Remove it // - Remove it, then add it // - Remove, add, remove // - Remove, add, remove, add // - Check timer's isRunning() considering whether last action was add or remove // - Check if the timer triggers next time or not, consistent with previous point // Repeat the tests that include re-adding with "pointer aliased" timers, i.e. add a new timer created // at the same memory location as the old one. That tests whether a known difficulty of the chosen // implementation is handled correctly. // Use the array to ensure we have pointer aliasing or no pointer aliasing std::aligned_storage::type timerStorage[2]; memset(timerStorage, 0, sizeof(timerStorage)); Timer *const timerArray = reinterpret_cast(timerStorage); for (int i = 0; i < 2; i++) { const bool withAliasing = i == 1; for (int j = 0; j < 5; j++) { // j = number of add / remove ops EventDispatcher dispatcher; Timer *t = &timerArray[0]; bool removeTimer = false; bool checkTrigger = false; bool didTrigger = false; CompletionFunc addRemove([&] (void * /*task*/) { if (checkTrigger) { didTrigger = true; return; } for (int k = 0; k < j; k++) { removeTimer = (k & 1) == 0; if (removeTimer) { TEST(t->isRunning()); t->~Timer(); memset(t, 0, sizeof(Timer)); // ensure that it can't trigger - of course if Timer // relies on that we should find it in valgrind... } else { if (!withAliasing) { if (t == &timerArray[0]) { t = &timerArray[1]; } else { t = &timerArray[0]; } } new(t) Timer(&dispatcher); - t->setCompletionClient(&addRemove); + t->setCompletionListener(&addRemove); t->start(0); TEST(t->isRunning()); } } }); Timer dummy1(&dispatcher); dummy1.start(0); new(t) Timer(&dispatcher); t->start(0); Timer dummy2(&dispatcher); dummy2.start(0); dispatcher.poll(); // this seems like a good idea for the test... // run and test the add / remove sequence - t->setCompletionClient(&addRemove); + t->setCompletionListener(&addRemove); dispatcher.poll(); // Test that the timer triggers when it should. Triggering when it should not will likely // cause a segfault or other error because the Timer's memory has been cleared. checkTrigger = true; dispatcher.poll(); TEST(didTrigger != removeTimer); // clean up if (!removeTimer) { t->~Timer(); } memset(timerStorage, 0, sizeof(timerStorage)); } } } // Test that all 0 msec timers trigger equally often regardless how long their triggered handler takes static void testTriggerOnlyOncePerDispatch() { EventDispatcher dispatcher; int dispatchCounter = 0; int triggerCounter1 = 0; int triggerCounter2 = 0; int hardWorkCounter = 0; Timer counter1Timer(&dispatcher); counter1Timer.setRunning(true); Timer hardWorkTimer(&dispatcher); hardWorkTimer.setRunning(true); Timer counter2Timer(&dispatcher); counter2Timer.setRunning(true); CompletionFunc countTriggers([&triggerCounter1, &triggerCounter2, &dispatchCounter, &counter1Timer, &counter2Timer] (void *task) { if (task == &counter1Timer) { TEST(triggerCounter1 == dispatchCounter); triggerCounter1++; } else { TEST(task == &counter2Timer); TEST(triggerCounter2 == dispatchCounter); triggerCounter2++; } }); - counter1Timer.setCompletionClient(&countTriggers); - counter2Timer.setCompletionClient(&countTriggers); + counter1Timer.setCompletionListener(&countTriggers); + counter2Timer.setCompletionListener(&countTriggers); CompletionFunc hardWorker([&hardWorkCounter, &dispatchCounter] (void * /*task*/) { TEST(hardWorkCounter == dispatchCounter); uint64 startTime = PlatformTime::monotonicMsecs(); // waste ten milliseconds, trying not to spend all time in PlatformTime::monotonicMsecs() do { for (volatile int i = 0; i < 20000; i++) {} } while (PlatformTime::monotonicMsecs() < startTime + 10); hardWorkCounter++; }); - hardWorkTimer.setCompletionClient(&hardWorker); + hardWorkTimer.setCompletionListener(&hardWorker); EventDispatcherInterruptor interruptor(&dispatcher, 200); while (dispatcher.poll()) { dispatchCounter++; } TEST(triggerCounter1 == dispatchCounter || triggerCounter1 == dispatchCounter - 1); TEST(triggerCounter2 == dispatchCounter || triggerCounter2 == dispatchCounter - 1); TEST(hardWorkCounter == dispatchCounter || hardWorkCounter == dispatchCounter - 1); } static void testReEnableNonRepeatingInTrigger() { EventDispatcher dispatcher; int slowCounter = 0; CompletionFunc slowReEnabler([&slowCounter] (void *task) { slowCounter++; Timer *timer = reinterpret_cast(task); TEST(!timer->isRunning()); timer->setRunning(true); TEST(timer->isRunning()); TEST(timer->interval() == 5); }); Timer slow(&dispatcher); - slow.setCompletionClient(&slowReEnabler); + slow.setCompletionListener(&slowReEnabler); slow.setRepeating(false); slow.setInterval(5); slow.setRunning(true); int fastCounter = 0; CompletionFunc fastReEnabler([&fastCounter] (void *task) { fastCounter++; Timer *timer = reinterpret_cast(task); TEST(!timer->isRunning()); timer->setRunning(true); TEST(timer->isRunning()); TEST(timer->interval() == 0); }); Timer fast(&dispatcher); - fast.setCompletionClient(&fastReEnabler); + fast.setCompletionListener(&fastReEnabler); fast.setRepeating(false); fast.setInterval(0); fast.setRunning(true); // also make sure that setRepeating(false) has any effect at all... int noRepeatCounter = 0; CompletionFunc noRepeatCheck([&noRepeatCounter] (void * /*task*/) { noRepeatCounter++; }); Timer noRepeat(&dispatcher); - noRepeat.setCompletionClient(&noRepeatCheck); + noRepeat.setCompletionListener(&noRepeatCheck); noRepeat.setRepeating(false); noRepeat.setInterval(10); noRepeat.setRunning(true); EventDispatcherInterruptor interruptor(&dispatcher, 50); while (dispatcher.poll()) { } TEST(noRepeatCounter == 1); TEST(slowCounter >= 8 && slowCounter <= 12); // std::cout << '\n' << fastCounter << ' ' << slowCounter <<'\n'; TEST(fastCounter >= 200); // ### hopefully low enough even for really slow machines and / or valgrind } int main(int, char *[]) { testBasic(); testAccuracy(); testDeleteInTrigger(); testAddInTrigger(); testReAddInTrigger(); testTriggerOnlyOncePerDispatch(); testReEnableNonRepeatingInTrigger(); std::cout << "Passed!\n"; } diff --git a/util/icompletionclient.cpp b/util/icompletionlistener.cpp similarity index 92% rename from util/icompletionclient.cpp rename to util/icompletionlistener.cpp index 0ed8dbe..b4b48af 100644 --- a/util/icompletionclient.cpp +++ b/util/icompletionlistener.cpp @@ -1,28 +1,28 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#include "icompletionclient.h" +#include "icompletionlistener.h" -ICompletionClient::~ICompletionClient() +ICompletionListener::~ICompletionListener() { } diff --git a/util/icompletionclient.h b/util/icompletionlistener.h similarity index 85% rename from util/icompletionclient.h rename to util/icompletionlistener.h index 9a4e12d..2885cac 100644 --- a/util/icompletionclient.h +++ b/util/icompletionlistener.h @@ -1,48 +1,48 @@ /* Copyright (C) 2013 Andreas Hartmetz This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LGPL. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Alternatively, this file is available under the Mozilla Public License Version 1.1. You may obtain a copy of the License at http://www.mozilla.org/MPL/ */ -#ifndef ICOMPLETIONCLIENT_H -#define ICOMPLETIONCLIENT_H +#ifndef ICOMPLETIONLISTENER_H +#define ICOMPLETIONLISTENER_H #include "export.h" #include -class DFERRY_EXPORT ICompletionClient +class DFERRY_EXPORT ICompletionListener { public: - virtual ~ICompletionClient(); + virtual ~ICompletionListener(); virtual void handleCompletion(void *task) = 0; }; -class DFERRY_EXPORT CompletionFunc : public ICompletionClient +class DFERRY_EXPORT CompletionFunc : public ICompletionListener { public: CompletionFunc(std::function func) : m_func(func) {} ~CompletionFunc() {} void handleCompletion(void *task) override { if (m_func) { m_func(task); } } std::function m_func; }; -#endif // ICOMPLETIONCLIENT_H +#endif // ICOMPLETIONLISTENER_H