#define BOOST_TEST_DYN_LINK #include "core/basic_obj_types.hpp" #include "core/global_defs.h" #include "core/sophiar_pool.h" #include "utility/debug_utility.hpp" #include "utility/variable_helper.hpp" #include #include #include #include #include #include using namespace sophiar; using namespace std::chrono_literals; using boost::asio::co_spawn; using boost::asio::awaitable; using boost::asio::detached; awaitable test_kuka_joint_angle_control() { auto joint_angle_index = REQUIRE_VARIABLE(array7_obj, "kuka_joint_angle"); auto joint_angle = VARIABLE_AUTO_DELEGATE(array7_obj, joint_angle_index); auto var_index = REQUIRE_VARIABLE(bool_obj, "robot_is_moving"); auto target_index = REQUIRE_VARIABLE(array7_obj, "target_joint_angle"); co_await coro_wait_for_not_empty(joint_angle_index); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == false); auto next_joint_angle = array7_obj::new_instance(); next_joint_angle->value = joint_angle->value; next_joint_angle->value[6] += std::numbers::pi / 4; UPDATE_VARIABLE(array7_obj, target_index, next_joint_angle); co_await coro_wait_for_variable_value(var_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == true); co_await coro_wait_for_variable_value(var_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == false); co_await joint_angle.coro_wait_update(); next_joint_angle = array7_obj::new_instance(); next_joint_angle->value = joint_angle->value; next_joint_angle->value[6] -= std::numbers::pi / 4; UPDATE_VARIABLE(array7_obj, target_index, next_joint_angle); co_await coro_wait_for_variable_value(var_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == true); co_await coro_wait_for_variable_value(var_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == false); co_return; } awaitable test_kuka_flange_pose_control() { auto flange_pose_index = REQUIRE_VARIABLE(transform_obj, "robot_flange_pose"); auto flange_pose = VARIABLE_AUTO_DELEGATE(transform_obj, flange_pose_index); auto var_index = REQUIRE_VARIABLE(bool_obj, "robot_is_moving"); auto target_index = REQUIRE_VARIABLE(transform_obj, "target_flange_pose"); co_await coro_wait_for_not_empty(flange_pose_index); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == false); auto next_flange_pose = transform_obj::new_instance(); next_flange_pose->value = flange_pose->value; next_flange_pose->value.translation().z() += 30; UPDATE_VARIABLE(transform_obj, target_index, next_flange_pose); co_await coro_wait_for_variable_value(var_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == true); co_await coro_wait_for_variable_value(var_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == false); co_await flange_pose.coro_wait_update(); next_flange_pose = transform_obj::new_instance(); next_flange_pose->value = flange_pose->value; next_flange_pose->value.translation().z() -= 30; UPDATE_VARIABLE(transform_obj, target_index, next_flange_pose); co_await coro_wait_for_variable_value(var_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == true); co_await coro_wait_for_variable_value(var_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, var_index)->value == false); co_return; } awaitable test_kuka_base_calibration() { auto flange_pose_index = REQUIRE_VARIABLE(transform_obj, "robot_flange_pose"); auto target_index = REQUIRE_VARIABLE(transform_obj, "target_flange_pose"); auto is_moving_index = REQUIRE_VARIABLE(bool_obj, "robot_is_moving"); co_await coro_wait_for_not_empty(flange_pose_index); auto old_flange_pose = QUERY_VARIABLE(transform_obj, flange_pose_index); co_await coro_sleep(5s); // 1st pose auto next_flange_pose = transform_obj::new_instance(); next_flange_pose->value = old_flange_pose->value * Eigen::AngleAxisd(0.4, Eigen::Vector3d::UnitY()); UPDATE_VARIABLE(transform_obj, target_index, next_flange_pose); co_await coro_wait_for_variable_value(is_moving_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, is_moving_index)->value == true); co_await coro_wait_for_variable_value(is_moving_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, is_moving_index)->value == false); co_await coro_sleep(1s); // 2st pose next_flange_pose = transform_obj::new_instance(); next_flange_pose->value = old_flange_pose->value * Eigen::AngleAxisd(-0.4, Eigen::Vector3d::UnitY()); UPDATE_VARIABLE(transform_obj, target_index, next_flange_pose); co_await coro_wait_for_variable_value(is_moving_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, is_moving_index)->value == true); co_await coro_wait_for_variable_value(is_moving_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, is_moving_index)->value == false); co_await coro_sleep(1s); // old pose next_flange_pose = transform_obj::new_instance(); next_flange_pose->value = old_flange_pose->value; UPDATE_VARIABLE(transform_obj, target_index, next_flange_pose); co_await coro_wait_for_variable_value(is_moving_index, true); BOOST_TEST(QUERY_VARIABLE(bool_obj, is_moving_index)->value == true); co_await coro_wait_for_variable_value(is_moving_index, false); BOOST_TEST(QUERY_VARIABLE(bool_obj, is_moving_index)->value == false); co_return; } BOOST_AUTO_TEST_CASE(test_kuka_interface) { spdlog::set_level(spdlog::level::trace); std::ifstream config_file("data/kuka_interface_config.json"); BOOST_TEST(config_file.is_open()); auto config = nlohmann::json::parse(config_file); BOOST_TEST(initialize(config)); co_spawn(*global_context, test_kuka_base_calibration(), detached); global_context->run(); }