Back to Catalog

custom-env

Gymnasium-compatible continuous resource management with 3 interdependent resources (A, B, C). Observation space: Box(low=0, high=100, shape=(15,), dtype=float32): [storage_A, storage_B, storage_C, demand_A, demand_B, demand_C, demand_derivative_A, demand_derivative_B, demand_derivative_C, coupling_AB, coupling_BC, coupling_CA, time_since_shock, rolling_efficiency_score, normalized_step]. Action space: Box(low=0, high=10, shape=(6,), dtype=float32): [produce_A, produce_B, produce_C, convert_A_to_B, convert_B_to_C, convert_C_to_A]. Dynamics: storage_t+1 = storage_t + production + conversion_in - conversion_out - demand_t - waste. Demand follows non-stationary process d_t = d_base + α*sin(ω*t) where ω = ω_base*(1+e) scales with efficiency e ∈ [0,1] (rolling satisfied_demand/total_demand over 100 steps). Shock events occur with probability p = 0.01 + 0.2*max(0, e-0.7). Coupling coefficients C_ij (resource i requires resource j) evolve as C_ij = C_base * e, creating progressive interdependencies. Higher e increases production complexity and demand non-stationarity. Reward: r_t = -sum(|demand_t - satisfied_t|) - 0.5*sum(waste) - 0.01*||action||^2. Episode length: 1000 steps. Reset() initializes storage at 50 units, sets coupling matrix based on performance history (persistence across episodes), and samples new demand phase parameters.

Domain

resource_management

Difficulty

medium

Observation

Box(shape=?)

Action

Discrete(shape=?)

Reward

see spec

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("custom-env-1774053531") obs, info = env.reset()

Environment Code

12535 chars
import gymnasium as gym
import numpy as np
from typing import Optional, Dict, Any, Tuple


class ResourceManagementEnv(gym.Env):
    """
    Resource Management Environment with 3 interdependent resources.
    
    Features:
    - Three resources (A, B, C) with storage capacities
    - Non-stationary sinusoidal demand patterns
    - Stochastic shock events disrupting demand/supply
    - Efficiency-dependent coupling between resources
    - Rolling efficiency metric affecting difficulty scaling
    
    Observation Space (15-dimensional Box [0, 1]):
    - storage_A/B/C_normalized: Current storage levels normalized to capacity
    - demand_A/B/C_normalized: Current demand normalized to max demand
    - demand_derivative_A/B/C_normalized: Rate of demand change normalized to [0, 1]
    - coupling_AB/BC/CA_normalized: Resource coupling strengths
    - time_since_shock_normalized: Time elapsed since last shock event
    - rolling_efficiency_score: Historical ratio of satisfied demand
    - normalized_step: Current timestep normalized to max horizon
    
    Action Space (6-dimensional Box [0, 10]):
    - Actions represent production rates (3) and transfer rates between resources (3)
    
    Reward Structure:
    - Positive reward for satisfying demand
    - Penalties for storage imbalance and extreme actions
    - Efficiency bonus and shock resilience bonus
    - Range clipped to [-10, 10]
    """
    
    # Constants
    MAX_STORAGE: float = 100.0
    MAX_DEMAND: float = 50.0
    MAX_TIMESTEPS: int = 1000
    SHOCK_PROBABILITY: float = 0.02
    SHOCK_DURATION: int = 20
    EFFICIENCY_WINDOW: int = 50
    COUPLING_BASE: float = 0.5
    
    def __init__(self, render_mode: Optional[str] = None):
        super().__init__()
        
        self.render_mode = render_mode
        
        # Storage levels for resources A, B, C
        self.storage = np.zeros(3, dtype=np.float32)
        self.storage_capacity = np.array([self.MAX_STORAGE, self.MAX_STORAGE, self.MAX_STORAGE], dtype=np.float32)
        
        # Demand tracking
        self.current_demand = np.zeros(3, dtype=np.float32)
        self.previous_demand = np.zeros(3, dtype=np.float32)
        self.demand_phase = np.zeros(3, dtype=np.float32)
        self.demand_frequency = np.array([0.05, 0.07, 0.03], dtype=np.float32)
        
        # Shock events
        self.shock_active = False
        self.shock_timer = 0
        self.time_since_shock = 0
        
        # Efficiency tracking
        self.efficiency_history = []
        self.rolling_efficiency = 0.5
        
        # Coupling matrix (3x3 representing interactions between resources)
        self.coupling = np.zeros((3, 3), dtype=np.float32)
        
        # Timestep tracking
        self.timestep = 0
        
        # Spaces
        self.observation_space = gym.spaces.Box(
            low=0.0,
            high=1.0,
            shape=(15,),
            dtype=np.float32
        )
        
        self.action_space = gym.spaces.Box(
            low=0.0,
            high=10.0,
            shape=(6,),
            dtype=np.float32
        )
        
    def reset(self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None) -> Tuple[np.ndarray, Dict[str, Any]]:
        super().reset(seed=seed)
        
        # Reset storage to middle levels with some randomness
        self.storage = self.np_random.uniform(30.0, 70.0, size=3).astype(np.float32)
        self.storage = np.clip(self.storage, 0.0, self.MAX_STORAGE)
        
        # Initialize demand phases randomly
        self.demand_phase = self.np_random.uniform(0.0, 2.0 * np.pi, size=3).astype(np.float32)
        self.current_demand = np.zeros(3, dtype=np.float32)
        self.previous_demand = np.zeros(3, dtype=np.float32)
        self._update_demand()
        
        # Reset shock state
        self.shock_active = False
        self.shock_timer = 0
        self.time_since_shock = 0
        
        # Reset efficiency tracking
        self.efficiency_history = []
        self.rolling_efficiency = 0.5
        
        # Reset timestep
        self.timestep = 0
        
        # Initialize coupling
        self._update_coupling()
        
        obs = self._get_obs()
        info = {}
        
        return obs, info
    
    def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        # Ensure action is numpy array and clip to bounds
        action = np.asarray(action, dtype=np.float32).reshape(6)
        action = np.clip(action, self.action_space.low, self.action_space.high)
        
        # Parse actions: [produce_A, produce_B, produce_C, transfer_AB, transfer_BC, transfer_CA]
        production = action[:3]
        transfers = action[3:]
        
        # Calculate transfers with efficiency effects
        efficiency_factor = 0.5 + self.rolling_efficiency
        transfer_AB = transfers[0] * efficiency_factor
        transfer_BC = transfers[1] * efficiency_factor
        transfer_CA = transfers[2] * efficiency_factor
        
        # Get coupling coefficients
        coupling_ab = self.coupling[0, 1]
        coupling_bc = self.coupling[1, 2]
        coupling_ca = self.coupling[2, 0]
        
        # Update storage based on production and transfers
        delta_A = production[0] - transfer_AB + transfer_CA * coupling_ca
        delta_B = production[1] + transfer_AB * coupling_ab - transfer_BC
        delta_C = production[2] + transfer_BC * coupling_bc - transfer_CA
        
        self.storage[0] += delta_A
        self.storage[1] += delta_B
        self.storage[2] += delta_C
        
        # Clip storage to valid range before satisfying demand
        self.storage = np.clip(self.storage, 0.0, self.MAX_STORAGE)
        
        # Satisfy demand
        satisfied_demand = np.minimum(self.storage, self.current_demand)
        unsatisfied_demand = self.current_demand - satisfied_demand
        
        self.storage -= satisfied_demand
        self.storage = np.clip(self.storage, 0.0, self.MAX_STORAGE)
        
        # Update efficiency tracking
        total_demand = np.sum(self.current_demand) + 1e-8
        demand_satisfaction_ratio = np.sum(satisfied_demand) / total_demand
        self.efficiency_history.append(demand_satisfaction_ratio)
        if len(self.efficiency_history) > self.EFFICIENCY_WINDOW:
            self.efficiency_history.pop(0)
        self.rolling_efficiency = float(np.mean(self.efficiency_history)) if self.efficiency_history else 0.5
        
        # Update coupling based on new efficiency
        self._update_coupling()
        
        # Update demand and shocks
        self.previous_demand = self.current_demand.copy()
        self._update_demand()
        self._handle_shocks()
        
        self.timestep += 1
        self.time_since_shock += 1
        
        # Calculate reward
        reward, reward_components = self._calculate_reward(
            satisfied_demand, unsatisfied_demand, action, production, transfers
        )
        
        # Check termination
        truncated = self.timestep >= self.MAX_TIMESTEPS
        terminated = False
        
        obs = self._get_obs()
        info = {"reward_components": reward_components}
        
        return obs, float(reward), bool(terminated), bool(truncated), info
    
    def _update_demand(self):
        """Update demand using sinusoidal patterns with non-stationary drift."""
        # Base sinusoidal demand
        time_factor = self.timestep * self.demand_frequency
        base_demand = 25.0 + 15.0 * np.sin(self.demand_phase + time_factor)
        
        # Non-stationary: slowly shift phases over time
        drift = 0.001 * self.timestep
        non_stationary_demand = base_demand + 5.0 * np.sin(drift * self.timestep)
        
        # Add shock effects
        if self.shock_active:
            shock_magnitude = self.np_random.uniform(10.0, 30.0, size=3)
            non_stationary_demand += shock_magnitude
        
        self.current_demand = np.clip(non_stationary_demand, 0.0, self.MAX_DEMAND).astype(np.float32)
    
    def _handle_shocks(self):
        """Manage stochastic shock events."""
        if self.shock_active:
            self.shock_timer -= 1
            if self.shock_timer <= 0:
                self.shock_active = False
                self.time_since_shock = 0
        else:
            # Probability increases with efficiency (higher efficiency = higher volatility)
            shock_prob = self.SHOCK_PROBABILITY * (0.5 + self.rolling_efficiency)
            if self.np_random.random() < shock_prob:
                self.shock_active = True
                self.shock_timer = self.SHOCK_DURATION
    
    def _update_coupling(self):
        """Update coupling strengths based on rolling efficiency."""
        # Higher efficiency increases coupling complexity
        coupling_strength = self.COUPLING_BASE * (0.5 + self.rolling_efficiency)
        
        self.coupling[0, 1] = coupling_strength * 0.9  # AB
        self.coupling[1, 2] = coupling_strength * 1.1  # BC
        self.coupling[2, 0] = coupling_strength * 0.8  # CA
        
        self.coupling = np.clip(self.coupling, 0.0, 1.0)
    
    def _get_obs(self) -> np.ndarray:
        """Construct observation vector."""
        # Normalize storage to [0, 1]
        storage_norm = self.storage / self.MAX_STORAGE
        
        # Normalize demand to [0, 1]
        demand_norm = self.current_demand / self.MAX_DEMAND
        
        # Calculate demand derivatives and normalize to [0, 1]
        demand_deriv = (self.current_demand - self.previous_demand) / (self.MAX_DEMAND + 1e-8)
        demand_deriv_norm = np.clip(demand_deriv * 0.5 + 0.5, 0.0, 1.0)
        
        # Coupling values already in [0, 1]
        coupling_ab = self.coupling[0, 1]
        coupling_bc = self.coupling[1, 2]
        coupling_ca = self.coupling[2, 0]
        
        # Time since shock normalized
        time_shock_norm = min(self.time_since_shock / 100.0, 1.0)
        
        # Rolling efficiency in [0, 1]
        efficiency_norm = self.rolling_efficiency
        
        # Normalized step
        step_norm = self.timestep / self.MAX_TIMESTEPS
        
        obs = np.array([
            storage_norm[0], storage_norm[1], storage_norm[2],
            demand_norm[0], demand_norm[1], demand_norm[2],
            demand_deriv_norm[0], demand_deriv_norm[1], demand_deriv_norm[2],
            coupling_ab, coupling_bc, coupling_ca,
            time_shock_norm, efficiency_norm, step_norm
        ], dtype=np.float32)
        
        return obs
    
    def _calculate_reward(
        self, 
        satisfied: np.ndarray, 
        unsatisfied: np.ndarray, 
        action: np.ndarray,
        production: np.ndarray,
        transfers: np.ndarray
    ) -> Tuple[float, Dict[str, float]]:
        """Calculate reward with multiple components."""
        # Demand satisfaction reward (dense)
        max_possible = self.MAX_DEMAND * 3.0
        satisfaction_reward = np.sum(satisfied) / max_possible * 5.0
        
        # Penalty for unsatisfied demand
        shortage_penalty = -np.sum(unsatisfied) / max_possible * 3.0
        
        # Storage balance penalty (prefer middle levels at 50%)
        storage_ratio = self.storage / self.MAX_STORAGE
        storage_balance = -np.sum(np.abs(storage_ratio - 0.5)) * 0.5
        
        # Action regularization (penalize extreme actions)
        action_penalty = -0.01 * np.sum(action ** 2)
        
        # Efficiency bonus
        efficiency_bonus = self.rolling_efficiency * 0.5
        
        # Shock resilience bonus if handling shock well
        shock_bonus = 0.0
        if self.shock_active and np.sum(unsatisfied) < 5.0:
            shock_bonus = 1.0
        
        total_reward = satisfaction_reward + shortage_penalty + storage_balance + action_penalty + efficiency_bonus + shock_bonus
        
        # Clip to range [-10, 10]
        total_reward = np.clip(total_reward, -10.0, 10.0)
        
        components = {
            "satisfaction": float(satisfaction_reward),
            "shortage": float(shortage_penalty),
            "balance": float(storage_balance),
            "action_reg": float(action_penalty),
            "efficiency": float(efficiency_bonus),
            "shock_resilience": float(shock_bonus),
            "total": float(total_reward)
        }
        
        return float(total_reward), components
    
    def close(self) -> None:
        """Clean up resources."""
        pass