diff --git a/plugins/process/network/helper/Accumulator.h b/plugins/process/network/helper/Accumulator.h --- a/plugins/process/network/helper/Accumulator.h +++ b/plugins/process/network/helper/Accumulator.h @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -56,6 +57,7 @@ std::thread m_thread; std::atomic_bool m_running; + std::mutex m_mutex; PidDataCounterHash m_data; }; diff --git a/plugins/process/network/helper/Accumulator.cpp b/plugins/process/network/helper/Accumulator.cpp --- a/plugins/process/network/helper/Accumulator.cpp +++ b/plugins/process/network/helper/Accumulator.cpp @@ -38,6 +38,8 @@ Accumulator::PidDataCounterHash Accumulator::data() { + std::lock_guard lock{m_mutex}; + auto tmp = m_data; auto toErase = std::vector{}; @@ -78,6 +80,8 @@ void Accumulator::addData(Packet::Direction direction, const Packet &packet, int pid) { + std::lock_guard lock{m_mutex}; + auto itr = m_data.find(pid); if (itr == m_data.end()) { m_data.emplace(pid, InboundOutboundData{0, 0}); diff --git a/plugins/process/network/helper/Capture.h b/plugins/process/network/helper/Capture.h --- a/plugins/process/network/helper/Capture.h +++ b/plugins/process/network/helper/Capture.h @@ -52,15 +52,15 @@ std::string m_interface; std::string m_error; - std::atomic_bool m_active; std::thread m_thread; std::mutex m_mutex; std::condition_variable m_condition; std::deque m_queue; int m_packetCount = 0; + int m_droppedPackets = 0; - pcap *m_pcap; + pcap *m_pcap = nullptr; }; #endif // CAPTURE_H diff --git a/plugins/process/network/helper/Capture.cpp b/plugins/process/network/helper/Capture.cpp --- a/plugins/process/network/helper/Capture.cpp +++ b/plugins/process/network/helper/Capture.cpp @@ -31,6 +31,10 @@ using namespace std::string_literals; +// Limit the amount of entries waiting in the queue to this size, to prevent +// the queue from getting too full. +static const int MaximumQueueSize = 1000; + void pcapDispatchCallback(uint8_t *user, const struct pcap_pkthdr *h, const uint8_t *bytes) { reinterpret_cast(user)->handlePacket(h, bytes); @@ -44,11 +48,7 @@ Capture::~Capture() { if (m_pcap) { - if (m_active) { - stop(); - } - - pcap_close(m_pcap); + stop(); } } @@ -68,12 +68,12 @@ pcap_set_promisc(m_pcap, 0); pcap_set_datalink(m_pcap, DLT_LINUX_SLL); - if (checkError(pcap_activate(m_pcap))) + if (checkError(pcap_activate(m_pcap))) { return false; + } struct bpf_program filter; if (checkError(pcap_compile(m_pcap, &filter, "tcp or udp", 1, PCAP_NETMASK_UNKNOWN))) { - pcap_freecode(&filter); return false; } @@ -95,6 +95,8 @@ if (m_thread.joinable()) { m_thread.join(); } + pcap_close(m_pcap); + m_pcap = nullptr; } std::string Capture::lastError() const @@ -112,6 +114,7 @@ std::cout << " " << stats.ps_drop << " dropped (full)" << std::endl; std::cout << " " << stats.ps_ifdrop << " dropped (iface)" << std::endl; std::cout << " " << m_packetCount << " processed" << std::endl; + std::cout << " " << m_droppedPackets << " dropped (capture)" << std::endl; } Packet Capture::nextPacket() @@ -162,10 +165,15 @@ { auto timeStamp = std::chrono::time_point_cast(std::chrono::system_clock::from_time_t(header->ts.tv_sec) + std::chrono::microseconds { header->ts.tv_usec }); - m_packetCount++; { std::lock_guard lock { m_mutex }; - m_queue.emplace_back(timeStamp, data, header->caplen, header->len); + + m_packetCount++; + if (m_queue.size() < MaximumQueueSize) { + m_queue.emplace_back(timeStamp, data, header->caplen, header->len); + } else { + m_droppedPackets++; + } } m_condition.notify_all(); diff --git a/plugins/process/network/helper/ConnectionMapping.cpp b/plugins/process/network/helper/ConnectionMapping.cpp --- a/plugins/process/network/helper/ConnectionMapping.cpp +++ b/plugins/process/network/helper/ConnectionMapping.cpp @@ -138,7 +138,7 @@ dirent *fd = nullptr; while ((fd = readdir(dir))) { memset(buffer, 0, 100); - readlinkat(dirfd(dir), fd->d_name, buffer, 100); + readlinkat(dirfd(dir), fd->d_name, buffer, 99); auto target = std::string(buffer); if (target.find("socket:") == std::string::npos) continue; diff --git a/plugins/process/network/helper/Packet.h b/plugins/process/network/helper/Packet.h --- a/plugins/process/network/helper/Packet.h +++ b/plugins/process/network/helper/Packet.h @@ -78,9 +78,9 @@ Address destinationAddress() const; private: - void parseIPv4(const uint8_t *data); - void parseIPv6(const uint8_t *data); - void parseTransport(uint8_t type, const uint8_t *data); + void parseIPv4(const uint8_t* data, int32_t dataLength); + void parseIPv6(const uint8_t* data, int32_t dataLength); + void parseTransport(uint8_t type, const uint8_t *data, int32_t dataLength); TimeStamp::MicroSeconds m_timeStamp; unsigned int m_size = 0; diff --git a/plugins/process/network/helper/Packet.cpp b/plugins/process/network/helper/Packet.cpp --- a/plugins/process/network/helper/Packet.cpp +++ b/plugins/process/network/helper/Packet.cpp @@ -33,6 +33,11 @@ #include #include +uint32_t u8Tou32(uint8_t first, uint8_t second, uint8_t third, uint8_t fourth) +{ + return uint32_t(first) << 24 | uint32_t(second) << 16 | uint32_t(third) << 8 | uint32_t(fourth); +} + Packet::Packet() { } @@ -47,13 +52,13 @@ case ETHERTYPE_IP: m_networkProtocol = NetworkProtocolType::IPv4; if (sizeof(sll_header) <= dataLength) { - parseIPv4(data + sizeof(sll_header)); + parseIPv4(data + sizeof(sll_header), dataLength - sizeof(sll_header)); } break; case ETHERTYPE_IPV6: m_networkProtocol = NetworkProtocolType::IPv6; if (sizeof(sll_header) <= dataLength) { - parseIPv6(data + sizeof(sll_header)); + parseIPv6(data + sizeof(sll_header), dataLength - sizeof(sll_header)); } break; default: @@ -96,51 +101,63 @@ return m_destinationAddress; } -void Packet::parseIPv4(const uint8_t *data) +void Packet::parseIPv4(const uint8_t *data, int32_t dataLength) { + if (dataLength < int32_t(sizeof(ip))) { + return; + } + const ip *header = reinterpret_cast(data); m_sourceAddress.address[3] = header->ip_src.s_addr; m_destinationAddress.address[3] = header->ip_dst.s_addr; - parseTransport(header->ip_p, data + sizeof(ip)); + parseTransport(header->ip_p, data + sizeof(ip), dataLength - sizeof(ip)); } -void Packet::parseIPv6(const uint8_t *data) +void Packet::parseIPv6(const uint8_t *data, int32_t dataLength) { + if (dataLength < int32_t(sizeof(ip6_hdr))) { + return; + } + const ip6_hdr *header = reinterpret_cast(data); m_sourceAddress.address = { - uint32_t(header->ip6_src.s6_addr[0] << 24 & header->ip6_src.s6_addr[1] << 16 & header->ip6_src.s6_addr[2] << 8 & header->ip6_src.s6_addr[3]), - uint32_t(header->ip6_src.s6_addr[4] << 24 & header->ip6_src.s6_addr[5] << 16 & header->ip6_src.s6_addr[6] << 8 & header->ip6_src.s6_addr[7]), - uint32_t(header->ip6_src.s6_addr[8] << 24 & header->ip6_src.s6_addr[9] << 16 & header->ip6_src.s6_addr[10] << 8 & header->ip6_src.s6_addr[11]), - uint32_t(header->ip6_src.s6_addr[12] << 24 & header->ip6_src.s6_addr[13] << 16 & header->ip6_src.s6_addr[14] << 8 & header->ip6_src.s6_addr[15]) + u8Tou32(header->ip6_src.s6_addr[0], header->ip6_src.s6_addr[1], header->ip6_src.s6_addr[2], header->ip6_src.s6_addr[3]), + u8Tou32(header->ip6_src.s6_addr[4], header->ip6_src.s6_addr[5], header->ip6_src.s6_addr[6], header->ip6_src.s6_addr[7]), + u8Tou32(header->ip6_src.s6_addr[8], header->ip6_src.s6_addr[9], header->ip6_src.s6_addr[10], header->ip6_src.s6_addr[11]), + u8Tou32(header->ip6_src.s6_addr[12], header->ip6_src.s6_addr[13], header->ip6_src.s6_addr[14], header->ip6_src.s6_addr[15]) }; m_destinationAddress.address = { - uint32_t(header->ip6_dst.s6_addr[0] << 24 & header->ip6_dst.s6_addr[1] << 16 & header->ip6_dst.s6_addr[2] << 8 & header->ip6_dst.s6_addr[3]), - uint32_t(header->ip6_dst.s6_addr[4] << 24 & header->ip6_dst.s6_addr[5] << 16 & header->ip6_dst.s6_addr[6] << 8 & header->ip6_dst.s6_addr[7]), - uint32_t(header->ip6_dst.s6_addr[8] << 24 & header->ip6_dst.s6_addr[9] << 16 & header->ip6_dst.s6_addr[10] << 8 & header->ip6_dst.s6_addr[11]), - uint32_t(header->ip6_dst.s6_addr[12] << 24 & header->ip6_dst.s6_addr[13] << 16 & header->ip6_dst.s6_addr[14] << 8 & header->ip6_dst.s6_addr[15]) + u8Tou32(header->ip6_dst.s6_addr[0], header->ip6_dst.s6_addr[1], header->ip6_dst.s6_addr[2], header->ip6_dst.s6_addr[3]), + u8Tou32(header->ip6_dst.s6_addr[4], header->ip6_dst.s6_addr[5], header->ip6_dst.s6_addr[6], header->ip6_dst.s6_addr[7]), + u8Tou32(header->ip6_dst.s6_addr[8], header->ip6_dst.s6_addr[9], header->ip6_dst.s6_addr[10], header->ip6_dst.s6_addr[11]), + u8Tou32(header->ip6_dst.s6_addr[12], header->ip6_dst.s6_addr[13], header->ip6_dst.s6_addr[14], header->ip6_dst.s6_addr[15]) }; - parseTransport(header->ip6_nxt, data + sizeof(ip6_hdr)); + parseTransport(header->ip6_nxt, data + sizeof(ip6_hdr), dataLength - sizeof(ip6_hdr)); } -void Packet::parseTransport(uint8_t type, const uint8_t *data) +void Packet::parseTransport(uint8_t type, const uint8_t *data, int32_t dataLength) { switch (type) { case IPPROTO_TCP: { m_transportProtocol = TransportProtocolType::Tcp; - const tcphdr *tcpHeader = reinterpret_cast(data); - m_sourceAddress.port = ntohs(tcpHeader->th_sport); - m_destinationAddress.port = ntohs(tcpHeader->th_dport); + if (dataLength >= int32_t(sizeof(tcphdr))) { + const tcphdr *tcpHeader = reinterpret_cast(data); + m_sourceAddress.port = ntohs(tcpHeader->th_sport); + m_destinationAddress.port = ntohs(tcpHeader->th_dport); + } break; } case IPPROTO_UDP: { m_transportProtocol = TransportProtocolType::Udp; - const udphdr *udpHeader = reinterpret_cast(data); - m_sourceAddress.port = ntohs(udpHeader->uh_sport); - m_destinationAddress.port = ntohs(udpHeader->uh_dport); + if (dataLength >= int32_t(sizeof(udphdr))) { + const udphdr *udpHeader = reinterpret_cast(data); + m_sourceAddress.port = ntohs(udpHeader->uh_sport); + m_destinationAddress.port = ntohs(udpHeader->uh_dport); + } break; } default: