Back to Catalog

adaptive-goal-nav

A 2D point-mass navigation environment where a holonomic robot tracks a smoothly moving goal. The reward structure continuously morphs between dense (distance-based) and sparse (proximity-based) according to a time-varying alpha parameter, testing continual adaptation to non-stationary reward functions.

Domain

navigation

Difficulty

medium

Observation

Box(shape=[6])

Action

Box(shape=[2])

Reward

morphing_sparse_dense

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("adaptive-goal-nav") obs, info = env.reset()

Environment Code

7818 chars
import gymnasium as gym
import numpy as np
from typing import Optional, Tuple, Dict, Any


class AdaptiveGoalNavEnv(gym.Env):
    """
    Adaptive Goal Navigation Environment.
    
    A 2D point-mass navigation task where the goal follows a smooth trajectory
    (circular, figure-8, or random walk) and the reward structure dynamically
    morphs between dense (distance-based) and sparse (proximity-based) according
    to a time-varying alpha parameter.
    
    Observation Space (Box(6,), dtype=float32, range [-1, 1]):
        [0-1]: Normalized agent position (px, py)
        [2-3]: Normalized agent velocity (vx, vy) 
        [4-5]: Normalized relative goal vector (goal_rel_x, goal_rel_y)
    
    Action Space (Box(2,), dtype=float32, range [-1, 1]):
        [0]: Velocity command x
        [1]: Velocity command y
    
    Reward Function:
        alpha(t) = 0.5 + 0.5 * sin(reward_morph_rate * time)
        dense = -10.0 * (distance / max_distance)
        sparse = 10.0 if distance < GOAL_TOLERANCE else 0.0
        reward = clip((1-alpha)*dense + alpha*sparse, -10, 10)
    
    Parameters:
        drift_velocity: Controls goal trajectory speed
        trajectory_type: 'circular', 'figure8', or 'random_walk'
        reward_morph_rate: Controls oscillation speed of reward structure
    """
    
    metadata = {"render_modes": ["human"], "render_fps": 30}
    
    ARENA_SIZE: float = 10.0
    MAX_STEPS: int = 1000
    DT: float = 0.1
    GOAL_TOLERANCE: float = 0.5
    MAX_REWARD: float = 10.0
    MIN_REWARD: float = -10.0
    MAX_SPEED: float = 1.0
    
    def __init__(
        self,
        render_mode: Optional[str] = None,
        drift_velocity: float = 0.1,
        trajectory_type: str = "circular",
        reward_morph_rate: float = 0.001
    ) -> None:
        super().__init__()
        
        self.render_mode = render_mode
        self.drift_velocity = drift_velocity
        self.trajectory_type = trajectory_type
        self.reward_morph_rate = reward_morph_rate
        
        valid_trajectories = ["circular", "figure8", "random_walk"]
        if self.trajectory_type not in valid_trajectories:
            raise ValueError(f"trajectory_type must be one of {valid_trajectories}")
        
        self.observation_space = gym.spaces.Box(
            low=-1.0,
            high=1.0,
            shape=(6,),
            dtype=np.float32
        )
        self.action_space = gym.spaces.Box(
            low=-1.0,
            high=1.0,
            shape=(2,),
            dtype=np.float32
        )
        
        self.agent_pos: np.ndarray = np.zeros(2, dtype=np.float32)
        self.agent_vel: np.ndarray = np.zeros(2, dtype=np.float32)
        self.goal_pos: np.ndarray = np.zeros(2, dtype=np.float32)
        self.goal_vel: np.ndarray = np.zeros(2, dtype=np.float32)
        self.time: float = 0.0
        self.steps: int = 0
        
    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[Dict[str, Any]] = None
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        super().reset(seed=seed)
        
        self.steps = 0
        self.time = 0.0
        
        if options is not None and "agent_pos" in options:
            self.agent_pos = np.array(options["agent_pos"], dtype=np.float32)
            self.agent_pos = np.clip(self.agent_pos, 0.0, self.ARENA_SIZE)
        else:
            margin = 1.0
            self.agent_pos = self.np_random.uniform(
                low=margin,
                high=self.ARENA_SIZE - margin,
                size=2
            ).astype(np.float32)
        
        self.agent_vel = np.zeros(2, dtype=np.float32)
        self.goal_vel = np.zeros(2, dtype=np.float32)
        
        self._update_goal_position()
        
        obs = self._get_obs()
        info: Dict[str, Any] = {"reward_components": {}}
        
        return obs, info
    
    def _update_goal_position(self) -> None:
        """Update goal position based on trajectory type and current time."""
        t = self.time
        v = self.drift_velocity
        center = self.ARENA_SIZE / 2.0
        
        if self.trajectory_type == "circular":
            radius = 3.0
            self.goal_pos[0] = center + radius * np.cos(v * t)
            self.goal_pos[1] = center + radius * np.sin(v * t)
            
        elif self.trajectory_type == "figure8":
            A = 3.0
            self.goal_pos[0] = center + A * np.sin(v * t)
            self.goal_pos[1] = center + (A / 2.0) * np.sin(2.0 * v * t)
            
        elif self.trajectory_type == "random_walk":
            noise_std = 0.5 * v
            damping = 0.95
            dt = self.DT
            
            acceleration = self.np_random.normal(0.0, noise_std, size=2)
            self.goal_vel = damping * self.goal_vel + acceleration
            self.goal_vel = np.clip(self.goal_vel, -2.0, 2.0)
            
            self.goal_pos += self.goal_vel * dt
            
            for i in range(2):
                if self.goal_pos[i] < 0.0:
                    self.goal_pos[i] = 0.0
                    self.goal_vel[i] *= -0.5
                elif self.goal_pos[i] > self.ARENA_SIZE:
                    self.goal_pos[i] = self.ARENA_SIZE
                    self.goal_vel[i] *= -0.5
        
        self.goal_pos = np.clip(self.goal_pos, 0.0, self.ARENA_SIZE)
    
    def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        action = np.asarray(action, dtype=np.float32)
        action = np.clip(action, -1.0, 1.0)
        
        self.agent_vel = action * self.MAX_SPEED
        self.agent_pos += self.agent_vel * self.DT
        
        self.agent_pos = np.clip(self.agent_pos, 0.0, self.ARENA_SIZE)
        
        self.time += self.DT
        self.steps += 1
        self._update_goal_position()
        
        obs = self._get_obs()
        
        dist = np.linalg.norm(self.agent_pos - self.goal_pos)
        max_dist = np.sqrt(2) * self.ARENA_SIZE
        
        alpha = 0.5 + 0.5 * np.sin(self.reward_morph_rate * self.time)
        alpha = float(np.clip(alpha, 0.0, 1.0))
        
        dense_reward = -10.0 * (dist / max_dist)
        
        if dist < self.GOAL_TOLERANCE:
            sparse_reward = 10.0
        else:
            sparse_reward = 0.0
        
        mixed_reward = (1.0 - alpha) * dense_reward + alpha * sparse_reward
        reward = float(np.clip(mixed_reward, self.MIN_REWARD, self.MAX_REWARD))
        
        terminated = False
        truncated = self.steps >= self.MAX_STEPS
        
        info = {
            "reward_components": {
                "dense": float(dense_reward),
                "sparse": float(sparse_reward),
                "alpha": alpha,
                "distance": float(dist)
            }
        }
        
        return obs, reward, terminated, truncated, info
    
    def _get_obs(self) -> np.ndarray:
        center = self.ARENA_SIZE / 2.0
        
        px_norm = (self.agent_pos[0] - center) / center
        py_norm = (self.agent_pos[1] - center) / center
        
        vx_norm = self.agent_vel[0] / self.MAX_SPEED
        vy_norm = self.agent_vel[1] / self.MAX_SPEED
        
        goal_rel_x = self.goal_pos[0] - self.agent_pos[0]
        goal_rel_y = self.goal_pos[1] - self.agent_pos[1]
        
        max_dist_component = self.ARENA_SIZE / 2.0
        goal_rel_x_norm = np.clip(goal_rel_x / max_dist_component, -1.0, 1.0)
        goal_rel_y_norm = np.clip(goal_rel_y / max_dist_component, -1.0, 1.0)
        
        obs = np.array([
            px_norm,
            py_norm,
            vx_norm,
            vy_norm,
            goal_rel_x_norm,
            goal_rel_y_norm
        ], dtype=np.float32)
        
        return obs
    
    def close(self) -> None:
        pass