Source code for stable_baselines3.common.vec_env.vec_frame_stack

import warnings
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
from gym import spaces

from stable_baselines3.common.preprocessing import is_image_space, is_image_space_channels_first
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvWrapper


[docs]class VecFrameStack(VecEnvWrapper): """ Frame stacking wrapper for vectorized environment. Designed for image observations. Dimension to stack over is either first (channels-first) or last (channels-last), which is detected automatically using ``common.preprocessing.is_image_space_channels_first`` if observation is an image space. :param venv: the vectorized environment to wrap :param n_stack: Number of frames to stack :param channels_order: If "first", stack on first image dimension. If "last", stack on last dimension. If None, automatically detect channel to stack over in case of image observation or default to "last" (default). """ def __init__(self, venv: VecEnv, n_stack: int, channels_order: Optional[str] = None): self.venv = venv self.n_stack = n_stack wrapped_obs_space = venv.observation_space assert isinstance(wrapped_obs_space, spaces.Box), "VecFrameStack only work with gym.spaces.Box observation space" if channels_order is None: # Detect channel location automatically for images if is_image_space(wrapped_obs_space): self.channels_first = is_image_space_channels_first(wrapped_obs_space) else: # Default behavior for non-image space, stack on the last axis self.channels_first = False else: assert channels_order in {"last", "first"}, "`channels_order` must be one of following: 'last', 'first'" self.channels_first = channels_order == "first" # This includes the vec-env dimension (first) self.stack_dimension = 1 if self.channels_first else -1 repeat_axis = 0 if self.channels_first else -1 low = np.repeat(wrapped_obs_space.low, self.n_stack, axis=repeat_axis) high = np.repeat(wrapped_obs_space.high, self.n_stack, axis=repeat_axis) self.stackedobs = np.zeros((venv.num_envs,) + low.shape, low.dtype) observation_space = spaces.Box(low=low, high=high, dtype=venv.observation_space.dtype) VecEnvWrapper.__init__(self, venv, observation_space=observation_space)
[docs] def step_wait(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[Dict[str, Any]]]: observations, rewards, dones, infos = self.venv.step_wait() # Let pytype know that observation is not a dict assert isinstance(observations, np.ndarray) stack_ax_size = observations.shape[self.stack_dimension] self.stackedobs = np.roll(self.stackedobs, shift=-stack_ax_size, axis=self.stack_dimension) for i, done in enumerate(dones): if done: if "terminal_observation" in infos[i]: old_terminal = infos[i]["terminal_observation"] if self.channels_first: new_terminal = np.concatenate( (self.stackedobs[i, :-stack_ax_size, ...], old_terminal), axis=self.stack_dimension ) else: new_terminal = np.concatenate( (self.stackedobs[i, ..., :-stack_ax_size], old_terminal), axis=self.stack_dimension ) infos[i]["terminal_observation"] = new_terminal else: warnings.warn("VecFrameStack wrapping a VecEnv without terminal_observation info") self.stackedobs[i] = 0 if self.channels_first: self.stackedobs[:, -observations.shape[self.stack_dimension] :, ...] = observations else: self.stackedobs[..., -observations.shape[self.stack_dimension] :] = observations return self.stackedobs, rewards, dones, infos
[docs] def reset(self) -> np.ndarray: """ Reset all environments """ obs: np.ndarray = self.venv.reset() # pytype:disable=annotation-type-mismatch self.stackedobs[...] = 0 if self.channels_first: self.stackedobs[:, -obs.shape[self.stack_dimension] :, ...] = obs else: self.stackedobs[..., -obs.shape[self.stack_dimension] :] = obs return self.stackedobs
[docs] def close(self) -> None: self.venv.close()