diff --git a/src/voy/CMakeLists.txt b/src/voy/CMakeLists.txt index 07e9d1f..8f76d14 100644 --- a/src/voy/CMakeLists.txt +++ b/src/voy/CMakeLists.txt @@ -1,93 +1,94 @@ include_directories ( ../../3rdparty/azmq ) foreach ( test_name IN ITEMS VALUES_TEST PROCESS_TEST DELAYED_TEST DELAYED_VALS_TEST FILTER_TEST SLICE_TEST MERGE_TEST TCP_TEST ZMQ_SUB_TEST ZMQ_PUB_TEST ZMQ_PUBSUB_TEST ASSOCIATIVITY_TEST ) message("This is a test: ${test_name}") add_executable ( "voy_test_${test_name}" tests_simple.cpp engine/asio/service.cpp ) target_compile_definitions ( "voy_test_${test_name}" PUBLIC "-D${test_name}" ) target_link_libraries ( "voy_test_${test_name}" ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_REGEX_LIBRARY} ${ZMQ_LIBRARIES} -pthread ) endforeach() foreach ( test_name IN ITEMS - TEST_BACKEND TEST_FRONTEND + TEST_BACKEND_1 + TEST_BACKEND_2 ) message("This is a multiprocess test: ${test_name}") add_executable ( "voy_test_multiprocess_${test_name}" tests_multiprocess.cpp engine/asio/service.cpp ) target_compile_definitions ( "voy_test_multiprocess_${test_name}" PUBLIC "-D${test_name}" ) target_link_libraries ( "voy_test_multiprocess_${test_name}" ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_REGEX_LIBRARY} ${ZMQ_LIBRARIES} -pthread ) endforeach() # add_executable ( # voy_test_process_1 # tests_multiprocess.cpp # # engine/asio/service.cpp # ) # # target_link_libraries ( # voy_test_process_1 # ${Boost_SYSTEM_LIBRARY} # ${Boost_FILESYSTEM_LIBRARY} # ${Boost_REGEX_LIBRARY} # ${ZMQ_LIBRARIES} # -pthread # ) diff --git a/src/voy/dsl/multiprocess.h b/src/voy/dsl/multiprocess.h index 4ff9c7a..193e5af 100644 --- a/src/voy/dsl/multiprocess.h +++ b/src/voy/dsl/multiprocess.h @@ -1,102 +1,149 @@ /* * Copyright (C) 2018 Ivan Čukić * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. * If not, see . */ #ifndef VOY_DSL_MULTIPROCESS_H #define VOY_DSL_MULTIPROCESS_H #include "../operations/identity.h" #include "dsl.h" namespace voy::dsl { -template -struct pipeline_container { - template - pipeline_container(LeftTuple&& left, RightTuple&& right) - : m_pipelines{std::tuple_cat(voy_fwd(left), voy_fwd(right))} - { - } +namespace detail { + + template + struct pipeline_container { + template + pipeline_container(LeftTuple&& left, RightTuple&& right) + : m_pipelines{std::tuple_cat(voy_fwd(left), voy_fwd(right))} + { + } - template - pipeline_container(Tuple&& tuple) - : m_pipelines{std::move(tuple)} - { - } + template + pipeline_container(Tuple&& tuple) + : m_pipelines{std::move(tuple)} + { + } + + pipeline_container() + { + } + + template + auto operator|| (NewPipeline&& new_pipeline) && + { + if constexpr (std::is_same_v) { + return pipeline_container( + std::move(m_pipelines), std::make_tuple(voy_fwd(new_pipeline))); - pipeline_container() + } else { + return pipeline_container( + std::move(m_pipelines)); + + } + } + + std::tuple m_pipelines; + }; + + inline auto multiprocess_pipeline() { + return pipeline_container<>{}; } - template - auto operator|| (NewPipeline&& new_pipeline) && + template < typename Left + > + voy_concept supports_double_pipeline(Left* = nullptr) { - if constexpr (std::is_same_v) { - return pipeline_container( - std::move(m_pipelines), std::make_tuple(voy_fwd(new_pipeline))); + if constexpr (std::is_same_v) { + return true; - } else { - return pipeline_container( - std::move(m_pipelines)); + } else if constexpr (node_traits::is_connection_expr) { + return true; + } else if constexpr (node_traits::is_node) { + return true; + + } else { + return false; } } - std::tuple m_pipelines; -}; -inline auto multiprocess_pipeline() +} // namespace detail + +template < typename Left + , typename Right + , voy_require( + detail::supports_double_pipeline() + ) + > +auto operator||(Left&& left, Right&& right) { - return pipeline_container<>{}; + return voy::dsl::detail::multiprocess_pipeline() + || voy_fwd(left) + || voy_fwd(right); } } // namespace voy::dsl #define voy_declare_bridge_out(BridgeName) \ auto BridgeName##_send() \ { \ return voy::zmq::publisher<>( \ std::string("ipc:///tmp/voy-zmq-bridge-" #BridgeName "-ivan")); \ } \ \ auto BridgeName##_receive() \ { \ return voy::identity<>(); \ } #define voy_declare_bridge_in(BridgeName) \ auto BridgeName##_send() \ { \ return voy::identity<>(); \ } \ \ auto BridgeName##_receive() \ { \ return voy::zmq::subscriber<>( \ std::string("ipc:///tmp/voy-zmq-bridge-" #BridgeName "-ivan")); \ } +#define voy_declare_bridge_ignored(BridgeName) \ + auto BridgeName##_send() \ + { \ + return voy::identity<>(); \ + } \ + \ + auto BridgeName##_receive() \ + { \ + return voy::identity<>(); \ + } + #define voy_bridge(BridgeName) BridgeName##_send() || BridgeName##_receive() #define voy_multiprocess voy::dsl::multiprocess_pipeline() || #endif // include guard diff --git a/src/voy/dsl/node_traits.h b/src/voy/dsl/node_traits.h index fdb6d81..123e634 100644 --- a/src/voy/dsl/node_traits.h +++ b/src/voy/dsl/node_traits.h @@ -1,110 +1,110 @@ /* * Copyright (C) 2018 Ivan Čukić * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. * If not, see . */ #ifndef VOY_NODE_TRAITS_H #define VOY_NODE_TRAITS_H // STL #include #include "node_tags.h" #include "../traits.h" #include "../utils.h" #include "../basic/sink.h" namespace voy::dsl::node_traits { using namespace voy::traits; // Returns the category of the node. It can also be used to detect whether // a class contains the node category specification template using node_category = typename remove_cvref_t::node_category; // Used to detect whther the class contains a with_continuation function // for rvalues template using with_continuation_memfn = decltype( std::declval().with_continuation(voy::sink{})); template voy_concept has_with_continuation = is_detected_v; // Returns whether the node has the specified tag or not template voy_concept is_tagged_with = std::is_same_v>; // The meta-function to check whether a class is a node is easier to implement // using the constexpr-if because of the branching. namespace detail { template < typename Node , typename DetectedCategory = detected_t > voy_concept is_node() { if constexpr (!is_detected_v) { - voy_fail(Node, "No category specified for the node"); + // voy_fail(Node, "No category specified for the node"); return false; } else if constexpr (std::is_same_v) { - voy_fail(Node, "Category is void, which means we already have a complete graph path"); + // voy_fail(Node, "Category is void, which means we already have a complete graph path"); return false; } else { return true; } } } // Wrapper for detail::is_node to make it look like a normal meta-function template voy_concept is_node = detail::is_node(); // Checks whether the node is a source node template voy_concept is_source = is_tagged_with; // Checks whether the node is a source node template voy_concept is_sink = is_tagged_with; // To be used with the detection idiom to check whether a type is // a connection_expr or a normal node template using connection_expr_tag_definition = typename Graph::connection_expr_tag; template voy_concept is_connection_expr = is_detected_v; } // namespace voy::dsl::node_traits #endif // include guard diff --git a/src/voy/tests_multiprocess.cpp b/src/voy/tests_multiprocess.cpp index d145532..247a258 100644 --- a/src/voy/tests_multiprocess.cpp +++ b/src/voy/tests_multiprocess.cpp @@ -1,94 +1,119 @@ /* * Copyright (C) 2018 Ivan Čukić * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. * If not, see . */ #include +#include +#include + #include "operations/merge.h" #include "operations/slice.h" #include "operations/transform.h" #include "operations/filter.h" #include "operations/identity.h" #include "basic/delayed.h" #include "basic/values.h" #include "basic/sink.h" #include "wrappers/process.h" #include "wrappers/tcp_service.h" #include "wrappers/zmq_service.h" #include "engine/event_loop.h" #include "dsl.h" #include "dsl/multiprocess.h" #include "../utils/debug.h" using namespace std::literals::string_literals; using namespace std::literals::chrono_literals; -#ifndef TEST_BACKEND -voy_declare_bridge_out(to_backend) -voy_declare_bridge_in(from_backend) -#else -voy_declare_bridge_in(to_backend) -voy_declare_bridge_out(from_backend) +#if defined TEST_FRONTEND +voy_declare_bridge_out(frontend_to_backend_1) +voy_declare_bridge_ignored(backend_1_to_backend_2) +voy_declare_bridge_in(backend_2_to_frontend) + +#elif defined TEST_BACKEND_1 +voy_declare_bridge_in(frontend_to_backend_1) +voy_declare_bridge_out(backend_1_to_backend_2) +voy_declare_bridge_ignored(backend_2_to_frontend) + +#else // TEST_BACKEND_2 +voy_declare_bridge_ignored(frontend_to_backend_1) +voy_declare_bridge_in(backend_1_to_backend_2) +voy_declare_bridge_out(backend_2_to_frontend) + #endif +inline std::string pid_s() +{ + return " \tpid:" + std::to_string(getpid()); +} int main(int argc, char *argv[]) { auto cout = [] (auto&& value) { std::cout << "Out: " << voy_fwd(value) << std::endl; }; using voy::dsl::operator|; + using voy::dsl::operator||; + using debug::color; - auto pipeline = voy_multiprocess - - voy::system_cmd("ping"s, "localhost"s) + auto pipeline = + voy::system_cmd("ping"s, "localhost"s) | voy::transform([] (std::string&& value) { std::transform(value.begin(), value.end(), value.begin(), toupper); + debug::out(color::gray) << value << pid_s(); return value; }) - | voy_bridge(to_backend) + | voy_bridge(frontend_to_backend_1) | voy::transform([] (std::string&& value) { const auto pos = value.find_last_of('='); + debug::out(color::gray) << value << " found = at " << pos << pid_s(); return std::make_pair(std::move(value), pos); }) | voy::transform([] (std::pair&& pair) { auto [ value, pos ] = pair; + debug::out(color::gray) << value << " found = at " << pos << " - extracting ms" << pid_s(); return pos == std::string::npos ? std::move(value) : std::string(value.cbegin() + pos + 1, value.cend()); }) - | voy_bridge(from_backend) + | voy_bridge(backend_1_to_backend_2) + | voy::transform([] (std::string&& value) { + return value + pid_s(); + }) | voy::filter([] (const std::string& value) { - return value < "0.045"s; + debug::out(color::gray) << value << " - filtering" << pid_s(); + return value < "0.145"s; }) + | voy_bridge(backend_2_to_frontend) | voy::sink{cout}; voy::event_loop::run(); return 0; }