Back to Catalog
MorphingMazeAdaptiveIntelligence
A 10x10 grid maze with continuous morphing dynamics where walls toggle probabilistically after every action, the goal relocates periodically, and resources respawn. The 63-dimensional observation space includes external state (position, local wall view), experience storage (action/reward history), self-observation metrics (distance trends, collision rates, exploration efficiency), and meta-awareness signals (environment change magnitude, pattern familiarity). Designed to test whether self-observational capabilities improve adaptation speed in non-stationary environments.
Domain
navigation
Difficulty
hard
Observation
Box(shape=[63])
Action
Discrete(shape=[4])
Reward
dense
Max Steps
1000
Version
v1
Tests (8/8)
syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism
Use via API
import kualia
env = kualia.make("morphingmazeadaptiveintelligence")
obs, info = env.reset()Environment Code
20109 charsimport gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any, Optional
class MorphingMazeSelfObsEnv(gym.Env):
"""
Morphing Maze with Self-Observation.
A 10x10 grid where walls stochastically toggle, the goal relocates,
and resources shift — after EVERY action. The agent must navigate to
the goal while adapting to continuous structural changes.
Observation Space (63 dims): External state (33), Experience storage (14),
Self-observation metrics (12), Meta-awareness (4).
Reward-hacking countermeasures: Anti-oscillation penalty and anti-stagnation
penalty discourage repetitive non-progressive behaviors.
"""
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10}
GRID_SIZE: int = 10
MAX_STEPS: int = 500
NUM_RESOURCES: int = 3
WALL_TOGGLE_PROB: float = 0.15
VIEW_SIZE: int = 5
INITIAL_WALL_DENSITY: float = 0.2
GOAL_RELOCATE_MIN: int = 80
GOAL_RELOCATE_MAX: int = 150
HISTORY_LEN: int = 8
SHORT_HISTORY: int = 5
OBS_DIM: int = 63
# Rewards
R_GOAL: float = 10.0
R_STEP: float = -0.1
R_WALL: float = -0.5
R_PROGRESS: float = 0.3
R_RESOURCE: float = 1.0
R_OSCILLATION: float = -0.2
R_STAGNATION: float = -0.05
# Directions: UP, DOWN, LEFT, RIGHT
DR = np.array([-1, 1, 0, 0], dtype=np.int32)
DC = np.array([0, 0, -1, 1], dtype=np.int32)
def __init__(self, render_mode: Optional[str] = None) -> None:
super().__init__()
self.render_mode = render_mode
self.action_space = gym.spaces.Discrete(4)
self.observation_space = gym.spaces.Box(
low=-1.0, high=1.0, shape=(self.OBS_DIM,), dtype=np.float32
)
# Core state
self.agent_pos: np.ndarray = np.zeros(2, dtype=np.int32)
self.goal_pos: np.ndarray = np.zeros(2, dtype=np.int32)
self.walls: np.ndarray = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=np.bool_)
self.prev_walls: Optional[np.ndarray] = None
self.resources: list[Tuple[int, int]] = []
# Episode counters
self.step_count: int = 0
self.goal_relocate_timer: int = 0
self.goal_relocate_interval: int = 100
# History buffers
self.action_history: np.ndarray = np.full(self.HISTORY_LEN, -1.0, dtype=np.float32)
self.reward_history: np.ndarray = np.zeros(self.SHORT_HISTORY, dtype=np.float32)
self.distance_history: np.ndarray = np.zeros(self.HISTORY_LEN, dtype=np.float32)
self.wall_hit_history: np.ndarray = np.zeros(self.HISTORY_LEN, dtype=np.bool_)
self.position_history: list[Tuple[int, int]] = []
self.local_view_history: list[bytes] = []
self.visited_cells: set[Tuple[int, int]] = set()
# Metrics for self-observation
self.cumulative_reward: float = 0.0
self.consecutive_stuck_steps: int = 0
self.pattern_reward_map: Dict[bytes, float] = {}
self.steps_since_collision: int = 0
def reset(
self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None
) -> Tuple[np.ndarray, Dict[str, Any]]:
super().reset(seed=seed)
# Initialize walls with random density
self.walls = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.INITIAL_WALL_DENSITY
# Agent starts bottom-left
self.agent_pos = np.array([self.GRID_SIZE - 1, 0], dtype=np.int32)
self.walls[self.agent_pos[0], self.agent_pos[1]] = False
# Goal at random free cell
self.goal_pos = np.array(self._sample_free([tuple(self.agent_pos)]), dtype=np.int32)
self.walls[self.goal_pos[0], self.goal_pos[1]] = False
# Initialize resources
self.resources = []
exclude = [tuple(self.agent_pos), tuple(self.goal_pos)]
for _ in range(self.NUM_RESOURCES):
pos = self._sample_free(exclude + self.resources)
self.resources.append(pos)
# Reset timers
self.step_count = 0
self.goal_relocate_timer = 0
self.goal_relocate_interval = int(self.np_random.integers(
self.GOAL_RELOCATE_MIN, self.GOAL_RELOCATE_MAX
))
# Reset histories
self.action_history = np.full(self.HISTORY_LEN, -1.0, dtype=np.float32)
self.reward_history = np.zeros(self.SHORT_HISTORY, dtype=np.float32)
self.distance_history = np.full(
self.HISTORY_LEN,
self._manhattan() / (2 * self.GRID_SIZE),
dtype=np.float32
)
self.wall_hit_history = np.zeros(self.HISTORY_LEN, dtype=np.bool_)
self.position_history = [tuple(self.agent_pos)]
self.local_view_history = []
self.visited_cells = {tuple(self.agent_pos)}
self.prev_walls = self.walls.copy()
self.cumulative_reward = 0.0
self.consecutive_stuck_steps = 0
self.steps_since_collision = 0
self.pattern_reward_map = {}
return self._get_obs(), {"step": 0}
def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
assert self.action_space.contains(action)
# Store pre-step state
prev_pos = self.agent_pos.copy()
prev_dist = self._manhattan()
# Update action history
self.action_history = np.roll(self.action_history, -1)
self.action_history[-1] = (action / 1.5) - 1.0 # Normalize [0,3] -> [-1, 1]
# Execute movement
nr = self.agent_pos[0] + self.DR[action]
nc = self.agent_pos[1] + self.DC[action]
hit_wall = False
if nr < 0 or nr >= self.GRID_SIZE or nc < 0 or nc >= self.GRID_SIZE or self.walls[nr, nc]:
hit_wall = True
self.consecutive_stuck_steps += 1
self.steps_since_collision = 0
else:
self.agent_pos[0] = nr
self.agent_pos[1] = nc
self.consecutive_stuck_steps = 0
self.steps_since_collision += 1
# Update wall hit history
self.wall_hit_history = np.roll(self.wall_hit_history, -1)
self.wall_hit_history[-1] = hit_wall
# Update position and visitation tracking
self.position_history.append(tuple(self.agent_pos))
if len(self.position_history) > self.HISTORY_LEN * 2:
self.position_history = self.position_history[-self.HISTORY_LEN * 2:]
self.visited_cells.add(tuple(self.agent_pos))
# Resource collection
resource_reward = 0.0
new_resources = []
for rpos in self.resources:
if tuple(self.agent_pos) == rpos:
resource_reward += self.R_RESOURCE
# Respawn resource
exclude = [tuple(self.agent_pos), tuple(self.goal_pos)] + new_resources
new_resources.append(self._sample_free(exclude))
else:
new_resources.append(rpos)
self.resources = new_resources
# Check goal termination
terminated = bool(np.array_equal(self.agent_pos, self.goal_pos))
goal_reward = self.R_GOAL if terminated else 0.0
# Morph environment (continuous dynamics)
self.prev_walls = self.walls.copy()
self._morph()
# Calculate rewards
new_dist = self._manhattan()
progress = (prev_dist - new_dist) / (2 * self.GRID_SIZE)
progress_reward = self.R_PROGRESS * max(0.0, progress)
wall_penalty = self.R_WALL if hit_wall else 0.0
# Reward-hacking countermeasures
oscillation_penalty = 0.0
if self._detect_oscillation():
oscillation_penalty = self.R_OSCILLATION
stagnation_penalty = 0.0
if self.consecutive_stuck_steps > 2:
stagnation_penalty = self.R_STAGNATION * (self.consecutive_stuck_steps - 2)
total_reward = (
goal_reward + resource_reward + self.R_STEP +
progress_reward + wall_penalty + oscillation_penalty + stagnation_penalty
)
total_reward = float(np.clip(total_reward, -10.0, 10.0))
# Update histories
self.reward_history = np.roll(self.reward_history, -1)
self.reward_history[-1] = total_reward
self.cumulative_reward += total_reward
self.distance_history = np.roll(self.distance_history, -1)
self.distance_history[-1] = new_dist / (2 * self.GRID_SIZE)
# Update local view history for meta-awareness
current_view = self._local_view().tobytes()
self.local_view_history.append(current_view)
if len(self.local_view_history) > 20:
self.local_view_history = self.local_view_history[-20:]
# Update pattern-reward mapping for strategy-environment match
if current_view in self.pattern_reward_map:
# Exponential moving average
self.pattern_reward_map[current_view] = (
0.7 * self.pattern_reward_map[current_view] + 0.3 * total_reward
)
else:
self.pattern_reward_map[current_view] = total_reward
self.step_count += 1
self.goal_relocate_timer += 1
# Check if we should relocate goal
if self.goal_relocate_timer >= self.goal_relocate_interval:
self.goal_relocate_timer = 0
self.goal_relocate_interval = int(self.np_random.integers(
self.GOAL_RELOCATE_MIN, self.GOAL_RELOCATE_MAX
))
exclude = [tuple(self.agent_pos)] + self.resources
self.goal_pos = np.array(self._sample_free(exclude), dtype=np.int32)
truncated = self.step_count >= self.MAX_STEPS
info = {
"step": self.step_count,
"reward_components": {
"goal": goal_reward,
"resource": resource_reward,
"step": self.R_STEP,
"progress": progress_reward,
"wall": wall_penalty,
"oscillation": oscillation_penalty,
"stagnation": stagnation_penalty
}
}
return self._get_obs(), float(total_reward), bool(terminated), bool(truncated), info
def _get_obs(self) -> np.ndarray:
"""Construct the 63-dimensional observation vector."""
obs = np.zeros(self.OBS_DIM, dtype=np.float32)
# External state (0-32)
# Agent position (2) normalized to [-1, 1]
obs[0] = (self.agent_pos[0] / (self.GRID_SIZE - 1)) * 2 - 1
obs[1] = (self.agent_pos[1] / (self.GRID_SIZE - 1)) * 2 - 1
# Goal position (2)
obs[2] = (self.goal_pos[0] / (self.GRID_SIZE - 1)) * 2 - 1
obs[3] = (self.goal_pos[1] / (self.GRID_SIZE - 1)) * 2 - 1
# 5x5 local wall view (25) - flattened
local_view = self._local_view()
obs[4:29] = local_view.flatten().astype(np.float32) * 2 - 1 # [0,1] -> [-1,1]
# Normalized Manhattan distance (1)
max_dist = 2 * (self.GRID_SIZE - 1)
obs[29] = self._manhattan() / max_dist
# Episode progress (1)
obs[30] = self.step_count / self.MAX_STEPS
# Relative goal direction (2) - normalized vector
dr = self.goal_pos[0] - self.agent_pos[0]
dc = self.goal_pos[1] - self.agent_pos[1]
norm = max(abs(dr), abs(dc), 1)
obs[31] = dr / norm
obs[32] = dc / norm
# Experience storage (33-46)
# Last 8 actions (8) - already normalized to [-1, 1]
obs[33:41] = self.action_history
# Last 5 rewards (5) - clip and normalize
clipped_rewards = np.clip(self.reward_history / 10.0, -1.0, 1.0)
obs[41:46] = clipped_rewards
# Cumulative reward normalized (1)
obs[46] = float(np.clip(self.cumulative_reward / 100.0, -1.0, 1.0))
# Self-observation metrics (47-58)
metrics = self._calculate_self_obs_metrics()
obs[47:59] = metrics
# Meta-awareness (59-62)
meta = self._calculate_meta_awareness()
obs[59:63] = meta
return np.clip(obs, -1.0, 1.0)
def _calculate_self_obs_metrics(self) -> np.ndarray:
"""Calculate 12 self-observation metrics."""
metrics = np.zeros(12, dtype=np.float32)
# 1. Distance trend slope (1) - simple linear regression on last 8 distances
if self.step_count >= 2:
x = np.arange(self.HISTORY_LEN)
y = self.distance_history
x_mean, y_mean = np.mean(x), np.mean(y)
denom = np.sum((x - x_mean) ** 2)
if denom > 1e-6:
slope = np.sum((x - x_mean) * (y - y_mean)) / denom
metrics[0] = np.clip(slope * 10, -1, 1) # Scale up for sensitivity
# 2. Wall collision rate (1)
metrics[1] = np.mean(self.wall_hit_history) * 2 - 1
# 3. Movement efficiency (1) - actual progress vs steps
if self.step_count > 0:
start_dist = self.distance_history[0] if self.distance_history[0] > 0 else 1.0
current_dist = self.distance_history[-1]
efficiency = (start_dist - current_dist) / (self.step_count / self.MAX_STEPS + 0.01)
metrics[2] = np.clip(efficiency, -1, 1)
# 4. Action diversity/entropy (1)
if self.step_count >= self.HISTORY_LEN:
unique_actions = len(np.unique(self.action_history))
metrics[3] = (unique_actions / 4.0) * 2 - 1
# 5. Reward momentum (1) - trend in recent rewards
if self.step_count >= 2:
recent = self.reward_history[-3:] if len(self.reward_history) >= 3 else self.reward_history
if len(recent) >= 2:
metrics[4] = np.clip(np.mean(np.diff(recent)) * 5, -1, 1)
# 6. Oscillation detection (1) - binary encoded as -1 or 1
metrics[5] = 1.0 if self._detect_oscillation() else -1.0
# 7. Stagnation score (1)
metrics[6] = np.clip(self.consecutive_stuck_steps / 5.0, 0, 1) * 2 - 1
# 8. Progress rate (1) - normalized step count vs distance remaining
if self._manhattan() > 0:
progress_ratio = (self.step_count / self.MAX_STEPS) / (self._manhattan() / (2 * self.GRID_SIZE))
metrics[7] = np.clip(1.0 - progress_ratio, -1, 1)
# 9. Exploration coverage (1)
total_cells = self.GRID_SIZE ** 2
coverage = len(self.visited_cells) / total_cells
metrics[8] = coverage * 2 - 1
# 10. Strategy consistency (1) - variance in action history
if self.step_count >= self.HISTORY_LEN:
metrics[9] = np.clip(1.0 - np.std(self.action_history), -1, 1)
# 11. Escape ability (1) - how quickly recovering from collisions
if np.any(self.wall_hit_history):
recent_hits = np.where(self.wall_hit_history)[0]
if len(recent_hits) > 0 and self.steps_since_collision < 10:
metrics[10] = 1.0 - (self.steps_since_collision / 10.0)
else:
metrics[10] = -1.0
else:
metrics[10] = 0.0
# 12. Adaptation score (1) - improvement in reward over time
if self.step_count >= self.SHORT_HISTORY:
first_half = np.mean(self.reward_history[:self.SHORT_HISTORY//2])
second_half = np.mean(self.reward_history[self.SHORT_HISTORY//2:])
metrics[11] = np.clip((second_half - first_half) * 2, -1, 1)
return metrics
def _calculate_meta_awareness(self) -> np.ndarray:
"""Calculate 4 meta-awareness signals."""
meta = np.zeros(4, dtype=np.float32)
# 1. Environment change magnitude (1) - Hamming distance between wall grids
if self.prev_walls is not None:
changes = np.sum(self.walls != self.prev_walls)
max_changes = self.GRID_SIZE ** 2
meta[0] = (changes / max_changes) * 2 - 1
# 2. Strategy-environment match (1) - avg reward for current pattern
current_view = self._local_view().tobytes()
if current_view in self.pattern_reward_map:
avg_reward = self.pattern_reward_map[current_view]
meta[1] = np.clip(avg_reward / 5.0, -1, 1)
else:
meta[1] = 0.0 # Unknown pattern
# 3. Recovery speed after collision (1)
if np.any(self.wall_hit_history):
meta[2] = 1.0 - min(self.steps_since_collision / 10.0, 1.0)
else:
meta[2] = 1.0 # No recent collisions, full recovery
# 4. Situation novelty (1) - how often we've seen this pattern
if current_view in self.pattern_reward_map:
# Less frequent = more novel
meta[3] = -0.5 # Familiar
else:
meta[3] = 1.0 # Novel
return meta
def _manhattan(self) -> int:
"""Calculate Manhattan distance to goal."""
return abs(self.agent_pos[0] - self.goal_pos[0]) + abs(self.agent_pos[1] - self.goal_pos[1])
def _sample_free(self, exclude: list[Tuple[int, int]]) -> Tuple[int, int]:
"""Sample a random free cell not in exclude list."""
for _ in range(1000):
r = int(self.np_random.integers(0, self.GRID_SIZE))
c = int(self.np_random.integers(0, self.GRID_SIZE))
if not self.walls[r, c] and (r, c) not in exclude:
return (r, c)
# Fallback: find any free cell
for r in range(self.GRID_SIZE):
for c in range(self.GRID_SIZE):
if not self.walls[r, c] and (r, c) not in exclude:
return (r, c)
return (0, 0) # Should never happen
def _local_view(self) -> np.ndarray:
"""Extract 5x5 local view around agent. 1 = wall/out of bounds, 0 = free."""
view = np.ones((self.VIEW_SIZE, self.VIEW_SIZE), dtype=np.bool_)
half = self.VIEW_SIZE // 2
for dr in range(-half, half + 1):
for dc in range(-half, half + 1):
r, c = self.agent_pos[0] + dr, self.agent_pos[1] + dc
vr, vc = dr + half, dc + half
if 0 <= r < self.GRID_SIZE and 0 <= c < self.GRID_SIZE:
view[vr, vc] = self.walls[r, c]
else:
view[vr, vc] = True # Out of bounds is wall
return view
def _morph(self) -> None:
"""Toggle walls probabilistically and relocate resources."""
# Toggle walls
toggle_mask = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.WALL_TOGGLE_PROB
self.walls = np.logical_xor(self.walls, toggle_mask)
# Ensure agent and goal positions remain free
self.walls[self.agent_pos[0], self.agent_pos[1]] = False
self.walls[self.goal_pos[0], self.goal_pos[1]] = False
# Occasionally respawn resources
if self.np_random.random() < 0.1:
exclude = [tuple(self.agent_pos), tuple(self.goal_pos)]
new_resources = []
for rpos in self.resources:
if self.np_random.random() < 0.3: # 30% chance to move
new_pos = self._sample_free(exclude + new_resources)
new_resources.append(new_pos)
else:
new_resources.append(rpos)
self.resources = new_resources
def _detect_oscillation(self) -> bool:
"""Detect A->B->A->B pattern in recent positions."""
if len(self.position_history) < 4:
return False
recent = self.position_history[-4:]
return recent[0] == recent[2] and recent[1] == recent[3] and recent[0] != recent[1]
def close(self) -> None:
pass