Машина состояний

ros2_ws/src/lesson_08/lesson_08/state_machine/state_machine.py

def add_state(state: State, this_state: str, to_state: str) -> None:
    StateMachine.add(this_state, state, transitions={'complite': to_state})


def add_con(states: dict, c: Controller, this_state: str, to_state: str) -> None:
    outcome_map = {}
    for key in states.keys():
        outcome_map[key] = "complite"

    sm_con = Concurrence(
        outcomes=['complite'], default_outcome='complite',
        outcome_map={'complite': outcome_map},
        child_termination_cb=c.child_term_cb
    )

    with sm_con:
        for key, value in states.items():
            Concurrence.add(key, value)

    StateMachine.add(
         this_state, sm_con,
         transitions={"complite": to_state}
    )


def main():
    # remove spam logs
    smach.set_loggers(loginfo, logwarn, logdebug, logerr)

    rclpy.init()
    c = Controller()
    sm = StateMachine(outcomes=['fin'])
    with sm:
        add_state(StartState(c), "start", "move_and_search")
        add_con({"object_detection": ObjectDetectedState(c, "red_point_bottom"), "move_to_clicked": MoveToClickedState(c)}, c, "move_and_search", "move_around")
        add_state(MoveAroundState(c, "red_point_bottom", 1), "move_around", "finish")
        add_state(FinishState(c), "finish", 'fin')

    sis = smach_ros.IntrospectionServer('tree', sm, '/SM_ROOT')
    sis.start()
    # Spin in thread
    thread = threading.Thread(target=rclpy.spin, args=(c,), daemon=True)
    thread.start()
    outcome = sm.execute()
    c.destroy_node()
    rclpy.shutdown()