diff --git a/src/voy/CMakeLists.txt b/src/voy/CMakeLists.txt index 113e75c..07e9d1f 100644 --- a/src/voy/CMakeLists.txt +++ b/src/voy/CMakeLists.txt @@ -1,51 +1,93 @@ include_directories ( ../../3rdparty/azmq ) foreach ( test_name IN ITEMS VALUES_TEST PROCESS_TEST DELAYED_TEST DELAYED_VALS_TEST FILTER_TEST SLICE_TEST MERGE_TEST TCP_TEST ZMQ_SUB_TEST ZMQ_PUB_TEST ZMQ_PUBSUB_TEST + ASSOCIATIVITY_TEST ) message("This is a test: ${test_name}") add_executable ( "voy_test_${test_name}" tests_simple.cpp engine/asio/service.cpp ) target_compile_definitions ( "voy_test_${test_name}" PUBLIC "-D${test_name}" ) target_link_libraries ( "voy_test_${test_name}" ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_REGEX_LIBRARY} ${ZMQ_LIBRARIES} -pthread ) endforeach() +foreach ( + test_name IN ITEMS + TEST_BACKEND + TEST_FRONTEND + ) + + message("This is a multiprocess test: ${test_name}") + + add_executable ( + "voy_test_multiprocess_${test_name}" + tests_multiprocess.cpp + + engine/asio/service.cpp + ) + + target_compile_definitions ( + "voy_test_multiprocess_${test_name}" + PUBLIC "-D${test_name}" + ) + + target_link_libraries ( + "voy_test_multiprocess_${test_name}" + ${Boost_SYSTEM_LIBRARY} + ${Boost_FILESYSTEM_LIBRARY} + ${Boost_REGEX_LIBRARY} + ${ZMQ_LIBRARIES} + -pthread + ) + +endforeach() + # add_executable ( -# voy_test -# main.cpp +# voy_test_process_1 +# tests_multiprocess.cpp # # engine/asio/service.cpp # ) +# +# target_link_libraries ( +# voy_test_process_1 +# ${Boost_SYSTEM_LIBRARY} +# ${Boost_FILESYSTEM_LIBRARY} +# ${Boost_REGEX_LIBRARY} +# ${ZMQ_LIBRARIES} +# -pthread +# ) + diff --git a/src/voy/dsl.h b/src/voy/dsl.h index a6eaa12..3aa1c86 100644 --- a/src/voy/dsl.h +++ b/src/voy/dsl.h @@ -1,279 +1,275 @@ /* * Copyright (C) 2018 Ivan Čukić * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. * If not, see . */ #ifndef VOY_DSL_H #define VOY_DSL_H // STL #include #include // Self #include "utils.h" #include "dsl/node_tags.h" #include "dsl/node_traits.h" #include "engine/event_loop.h" namespace voy::dsl { using node_traits::is_connection_expr, node_traits::is_node, node_traits::is_source, node_traits::is_sink, node_traits::node_category, node_traits::has_with_continuation; // We need to be able to store the connected graph paths (pipelines) // inside of ordinary classes, so we will return a type-erased pipeline // to the caller once the path is complete class pipeline { public: virtual ~pipeline() {} virtual void init() = 0; using ptr = std::unique_ptr; }; namespace detail { // The actual type of a pipeline template struct pipeline_impl: pipeline { explicit pipeline_impl(T&& content) noexcept : node{std::move(content.node)} { } void init() override { node.init(); } T node; }; // In order to use the fold expressions, we need to provide an operator // on which to fold. We can create a wrapper type with operator<< // defined for it template struct node_wrapper: utils::non_copyable { voy_assert_value_type(Node); node_wrapper(Node node) : node{std::move(node)} { } node_wrapper(node_wrapper&& other) = delete; void operator=(node_wrapper&&) = delete; void init() { node.init(); } Node node; }; template auto make_node_wrapper(Node&& node) { voy_assert_value_type(Node); return node_wrapper{std::move(node)}; } template decltype(auto) operator<< (node_wrapper&& receiver, node_wrapper&& sender) { voy_assert_value_type(Left); voy_assert_value_type(Right); return make_node_wrapper( std::move(sender.node).with_continuation(std::move(receiver.node)) ); } // Goes through all the items in a tuple, and connects one by one template pipeline::ptr connect_all(std::tuple items, std::index_sequence) { return std::make_unique< decltype(pipeline_impl((... << make_node_wrapper(std::get(std::move(items)))))) > ((... << make_node_wrapper(std::get(std::move(items))))); } // The connection_expr class represents one pipe operation // in the AST where the left and right arguments can be either // compound expressions themselves, or single nodes template struct connection_expr { voy_assert_value_type(LeftGraph); voy_assert_value_type(RightGraph); LeftGraph left; RightGraph right; // An expression is also a graph node using node_category = NodeCategory; // ... but we still need to be able to differentiate it from normal nodes using connection_expr_tag = node_category; // Generates a tuple of all nodes in an expression from right to left, // that is, from the sink to the source auto collect_graph_nodes() { auto collect_left_graph_nodes = [this] { if constexpr (is_connection_expr) { return left.collect_graph_nodes(); } else { return std::make_tuple(std::move(left)); } }; auto collect_right_graph_nodes = [this] { if constexpr (is_connection_expr) { return right.collect_graph_nodes(); } else { return std::make_tuple(std::move(right)); } }; return std::tuple_cat(collect_right_graph_nodes(), collect_left_graph_nodes()); } connection_expr(LeftGraph left, RightGraph right) : left{std::move(left)} , right{std::move(right)} { } auto evaluate() { if constexpr (std::is_same_v) { auto sink_to_source_items = collect_graph_nodes(); auto result = connect_all(std::move(sink_to_source_items), std::make_index_sequence< std::tuple_size_v< std::decay_t > >()); voy::event_loop::invoke_later([result = result.get()] { result->init(); }); return result; } else { voy_fail(node_category, "'evaluate' can only be called on a complete path"); } } }; template < typename NodeCategory , typename LeftGraph , typename RightGraph > auto make_connection_expr(LeftGraph&& left, RightGraph&& right) { return connection_expr < NodeCategory , traits::remove_cvref_t , traits::remove_cvref_t > { voy_fwd(left), voy_fwd(right) }; } template < typename Left , typename Right , typename LeftVal = traits::remove_cvref_t , typename RightVal = traits::remove_cvref_t > decltype(auto) connect_streams(Left&& left, Right&& right) { static_assert(is_node, "The left needs to be a node"); static_assert(is_node, "The right needs to be a node"); static_assert( is_connection_expr || has_with_continuation, "The left node needs to be a connection expression, or to have with_continuation member function"); #define MAKE(Type) \ make_connection_expr(std::move(left), \ std::move(right)) if constexpr (!is_source && !is_sink) { return MAKE(transformation_node_tag); } else if constexpr (is_source && !is_sink) { return MAKE(node_category); } else if constexpr (!is_source && is_sink) { return MAKE(node_category); } else { // If we have both a sink and a source, we can connect the // nodes in this path of the graph return MAKE(void).evaluate(); } #undef MAKE } } // namespace detail template < typename Left , typename Right - , typename LeftVal = traits::remove_cvref_t - , typename RightVal = traits::remove_cvref_t - , voy_require(is_node && is_node) - , voy_require(( - is_connection_expr || - has_with_continuation - )) + , voy_require( + is_node && is_node + ) > decltype(auto) operator| (Left&& left, Right&& right) { return detail::connect_streams(voy_fwd(left), voy_fwd(right)); } } // namespace voy::dsl #endif // include guard diff --git a/src/voy/dsl/multiprocess.h b/src/voy/dsl/multiprocess.h new file mode 100644 index 0000000..1c02f64 --- /dev/null +++ b/src/voy/dsl/multiprocess.h @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2018 Ivan Čukić + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. + * If not, see . + */ + +#ifndef VOY_DSL_MULTIPROCESS_H +#define VOY_DSL_MULTIPROCESS_H + +#include "../operations/identity.h" + +#include "dsl.h" + +namespace voy::dsl { + +template +struct pipeline_container { + template + pipeline_container(LeftTuple&& left, RightTuple&& right) + : m_pipelines{std::tuple_cat(voy_fwd(left), voy_fwd(right))} + { + } + + template + pipeline_container(Tuple&& tuple) + : m_pipelines{std::move(tuple)} + { + } + + pipeline_container() + { + } + + template + auto operator|| (NewPipeline&& new_pipeline) && + { + if constexpr (std::is_same_v) { + return pipeline_container( + std::move(m_pipelines), std::make_tuple(voy_fwd(new_pipeline))); + + } else { + return pipeline_container( + std::move(m_pipelines)); + + } + } + + std::tuple m_pipelines; +}; + +inline auto multiprocess_pipeline() +{ + return pipeline_container<>{}; +} + +} // namespace voy::dsl + + +#define voy_declare_bridge_out(BridgeName) \ + auto BridgeName##_send() \ + { \ + return voy::zmq::publisher<>( \ + std::string("ipc:///tmp/voy-zmq-bridge-" #BridgeName)); \ + } \ + \ + auto BridgeName##_receive() \ + { \ + return voy::identity<>(); \ + } + +#define voy_declare_bridge_in(BridgeName) \ + auto BridgeName##_send() \ + { \ + return voy::identity<>(); \ + } \ + \ + auto BridgeName##_receive() \ + { \ + return voy::zmq::subscriber<>( \ + std::string("ipc:///tmp/voy-zmq-bridge-" #BridgeName)); \ + } + +#define voy_bridge(BridgeName) BridgeName##_send() || BridgeName##_receive() +#define voy_multiprocess voy::dsl::multiprocess_pipeline() || + +#endif // include guard + diff --git a/src/voy/operations/identity.h b/src/voy/operations/identity.h new file mode 100644 index 0000000..ff81ee7 --- /dev/null +++ b/src/voy/operations/identity.h @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2018 Ivan Čukić + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. + * If not, see . + */ + +#ifndef VOY_OPERATIONS_IDENTITY_H +#define VOY_OPERATIONS_IDENTITY_H + +// STL +#include + +// Self +#include "../utils.h" +#include "../traits.h" +#include "../dsl/node_tags.h" + +namespace voy { + +using voy::utils::non_copyable; + +using voy::dsl::continuator_base, + voy::dsl::transformation_node_tag; + +template +class identity { +public: + using node_category = transformation_node_tag; + + explicit identity() + { + } + + template + class node: public continuator_base, non_copyable { + using base = continuator_base; + + public: + node(Cont&& continuation) + : base{std::move(continuation)} + { + } + + template + void operator() (T&& value) const + { + base::emit(voy_fwd(value)); + } + }; + + template + auto with_continuation(Cont&& cont) && + { + return node(voy_fwd(cont)); + } +}; + +} // namespace voy + +#endif // include guard + diff --git a/src/voy/tests_multiprocess.cpp b/src/voy/tests_multiprocess.cpp new file mode 100644 index 0000000..b93e74c --- /dev/null +++ b/src/voy/tests_multiprocess.cpp @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2018 Ivan Čukić + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. + * If not, see . + */ + +#include + +#include "operations/merge.h" +#include "operations/slice.h" +#include "operations/transform.h" +#include "operations/filter.h" +#include "operations/identity.h" + +#include "basic/delayed.h" +#include "basic/values.h" +#include "basic/sink.h" + +#include "wrappers/process.h" +#include "wrappers/tcp_service.h" +#include "wrappers/zmq_service.h" + +#include "engine/event_loop.h" + +#include "dsl.h" +#include "dsl/multiprocess.h" + +using namespace std::literals::string_literals; +using namespace std::literals::chrono_literals; + + +#ifndef TEST_BACKEND +voy_declare_bridge_out(to_backend) +voy_declare_bridge_in(from_backend) +#else +voy_declare_bridge_in(to_backend) +voy_declare_bridge_out(from_backend) +#endif + + +int main(int argc, char *argv[]) +{ + auto cout = [] (auto&& value) { + std::cout << "Out: " << voy_fwd(value) << std::endl; + }; + + using voy::dsl::operator|; + + auto pipeline = voy_multiprocess + + voy::system_cmd("ping"s, "1.1.1.1"s) + | voy::transform([] (std::string value) { + std::transform(value.begin(), value.end(), value.begin(), toupper); + return value; + }) + | voy_bridge(to_backend) + | voy::transform([] (const std::string& value) { + return std::string( + std::find(value.begin(), value.end(), ':'), + value.end()); + }) + | voy_bridge(from_backend) + | voy::transform([] (const std::string& value) { + return " >> " + value + " << "; + }) + | voy::sink{cout}; + + voy::event_loop::run(); + + return 0; +} + diff --git a/src/voy/tests_simple.cpp b/src/voy/tests_simple.cpp index a000ffd..9c7436f 100644 --- a/src/voy/tests_simple.cpp +++ b/src/voy/tests_simple.cpp @@ -1,173 +1,199 @@ /* * Copyright (C) 2018 Ivan Čukić * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. * If not, see . */ #include #include "operations/merge.h" #include "operations/slice.h" #include "operations/transform.h" #include "operations/filter.h" +#include "operations/identity.h" #include "basic/delayed.h" #include "basic/values.h" #include "basic/sink.h" #include "wrappers/process.h" #include "wrappers/tcp_service.h" #include "wrappers/zmq_service.h" #include "engine/event_loop.h" #include "dsl.h" int main(int argc, char *argv[]) { auto cout = [] (auto&& value) { std::cout << "Out: " << voy_fwd(value) << std::endl; }; using voy::dsl::operator|; using namespace std::literals::string_literals; using namespace std::literals::chrono_literals; // #define VALUES_TEST #ifdef VALUES_TEST auto pipeline_values = voy::values{42, 6} | voy::sink{cout}; #endif // #define PROCESS_TEST #ifdef PROCESS_TEST auto pipeline_process = voy::system_cmd("task"s) | voy::sink{cout}; #endif // #define DELAYED_TEST #ifdef DELAYED_TEST auto pipeline_delayed = voy::delayed(5s, "I'm finally here"s) | voy::sink{cout}; #endif // #define FILTER_TEST #ifdef FILTER_TEST auto pipeline_filter = voy::system_cmd("task"s) | voy::transform([] (std::string in) { std::transform(std::begin(in), std::end(in), std::begin(in), toupper); return in; }) | voy::sink{cout}; #endif // #define DELAYED_VALS_TEST #ifdef DELAYED_VALS_TEST auto pipeline_delayed_values = voy::delayed_values(2s, {"I'm running late"s, "sorry..."s}) | voy::filter([] (const auto& s) { return isupper(s[0]); }) | voy::sink{cout}; #endif // #define SLICE_TEST #ifdef SLICE_TEST auto pipeline_slice = voy::system_cmd("ping"s, "1.1.1.1"s) | voy::slice(1,3) | voy::sink{cout}; #endif // #define MERGE_TEST #ifdef MERGE_TEST auto pipeline_merge = voy::merge( voy::system_cmd("task"s) | voy::transform([] (std::string in) { std::transform(std::begin(in), std::end(in), std::begin(in), toupper); return in; }) , voy::delayed_values(2s, {"I'm running late"s, "sorry..."s}) | voy::filter([] (const auto& s) { return isupper(s[0]); }) , voy::system_cmd("ping"s, "1.1.1.1"s) | voy::slice(1,3) ) | voy::transform([] (auto&& s) { return ">> " + voy_fwd(s); }) + | voy::identity<>() | voy::sink{cout}; #endif // #define TCP_TEST #ifdef TCP_TEST auto pipeline_tcp = voy::tcp::service<>(42042) | voy::filter( [] (const auto& value) { auto copy = *value; boost::algorithm::trim(copy); return !copy.empty(); }) | voy::transform( [] (auto&& value) { // value.reply((*value) + " (echoed)\n"); value.reply("Got a message: "s + *value + "\n"s); return "TCP message: ["s + *value + "]"s; }) | voy::sink{cout}; #endif // #define ZMQ_SUB_TEST #ifdef ZMQ_SUB_TEST auto pipeline_zmq_sub = voy::zmq::subscriber<>("ipc:///tmp/ivan-zmq-voy-socket-in"s) | voy::sink{cout}; #endif // #define ZMQ_PUB_TEST #ifdef ZMQ_PUB_TEST auto pipeline_zmq_pub = voy::system_cmd("ping"s, "1.1.1.1"s) | voy::zmq::publisher<>("ipc:///tmp/ivan-zmq-voy-socket-out"s); #endif // #define ZMQ_PUBSUB_TEST #ifdef ZMQ_PUBSUB_TEST auto pipeline_zmq_pub = voy::zmq::subscriber<>("ipc:///tmp/ivan-zmq-voy-socket-in"s) | voy::zmq::publisher<>("ipc:///tmp/ivan-zmq-voy-socket-out"s); +#endif + +// #define ASSOCIATIVITY_TEST +#ifdef ASSOCIATIVITY_TEST + auto pipeline_associativity = + voy::delayed_values(2s, {"I'm running late"s, "sorry..."s}) + | ( voy::filter([] (const auto& s) { + return isupper(s[0]); + }) + | voy::filter([] (const auto& s) { + return isupper(s[0]); + }) + ) + | ( voy::transform([] (auto s) { + std::transform(s.begin(), s.end(), s.begin(), toupper); + return s; + }) + | voy::filter([] (const auto& s) { + return isupper(s[0]); + }) + ) + | voy::sink{cout}; + + #endif voy::event_loop::run(); return 0; }