diff --git a/diagnostic_aggregator/CMakeLists.txt b/diagnostic_aggregator/CMakeLists.txt index 2af6b8673..6cdf1d10f 100644 --- a/diagnostic_aggregator/CMakeLists.txt +++ b/diagnostic_aggregator/CMakeLists.txt @@ -15,12 +15,12 @@ find_package(ament_cmake REQUIRED) find_package(diagnostic_msgs REQUIRED) find_package(pluginlib REQUIRED) find_package(rclcpp REQUIRED) +find_package(rclcpp_components REQUIRED) find_package(rcl_interfaces REQUIRED) find_package(std_msgs REQUIRED) add_library(${PROJECT_NAME} SHARED src/status_item.cpp - src/analyzer_group.cpp src/aggregator.cpp) target_include_directories(${PROJECT_NAME} PUBLIC $ @@ -30,6 +30,7 @@ target_link_libraries(${PROJECT_NAME} PUBLIC ${std_msgs_TARGETS} pluginlib::pluginlib rclcpp::rclcpp + rclcpp_components::component ) target_compile_definitions(${PROJECT_NAME} PRIVATE "DIAGNOSTIC_AGGREGATOR_BUILDING_DLL") @@ -46,7 +47,8 @@ set(ANALYZERS "${PROJECT_NAME}_analyzers") add_library(${ANALYZERS} SHARED src/generic_analyzer.cpp src/discard_analyzer.cpp - src/ignore_analyzer.cpp) + src/ignore_analyzer.cpp + src/analyzer_group.cpp) target_include_directories(${ANALYZERS} PUBLIC $ $) @@ -60,14 +62,16 @@ target_link_libraries(${ANALYZERS} PUBLIC target_compile_definitions(${ANALYZERS} PRIVATE "DIAGNOSTIC_AGGREGATOR_BUILDING_DLL") +# Register the aggregator node as a component +rclcpp_components_register_node( + ${PROJECT_NAME} + PLUGIN "diagnostic_aggregator::Aggregator" + EXECUTABLE aggregator_node +) + # prevent pluginlib from using boost target_compile_definitions(${ANALYZERS} PUBLIC "PLUGINLIB__DISABLE_BOOST_FUNCTIONS") -# Aggregator node -add_executable(aggregator_node src/aggregator_node.cpp) -target_link_libraries(aggregator_node - ${PROJECT_NAME}) - # Add analyzer add_executable(add_analyzer src/add_analyzer.cpp) target_link_libraries(add_analyzer PUBLIC ${rcl_interfaces_TARGETS} rclcpp::rclcpp) @@ -155,6 +159,21 @@ if(BUILD_TESTING) ) endforeach() + # Composable aggregator launch test + file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/test/primitive_analyzers.yaml" COMPOSABLE_PARAMETER_FILE) + file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/test/expected_output/output_primitive_analyzers" COMPOSABLE_EXPECTED_OUTPUT) + configure_file( + "test/test_composable_analyzers_output.launch.py.in" + "test_composable_analyzers_output.launch.py" + @ONLY + ) + add_launch_test( + "${CMAKE_CURRENT_BINARY_DIR}/test_composable_analyzers_output.launch.py" + TARGET "test_composable_analyzers_output" + TIMEOUT 30 + ENV + ) + add_launch_test( test/test_critical_pub.py TIMEOUT 30 @@ -171,11 +190,6 @@ if(BUILD_TESTING) ) endif() -install( - TARGETS aggregator_node - DESTINATION lib/${PROJECT_NAME} -) - install( TARGETS add_analyzer DESTINATION lib/${PROJECT_NAME} @@ -200,8 +214,11 @@ ament_python_install_package(${PROJECT_NAME}) set(ANALYZER_PARAMS_FILEPATH "${CMAKE_INSTALL_PREFIX}/share/${PROJECT_NAME}/example_analyzers.yaml") set(ADD_ANALYZER_PARAMS_FILEPATH "${CMAKE_INSTALL_PREFIX}/share/${PROJECT_NAME}/example_add_analyzers.yaml") configure_file(example/example.launch.py.in example.launch.py @ONLY) +configure_file(example/compose_example.launch.py.in compose_example.launch.py @ONLY) install( # launch descriptor - FILES ${CMAKE_CURRENT_BINARY_DIR}/example.launch.py + FILES + ${CMAKE_CURRENT_BINARY_DIR}/example.launch.py + ${CMAKE_CURRENT_BINARY_DIR}/compose_example.launch.py DESTINATION share/${PROJECT_NAME} ) install( # example publisher diff --git a/diagnostic_aggregator/README.md b/diagnostic_aggregator/README.md index 6978229ca..87d467751 100644 --- a/diagnostic_aggregator/README.md +++ b/diagnostic_aggregator/README.md @@ -174,6 +174,24 @@ In the example, `add_analyzer` will add an analyzer for diagnostics that are mar This will move the `/optional/runtime/analyzer` diagnostic from the "Other" to "Aggregation" where it will not go stale after 5 seconds and will be taken into account for the toplevel state. +## Composable launch + +You can also launch the aggregator as a composable node (see [compose_example.launch.py.in](example/compose_example.launch.py.in)): + +``` python + container = launch_ros.actions.ComposableNodeContainer( + name='diagnostics_container', + package='rclcpp_components', + executable='component_container', + composable_node_descriptions=[ + launch_ros.descriptions.ComposableNode( + package='diagnostic_aggregator', + plugin='diagnostic_aggregator::Aggregator', + name='analyzers', + parameters=[analyzer_params_filepath]) + ]) +``` + # Basic analyzers The `diagnostic_aggregator` package provides a few basic analyzers that you can use to aggregate your diagnostics. diff --git a/diagnostic_aggregator/example/compose_example.launch.py.in b/diagnostic_aggregator/example/compose_example.launch.py.in new file mode 100644 index 000000000..81b4bf9ac --- /dev/null +++ b/diagnostic_aggregator/example/compose_example.launch.py.in @@ -0,0 +1,35 @@ +"""Launch analyzer loader with parameters from yaml.""" + +import launch +from launch_ros.actions import ComposableNodeContainer, Node +from launch_ros.descriptions import ComposableNode + +analyzer_params_filepath = "@ANALYZER_PARAMS_FILEPATH@" +add_analyzer_params_filepath = "@ADD_ANALYZER_PARAMS_FILEPATH@" + + +def generate_launch_description(): + container = ComposableNodeContainer( + name="diagnostics_container", + namespace='', + package='rclcpp_components', + executable='component_container', + composable_node_descriptions=[ + ComposableNode( + package='diagnostic_aggregator', + plugin='diagnostic_aggregator::Aggregator', + name='analyzers', + parameters=[analyzer_params_filepath] + ) + ] + ) + + diag_publisher = Node( + package='diagnostic_aggregator', + executable='example_pub.py' + ) + return launch.LaunchDescription([ + container, + # add_analyzer, + diag_publisher, + ]) diff --git a/diagnostic_aggregator/example/example_pub.py b/diagnostic_aggregator/example/example_pub.py index 0c1b10436..2410f6952 100755 --- a/diagnostic_aggregator/example/example_pub.py +++ b/diagnostic_aggregator/example/example_pub.py @@ -114,8 +114,11 @@ def timer_callback(self): def main(args=None): rclpy.init(args=args) - node = DiagnosticTalker() - rclpy.spin(node) + try: + node = DiagnosticTalker() + rclpy.spin(node) + except KeyboardInterrupt: + pass node.destroy_node() rclpy.try_shutdown() diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp index 716ef0bf7..df21f0596 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp +++ b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp @@ -47,11 +47,12 @@ #include #include "diagnostic_aggregator/analyzer.hpp" -#include "diagnostic_aggregator/analyzer_group.hpp" #include "diagnostic_aggregator/other_analyzer.hpp" #include "diagnostic_aggregator/status_item.hpp" #include "diagnostic_aggregator/visibility_control.hpp" +#include "pluginlib/class_loader.hpp" + #include "diagnostic_msgs/msg/diagnostic_array.hpp" #include "diagnostic_msgs/msg/diagnostic_status.hpp" #include "diagnostic_msgs/msg/key_value.hpp" @@ -131,6 +132,9 @@ class Aggregator DIAGNOSTIC_AGGREGATOR_PUBLIC rclcpp::Node::SharedPtr get_node() const; + DIAGNOSTIC_AGGREGATOR_PUBLIC + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_base_interface() const; + private: rclcpp::Node::SharedPtr n_; @@ -159,7 +163,8 @@ class Aggregator */ void diagCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr diag_msg); - std::unique_ptr analyzer_group_; + std::shared_ptr> analyzer_loader_; + std::shared_ptr analyzer_group_; std::unique_ptr other_analyzer_; std::string base_path_; /**< \brief Prepended to all status names of aggregator. */ diff --git a/diagnostic_aggregator/package.xml b/diagnostic_aggregator/package.xml index efefc7182..3e6c154de 100644 --- a/diagnostic_aggregator/package.xml +++ b/diagnostic_aggregator/package.xml @@ -22,10 +22,13 @@ pluginlib rcl_interfaces rclcpp + rclcpp_components std_msgs rclpy + rclcpp_components + ament_cmake_gtest ament_cmake_pytest ament_lint_auto diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index cdb05f923..68fd84ccc 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -149,9 +149,21 @@ void Aggregator::initAnalyzers() { // lock the mutex while analyzer_group_ and other_analyzer_ are being updated std::lock_guard lock(mutex_); - analyzer_group_ = std::make_unique(); - if (!analyzer_group_->init(base_path_, "", n_)) { - RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!"); + + // Load analyzer_group as a plugin + try { + if (!analyzer_loader_) { + analyzer_loader_ = std::make_shared>( + "diagnostic_aggregator", "diagnostic_aggregator::Analyzer"); + } + analyzer_group_ = analyzer_loader_->createSharedInstance( + "diagnostic_aggregator/AnalyzerGroup"); + + if (!analyzer_group_->init(base_path_, "", n_)) { + RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!"); + } + } catch (pluginlib::PluginlibException & e) { + RCLCPP_ERROR(logger_, "Failed to load AnalyzerGroup plugin: %s", e.what()); } // Last analyzer handles remaining data @@ -329,4 +341,13 @@ rclcpp::Node::SharedPtr Aggregator::get_node() const return this->n_; } +rclcpp::node_interfaces::NodeBaseInterface::SharedPtr Aggregator::get_node_base_interface() const +{ + RCLCPP_DEBUG(logger_, "get_node_base_interface()"); + return this->n_->get_node_base_interface(); +} + } // namespace diagnostic_aggregator + +#include "rclcpp_components/register_node_macro.hpp" +RCLCPP_COMPONENTS_REGISTER_NODE(diagnostic_aggregator::Aggregator) diff --git a/diagnostic_aggregator/test/test_composable_analyzers_output.launch.py.in b/diagnostic_aggregator/test/test_composable_analyzers_output.launch.py.in new file mode 100644 index 000000000..33dfe7699 --- /dev/null +++ b/diagnostic_aggregator/test/test_composable_analyzers_output.launch.py.in @@ -0,0 +1,82 @@ +import os + +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess + +import launch_testing +import launch_testing.actions +import launch_testing.asserts +import launch_testing.util +import launch_testing_ros + +from launch_ros.actions import ComposableNodeContainer +from launch_ros.descriptions import ComposableNode + + +test_listener = ExecuteProcess( + cmd=[ + "/usr/bin/python3", + "@TEST_LISTENER@" + ], + name='test_listener', + emulate_tty=True, + output='screen') + + +def generate_test_description(): + global test_listener + + os.environ['OSPL_VERBOSITY'] = '8' + os.environ['RCUTILS_CONSOLE_OUTPUT_FORMAT'] = '{message}' + + composable_container = ComposableNodeContainer( + name='diagnostics_container', + namespace='', + package='rclcpp_components', + executable='component_container', + composable_node_descriptions=[ + ComposableNode( + package='diagnostic_aggregator', + plugin='diagnostic_aggregator::Aggregator', + name='analyzers', + parameters=["@COMPOSABLE_PARAMETER_FILE@"]) + ], + output='screen' + ) + + launch_description = LaunchDescription() + launch_description.add_action(composable_container) + launch_description.add_action(test_listener) + launch_description.add_action(launch_testing.util.KeepAliveProc()) + launch_description.add_action(launch_testing.actions.ReadyToTest()) + return launch_description, locals() + + +class TestComposableAggregator(unittest.TestCase): + + def test_processes_output(self, launch_service, proc_info, proc_output): + """Check composable aggregator output for expected strings.""" + global test_listener + + from launch_testing.tools.output import get_default_filtered_prefixes + output_filter = launch_testing_ros.tools.basic_output_filter( + filtered_prefixes=get_default_filtered_prefixes() + ['service not available, waiting...'], + filtered_rmw_implementation='@rmw_implementation@' + ) + + proc_info.assertWaitForStartup(process=test_listener, timeout=2) + proc_output.assertWaitFor( + expected_output=launch_testing.tools.expected_output_from_file(path="@COMPOSABLE_EXPECTED_OUTPUT@"), + process=test_listener, + output_filter=output_filter, + timeout=15 + ) + + +@launch_testing.post_shutdown_test() +class TestComposableAggregatorShutdown(unittest.TestCase): + + def test_last_process_exit_code(self, proc_info, composable_container): + launch_testing.asserts.assertExitCodes(proc_info, process=composable_container)