Back to Catalog
dynamic-goal-navigation-v0
A 2D point-mass navigation task where the target goal location evolves continuously to test adaptation to changing reward structures while maintaining fixed transition dynamics. The agent controls a point mass with double-integrator dynamics (mass=1.0, friction=0.1) in a 10x10 bounded arena. The goal follows configurable dynamics: static, linear drift with reflection, Brownian random walk, or periodic teleportation.
Domain
navigation
Difficulty
medium
Observation
Box(shape=[6])
Action
Box(shape=[2])
Reward
dense
Max Steps
1000
Version
v1
Tests (8/8)
syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism
Use via API
import kualia
env = kualia.make("dynamic-goal-navigation-v0")
obs, info = env.reset()Environment Code
8541 charsimport gymnasium as gym
import numpy as np
from typing import Optional, Dict, Any, Tuple
class DynamicGoalNavEnv(gym.Env):
"""
2D point-mass navigation with evolving goal dynamics.
The agent controls a point mass with double-integrator dynamics in a bounded
arena. The target goal moves according to configurable dynamics (static,
linear drift, random walk, or teleport), requiring the agent to adapt to
changing reward landscapes while state transition dynamics remain fixed.
Observation Space (normalized to [-1, 1]):
- agent_x: normalized x position [-10, 10] -> [-1, 1]
- agent_y: normalized y position [-10, 10] -> [-1, 1]
- velocity_x: normalized x velocity [-5, 5] -> [-1, 1]
- velocity_y: normalized y velocity [-5, 5] -> [-1, 1]
- goal_x: normalized goal x position [-10, 10] -> [-1, 1]
- goal_y: normalized goal y position [-10, 10] -> [-1, 1]
Action Space (continuous):
- force_x: x-component of force [-1, 1] (N)
- force_y: y-component of force [-1, 1] (N)
Reward Function:
- negative Euclidean distance to goal
- action penalty: -0.01 * L2-norm of action
- clipped to [-10, 10]
"""
# Physics constants
MASS: float = 1.0
FRICTION: float = 0.1
DT: float = 0.1
BOUNDS: float = 10.0 # Position bounds +/- 10
VEL_BOUNDS: float = 5.0 # Velocity bounds +/- 5
MAX_FORCE: float = 1.0
# Reward constants
ACTION_PENALTY_COEF: float = 0.01
MAX_EPISODE_STEPS: int = 1000
REWARD_CLIP: float = 10.0
def __init__(
self,
render_mode: Optional[str] = None,
goal_mode: str = "linear_drift",
alpha: float = 0.5,
teleport_interval: int = 100
):
super().__init__()
self.render_mode = render_mode
self.goal_mode = goal_mode
self.alpha = alpha
self.teleport_interval = teleport_interval
# Observation space: normalized to [-1, 1]
self.observation_space = gym.spaces.Box(
low=-1.0, high=1.0, shape=(6,), dtype=np.float32
)
# Action space: 2D force commands
self.action_space = gym.spaces.Box(
low=-self.MAX_FORCE, high=self.MAX_FORCE, shape=(2,), dtype=np.float32
)
# State variables (initialized in reset)
self.position: np.ndarray = np.zeros(2, dtype=np.float32)
self.velocity: np.ndarray = np.zeros(2, dtype=np.float32)
self.goal_position: np.ndarray = np.zeros(2, dtype=np.float32)
self.goal_velocity: np.ndarray = np.zeros(2, dtype=np.float32)
self.steps: int = 0
self._last_teleport_step: int = 0
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[Dict[str, Any]] = None
) -> Tuple[np.ndarray, Dict]:
super().reset(seed=seed)
# Initialize agent state randomly within bounds
self.position = self.np_random.uniform(
-self.BOUNDS, self.BOUNDS, size=2
).astype(np.float32)
self.velocity = self.np_random.uniform(
-self.VEL_BOUNDS, self.VEL_BOUNDS, size=2
).astype(np.float32)
# Initialize goal
self.goal_position = self.np_random.uniform(
-self.BOUNDS, self.BOUNDS, size=2
).astype(np.float32)
# Initialize goal dynamics based on mode
if self.goal_mode == "linear_drift":
angle = self.np_random.uniform(0.0, 2.0 * np.pi)
self.goal_velocity = self.alpha * np.array(
[np.cos(angle), np.sin(angle)], dtype=np.float32
)
else:
self.goal_velocity = np.zeros(2, dtype=np.float32)
self.steps = 0
self._last_teleport_step = 0
observation = self._get_obs()
info = {}
return observation, info
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict]:
# Clamp action to valid range (defensive programming)
action = np.clip(action, -self.MAX_FORCE, self.MAX_FORCE).astype(np.float32)
# Physics: double integrator with viscous friction
# F_net = F_applied - friction * velocity
# acceleration = F_net / mass
force_applied = action
acceleration = (force_applied - self.FRICTION * self.velocity) / self.MASS
# Semi-implicit Euler integration
self.velocity += acceleration * self.DT
self.velocity = np.clip(self.velocity, -self.VEL_BOUNDS, self.VEL_BOUNDS)
self.position += self.velocity * self.DT
# Boundary reflection for agent
for i in range(2):
if self.position[i] > self.BOUNDS:
self.position[i] = 2.0 * self.BOUNDS - self.position[i]
self.velocity[i] = -self.velocity[i] * 0.9 # Slight energy loss on bounce
elif self.position[i] < -self.BOUNDS:
self.position[i] = -2.0 * self.BOUNDS - self.position[i]
self.velocity[i] = -self.velocity[i] * 0.9
# Update goal dynamics
self._update_goal()
# Compute reward components
distance = np.linalg.norm(self.position - self.goal_position)
action_norm = np.linalg.norm(action)
reward_distance = -distance
reward_action_penalty = -self.ACTION_PENALTY_COEF * action_norm
reward = reward_distance + reward_action_penalty
# Clip reward to range
reward = float(np.clip(reward, -self.REWARD_CLIP, self.REWARD_CLIP))
# Update counters
self.steps += 1
truncated = self.steps >= self.MAX_EPISODE_STEPS
terminated = False
info = {
"reward_components": {
"negative_distance": float(reward_distance),
"action_penalty": float(reward_action_penalty)
},
"distance_to_goal": float(distance),
"goal_mode": self.goal_mode
}
observation = self._get_obs()
return observation, reward, terminated, truncated, info
def _update_goal(self) -> None:
"""Update goal position based on the selected dynamics mode."""
if self.goal_mode == "static":
return
elif self.goal_mode == "linear_drift":
# Constant velocity with boundary reflection
self.goal_position += self.goal_velocity * self.DT
for i in range(2):
if self.goal_position[i] > self.BOUNDS:
self.goal_position[i] = 2.0 * self.BOUNDS - self.goal_position[i]
self.goal_velocity[i] = -self.goal_velocity[i]
elif self.goal_position[i] < -self.BOUNDS:
self.goal_position[i] = -2.0 * self.BOUNDS - self.goal_position[i]
self.goal_velocity[i] = -self.goal_velocity[i]
elif self.goal_mode == "random_walk":
# Brownian motion: dx = alpha * sqrt(dt) * N(0,1)
# Using dt factor for proper scaling
noise = self.np_random.normal(0.0, 1.0, size=2).astype(np.float32)
self.goal_position += self.alpha * np.sqrt(self.DT) * noise
# Hard clamp to bounds
self.goal_position = np.clip(
self.goal_position, -self.BOUNDS, self.BOUNDS
)
elif self.goal_mode == "teleport":
if self.steps - self._last_teleport_step >= self.teleport_interval:
self.goal_position = self.np_random.uniform(
-self.BOUNDS, self.BOUNDS, size=2
).astype(np.float32)
self._last_teleport_step = self.steps
def _get_obs(self) -> np.ndarray:
"""
Construct normalized observation vector.
Normalizes positions by BOUNDS and velocities by VEL_BOUNDS
to map all values into [-1, 1].
"""
obs = np.concatenate([
self.position / self.BOUNDS,
self.velocity / self.VEL_BOUNDS,
self.goal_position / self.BOUNDS
]).astype(np.float32)
# Numerical safety clamp
obs = np.clip(obs, -1.0, 1.0)
return obs
def close(self) -> None:
"""Cleanup resources."""
pass