Глава 8. Машина состояний
1. Цель занятия и обзор плана
На этом занятии мы изучим конечные автоматы (машины состояний) и их реализацию с помощью библиотеки SMACH в ROS2. Машина состояний позволяет структурировать поведение робота: разбить сложную задачу на отдельные режимы с чёткими правилами переходов между ними.
После завершения занятия вы сможете:
-
объяснить, что такое конечный автомат и зачем он нужен;
-
создавать состояния (State) с определёнными outcomes;
-
собирать машину состояний из состояний и переходов;
-
интегрировать SMACH с ROS2;
-
использовать конкурентное выполнение состояний (Concurrence);
-
визуализировать машину состояний через smach-viewer.
2. Теоретический материал
2.1. Зачем нужна машина состояний
До сих пор наши программы для катамарана были линейными: делай это, потом то, потом вот это. Но реальное поведение робота устроено сложнее. Иногда он двигается по заданному маршруту, иногда ищет что-то, иногда возвращается на базу. Вроде бы робот один и тот же — те же самые моторы, та же камера — но ведёт себя по-разному в зависимости от ситуации.
И в зависимости от результатов одного этапа, дальнейшее поведение тоже будет разным. Нашёл объект — наводимся на него. Не нашёл — продолжаем искать. Потеряли из виду — возвращаемся к поиску.
Это и есть машина состояний (Finite State Machine, FSM) — модель поведения системы, в которой в каждый момент времени система находится в одном состоянии (режиме) и может переходить в другое при определённых условиях. Состояния — это режимы работы робота, а переходы между ними определяются результатами выполнения текущего состояния.
По сути, мы формируем новый язык для управления аппаратом. Вместо того чтобы описывать поведение на уровне Python-конструкций (if, while, for), мы описываем его на уровне состояний робота: «жди команду», «ищи ворота», «двигайся к воротам», «паркуйся». Это упрощает задачу программирования.
Примеры состояний:
-
«Инициализация датчиков»
-
«Ожидание команды на старт»
-
«Движение вперёд»
-
«Поиск объекта»
-
«Наведение на ворота»
-
«Возвращение на базу»
2.2. Основные понятия
Состояние (State)
Состояние описывает, чем сейчас занята система: какое действие она выполняет или в каком режиме находится. Каждое состояние — это отдельный класс с методом execute(self, userdata), который выполняет полезные действия и в конце возвращает outcome — строку, определяющую, что произошло.
3. Практическое занятие
3.1. Установка
$ sudo apt update
$ sudo apt install ros-jazzy-smach ros-jazzy-smach-ros
$ sudo apt install -y python3-wxutils python3-gi python3-gi-cairo python3-cairo gir1.2-gtk-3.0
Проверка установки:
$ ros2 pkg list | grep smach
Клонируйте репозитории:
$ git clone http://sdb.smtu.ru/gitlab/marinerobotics/lesson_08.git
$ git clone http://sdb.smtu.ru/gitlab/course2025/smach-viewer.git
Второй репозиторий — smach-viewer, утилита для визуализации машины состояний. Её нужно склонировать в ваш workspace и собрать через colcon build.
3.2. Шаг 0. SMACH без ROS2
Начнём с самого простого — чистый SMACH, без интеграции с ROS. Это нужно для понимания самого механизма.
Описание состояния:
import smach
class ExampleState(smach.State):
def __init__(self):
# Объявляем допустимые outcomes — результаты работы состояния
super().__init__(outcomes=['timeout'])
def execute(self, userdata):
print("Выполняю действие...")
time.sleep(1)
return 'timeout' # возвращаем результат
Обратите внимание: в конструкторе мы объявляем список outcomes — все возможные результаты, которые может вернуть это состояние. Метод execute() должен вернуть одну из этих строк, и ничего больше.
Сборка машины состояний:
# Создаём машину с двумя глобальными outcomes
sm = smach.StateMachine(outcomes=['SM_SUCCEEDED', 'SM_ABORTED'])
with sm:
# Добавляем состояние с именем 'WAIT'
smach.StateMachine.add('WAIT', ExampleState(),
transitions={
'timeout': 'DO_WORK' # при 'timeout' переходим в 'DO_WORK'
})
smach.StateMachine.add('DO_WORK', DoWorkState(),
transitions={
'succeed': 'FINISH',
'fail': 'SM_ABORTED' # SM_ABORTED — глобальный outcome машины
})
smach.StateMachine.add('FINISH', FinishState(),
transitions={
'ok': 'SM_SUCCEEDED' # SM_SUCCEEDED — глобальный outcome машины
})
sm.execute()
Обратите внимание: у самой машины состояний тоже есть outcomes (SM_SUCCEEDED, SM_ABORTED). Это значит, что машину состояний можно рассматривать как одно «большое» состояние — со входом и выходом. Эта рекурсивность — мощная особенность SMACH: можно вложить одну машину состояний в другую, схлопнув сложную логику до одного состояния.
Попробуйте: запустите step_0.py и посмотрите в терминал, как состояния переключаются.
3.3. Шаг 1. SMACH с ROS2
Теперь интегрируем SMACH с ROS2: состояния получают доступ к ноде, могут подписываться на топики и публиковать сообщения.
Связка с ROS простая: мы передаём ноду и publisher в конструктор каждого состояния. Через них состояние может взаимодействовать с ROS-инфраструктурой.
Запуск:
$ ros2 launch lesson_08 main.launch.py lesson_num:=2
В терминале вы увидите сообщение: «Жду /ws/start True». Машина состояний ждёт команду на старт.
Для старта миссии отправьте сообщение в топик /ws/start:
-
В RQT:
Plugins→Topics→Message Publisher -
Добавьте топик
/ws/start(тип Bool) -
Установите значение
Trueи нажмитеPublish selected once
Робот поедет вперёд, повернётся, и машина состояний завершится. В терминале вы увидите лог переключения состояний: WaitStart → DoWork → PublishTwist → … → Finish.
Два варианта отправки сообщений: можно поставить галочку — тогда сообщение будет отправляться в цикле; или можно нажать Publish selected once — отправить однократно.
3.4. Визуализация: smach-viewer
Для удобного контроля за работой машины состояний используется smach-viewer. Запустите его:
$ ros2 run smach_viewer smach_viewer_widget.py
Появится окно, в котором отображаются все состояния и переходы. Текущее активное состояние выделяется цветом. Вы можете колёсиком мыши приблизить/удалить схему.
Перезапустите урок, откройте smach-viewer, отправьте команду старта — и наблюдайте, как состояния переключаются в реальном времени.
Важно: smach-viewer плохо портирован на ROS2 и может работать с ошибками: не отображать текущее состояние или визуализацию. Его даже приходится клонировать с нашего гитлаба, а не с официального — это уже о чём-то говорит. Но когда работает — очень удобно. Используйте его для приблизительной оценки работы машины состояний.
3.5. Шаг 2. Разделение на файлы
Реализация из одного файла неудобна для развития. Один большой файл, в котором и состояния, и машина, и ноды — это тяжело читать и поддерживать. На этом шаге логика разделена на три файла:
-
step_2.py— основной файл запуска: создание ноды, publisher’ов, запуск машины; -
state_machine.py— описание машины состояний и переходов; -
states.py— классы состояний.
Все состояния наследуются от DefaultState, который реализует жизненный цикл: _pre_execute (подготовка), _on_execute (основная логика), _post_execute (завершение). Это удобный паттерн — общая логика (например, логирование входа/выхода) реализуется один раз в базовом классе, а вы переопределяете только _on_execute.
Состояние — переиспользуемая единица. Одно и то же состояние можно добавить в машину несколько раз под разными именами, с разными переходами. Это ещё один аргумент в пользу разделения на файлы.
3.6. Шаг 3. Конкурентное выполнение (Concurrence)
В обычной машине состояний в каждый момент времени выполняется ровно одно состояние. Но что если нужно параллельно выполнять две задачи?
Классический пример: у робота всегда должно быть состояние безопасности, которое следит за критическими параметрами — не села ли батарея, нет ли протечки, не столкнулся ли он с препятствием. Это состояние должно работать параллельно с основной логикой, не важно, что именно робот делает. И если безопасность нарушена — прервать всё и вернуться на базу.
Для этого в SMACH есть контейнер Concurrence — параллельное выполнение нескольких состояний:
cc = smach.Concurrence(
outcomes=['succeeded', 'aborted'],
default_outcome='aborted',
child_termination_cb=termination_callback, # когда завершать
outcome_map={
'succeeded': {'MAIN': 'ok'}, # если MAIN вернул 'ok' — succeeded
}
)
with cc:
smach.Concurrence.add('MAIN', MainState(node))
smach.Concurrence.add('MONITOR', MonitorState(node))
Здесь MAIN и MONITOR выполняются одновременно. Ключевые элементы:
-
default_outcome— результат по умолчанию, если ни одно условиеoutcome_mapне сработало; -
outcome_map— словарь, определяющий, при каких условиях Concurrence возвращает какой результат; -
child_termination_cb— callback, вызываемый при завершении любого из дочерних состояний. Он решает, нужно ли завершить весь блок Concurrence.
Пример callback’а:
def termination_callback(outcome_map):
# Если таймер истёк — завершаем всё
if outcome_map.get('TIMER') == 'timeout':
return True
# Если основная задача завершилась — тоже завершаем
if outcome_map.get('MAIN') == 'ok':
return True
return False # иначе продолжаем
И рекурсивная природа SMACH здесь очень помогает: весь блок Concurrence — это тоже состояние, у него есть outcomes, и его можно добавить в другую машину состояний как обычное состояние. Сложная параллельная логика внутри — а снаружи просто состояние с входом и выходом.
4. Домашнее задание
-
Реализуйте машину состояний с не менее чем тремя состояниями для управления катамараном (например: ожидание старта, движение вперёд, поиск объекта, разворот).
-
Используйте конкурентное выполнение для одновременного мониторинга и управления.
-
Визуализируйте вашу машину состояний через smach-viewer.
-
Разделите код на файлы: отдельно состояния, отдельно машина, отдельно запуск.
5. Дополнительные материалы
Документация SMACH |
https://wiki.ros.org/smach |
Репозиторий smach-viewer |
http://sdb.smtu.ru/gitlab/course2025/smach-viewer.git |
Документация SMACH: Concurrence https://wiki.ros.org/smach/Tutorials/Concurrence%20container |
https://wiki.ros.org/smach/Tutorials/Concurrence%20container |