custom-env
Gymnasium-compatible continuous resource management with 3 interdependent resources (A, B, C). Observation space: Box(low=0, high=100, shape=(15,), dtype=float32): [storage_A, storage_B, storage_C, demand_A, demand_B, demand_C, demand_derivative_A, demand_derivative_B, demand_derivative_C, coupling_AB, coupling_BC, coupling_CA, time_since_shock, rolling_efficiency_score, normalized_step]. Action space: Box(low=0, high=10, shape=(6,), dtype=float32): [produce_A, produce_B, produce_C, convert_A_to_B, convert_B_to_C, convert_C_to_A]. Dynamics: storage_t+1 = storage_t + production + conversion_in - conversion_out - demand_t - waste. Demand follows non-stationary process d_t = d_base + α*sin(ω*t) where ω = ω_base*(1+e) scales with efficiency e ∈ [0,1] (rolling satisfied_demand/total_demand over 100 steps). Shock events occur with probability p = 0.01 + 0.2*max(0, e-0.7). Coupling coefficients C_ij (resource i requires resource j) evolve as C_ij = C_base * e, creating progressive interdependencies. Higher e increases production complexity and demand non-stationarity. Reward: r_t = -sum(|demand_t - satisfied_t|) - 0.5*sum(waste) - 0.01*||action||^2. Episode length: 1000 steps. Reset() initializes storage at 50 units, sets coupling matrix based on performance history (persistence across episodes), and samples new demand phase parameters.
Domain
resource_management
Difficulty
medium
Observation
Box(shape=?)
Action
Discrete(shape=?)
Reward
see spec
Max Steps
1000
Version
v1
Tests (8/8)
Use via API
import kualia
env = kualia.make("custom-env-1774053531")
obs, info = env.reset()Environment Code
12535 charsimport gymnasium as gym
import numpy as np
from typing import Optional, Dict, Any, Tuple
class ResourceManagementEnv(gym.Env):
"""
Resource Management Environment with 3 interdependent resources.
Features:
- Three resources (A, B, C) with storage capacities
- Non-stationary sinusoidal demand patterns
- Stochastic shock events disrupting demand/supply
- Efficiency-dependent coupling between resources
- Rolling efficiency metric affecting difficulty scaling
Observation Space (15-dimensional Box [0, 1]):
- storage_A/B/C_normalized: Current storage levels normalized to capacity
- demand_A/B/C_normalized: Current demand normalized to max demand
- demand_derivative_A/B/C_normalized: Rate of demand change normalized to [0, 1]
- coupling_AB/BC/CA_normalized: Resource coupling strengths
- time_since_shock_normalized: Time elapsed since last shock event
- rolling_efficiency_score: Historical ratio of satisfied demand
- normalized_step: Current timestep normalized to max horizon
Action Space (6-dimensional Box [0, 10]):
- Actions represent production rates (3) and transfer rates between resources (3)
Reward Structure:
- Positive reward for satisfying demand
- Penalties for storage imbalance and extreme actions
- Efficiency bonus and shock resilience bonus
- Range clipped to [-10, 10]
"""
# Constants
MAX_STORAGE: float = 100.0
MAX_DEMAND: float = 50.0
MAX_TIMESTEPS: int = 1000
SHOCK_PROBABILITY: float = 0.02
SHOCK_DURATION: int = 20
EFFICIENCY_WINDOW: int = 50
COUPLING_BASE: float = 0.5
def __init__(self, render_mode: Optional[str] = None):
super().__init__()
self.render_mode = render_mode
# Storage levels for resources A, B, C
self.storage = np.zeros(3, dtype=np.float32)
self.storage_capacity = np.array([self.MAX_STORAGE, self.MAX_STORAGE, self.MAX_STORAGE], dtype=np.float32)
# Demand tracking
self.current_demand = np.zeros(3, dtype=np.float32)
self.previous_demand = np.zeros(3, dtype=np.float32)
self.demand_phase = np.zeros(3, dtype=np.float32)
self.demand_frequency = np.array([0.05, 0.07, 0.03], dtype=np.float32)
# Shock events
self.shock_active = False
self.shock_timer = 0
self.time_since_shock = 0
# Efficiency tracking
self.efficiency_history = []
self.rolling_efficiency = 0.5
# Coupling matrix (3x3 representing interactions between resources)
self.coupling = np.zeros((3, 3), dtype=np.float32)
# Timestep tracking
self.timestep = 0
# Spaces
self.observation_space = gym.spaces.Box(
low=0.0,
high=1.0,
shape=(15,),
dtype=np.float32
)
self.action_space = gym.spaces.Box(
low=0.0,
high=10.0,
shape=(6,),
dtype=np.float32
)
def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) -> Tuple[np.ndarray, Dict[str, Any]]:
super().reset(seed=seed)
# Reset storage to middle levels with some randomness
self.storage = self.np_random.uniform(30.0, 70.0, size=3).astype(np.float32)
self.storage = np.clip(self.storage, 0.0, self.MAX_STORAGE)
# Initialize demand phases randomly
self.demand_phase = self.np_random.uniform(0.0, 2.0 * np.pi, size=3).astype(np.float32)
self.current_demand = np.zeros(3, dtype=np.float32)
self.previous_demand = np.zeros(3, dtype=np.float32)
self._update_demand()
# Reset shock state
self.shock_active = False
self.shock_timer = 0
self.time_since_shock = 0
# Reset efficiency tracking
self.efficiency_history = []
self.rolling_efficiency = 0.5
# Reset timestep
self.timestep = 0
# Initialize coupling
self._update_coupling()
obs = self._get_obs()
info = {}
return obs, info
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
# Ensure action is numpy array and clip to bounds
action = np.asarray(action, dtype=np.float32).reshape(6)
action = np.clip(action, self.action_space.low, self.action_space.high)
# Parse actions: [produce_A, produce_B, produce_C, transfer_AB, transfer_BC, transfer_CA]
production = action[:3]
transfers = action[3:]
# Calculate transfers with efficiency effects
efficiency_factor = 0.5 + self.rolling_efficiency
transfer_AB = transfers[0] * efficiency_factor
transfer_BC = transfers[1] * efficiency_factor
transfer_CA = transfers[2] * efficiency_factor
# Get coupling coefficients
coupling_ab = self.coupling[0, 1]
coupling_bc = self.coupling[1, 2]
coupling_ca = self.coupling[2, 0]
# Update storage based on production and transfers
delta_A = production[0] - transfer_AB + transfer_CA * coupling_ca
delta_B = production[1] + transfer_AB * coupling_ab - transfer_BC
delta_C = production[2] + transfer_BC * coupling_bc - transfer_CA
self.storage[0] += delta_A
self.storage[1] += delta_B
self.storage[2] += delta_C
# Clip storage to valid range before satisfying demand
self.storage = np.clip(self.storage, 0.0, self.MAX_STORAGE)
# Satisfy demand
satisfied_demand = np.minimum(self.storage, self.current_demand)
unsatisfied_demand = self.current_demand - satisfied_demand
self.storage -= satisfied_demand
self.storage = np.clip(self.storage, 0.0, self.MAX_STORAGE)
# Update efficiency tracking
total_demand = np.sum(self.current_demand) + 1e-8
demand_satisfaction_ratio = np.sum(satisfied_demand) / total_demand
self.efficiency_history.append(demand_satisfaction_ratio)
if len(self.efficiency_history) > self.EFFICIENCY_WINDOW:
self.efficiency_history.pop(0)
self.rolling_efficiency = float(np.mean(self.efficiency_history)) if self.efficiency_history else 0.5
# Update coupling based on new efficiency
self._update_coupling()
# Update demand and shocks
self.previous_demand = self.current_demand.copy()
self._update_demand()
self._handle_shocks()
self.timestep += 1
self.time_since_shock += 1
# Calculate reward
reward, reward_components = self._calculate_reward(
satisfied_demand, unsatisfied_demand, action, production, transfers
)
# Check termination
truncated = self.timestep >= self.MAX_TIMESTEPS
terminated = False
obs = self._get_obs()
info = {"reward_components": reward_components}
return obs, float(reward), bool(terminated), bool(truncated), info
def _update_demand(self):
"""Update demand using sinusoidal patterns with non-stationary drift."""
# Base sinusoidal demand
time_factor = self.timestep * self.demand_frequency
base_demand = 25.0 + 15.0 * np.sin(self.demand_phase + time_factor)
# Non-stationary: slowly shift phases over time
drift = 0.001 * self.timestep
non_stationary_demand = base_demand + 5.0 * np.sin(drift * self.timestep)
# Add shock effects
if self.shock_active:
shock_magnitude = self.np_random.uniform(10.0, 30.0, size=3)
non_stationary_demand += shock_magnitude
self.current_demand = np.clip(non_stationary_demand, 0.0, self.MAX_DEMAND).astype(np.float32)
def _handle_shocks(self):
"""Manage stochastic shock events."""
if self.shock_active:
self.shock_timer -= 1
if self.shock_timer <= 0:
self.shock_active = False
self.time_since_shock = 0
else:
# Probability increases with efficiency (higher efficiency = higher volatility)
shock_prob = self.SHOCK_PROBABILITY * (0.5 + self.rolling_efficiency)
if self.np_random.random() < shock_prob:
self.shock_active = True
self.shock_timer = self.SHOCK_DURATION
def _update_coupling(self):
"""Update coupling strengths based on rolling efficiency."""
# Higher efficiency increases coupling complexity
coupling_strength = self.COUPLING_BASE * (0.5 + self.rolling_efficiency)
self.coupling[0, 1] = coupling_strength * 0.9 # AB
self.coupling[1, 2] = coupling_strength * 1.1 # BC
self.coupling[2, 0] = coupling_strength * 0.8 # CA
self.coupling = np.clip(self.coupling, 0.0, 1.0)
def _get_obs(self) -> np.ndarray:
"""Construct observation vector."""
# Normalize storage to [0, 1]
storage_norm = self.storage / self.MAX_STORAGE
# Normalize demand to [0, 1]
demand_norm = self.current_demand / self.MAX_DEMAND
# Calculate demand derivatives and normalize to [0, 1]
demand_deriv = (self.current_demand - self.previous_demand) / (self.MAX_DEMAND + 1e-8)
demand_deriv_norm = np.clip(demand_deriv * 0.5 + 0.5, 0.0, 1.0)
# Coupling values already in [0, 1]
coupling_ab = self.coupling[0, 1]
coupling_bc = self.coupling[1, 2]
coupling_ca = self.coupling[2, 0]
# Time since shock normalized
time_shock_norm = min(self.time_since_shock / 100.0, 1.0)
# Rolling efficiency in [0, 1]
efficiency_norm = self.rolling_efficiency
# Normalized step
step_norm = self.timestep / self.MAX_TIMESTEPS
obs = np.array([
storage_norm[0], storage_norm[1], storage_norm[2],
demand_norm[0], demand_norm[1], demand_norm[2],
demand_deriv_norm[0], demand_deriv_norm[1], demand_deriv_norm[2],
coupling_ab, coupling_bc, coupling_ca,
time_shock_norm, efficiency_norm, step_norm
], dtype=np.float32)
return obs
def _calculate_reward(
self,
satisfied: np.ndarray,
unsatisfied: np.ndarray,
action: np.ndarray,
production: np.ndarray,
transfers: np.ndarray
) -> Tuple[float, Dict[str, float]]:
"""Calculate reward with multiple components."""
# Demand satisfaction reward (dense)
max_possible = self.MAX_DEMAND * 3.0
satisfaction_reward = np.sum(satisfied) / max_possible * 5.0
# Penalty for unsatisfied demand
shortage_penalty = -np.sum(unsatisfied) / max_possible * 3.0
# Storage balance penalty (prefer middle levels at 50%)
storage_ratio = self.storage / self.MAX_STORAGE
storage_balance = -np.sum(np.abs(storage_ratio - 0.5)) * 0.5
# Action regularization (penalize extreme actions)
action_penalty = -0.01 * np.sum(action ** 2)
# Efficiency bonus
efficiency_bonus = self.rolling_efficiency * 0.5
# Shock resilience bonus if handling shock well
shock_bonus = 0.0
if self.shock_active and np.sum(unsatisfied) < 5.0:
shock_bonus = 1.0
total_reward = satisfaction_reward + shortage_penalty + storage_balance + action_penalty + efficiency_bonus + shock_bonus
# Clip to range [-10, 10]
total_reward = np.clip(total_reward, -10.0, 10.0)
components = {
"satisfaction": float(satisfaction_reward),
"shortage": float(shortage_penalty),
"balance": float(storage_balance),
"action_reg": float(action_penalty),
"efficiency": float(efficiency_bonus),
"shock_resilience": float(shock_bonus),
"total": float(total_reward)
}
return float(total_reward), components
def close(self) -> None:
"""Clean up resources."""
pass