Changeset View
Changeset View
Standalone View
Standalone View
core/backends/lan/compositeuploadjob.cpp
- This file was added.
1 | /** | ||||
---|---|---|---|---|---|
2 | * Copyright 2018 Erik Duisters | ||||
3 | * | ||||
4 | * This program is free software; you can redistribute it and/or | ||||
5 | * modify it under the terms of the GNU General Public License as | ||||
6 | * published by the Free Software Foundation; either version 2 of | ||||
7 | * the License or (at your option) version 3 or any later version | ||||
8 | * accepted by the membership of KDE e.V. (or its successor approved | ||||
9 | * by the membership of KDE e.V.), which shall act as a proxy | ||||
10 | * defined in Section 14 of version 3 of the license. | ||||
11 | * | ||||
12 | * This program is distributed in the hope that it will be useful, | ||||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
15 | * GNU General Public License for more details. | ||||
16 | * | ||||
17 | * You should have received a copy of the GNU General Public License | ||||
18 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
19 | */ | ||||
20 | | ||||
21 | #include "compositeuploadjob.h" | ||||
22 | #include <core_debug.h> | ||||
23 | #include <KLocalizedString> | ||||
24 | #include <kio/global.h> | ||||
25 | #include <KJobTrackerInterface> | ||||
26 | #include "lanlinkprovider.h" | ||||
27 | #include <daemon.h> | ||||
28 | | ||||
29 | CompositeUploadJob::CompositeUploadJob(const QString& deviceId, bool displayNotification) | ||||
30 | : KCompositeJob() | ||||
31 | , m_server(new Server(this)) | ||||
32 | , m_socket(nullptr) | ||||
33 | , m_port(0) | ||||
34 | , m_deviceId(deviceId) | ||||
35 | , m_running(false) | ||||
36 | , m_currentJobNum(1) | ||||
37 | , m_totalJobs(0) | ||||
38 | , m_currentJobSendPayloadSize(0) | ||||
39 | , m_totalSendPayloadSize(0) | ||||
40 | , m_totalPayloadSize(0) | ||||
41 | , m_currentJob(nullptr) | ||||
42 | { | ||||
43 | setCapabilities(Killable); | ||||
44 | | ||||
45 | if (displayNotification) { | ||||
46 | KIO::getJobTracker()->registerJob(this); | ||||
47 | } | ||||
48 | } | ||||
49 | | ||||
50 | bool CompositeUploadJob::isRunning() | ||||
51 | { | ||||
52 | return m_running; | ||||
53 | } | ||||
54 | | ||||
55 | void CompositeUploadJob::start() { | ||||
56 | if (m_running) { | ||||
57 | qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::start() - allready running"; | ||||
58 | return; | ||||
59 | } | ||||
60 | | ||||
61 | if (!hasSubjobs()) { | ||||
62 | qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::start() - there are no subjobs to start"; | ||||
63 | emitResult(); | ||||
64 | return; | ||||
65 | } | ||||
66 | | ||||
67 | if (!startListening()) { | ||||
68 | return; | ||||
69 | } | ||||
70 | | ||||
71 | connect(m_server, &QTcpServer::newConnection, this, &CompositeUploadJob::newConnection); | ||||
72 | | ||||
73 | m_running = true; | ||||
74 | | ||||
75 | //Give SharePlugin some time to add subjobs | ||||
76 | QMetaObject::invokeMethod(this, "startNextSubJob", Qt::QueuedConnection); | ||||
77 | } | ||||
78 | | ||||
79 | bool CompositeUploadJob::startListening() | ||||
80 | { | ||||
81 | m_port = MIN_PORT; | ||||
82 | while (!m_server->listen(QHostAddress::Any, m_port)) { | ||||
83 | m_port++; | ||||
84 | if (m_port > MAX_PORT) { //No ports available? | ||||
85 | qCWarning(KDECONNECT_CORE) << "CompositeUploadJob::startListening() - Error opening a port in range" << MIN_PORT << "-" << MAX_PORT; | ||||
86 | m_port = 0; | ||||
87 | setError(NoPortAvailable); | ||||
88 | setErrorText(i18n("Couldn't find an available port")); | ||||
89 | emitResult(); | ||||
90 | return false; | ||||
91 | } | ||||
92 | } | ||||
93 | | ||||
94 | qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::startListening() - listening on port: " << m_port; | ||||
95 | return true; | ||||
96 | } | ||||
97 | | ||||
98 | void CompositeUploadJob::startNextSubJob() | ||||
99 | { | ||||
100 | m_currentJob = qobject_cast<UploadJob*>(subjobs().at(0)); | ||||
101 | m_currentJobSendPayloadSize = 0; | ||||
102 | emitDescription(m_currentJob->getNetworkPacket().get<QString>(QStringLiteral("filename"))); | ||||
103 | | ||||
104 | connect(m_currentJob, SIGNAL(processedAmount(KJob*,KJob::Unit,qulonglong)), this, SLOT(slotProcessedAmount(KJob*,KJob::Unit,qulonglong))); | ||||
105 | //Already done by KCompositeJob | ||||
106 | //connect(m_currentJob, &KJob::result, this, &CompositeUploadJob::slotResult); | ||||
107 | | ||||
108 | //TODO: Create a copy of the networkpacket that can be re-injected if sending via lan fails? | ||||
109 | NetworkPacket np = m_currentJob->getNetworkPacket(); | ||||
110 | np.setPayload(nullptr, np.payloadSize()); | ||||
111 | np.setPayloadTransferInfo({{"port", m_port}}); | ||||
112 | np.set<int>(QStringLiteral("numberOfFiles"), m_totalJobs); | ||||
113 | np.set<quint64>(QStringLiteral("totalPayloadSize"), m_totalPayloadSize); | ||||
114 | | ||||
115 | if (Daemon::instance()->getDevice(m_deviceId)->sendPacket(np)) { | ||||
116 | m_server->resumeAccepting(); | ||||
117 | } else { | ||||
118 | setError(SendingNetworkPacketFailed); | ||||
119 | setErrorText(i18n("Failed to send packet to %1", Daemon::instance()->getDevice(m_deviceId)->name())); | ||||
120 | //TODO: cleanup/resend remaining jobs | ||||
121 | emitResult(); | ||||
122 | } | ||||
123 | } | ||||
124 | | ||||
125 | void CompositeUploadJob::newConnection() | ||||
126 | { | ||||
127 | m_server->pauseAccepting(); | ||||
128 | | ||||
129 | m_socket = m_server->nextPendingConnection(); | ||||
130 | | ||||
131 | if (!m_socket) { | ||||
132 | qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::newConnection() - m_server->nextPendingConnection() returned a nullptr"; | ||||
133 | return; | ||||
134 | } | ||||
135 | | ||||
136 | m_currentJob->setSocket(m_socket); | ||||
137 | | ||||
138 | connect(m_socket, &QSslSocket::disconnected, this, &CompositeUploadJob::socketDisconnected); | ||||
139 | connect(m_socket, QOverload<QAbstractSocket::SocketError>::of(&QAbstractSocket::error), this, &CompositeUploadJob::socketError); | ||||
140 | connect(m_socket, QOverload<const QList<QSslError> &>::of(&QSslSocket::sslErrors), this, &CompositeUploadJob::sslError); | ||||
141 | connect(m_socket, &QSslSocket::encrypted, this, &CompositeUploadJob::encrypted); | ||||
142 | | ||||
143 | LanLinkProvider::configureSslSocket(m_socket, m_deviceId, true); | ||||
144 | | ||||
145 | m_socket->startServerEncryption(); | ||||
146 | } | ||||
147 | | ||||
148 | void CompositeUploadJob::socketDisconnected() | ||||
149 | { | ||||
150 | m_socket->close(); | ||||
151 | } | ||||
152 | | ||||
153 | void CompositeUploadJob::socketError(QAbstractSocket::SocketError error) | ||||
154 | { | ||||
155 | Q_UNUSED(error); | ||||
156 | | ||||
157 | m_socket->close(); | ||||
158 | setError(SocketError); | ||||
159 | emitResult(); | ||||
160 | //TODO: cleanup jobs? | ||||
161 | m_running = false; | ||||
162 | } | ||||
163 | | ||||
164 | void CompositeUploadJob::sslError(const QList<QSslError>& errors) | ||||
165 | { | ||||
166 | Q_UNUSED(errors); | ||||
167 | | ||||
168 | m_socket->close(); | ||||
169 | setError(SslError); | ||||
170 | emitResult(); | ||||
171 | //TODO: cleanup jobs? | ||||
172 | m_running = false; | ||||
173 | } | ||||
174 | | ||||
175 | void CompositeUploadJob::encrypted() | ||||
176 | { | ||||
177 | if (!m_timer.isValid()) { | ||||
178 | m_timer.start(); | ||||
179 | } | ||||
180 | | ||||
181 | m_currentJob->start(); | ||||
182 | } | ||||
183 | | ||||
184 | bool CompositeUploadJob::addSubjob(KJob* job) | ||||
185 | { | ||||
186 | if (UploadJob *uploadJob = qobject_cast<UploadJob*>(job)) { | ||||
187 | NetworkPacket np = uploadJob->getNetworkPacket(); | ||||
188 | | ||||
189 | m_totalJobs++; | ||||
190 | | ||||
191 | if (np.payloadSize() >= 0 ) { | ||||
192 | m_totalPayloadSize += np.payloadSize(); | ||||
193 | setTotalAmount(Bytes, m_totalPayloadSize); | ||||
194 | } | ||||
195 | | ||||
196 | QString filename; | ||||
197 | QString filenameArg = QStringLiteral("filename"); | ||||
198 | | ||||
199 | if (m_currentJob) { | ||||
200 | filename = m_currentJob->getNetworkPacket().get<QString>(filenameArg); | ||||
201 | } else { | ||||
202 | filename = np.get<QString>(filenameArg); | ||||
203 | } | ||||
204 | | ||||
205 | emitDescription(filename); | ||||
206 | | ||||
207 | return KCompositeJob::addSubjob(job); | ||||
208 | } else { | ||||
209 | qCDebug(KDECONNECT_CORE) << "CompositeUploadJob::addSubjob() - you can only add UploadJob's, ignoring"; | ||||
210 | return false; | ||||
211 | } | ||||
212 | } | ||||
213 | | ||||
214 | bool CompositeUploadJob::doKill() | ||||
215 | { | ||||
216 | //TODO: Remove all subjobs? | ||||
217 | //TODO: cleanup jobs? | ||||
218 | if (m_running) { | ||||
219 | m_running = false; | ||||
220 | | ||||
221 | return m_currentJob->stop(); | ||||
222 | } | ||||
223 | | ||||
224 | return true; | ||||
225 | } | ||||
226 | | ||||
227 | void CompositeUploadJob::slotProcessedAmount(KJob *job, KJob::Unit unit, qulonglong amount) { | ||||
228 | Q_UNUSED(job); | ||||
229 | | ||||
230 | m_currentJobSendPayloadSize = amount; | ||||
231 | | ||||
232 | quint64 uploaded = m_totalSendPayloadSize + m_currentJobSendPayloadSize; | ||||
233 | setProcessedAmount(unit, uploaded); | ||||
234 | | ||||
235 | const auto elapsed = m_timer.elapsed(); | ||||
236 | if (elapsed > 0) { | ||||
237 | emitSpeed((1000 * uploaded) / elapsed); | ||||
238 | } | ||||
239 | } | ||||
240 | | ||||
241 | void CompositeUploadJob::slotResult(KJob *job) { | ||||
242 | //Copies job error and errorText and emits result if job is in error otherwise removes job from subjob list | ||||
243 | KCompositeJob::slotResult(job); | ||||
244 | | ||||
245 | //TODO: cleanup jobs? | ||||
246 | | ||||
247 | if (error() || !m_running) { | ||||
248 | return; | ||||
249 | } | ||||
250 | | ||||
251 | m_totalSendPayloadSize += m_currentJobSendPayloadSize; | ||||
252 | | ||||
253 | if (hasSubjobs()) { | ||||
254 | m_currentJobNum++; | ||||
255 | startNextSubJob(); | ||||
256 | } else { | ||||
257 | Q_EMIT description(this, i18n("Finished sending to %1", Daemon::instance()->getDevice(this->m_deviceId)->name()), | ||||
258 | { QStringLiteral(""), i18np("Sent 1 file", "Sent %1 files", m_totalJobs) } | ||||
259 | ); | ||||
260 | emitResult(); | ||||
261 | } | ||||
262 | } | ||||
263 | | ||||
264 | void CompositeUploadJob::emitDescription(const QString& currentFileName) { | ||||
265 | QPair<QString, QString> field2; | ||||
266 | | ||||
267 | if (m_totalJobs > 1) { | ||||
268 | field2.first = i18n("Progress"); | ||||
269 | field2.second = i18n("Sending file %1 of %2", m_currentJobNum, m_totalJobs); | ||||
270 | } | ||||
271 | | ||||
272 | Q_EMIT description(this, i18n("Sending to %1", Daemon::instance()->getDevice(this->m_deviceId)->name()), | ||||
273 | { i18n("File"), currentFileName }, field2 | ||||
274 | ); | ||||
275 | } |