diff --git a/src/dht/dht.cpp b/src/dht/dht.cpp index d89d005..0405a4e 100644 --- a/src/dht/dht.cpp +++ b/src/dht/dht.cpp @@ -1,410 +1,430 @@ /*************************************************************************** * Copyright (C) 2005 by Joris Guisson * * joris.guisson@gmail.com * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "dht.h" #include #include #include #include #include #include #include #include "announcetask.h" #include "node.h" #include "rpcserver.h" #include "rpcmsg.h" #include "kclosestnodessearch.h" #include "database.h" #include "taskmanager.h" #include "nodelookup.h" #include "pingreq.h" #include "findnodereq.h" #include "getpeersreq.h" #include "announcereq.h" #include "pingrsp.h" #include "findnodersp.h" #include "getpeersrsp.h" #include "announcersp.h" using namespace bt; namespace dht { DHT::DHT() : node(0), srv(0), db(0), tman(0), our_node_lookup(0) { connect(&update_timer, &QTimer::timeout, this, &DHT::update); connect(&expire_timer, &QTimer::timeout, this, &DHT::expireDatabaseItems); } DHT::~DHT() { if (running) stop(); } void DHT::start(const QString & table, const QString & key_file, bt::Uint16 port) { if (running) return; if (port == 0) port = 6881; table_file = table; this->port = port; Out(SYS_DHT | LOG_NOTICE) << "DHT: Starting on port " << port << endl; srv = new RPCServer(this, port); node = new Node(srv, key_file); db = new Database(); tman = new TaskManager(this); running = true; srv->start(); node->loadTable(table); update_timer.start(1000); expire_timer.start(5*60*1000); started(); if (node->getNumEntriesInRoutingTable() > 0) { // refresh the DHT table by looking for our own ID findOwnNode(); } + else + { + // routing table is empty, bootstrap from well-known nodes + Out(SYS_DHT | LOG_NOTICE) << "DHT: Routing table empty, bootstrapping from well-known nodes" << endl; + bootstrap(); + } } void DHT::stop() { if (!running) return; update_timer.stop(); expire_timer.stop(); Out(SYS_DHT | LOG_NOTICE) << "DHT: Stopping " << endl; srv->stop(); node->saveTable(table_file); running = false; stopped(); delete tman; tman = 0; delete db; db = 0; delete node; node = 0; delete srv; srv = 0; } void DHT::ping(const PingReq & r) { if (!running) return; // ignore requests we get from ourself if (r.getID() == node->getOurID()) return; PingRsp rsp(r.getMTID(), node->getOurID()); rsp.setOrigin(r.getOrigin()); srv->sendMsg(rsp); node->received(this, r); } void DHT::findNode(const dht::FindNodeReq& r) { if (!running) return; // ignore requests we get from ourself if (r.getID() == node->getOurID()) return; node->received(this, r); // find the K closest nodes and pack them KClosestNodesSearch kns(r.getTarget(), K); bt::Uint32 wants = 0; if (r.wants(4) || r.getOrigin().ipVersion() == 4) wants |= WANT_IPV4; if (r.wants(6) || r.getOrigin().ipVersion() == 6) wants |= WANT_IPV6; node->findKClosestNodes(kns, wants); FindNodeRsp fnr(r.getMTID(), node->getOurID()); // pack the found nodes in a byte array kns.pack(&fnr); fnr.setOrigin(r.getOrigin()); srv->sendMsg(fnr); } NodeLookup* DHT::findOwnNode() { if (our_node_lookup) return our_node_lookup; our_node_lookup = findNode(node->getOurID()); if (our_node_lookup) connect(our_node_lookup, &NodeLookup::finished, this, &DHT::ownNodeLookupFinished); return our_node_lookup; } void DHT::ownNodeLookupFinished(Task* t) { if (our_node_lookup == t) our_node_lookup = 0; } void DHT::announce(const AnnounceReq & r) { if (!running) return; // ignore requests we get from ourself if (r.getID() == node->getOurID()) return; node->received(this, r); // first check if the token is OK dht::Key token = r.getToken(); if (!db->checkToken(token, r.getOrigin())) return; // everything OK, so store the value db->store(r.getInfoHash(), DBItem(r.getOrigin())); // send a proper response to indicate everything is OK AnnounceRsp rsp(r.getMTID(), node->getOurID()); rsp.setOrigin(r.getOrigin()); srv->sendMsg(rsp); } void DHT::getPeers(const GetPeersReq & r) { if (!running) return; // ignore requests we get from ourself if (r.getID() == node->getOurID()) return; node->received(this, r); DBItemList dbl; db->sample(r.getInfoHash(), dbl, 50, r.getOrigin().ipVersion()); // generate a token dht::Key token = db->genToken(r.getOrigin()); bt::Uint32 wants = 0; if (r.wants(4) || r.getOrigin().ipVersion() == 4) wants |= WANT_IPV4; if (r.wants(6) || r.getOrigin().ipVersion() == 6) wants |= WANT_IPV6; // if data is null do the same as when we have a findNode request // find the K closest nodes and pack them KClosestNodesSearch kns(r.getInfoHash(), K); node->findKClosestNodes(kns, wants); GetPeersRsp fnr(r.getMTID(), node->getOurID(), dbl, token); kns.pack(&fnr); fnr.setOrigin(r.getOrigin()); srv->sendMsg(fnr); } void DHT::response(const RPCMsg & r) { if (!running) return; node->received(this, r); } void DHT::error(const ErrMsg & msg) { Q_UNUSED(msg); } void DHT::portReceived(const QString & ip, bt::Uint16 port) { if (!running) return; RPCMsg::Ptr r(new PingReq(node->getOurID())); r->setOrigin(net::Address(ip, port)); srv->doCall(r); } bool DHT::canStartTask() const { // we can start a task if we have less then 7 runnning and // there are at least 16 RPC slots available if (tman->getNumTasks() >= 7) return false; else if (256 - srv->getNumActiveRPCCalls() <= 16) return false; return true; } AnnounceTask* DHT::announce(const bt::SHA1Hash & info_hash, bt::Uint16 port) { if (!running) return 0; KClosestNodesSearch kns(info_hash, K); node->findKClosestNodes(kns, WANT_BOTH); if (kns.getNumEntries() > 0) { Out(SYS_DHT | LOG_NOTICE) << "DHT: Doing announce " << endl; AnnounceTask* at = new AnnounceTask(db, srv, node, info_hash, port, tman); at->start(kns, !canStartTask()); tman->addTask(at); if (!db->contains(info_hash)) db->insert(info_hash); return at; } return 0; } NodeLookup* DHT::refreshBucket(const dht::Key & id, KBucket & bucket) { if (!running) return 0; KClosestNodesSearch kns(id, K); bucket.findKClosestNodes(kns); bucket.updateRefreshTimer(); if (kns.getNumEntries() > 0) { Out(SYS_DHT | LOG_DEBUG) << "DHT: refreshing bucket " << endl; NodeLookup* nl = new NodeLookup(id, srv, node, tman); nl->start(kns, !canStartTask()); tman->addTask(nl); return nl; } return 0; } NodeLookup* DHT::findNode(const dht::Key & id) { if (!running) return 0; KClosestNodesSearch kns(id, K); node->findKClosestNodes(kns, WANT_BOTH); if (kns.getNumEntries() > 0) { Out(SYS_DHT | LOG_DEBUG) << "DHT: finding node " << endl; NodeLookup* at = new NodeLookup(id, srv, node, tman); at->start(kns, !canStartTask()); tman->addTask(at); return at; } return 0; } void DHT::expireDatabaseItems() { db->expire(bt::CurrentTime()); } void DHT::update() { if (!running) return; try { node->refreshBuckets(this); stats.num_tasks = tman->getNumTasks() + tman->getNumQueuedTasks(); stats.num_peers = node->getNumEntriesInRoutingTable(); } catch (bt::Error & e) { Out(SYS_DHT | LOG_IMPORTANT) << "DHT: Error: " << e.toString() << endl; } } void DHT::timeout(RPCMsg::Ptr r) { node->onTimeout(r); } void DHT::addDHTNode(const QString & host, Uint16 hport) { if (!running) return; net::Address addr; if (addr.setAddress(host)) { + Out(SYS_DHT | LOG_DEBUG) << "DHT: Adding node '" << host << ":" << hport << "'" << endl; addr.setPort(hport); srv->ping(node->getOurID(), addr); } else + { + Out(SYS_DHT | LOG_DEBUG) << "DHT: Resolving node '" << host << "'" << endl; net::AddressResolver::resolve(host, hport, this, SLOT(onResolverResults(net::AddressResolver*))); + } } void DHT::onResolverResults(net::AddressResolver* res) { if (!running) return; if (res->succeeded()) { + Out(SYS_DHT | LOG_DEBUG) << "DHT: Adding node '" << res->address().toString() << ":" << res->address().port() << "'" << endl; srv->ping(node->getOurID(), res->address()); } } QMap DHT::getClosestGoodNodes(int maxNodes) { QMap map; if (!node) return map; int max = 0; KClosestNodesSearch kns(node->getOurID(), maxNodes*2); node->findKClosestNodes(kns, WANT_BOTH); KClosestNodesSearch::Itr it; for (it = kns.begin(); it != kns.end(); ++it) { KBucketEntry e = it->second; if (!e.isGood()) continue; const net::Address & a = e.getAddress(); map.insert(a.toString(), a.port()); if (++max >= maxNodes) break; } return map; } + void DHT::bootstrap() + { + Out(SYS_DHT | LOG_DEBUG) << "DHT: Adding well-known bootstrap nodes" << endl; + addDHTNode(QString("router.bittorrent.com"), 6881); + addDHTNode(QString("router.utorrent.com"), 6881); + addDHTNode(QString("dht.libtorrent.org"), 25401); + addDHTNode(QString("dht.transmissionbt.com"), 6881); + } + } diff --git a/src/dht/dht.h b/src/dht/dht.h index f45ad06..40dfee4 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -1,136 +1,139 @@ /*************************************************************************** * Copyright (C) 2005 by Joris Guisson * * joris.guisson@gmail.com * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #ifndef DHTDHT_H #define DHTDHT_H #include #include #include #include #include #include "key.h" #include "dhtbase.h" #include "rpcmsg.h" namespace net { class AddressResolver; } namespace bt { class SHA1Hash; } namespace dht { class Node; class RPCServer; class Database; class TaskManager; class Task; class AnnounceTask; class NodeLookup; class KBucket; class ErrMsg; class PingReq; class FindNodeReq; class GetPeersReq; class AnnounceReq; /** @author Joris Guisson */ class DHT : public DHTBase { Q_OBJECT public: DHT(); virtual ~DHT(); void ping(const PingReq & r); void findNode(const FindNodeReq & r); void response(const RPCMsg & r); void getPeers(const GetPeersReq & r); void announce(const AnnounceReq & r); void error(const ErrMsg & r); void timeout(RPCMsg::Ptr r); /** * A Peer has received a PORT message, and uses this function to alert the DHT of it. * @param ip The IP of the peer * @param port The port in the PORT message */ void portReceived(const QString & ip,bt::Uint16 port); /** * Do an announce on the DHT network * @param info_hash The info_hash * @param port The port * @return The task which handles this */ AnnounceTask* announce(const bt::SHA1Hash & info_hash,bt::Uint16 port); /** * Refresh a bucket using a find node task. * @param id The id * @param bucket The bucket to refresh */ NodeLookup* refreshBucket(const dht::Key & id,KBucket & bucket); /** * Do a NodeLookup. * @param id The id of the key to search */ NodeLookup* findNode(const dht::Key & id); /// Do a findNode for our node id NodeLookup* findOwnNode(); /// See if it is possible to start a task bool canStartTask() const; void start(const QString & table,const QString & key_file,bt::Uint16 port); void stop(); void addDHTNode(const QString & host,bt::Uint16 hport); virtual QMap getClosestGoodNodes(int maxNodes); + + /// Bootstrap from well-known nodes + void bootstrap(); private Q_SLOTS: void update(); void onResolverResults(net::AddressResolver* ar); void ownNodeLookupFinished(Task* t); void expireDatabaseItems(); private: Node* node; RPCServer* srv; Database* db; TaskManager* tman; QTimer expire_timer; QString table_file; QTimer update_timer; NodeLookup* our_node_lookup; }; } #endif diff --git a/src/dht/node.cpp b/src/dht/node.cpp index 84baba4..0110143 100644 --- a/src/dht/node.cpp +++ b/src/dht/node.cpp @@ -1,178 +1,179 @@ /*************************************************************************** * Copyright (C) 2005 by Joris Guisson * * joris.guisson@gmail.com * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the * * Free Software Foundation, Inc., * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ #include "node.h" #include #include #include #include #include #include #include "rpcmsg.h" #include "key.h" #include "rpccall.h" #include "rpcserver.h" #include "kclosestnodessearch.h" #include "dht.h" #include "nodelookup.h" #include "kbuckettable.h" using namespace bt; namespace dht { class Node::Private { public: Private(RPCServer* srv) : srv(srv) { num_receives = 0; new_key = false; } ~Private() { } void saveKey(const dht::Key & key, const QString & key_file) { bt::File fptr; if (!fptr.open(key_file, "wb")) { Out(SYS_DHT | LOG_IMPORTANT) << "DHT: Cannot open file " << key_file << " : " << fptr.errorString() << endl; return; } fptr.write(key.getData(), 20); fptr.close(); } dht::Key loadKey(const QString & key_file) { bt::File fptr; if (!fptr.open(key_file, "rb")) { Out(SYS_DHT | LOG_IMPORTANT) << "DHT: Cannot open file " << key_file << " : " << fptr.errorString() << endl; dht::Key r = dht::Key::random(); saveKey(r, key_file); new_key = true; return r; } Uint8 data[20]; if (fptr.read(data, 20) != 20) { dht::Key r = dht::Key::random(); saveKey(r, key_file); new_key = true; return r; } new_key = false; return dht::Key(data); } QScopedPointer ipv4_table; QScopedPointer ipv6_table; RPCServer* srv; Uint32 num_receives; bool new_key; }; Node::Node(RPCServer* srv, const QString & key_file) : d(new Private(srv)) { num_entries = 0; our_id = d->loadKey(key_file); d->ipv4_table.reset(new KBucketTable(our_id)); d->ipv6_table.reset(new KBucketTable(our_id)); } Node::~Node() { delete d; } void Node::received(dht::DHT* dh_table, const dht::RPCMsg & msg) { if (msg.getOrigin().ipVersion() == 4) d->ipv4_table->insert(KBucketEntry(msg.getOrigin(), msg.getID()), d->srv); else d->ipv6_table->insert(KBucketEntry(msg.getOrigin(), msg.getID()), d->srv); d->num_receives++; if (d->num_receives == 3) { // do a node lookup upon our own id // when we insert the first entry in the table dh_table->findOwnNode(); } num_entries = d->ipv4_table->numEntries() + d->ipv6_table->numEntries(); } void Node::findKClosestNodes(KClosestNodesSearch & kns, bt::Uint32 want) { if (want & WANT_IPV4) d->ipv4_table->findKClosestNodes(kns); if (want & WANT_IPV6) d->ipv6_table->findKClosestNodes(kns); } void Node::onTimeout(RPCMsg::Ptr msg) { if (msg->getOrigin().ipVersion() == 4) d->ipv4_table->onTimeout(msg->getOrigin()); else d->ipv6_table->onTimeout(msg->getOrigin()); } void Node::refreshBuckets(DHT* dh_table) { d->ipv4_table->refreshBuckets(dh_table); d->ipv6_table->refreshBuckets(dh_table); } void Node::saveTable(const QString & file) { d->ipv4_table->saveTable(file + ".ipv4"); d->ipv6_table->saveTable(file + ".ipv6"); } void Node::loadTable(const QString & file) { if (d->new_key) { d->new_key = false; bt::Delete(file + ".ipv4", true); bt::Delete(file + ".ipv6", true); Out(SYS_DHT | LOG_IMPORTANT) << "DHT: new key, so removing tables" << endl; } else { d->ipv4_table->loadTable(file + ".ipv4", d->srv); d->ipv6_table->loadTable(file + ".ipv6", d->srv); + num_entries = d->ipv4_table->numEntries() + d->ipv6_table->numEntries(); } } }