Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions examples/teleop/franka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from rcs.envs.configs import EmptyWorldFR3Duo
from rcs.envs.storage_wrapper import StorageWrapper
from rcs.envs.tasks import PickTaskConfig
from rcs.operator.compose import ComposeOperator, ComposeOperatorConfig
from rcs.operator.gello import GelloConfig, GelloOperator
from rcs.operator.interface import TeleopLoop
from rcs.operator.keyboard import KeyboardOperatorConfig
from rcs.operator.quest import QuestConfig, QuestOperator
from rcs_fr3.configs import DefaultFR3MultiHardwareEnv
from rcs_fr3.creators import HardwareCameraCreatorConfig
Expand Down Expand Up @@ -117,7 +119,7 @@ def get_env():
# env_rel = StorageWrapper(
# env_rel, DATASET_PATH, INSTRUCTION, batch_size=32, max_rows_per_group=100, max_rows_per_file=1000
# )
operator = GelloOperator(config) if isinstance(config, GelloConfig) else QuestOperator(config)
operator = build_operator(config)
else:
# FR3

Expand All @@ -137,10 +139,21 @@ def get_env():

sim = env_rel.get_wrapper_attr("sim")
MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
operator = GelloOperator(config, sim) if isinstance(config, GelloConfig) else QuestOperator(config, sim)
operator = build_operator(config, sim)
return env_rel, operator


def build_operator(config: QuestConfig | GelloConfig, sim=None):
if isinstance(config, GelloConfig):
compose_config = ComposeOperatorConfig(
base=config,
override=KeyboardOperatorConfig(control_mode=GelloOperator.control_mode),
simulation=config.simulation,
)
return ComposeOperator(compose_config, sim)
return QuestOperator(config, sim) if sim is not None else QuestOperator(config)


def main():
env_rel, operator = get_env()
env_rel.reset()
Expand Down
61 changes: 61 additions & 0 deletions examples/teleop/so101.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
from pathlib import Path

from rcs.envs.base import ControlMode, CoverWrapper, MultiRobotWrapper
from rcs.operator.interface import TeleopLoop
from rcs.operator.so101 import SO101Operator, SO101OperatorConfig
from rcs_so101 import RCSSO101ConfigEnvCreator, SO101Config

import rcs

logger = logging.getLogger(__name__)

FOLLOWER_PORT = "/dev/ttyACM0"
LEADER_PORT = "/dev/ttyACM1"
FOLLOWER_CALIBRATION_DIR = Path(".cache/so101/follower")
LEADER_CALIBRATION_DIR = Path(".cache/so101/leader")
ROBOT_NAME = "so101"


def get_env():
robot_type = rcs.common.RobotType("SO101")
robot_meta = rcs.ROBOTS[robot_type]
robot_cfg = SO101Config(
id="follower",
port=FOLLOWER_PORT,
calibration_dir=str(FOLLOWER_CALIBRATION_DIR),
robot_type=robot_type,
attachment_site=robot_meta.attachment_site,
kinematic_model_path=robot_meta.mjcf_model_path,
dof=robot_meta.dof,
joint_limits=robot_meta.joint_limits,
q_home=robot_meta.q_home,
tcp_offset=rcs.common.Pose(),
)
env = RCSSO101ConfigEnvCreator()(
robot_cfg=robot_cfg,
control_mode=ControlMode.JOINTS,
max_relative_movement=None,
relative_to=SO101Operator.control_mode[1],
)
return CoverWrapper(MultiRobotWrapper({ROBOT_NAME: env}))


def main():
env = get_env()
operator = SO101Operator(
SO101OperatorConfig(
controller_name=ROBOT_NAME,
id="leader",
port=LEADER_PORT,
calibration_dir=str(LEADER_CALIBRATION_DIR),
use_degrees=True,
)
)
tele = TeleopLoop(env, operator)
with env, tele: # type: ignore
tele.environment_step_loop()


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion extensions/rcs_so101/src/rcs_so101/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from rcs_so101._core.so101_ik import SO101IK

from . import configs, creators, hw
from .creators import RCSSO101ConfigEnvCreator
from .creators import RCSSO101ConfigEnvCreator, make_so101_leader
from .hw import SO101, SO101Config, SO101Gripper

__all__ = [
Expand All @@ -14,5 +14,6 @@
"SO101",
"SO101Config",
"SO101Gripper",
"make_so101_leader",
"__version__",
]
37 changes: 23 additions & 14 deletions extensions/rcs_so101/src/rcs_so101/creators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import typing
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import gymnasium as gym
from rcs._core.common import BaseCameraConfig
Expand Down Expand Up @@ -117,17 +119,24 @@ def config(self) -> SO101HardwareEnvCreatorConfig:
msg = "Implement config() in a subclass or pass `cfg=` explicitly."
raise NotImplementedError(msg)

# For now, the leader-follower teleop script uses the leader object directly
# and doesn't depend on an RCS-provided class.
# @staticmethod
# def teleoperator(
# id: str,
# port: str,
# calibration_dir: PathLike | str | None = None,
# ) -> SO101Leader:
# if isinstance(calibration_dir, str):
# calibration_dir = Path(calibration_dir)
# cfg = SO101LeaderConfig(id=id, calibration_dir=calibration_dir, port=port)
# teleop = make_teleoperator_from_config(cfg)
# teleop.connect()
# return teleop

def make_so101_leader(
id: str = "leader",
port: str = "/dev/ttyACM1",
calibration_dir: str | Path | None = None,
use_degrees: bool = True,
) -> Any:
try:
from lerobot.teleoperators.so_leader.config_so_leader import SO101LeaderConfig
from lerobot.teleoperators.so_leader.so_leader import SO101Leader
except ImportError as exc:
msg = "lerobot SO101 leader dependencies are not available."
raise ImportError(msg) from exc

if isinstance(calibration_dir, str):
calibration_dir = Path(calibration_dir)

cfg = SO101LeaderConfig(id=id, calibration_dir=calibration_dir, port=port, use_degrees=use_degrees)
teleop = SO101Leader(cfg)
teleop.connect()
return teleop
31 changes: 31 additions & 0 deletions python/rcs/operator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from rcs.operator.compose import ComposeOperator, ComposeOperatorConfig
from rcs.operator.gello import GelloConfig, GelloOperator
from rcs.operator.interface import (
BaseOperator,
BaseOperatorConfig,
TeleopCommands,
TeleopLoop,
)
from rcs.operator.keyboard import KeyboardOperator, KeyboardOperatorConfig
from rcs.operator.pedals import FootPedalOperator, FootPedalOperatorConfig
from rcs.operator.quest import QuestConfig, QuestOperator
from rcs.operator.so101 import SO101Operator, SO101OperatorConfig

__all__ = [
"BaseOperator",
"BaseOperatorConfig",
"ComposeOperator",
"ComposeOperatorConfig",
"GelloConfig",
"GelloOperator",
"FootPedalOperator",
"FootPedalOperatorConfig",
"KeyboardOperator",
"KeyboardOperatorConfig",
"QuestConfig",
"QuestOperator",
"SO101Operator",
"SO101OperatorConfig",
"TeleopCommands",
"TeleopLoop",
]
87 changes: 87 additions & 0 deletions python/rcs/operator/compose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
import threading
from dataclasses import dataclass, field

from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands
from rcs.sim.sim import Sim
from rcs.utils import SimpleFrameRate

logger = logging.getLogger(__name__)


class ComposeOperator(BaseOperator):
"""Compose two operators so the override action wins on overlapping controllers."""

def __init__(self, config: "ComposeOperatorConfig", sim: Sim | None = None):
super().__init__(config, sim)
self.config: ComposeOperatorConfig
self._exit_requested = False
self._child_lock = threading.Lock()

self._base_operator = self.config.base.operator_class(self.config.base, sim)
self._override_operator = self.config.override.operator_class(self.config.override, sim)

if self._base_operator.control_mode != self._override_operator.control_mode:
msg = (
"ComposeOperator requires both child operators to use the same control_mode. "
f"Got base={self._base_operator.control_mode} and "
f"override={self._override_operator.control_mode}."
)
raise ValueError(msg)

self.control_mode = self._base_operator.control_mode
self.controller_names = list(
dict.fromkeys(self._base_operator.controller_names + self._override_operator.controller_names)
)

def consume_commands(self) -> TeleopCommands:
with self._child_lock:
base_commands = self._base_operator.consume_commands()
override_commands = self._override_operator.consume_commands()
return TeleopCommands.merged(base_commands, override_commands)

def reset_operator_state(self):
with self._child_lock:
self._base_operator.reset_operator_state()
self._override_operator.reset_operator_state()

def consume_action(self):
with self._child_lock:
actions = self._base_operator.consume_action()
override_actions = self._override_operator.consume_action()
return actions | override_actions

def run(self):
self._base_operator.start()
self._override_operator.start()

rate_limiter = SimpleFrameRate(self.config.read_frequency, "compose operator")

try:
while not self._exit_requested:
if not self._base_operator.is_alive():
logger.warning("ComposeOperator base child stopped.")
break
if not self._override_operator.is_alive():
logger.warning("ComposeOperator override child stopped.")
break
rate_limiter()
finally:
self.close()

def close(self):
self._exit_requested = True
self._base_operator.close()
self._override_operator.close()

current_thread = threading.current_thread()
for operator in (self._base_operator, self._override_operator):
if operator.is_alive() and current_thread != operator:
operator.join(timeout=1.0)


@dataclass(kw_only=True)
class ComposeOperatorConfig(BaseOperatorConfig):
operator_class: type[BaseOperator] = field(default=ComposeOperator)
base: BaseOperatorConfig
override: BaseOperatorConfig
37 changes: 1 addition & 36 deletions python/rcs/operator/gello.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import copy
import logging
import threading
import time
Expand All @@ -19,13 +18,6 @@
except ImportError:
HAS_DYNAMIXEL_SDK = False

try:
from pynput import keyboard

HAS_PYNPUT = True
except ImportError:
HAS_PYNPUT = False

from rcs.envs.base import ControlMode, RelativeTo
from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands
from rcs.sim.sim import Sim
Expand Down Expand Up @@ -346,40 +338,15 @@ def __init__(self, config: "GelloConfig", sim: Sim | None = None):
super().__init__(config, sim)
self.config: GelloConfig
self._resource_lock = threading.Lock()
self._cmd_lock = threading.Lock()

self._exit_requested = False
self._commands = TeleopCommands()

self.controller_names = list(self.config.arms.keys())

self._last_joints: Dict[str, np.ndarray | None] = {name: None for name in self.controller_names}
self._last_gripper = {name: 1.0 for name in self.controller_names}
self._hws: Dict[str, GelloHardware] = {}

if HAS_PYNPUT:
self._listener = keyboard.Listener(on_press=self._on_press)
self._listener.start()
else:
logger.warning("pynput not found. Keyboard triggers disabled.")

def _on_press(self, key):
try:
if hasattr(key, "char"):
if key.char == "s":
with self._cmd_lock:
self._commands.sync_position = True
elif key.char == "r":
with self._cmd_lock:
self._commands.failure = True
except AttributeError:
pass

def consume_commands(self) -> TeleopCommands:
with self._cmd_lock:
cmds = copy.copy(self._commands)
self._commands = TeleopCommands()
return cmds
return TeleopCommands()

def reset_operator_state(self):
# GELLO is absolute, no internal state to reset typically
Expand Down Expand Up @@ -425,8 +392,6 @@ def run(self):

def close(self):
self._exit_requested = True
if HAS_PYNPUT and hasattr(self, "_listener"):
self._listener.stop()
for hw in self._hws.values():
hw.close()
if self.is_alive() and threading.current_thread() != self:
Expand Down
14 changes: 14 additions & 0 deletions python/rcs/operator/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ class TeleopCommands:
sync_position: bool = False
reset_origin_to_current: dict[str, bool] = field(default_factory=dict)

@classmethod
def merged(cls, *commands: "TeleopCommands") -> "TeleopCommands":
merged = cls()
for cmd in commands:
merged.record = merged.record or cmd.record
merged.success = merged.success or cmd.success
merged.failure = merged.failure or cmd.failure
merged.sync_position = merged.sync_position or cmd.sync_position
for controller, should_reset in cmd.reset_origin_to_current.items():
merged.reset_origin_to_current[controller] = (
merged.reset_origin_to_current.get(controller, False) or should_reset
)
return merged


class BaseOperator(ABC, threading.Thread):
control_mode: tuple[ControlMode, RelativeTo]
Expand Down
Loading
Loading