Back to Catalog

Morphing Grid Navigation

A 10x10 partially observable grid world where the agent navigates from bottom-left to a dynamically relocating goal while collecting resources. After every action, the environment stochastically morphs: walls toggle with 30% probability per cell, the goal teleports, and resources shift positions. The agent receives a 5x5 local view of walls and relative vectors to the goal and nearest resources. Features anti-oscillation and stagnation penalties to prevent reward hacking.

Domain

navigation

Difficulty

hard

Observation

Box(shape=[41])

Action

Discrete(shape=[1])

Reward

composite

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("morphing-grid-navigation") obs, info = env.reset()

Environment Code

15843 chars
import gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any, Optional


class MorphingGridNavigationEnv(gym.Env):
    """
    Morphing Grid Navigation Environment.
    
    A 10x10 grid where the agent must adapt to drastic environmental changes
    after every action. Walls stochastically toggle, the goal relocates, and
    resources shift. The agent receives partial observability via a local
    wall view and relative positions to targets.
    
    Observation Space (41 dims, float32):
        - [0:2]: Agent position (row, col) normalized to [0, 1]
        - [2:27]: 5x5 local wall view (binary, padded with 1s), flattened
        - [27:29]: Relative goal position (dr/9, dc/9) clipped to [-1, 1]
        - [29:35]: Relative positions of up to 3 nearest resources (6 dims)
        - [35:38]: Binary mask for resource existence (3 dims)
        - [38]: Collected resources count normalized
        - [39]: Episode progress (step/max_steps)
        - [40]: Stagnation flag (1.0 if stuck last step, else 0.0)
    
    Action Space (Discrete 4):
        0: UP, 1: DOWN, 2: LEFT, 3: RIGHT
    
    Reward Structure:
        - Goal reach: +10.0
        - Resource collection: +1.0
        - Step penalty: -0.1
        - Wall collision: -5.0
        - Anti-hacking penalties: Oscillation (-0.5), Stagnation (-0.3)
    """
    
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10}
    
    GRID_SIZE: int = 10
    MAX_STEPS: int = 500
    NUM_RESOURCES: int = 3
    WALL_TOGGLE_PROB: float = 0.3
    VIEW_SIZE: int = 5  # Must be odd
    INITIAL_WALL_DENSITY: float = 0.2
    
    REWARD_GOAL: float = 10.0
    REWARD_RESOURCE: float = 1.0
    REWARD_STEP: float = -0.1
    REWARD_WALL_HIT: float = -5.0
    REWARD_OSCILLATION: float = -0.5
    REWARD_STAGNATION: float = -0.3
    STAGNATION_THRESHOLD: int = 3
    
    def __init__(self, render_mode: Optional[str] = None):
        super().__init__()
        
        self.render_mode = render_mode
        self.window = None
        self.clock = None
        
        # Observation space: 41 dimensions as described in docstring
        obs_shape = self._get_observation_shape()
        self.observation_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(obs_shape,), dtype=np.float32
        )
        
        self.action_space = gym.spaces.Discrete(4)
        
        # State variables (initialized in reset)
        self.agent_pos: np.ndarray = np.zeros(2, dtype=np.int32)
        self.goal_pos: np.ndarray = np.zeros(2, dtype=np.int32)
        self.walls: np.ndarray = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=np.bool_)
        self.resources: list = []
        self.collected_count: int = 0
        self.step_count: int = 0
        
        # Anti-hacking tracking
        self.prev_pos: Optional[np.ndarray] = None
        self.stagnation_counter: int = 0
        self.last_move_direction: Optional[int] = None
        self._pos_history: list = []
    
    def _get_observation_shape(self) -> int:
        """Calculate observation dimensionality."""
        pos_dims = 2
        wall_view_dims = self.VIEW_SIZE * self.VIEW_SIZE
        goal_dims = 2
        resource_dims = self.NUM_RESOURCES * 2
        resource_mask_dims = self.NUM_RESOURCES
        misc_dims = 3  # collected count, progress, stagnation flag
        return pos_dims + wall_view_dims + goal_dims + resource_dims + resource_mask_dims + misc_dims
    
    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[Dict[str, Any]] = None
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        super().reset(seed=seed)
        
        # Initialize grid: bottom-left is (GRID_SIZE-1, 0)
        self.agent_pos = np.array([self.GRID_SIZE - 1, 0], dtype=np.int32)
        self.prev_pos = None
        self.stagnation_counter = 0
        self.last_move_direction = None
        self.step_count = 0
        self.collected_count = 0
        self._pos_history = []
        
        # Initialize walls with random density
        self.walls = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.INITIAL_WALL_DENSITY
        self.walls[self.agent_pos[0], self.agent_pos[1]] = False
        
        # Initialize goal
        self.goal_pos = self._sample_free_cell(exclude=[tuple(self.agent_pos)])
        
        # Initialize resources
        self.resources = []
        for _ in range(self.NUM_RESOURCES):
            pos = self._sample_free_cell(exclude=[tuple(self.agent_pos), tuple(self.goal_pos)] + self.resources)
            self.resources.append(pos)
        
        obs = self._get_obs()
        info = {"episode_step": 0, "collected": 0}
        
        return obs, info
    
    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        assert self.action_space.contains(action), f"Invalid action {action}"
        
        # Decode action
        dr, dc = 0, 0
        if action == 0:  # UP
            dr = -1
        elif action == 1:  # DOWN
            dr = 1
        elif action == 2:  # LEFT
            dc = -1
        elif action == 3:  # RIGHT
            dc = 1
        
        # Calculate intended position
        intended_pos = self.agent_pos + np.array([dr, dc], dtype=np.int32)
        
        # Check bounds and walls
        hit_wall = False
        if (intended_pos[0] < 0 or intended_pos[0] >= self.GRID_SIZE or 
            intended_pos[1] < 0 or intended_pos[1] >= self.GRID_SIZE or 
            self.walls[intended_pos[0], intended_pos[1]]):
            hit_wall = True
            new_pos = self.agent_pos.copy()
        else:
            new_pos = intended_pos
        
        # Update position history
        self.prev_pos = self.agent_pos.copy()
        self.agent_pos = new_pos
        
        # Check stagnation (position didn't change)
        if np.array_equal(self.agent_pos, self.prev_pos):
            self.stagnation_counter += 1
        else:
            self.stagnation_counter = 0
        
        # Check resource collection
        resource_reward = 0.0
        new_resources = []
        for r_pos in self.resources:
            if np.array_equal(self.agent_pos, np.array(r_pos)):
                resource_reward += self.REWARD_RESOURCE
                self.collected_count += 1
                # Respawn resource elsewhere immediately
                exclude = [tuple(self.agent_pos), tuple(self.goal_pos)] + new_resources
                new_pos_res = self._sample_free_cell(exclude=exclude)
                new_resources.append(new_pos_res)
            else:
                new_resources.append(r_pos)
        self.resources = new_resources
        
        # Check goal reach
        terminated = False
        goal_reward = 0.0
        if np.array_equal(self.agent_pos, self.goal_pos):
            goal_reward = self.REWARD_GOAL
            terminated = True
        
        # Morph environment (walls, goal, resources shift)
        self._morph_environment()
        
        # Calculate anti-hacking penalties
        # Oscillation: check if we moved back to the position from 2 steps ago
        osc_penalty = 0.0
        if self.prev_pos is not None and len(self._pos_history) >= 1:
            if np.array_equal(self.agent_pos, self._pos_history[-1]):
                osc_penalty = self.REWARD_OSCILLATION
        
        # Update history after check
        self._pos_history.append(self.prev_pos.copy())
        if len(self._pos_history) > 2:
            self._pos_history.pop(0)
        
        # Stagnation penalty
        stag_penalty = 0.0
        if self.stagnation_counter >= self.STAGNATION_THRESHOLD:
            stag_penalty = self.REWARD_STAGNATION
        
        # Step penalty
        step_penalty = self.REWARD_STEP
        
        # Wall collision penalty
        wall_penalty = self.REWARD_WALL_HIT if hit_wall else 0.0
        
        # Sum rewards
        total_reward = (
            goal_reward + resource_reward + step_penalty + 
            wall_penalty + osc_penalty + stag_penalty
        )
        
        # Clip to range
        total_reward = float(np.clip(total_reward, -10.0, 10.0))
        
        self.step_count += 1
        truncated = self.step_count >= self.MAX_STEPS
        
        obs = self._get_obs()
        
        info = {
            "episode_step": self.step_count,
            "collected": self.collected_count,
            "hit_wall": hit_wall,
            "reward_components": {
                "goal": goal_reward,
                "resource": resource_reward,
                "step_penalty": step_penalty,
                "wall_collision": wall_penalty,
                "oscillation_penalty": osc_penalty,
                "stagnation_penalty": stag_penalty,
                "total": total_reward
            }
        }
        
        return obs, total_reward, terminated, truncated, info
    
    def _morph_environment(self) -> None:
        """Apply stochastic morphing: walls toggle, goal and resources relocate."""
        # Toggle walls with 30% probability per cell
        toggle_mask = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.WALL_TOGGLE_PROB
        self.walls ^= toggle_mask
        
        # Ensure agent and goal positions remain free
        self.walls[self.agent_pos[0], self.agent_pos[1]] = False
        self.walls[self.goal_pos[0], self.goal_pos[1]] = False
        
        # Relocate goal to new random free position
        self.goal_pos = np.array(
            self._sample_free_cell(exclude=[tuple(self.agent_pos)]), 
            dtype=np.int32
        )
        
        # Shift resources to new random positions
        new_resources = []
        for _ in range(self.NUM_RESOURCES):
            exclude = [tuple(self.agent_pos), tuple(self.goal_pos)] + new_resources
            new_pos = self._sample_free_cell(exclude=exclude)
            new_resources.append(new_pos)
        self.resources = new_resources
    
    def _sample_free_cell(self, exclude: list) -> Tuple[int, int]:
        """Sample a random free cell not in exclude list."""
        max_attempts = 1000
        for _ in range(max_attempts):
            r = self.np_random.integers(0, self.GRID_SIZE)
            c = self.np_random.integers(0, self.GRID_SIZE)
            if not self.walls[r, c] and (r, c) not in exclude:
                return (r, c)
        # Fallback: return first free cell found
        for r in range(self.GRID_SIZE):
            for c in range(self.GRID_SIZE):
                if not self.walls[r, c] and (r, c) not in exclude:
                    return (r, c)
        return (0, 0)  # Should never reach here in valid configs
    
    def _get_obs(self) -> np.ndarray:
        """Construct observation vector."""
        obs_parts = []
        
        # 1. Agent position normalized [0, 1]
        agent_norm = self.agent_pos.astype(np.float32) / (self.GRID_SIZE - 1)
        obs_parts.append(agent_norm)
        
        # 2. Local wall view (5x5) centered on agent
        half_view = self.VIEW_SIZE // 2
        wall_view = np.ones((self.VIEW_SIZE, self.VIEW_SIZE), dtype=np.float32)  # 1 = wall
        for i in range(self.VIEW_SIZE):
            for j in range(self.VIEW_SIZE):
                grid_r = self.agent_pos[0] - half_view + i
                grid_c = self.agent_pos[1] - half_view + j
                if 0 <= grid_r < self.GRID_SIZE and 0 <= grid_c < self.GRID_SIZE:
                    wall_view[i, j] = float(self.walls[grid_r, grid_c])
        obs_parts.append(wall_view.flatten())
        
        # 3. Relative goal position normalized to [-1, 1]
        goal_rel = (self.goal_pos - self.agent_pos).astype(np.float32) / (self.GRID_SIZE - 1)
        goal_rel = np.clip(goal_rel, -1.0, 1.0)
        obs_parts.append(goal_rel)
        
        # 4. Nearest resources relative positions and mask
        # Calculate distances to all resources
        resource_rels = []
        distances = []
        for r_pos in self.resources:
            rel = (np.array(r_pos) - self.agent_pos).astype(np.float32) / (self.GRID_SIZE - 1)
            dist = np.abs(rel[0]) + np.abs(rel[1])  # Manhattan distance proxy
            resource_rels.append(rel)
            distances.append(dist)
        
        # Sort by distance and take top 3
        if len(distances) > 0:
            sorted_indices = np.argsort(distances)[:self.NUM_RESOURCES]
        else:
            sorted_indices = []
        
        resource_obs = np.zeros(self.NUM_RESOURCES * 2, dtype=np.float32)
        resource_mask = np.zeros(self.NUM_RESOURCES, dtype=np.float32)
        
        for idx, res_idx in enumerate(sorted_indices):
            if idx < self.NUM_RESOURCES:
                resource_obs[idx*2:(idx+1)*2] = resource_rels[res_idx]
                resource_mask[idx] = 1.0
        
        obs_parts.append(resource_obs)
        obs_parts.append(resource_mask)
        
        # 5. Collected count normalized
        collected_norm = np.array([float(self.collected_count) / max(1, self.NUM_RESOURCES)], dtype=np.float32)
        obs_parts.append(collected_norm)
        
        # 6. Progress normalized
        progress_norm = np.array([float(self.step_count) / self.MAX_STEPS], dtype=np.float32)
        obs_parts.append(progress_norm)
        
        # 7. Stagnation flag
        stagnation_flag = np.array([1.0 if self.stagnation_counter > 0 else 0.0], dtype=np.float32)
        obs_parts.append(stagnation_flag)
        
        # Concatenate
        obs = np.concatenate(obs_parts).astype(np.float32)
        
        # Ensure shape matches
        expected_shape = self._get_observation_shape()
        if obs.shape[0] != expected_shape:
            # Pad or truncate if necessary (should not happen)
            if obs.shape[0] < expected_shape:
                obs = np.pad(obs, (0, expected_shape - obs.shape[0]), mode='constant')
            else:
                obs = obs[:expected_shape]
        
        return obs
    
    def render(self) -> Optional[np.ndarray]:
        """Render the environment as RGB array or print to console."""
        if self.render_mode == "rgb_array":
            # Create simple RGB representation
            cell_size = 20
            rgb = np.zeros((self.GRID_SIZE * cell_size, self.GRID_SIZE * cell_size, 3), dtype=np.uint8)
            for r in range(self.GRID_SIZE):
                for c in range(self.GRID_SIZE):
                    color = [255, 255, 255]  # White background
                    if self.walls[r, c]:
                        color = [0, 0, 0]  # Black wall
                    elif np.array_equal([r, c], self.agent_pos):
                        color = [0, 0, 255]  # Blue agent
                    elif np.array_equal([r, c], self.goal_pos):
                        color = [0, 255, 0]  # Green goal
                    elif any(np.array_equal([r, c], np.array(res)) for res in self.resources):
                        color = [255, 215, 0]  # Gold resource
                    
                    rgb[r*cell_size:(r+1)*cell_size, c*cell_size:(c+1)*cell_size] = color
            return rgb
        elif self.render_mode == "human":
            # Console rendering
            grid = [['.' for _ in range(self.GRID_SIZE)] for _ in range(self.GRID_SIZE)]
            for r, c in self.resources:
                grid[r][c] = 'R'
            if self.walls.any():
                for r in range(self.GRID_SIZE):
                    for c in range(self.GRID_SIZE):
                        if self.walls[r, c]:
                            grid[r][c] = '#'
            grid[self.goal_pos[0]][self.goal_pos[1]] = 'G'
            grid[self.agent_pos[0]][self.agent_pos[1]] = 'A'
            print("\n".join([" ".join(row) for row in grid]))
        return None
    
    def close(self) -> None:
        """Clean up any resources."""
        pass