diff --git a/smb/CMakeLists.txt b/smb/CMakeLists.txt --- a/smb/CMakeLists.txt +++ b/smb/CMakeLists.txt @@ -19,6 +19,8 @@ add_feature_info("SMB DNS-SD Discovery" HAVE_KDNSSD_WITH_SIGNAL_RACE_PROTECTION "Discover SMB hosts via DNS-SD/Avahi/Bonjour. KF5DNSSD >= 5.54 is required to support this.") +find_package(Threads REQUIRED) + add_definitions(-DTRANSLATION_DOMAIN=\"kio5_smb\") include(CheckIncludeFile) @@ -47,6 +49,7 @@ wsdiscoverer.cpp dnssddiscoverer.cpp discovery.cpp + transfer.cpp ) ecm_qt_declare_logging_category(kio_smb_PART_SRCS @@ -69,6 +72,7 @@ Qt5::Network KF5::DNSSD KDSoap::WSDiscoveryClient + Threads::Threads # std::async ) # Final plugin target. diff --git a/smb/autotests/CMakeLists.txt b/smb/autotests/CMakeLists.txt --- a/smb/autotests/CMakeLists.txt +++ b/smb/autotests/CMakeLists.txt @@ -11,6 +11,7 @@ ecm_add_tests( smburltest + transfertest LINK_LIBRARIES Qt5::Test kio_smb_static diff --git a/smb/autotests/transfertest.cpp b/smb/autotests/transfertest.cpp new file mode 100644 --- /dev/null +++ b/smb/autotests/transfertest.cpp @@ -0,0 +1,151 @@ +/* + SPDX-FileCopyrightText: 2020 Harald Sitter + SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL +*/ + +#include + +#include + +#include "transfer.h" + +class TransferTest : public QObject +{ + Q_OBJECT +private Q_SLOTS: + void testSegmentOnSmallFile() + { + // Files smaller than our minimal segment size ought to be transferred in one go + // otherwise we have a chance of degrading performance. + QCOMPARE(TransferSegment(1).buf.size(), 1); + } + + void testMaxSegment() + { + // Large files may only use up to a given maximum. + QCOMPARE(TransferSegment(512 * 1024 * 1024).buf.size(), c_maxSegmentSize); + } + + void testIdealSegmentSize() + { + QCOMPARE(TransferSegment(64 * 1024 * 1024).buf.size(), 1342177); + } + + void testSegment() + { + TransferSegment s(8); + QCOMPARE(s.buf.size(), 8); + memset(s.buf.data(), 1, 8); + QCOMPARE(s.buf.data()[0], 1); + } + + void testRing() + { + TransferRingBuffer ring(8); + for (auto i = 0; i <= 32; ++i) { + { + auto s = ring.nextFree(); + memset(s->buf.data(), i, 8); + ring.push(); + } + { + auto s = ring.pop(); + QCOMPARE(s->buf.data()[0], static_cast(i)); + ring.unpop(); + } + } + } + + void testRingThreadedSlowPush() + { + const auto runs = 127; + const auto fileSize = 8; + TransferRingBuffer ring(fileSize); + + std::atomic abort(false); + + auto pullFuture = std::async(std::launch::async, [&ring, &abort]() -> bool { + for (auto i = 0; i <= runs && !abort; ++i) { + auto s = ring.pop(); + if (!QTest::qCompare(s->buf.data()[0], static_cast(i), + qPrintable(QStringLiteral("On pull iteration %1").arg(i)), "", + __FILE__, __LINE__)) { + abort = true; + return false; + } + ring.unpop(); + } + return true; + }); + + auto pushFuture = std::async(std::launch::async, [&ring, &abort]() -> bool { + for (auto i = 0; i <= runs && !abort; ++i) { + auto s = ring.nextFree(); + memset(s->buf.data(), i, fileSize); + ring.push(); + if (abort) { + ring.done(); + return false; + } + // Slow down this thread to simulate slow network reads. + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + ring.done(); + return true; + }); + + pushFuture.wait(); + pullFuture.wait(); + + QVERIFY(pushFuture.get()); + QVERIFY(pullFuture.get()); + } + + void testRingThreadedSlowPull() + { + const auto runs = 127; + const auto fileSize = 8; + TransferRingBuffer ring(fileSize); + + std::atomic abort(false); + + auto pullFuture = std::async(std::launch::async, [&ring, &abort]() -> bool { + for (auto i = 0; i <= runs && !abort; ++i) { + auto s = ring.pop(); + if (!QTest::qCompare(s->buf.data()[0], static_cast(i), + qPrintable(QStringLiteral("On pull iteration %1").arg(i)), "", + __FILE__, __LINE__)) { + abort = true; + } + // Slow down this thread to simulate slow local writes. + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + ring.unpop(); + } + return true; + }); + + auto pushFuture = std::async(std::launch::async, [&ring, &abort]() -> bool { + for (auto i = 0; i <= runs && !abort; ++i) { + auto s = ring.nextFree(); + memset(s->buf.data(), i, fileSize); + if (abort) { + ring.done(); + return false; + } + ring.push(); + } + ring.done(); + return true; + }); + + pushFuture.wait(); + pullFuture.wait(); + + QVERIFY(pushFuture.get()); + QVERIFY(pullFuture.get()); + } +}; + +QTEST_GUILESS_MAIN(TransferTest) + +#include "transfertest.moc" diff --git a/smb/kio_smb.h b/smb/kio_smb.h --- a/smb/kio_smb.h +++ b/smb/kio_smb.h @@ -79,7 +79,6 @@ //--------------------------- #include "smburl.h" -#define MAX_XFER_BUF_SIZE 65534 using namespace KIO; diff --git a/smb/kio_smb_dir.cpp b/smb/kio_smb_dir.cpp --- a/smb/kio_smb_dir.cpp +++ b/smb/kio_smb_dir.cpp @@ -39,6 +39,10 @@ #include #include +#include + +#include "transfer.h" + void SMBSlave::copy(const QUrl &src, const QUrl &dst, int permissions, KIO::JobFlags flags) { const bool isSourceLocal = src.isLocalFile(); @@ -55,25 +59,14 @@ void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO::JobFlags flags) { - SMBUrl src; - SMBUrl dst; - mode_t initialmode; - ssize_t n; - int dstflags; - int srcfd = -1; - int dstfd = -1; - int errNum = 0; - KIO::filesize_t processed_size = 0; - unsigned char buf[MAX_XFER_BUF_SIZE]; - qCDebug(KIO_SMB_LOG) << "SMBSlave::copy with src = " << ksrc << "and dest = " << kdst; // setup urls - src = ksrc; - dst = kdst; + SMBUrl src = ksrc; + SMBUrl dst = kdst; // Obtain information about source - errNum = cache_stat(src, &st); + int errNum = cache_stat(src, &st); if (errNum != 0) { if (errNum == EACCES) { error(KIO::ERR_ACCESS_DENIED, src.toDisplayString()); @@ -86,7 +79,8 @@ error(KIO::ERR_IS_DIRECTORY, src.toDisplayString()); return; } - totalSize(st.st_size); + const auto srcSize = st.st_size; + totalSize(srcSize); // Check to se if the destination exists errNum = cache_stat(dst, &st); @@ -102,7 +96,7 @@ } // Open the source file - srcfd = smbc_open(src.toSmbcUrl(), O_RDONLY, 0); + int srcfd = smbc_open(src.toSmbcUrl(), O_RDONLY, 0); if (srcfd < 0) { errNum = errno; } else { @@ -118,19 +112,21 @@ return; } + + mode_t initialmode = 0; // Determine initial creation mode if (permissions != -1) { initialmode = permissions | S_IWUSR; } else { initialmode = 0 | S_IWUSR; // 0666; } // Open the destination file - dstflags = O_CREAT | O_TRUNC | O_WRONLY; + int dstflags = O_CREAT | O_TRUNC | O_WRONLY; if (!(flags & KIO::Overwrite)) { dstflags |= O_EXCL; } - dstfd = smbc_open(dst.toSmbcUrl(), dstflags, initialmode); + int dstfd = smbc_open(dst.toSmbcUrl(), dstflags, initialmode); if (dstfd < 0) { errNum = errno; } else { @@ -151,10 +147,15 @@ } // Perform copy + // TODO: if and when smb_context becomes thread-safe, use two contexts connected with + // a ring buffer to optimize transfer speed (also see smbCopyGet) + // https://bugzilla.samba.org/show_bug.cgi?id=11413 + KIO::filesize_t processed_size = 0; + TransferSegment segment(srcSize); while (true) { - n = smbc_read(srcfd, buf, MAX_XFER_BUF_SIZE); + ssize_t n = smbc_read(srcfd, segment.buf.data(), segment.buf.size()); if (n > 0) { - n = smbc_write(dstfd, buf, n); + n = smbc_write(dstfd, segment.buf.data(), n); if (n == -1) { qCDebug(KIO_SMB_LOG) << "SMBSlave::copy copy now KIO::ERR_CANNOT_WRITE"; error(KIO::ERR_CANNOT_WRITE, dst.toDisplayString()); @@ -316,30 +317,49 @@ return; } - // Perform the copy - char buf[MAX_XFER_BUF_SIZE]; - bool isErr = false; + std::atomic isErr(false); + TransferRingBuffer buffer(st.st_size); + auto future = std::async(std::launch::async, [&buffer, &srcfd, &isErr]() -> int { + while (!isErr) { + TransferSegment *segment = buffer.nextFree(); + segment->size = smbc_read(srcfd, segment->buf.data(), segment->buf.capacity()); + if (segment->size <= 0) { + buffer.push(); + buffer.done(); + if (segment->size < 0) { + return KIO::ERR_COULD_NOT_READ; + } + break; + } + buffer.push(); + } + return KJob::NoError; + }); while (true) { - const ssize_t bytesRead = smbc_read(srcfd, buf, MAX_XFER_BUF_SIZE); - if (bytesRead <= 0) { - if (bytesRead < 0) { - error(KIO::ERR_CANNOT_READ, src.toDisplayString()); - isErr = true; - } + TransferSegment *segment = buffer.pop(); + if (!segment) { // done, no more segments pending break; } - const qint64 bytesWritten = file.write(buf, bytesRead); + const qint64 bytesWritten = file.write(segment->buf.data(), segment->size); if (bytesWritten == -1) { qCDebug(KIO_SMB_LOG) << "copy now KIO::ERR_CANNOT_WRITE"; error(KIO::ERR_CANNOT_WRITE, kdst.toDisplayString()); isErr = true; + buffer.unpop(); break; } processed_size += bytesWritten; processedSize(processed_size); + buffer.unpop(); + } + if (isErr) { // writing failed + future.wait(); + } else if (future.get() != KJob::NoError) { // check if read had an error + error(future.get(), ksrc.toDisplayString()); + isErr = true; } // FINISHED @@ -499,19 +519,18 @@ if (processed_size == 0 || srcFile.seek(processed_size)) { // Perform the copy - char buf[MAX_XFER_BUF_SIZE]; - + TransferSegment segment(srcInfo.size()); while (true) { - const ssize_t bytesRead = srcFile.read(buf, MAX_XFER_BUF_SIZE); + const ssize_t bytesRead = srcFile.read(segment.buf.data(), segment.buf.size()); if (bytesRead <= 0) { if (bytesRead < 0) { error(KIO::ERR_CANNOT_READ, ksrc.toDisplayString()); isErr = true; } break; } - const qint64 bytesWritten = smbc_write(dstfd, buf, bytesRead); + const qint64 bytesWritten = smbc_write(dstfd, segment.buf.data(), bytesRead); if (bytesWritten == -1) { error(KIO::ERR_CANNOT_WRITE, kdst.toDisplayString()); isErr = true; diff --git a/smb/kio_smb_file.cpp b/smb/kio_smb_file.cpp --- a/smb/kio_smb_file.cpp +++ b/smb/kio_smb_file.cpp @@ -37,19 +37,12 @@ #include #include +#include + +#include "transfer.h" + void SMBSlave::get(const QUrl &kurl) { - char buf[MAX_XFER_BUF_SIZE]; - int filefd = 0; - int errNum = 0; - ssize_t bytesread = 0; - // time_t curtime = 0; - // time_t lasttime = 0; // Disabled durint port to Qt5/KF5. Seems to be unused. - // time_t starttime = 0; // Disabled durint port to Qt5/KF5. Seems to be unused. - KIO::filesize_t totalbytesread = 0; - QByteArray filedata; - SMBUrl url; - qCDebug(KIO_SMB_LOG) << kurl; // check (correct) URL @@ -65,8 +58,8 @@ return; // Stat - url = kurl; - errNum = cache_stat(url, &st); + SMBUrl url = kurl; + int errNum = cache_stat(url, &st); if (errNum != 0) { if (errNum == EACCES) error(KIO::ERR_ACCESS_DENIED, url.toDisplayString()); @@ -83,45 +76,65 @@ totalSize(st.st_size); // Open and read the file - filefd = smbc_open(url.toSmbcUrl(), O_RDONLY, 0); - if (filefd >= 0) { - bool isFirstPacket = true; - // lasttime = starttime = time(NULL); // This seems to be unused.. + int filefd = smbc_open(url.toSmbcUrl(), O_RDONLY, 0); + if (filefd < 0) { + error(KIO::ERR_CANNOT_OPEN_FOR_READING, url.toDisplayString()); + return; + } + + KIO::filesize_t totalbytesread = 0; + QByteArray filedata; + bool isFirstPacket = true; + + TransferRingBuffer buffer(st.st_size); + auto future = std::async(std::launch::async, [&buffer, &filefd]() -> int { while (true) { - bytesread = smbc_read(filefd, buf, MAX_XFER_BUF_SIZE); - if (bytesread == 0) { - // All done reading + TransferSegment *s = buffer.nextFree(); + s->size = smbc_read(filefd, s->buf.data(), s->buf.capacity()); + if (s->size <= 0) { + buffer.push(); + buffer.done(); + if (s->size < 0) { + return KIO::ERR_COULD_NOT_READ; + } break; - } else if (bytesread < 0) { - error(KIO::ERR_CANNOT_READ, url.toDisplayString()); - return; - } - - filedata = QByteArray::fromRawData(buf, bytesread); - if (isFirstPacket) { - QMimeDatabase db; - QMimeType type = db.mimeTypeForFileNameAndData(url.fileName(), filedata); - mimeType(type.name()); - isFirstPacket = false; } - data(filedata); - filedata.clear(); + buffer.push(); + } + return KJob::NoError; + }); - // increment total bytes read - totalbytesread += bytesread; + while (true) { + TransferSegment *s = buffer.pop(); + if (!s) { // done, no more segments pending + break; + } - processedSize(totalbytesread); + filedata = QByteArray::fromRawData(s->buf.data(), s->size); + if (isFirstPacket) { + QMimeDatabase db; + QMimeType type = db.mimeTypeForFileNameAndData(url.fileName(), filedata); + mimeType(type.name()); + isFirstPacket = false; } + data(filedata); + filedata.clear(); - smbc_close(filefd); - data(QByteArray()); - processedSize(static_cast(st.st_size)); + // increment total bytes read + totalbytesread += s->size; - } else { - error(KIO::ERR_CANNOT_OPEN_FOR_READING, url.toDisplayString()); - return; + processedSize(totalbytesread); + buffer.unpop(); + } + if (future.get() != KJob::NoError) { // check if read had an error + error(future.get(), url.toDisplayString()); } + smbc_close(filefd); + data(QByteArray()); +#warning fixme this is potentially a lie + processedSize(static_cast(st.st_size)); + finished(); } diff --git a/smb/transfer.h b/smb/transfer.h new file mode 100644 --- /dev/null +++ b/smb/transfer.h @@ -0,0 +1,75 @@ +/* + SPDX-FileCopyrightText: 2020 Harald Sitter + SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL +*/ + +#ifndef TRANSFER_H +#define TRANSFER_H + +#include +#include + +#include +#include + +constexpr auto c_minSegmentSize = 64 * 1024; // minimal size on stack +constexpr auto c_maxSegmentSize = 4L * 1024 * 1024; // 4MiB is the largest request we make + +struct TransferSegment +{ + explicit TransferSegment(const off_t fileSize); + + ssize_t size = 0; // current size (i.e. the size that was put into buf) + QVarLengthArray buf; // data buffer, only filled up to size! + +private: + static off_t segmentSizeForFileSize(const off_t fileSize_); +}; + +// Naive ring buffer. +// Segment instances are held in the buffer, i.e. only alloc'd once at +// beginning of the operation. Kind of a mix between ring and pool. +// +// The popping thread cannot pop while the pushing thread is still on +// an element. As such we need at least 3 elements to prevent dead locks. +class TransferRingBuffer +{ +public: + // fileSize is the stat'd file size of the source file. + explicit TransferRingBuffer(const off_t fileSize_); + ~TransferRingBuffer() = default; + + // Pops an item into the pull thread. This blocks + // when the push thread is also currently on that index. + // This can return nullptr if the push thread set the done state. + // @note once done unpop() needs calling + TransferSegment *pop(); + + // Frees the item used by the pull thread. So it may be used by the + // push thread. + void unpop(); + + // Simply returns a ptr to the item the current push thread marker is + // at. i.e. the item "locked" for reading. + // @note once done push() needs calling + TransferSegment *nextFree(); + + // Pushes ahead from the item obtained by nextFree. + // This effectively allows the pull thread to pop() this item again. + void push(); + + // Only called by push thread to mark the buffer done and wake waiting + // threads. + void done(); + +private: + bool m_done = false; + std::mutex m_mutex; + std::condition_variable m_cond; + static const size_t m_capacity = 4; + std::array, m_capacity> m_buffer; + size_t head = 0; // index of push thread (prevents pop() from pull thread) + size_t tail = 0; // index of pull thread (prevents push() from push thread) +}; + +#endif // TRANSFER_H diff --git a/smb/transfer.cpp b/smb/transfer.cpp new file mode 100644 --- /dev/null +++ b/smb/transfer.cpp @@ -0,0 +1,105 @@ +/* + SPDX-FileCopyrightText: 2020 Harald Sitter + SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL +*/ + +#include "transfer.h" + +#include + +TransferSegment::TransferSegment(const off_t fileSize) + : buf(segmentSizeForFileSize(fileSize)) +{ +} + +off_t TransferSegment::segmentSizeForFileSize(const off_t fileSize_) +{ + const off_t fileSize = qMax(0L, fileSize_); + + // read() internally splits our read requests into multiple server + // requests and then assembles the responses into our buffer. + // The larger the chunks we request the better the performance. + // At the same time we'll want a semblence of progress reporting + // and also not eat too much RAM. It's a balancing act :| + off_t segmentSize = c_minSegmentSize; + // The segment size is largely arbitrary and sacrifices better throughput for + // greater memory use. + // This only goes up to a maxiumum because bigger transfer blobs directly + // translate to more RAM use. Mind that the effective RAM use will + // be (segmentSize * (segments + 1)). The +1 is because smbc internally will also + // allocate up to a full segment for one read() call. + // + // Unfortunately we have no way of knowing what size smbc will use for the + // network requests, so we can't use a multiple of that. Which means we'll + // almost never reach best performance. + // + // TODO: perhaps it would actually make sense to read at a multiple of + // the target drive's block size? + const off_t idealSegmentSize = qMin(fileSize / 50, c_maxSegmentSize); + segmentSize = qMax(segmentSize, idealSegmentSize); + // If the segment size is larger than the file size it appears we can + // actually degrade performance, so pick the smaller of the two. + if (fileSize != 0) { + segmentSize = qMin(segmentSize, fileSize); + } + return segmentSize; +} + +TransferRingBuffer::TransferRingBuffer(const off_t fileSize) +{ + for (size_t i = 0; i < m_capacity; ++i) { + m_buffer[i] = std::unique_ptr(new TransferSegment(fileSize)); + } +} + +TransferSegment *TransferRingBuffer::pop() +{ + std::unique_lock lock(m_mutex); + + while (head == tail) { + if (!m_done) { + m_cond.wait(lock); + } else { + return nullptr; + } + } + + auto segment = m_buffer[tail].get(); + m_cond.notify_all(); + return segment; +} + +void TransferRingBuffer::unpop() +{ + std::unique_lock lock(m_mutex); + tail = ++tail % m_capacity; + m_cond.notify_all(); +} + +TransferSegment *TransferRingBuffer::nextFree() +{ + // This does not require synchronization. As soon + // as we pushed the last item we gained exclusive lock + // on the new item. + m_cond.notify_all(); + return m_buffer[head].get(); +} + +void TransferRingBuffer::push() +{ + const auto newHead = (head + 1) % m_capacity; + std::unique_lock lock(m_mutex); + while (newHead == tail) { + // do not move to the item the reading thread is on + m_cond.wait(lock); + } + head = newHead; + m_cond.notify_all(); +} + +void TransferRingBuffer::done() +{ + std::unique_lock lock(m_mutex); + m_done = true; + m_cond.notify_all(); +}