Dynamic Programming Rl

november 2025

Markov Decision Process (MDP) Formulation

An MDP is defined by the tuple $(S, A, P, R, \gamma)$ where:

The state-value function for policy $\pi$ is:

$$v_\pi(s) = \mathbb{E}_\pi[G_t | S_t = s] = \mathbb{E}_\pi\left[\sum_{k=0}^{\infty} \gamma^k R_{t+k+1} | S_t = s\right]$$

The action-value function is:

$$q_\pi(s, a) = \mathbb{E}_\pi[G_t | S_t = s, A_t = a]$$

The Bellman expectation equation for $v_\pi$ is:

$$v_\pi(s) = \sum_a \pi(a|s) \sum_{s', r} p(s', r | s, a)[r + \gamma v_\pi(s')]$$

Dynamic programming (DP) provides a collection of algorithms for computing optimal policies when we have a perfect model of the environment as a Markov Decision Process (MDP). While DP methods are limited by their assumption of a perfect model and high computational cost, they provide the theoretical foundation for understanding reinforcement learning.

GridWorld Environment

We'll implement a classic GridWorld environment where:

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import seaborn as sns

np.random.seed(42)
sns.set_style('whitegrid')
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import seaborn as sns

np.random.seed(42)
sns.set_style('whitegrid')

class GridWorld:
    """
    GridWorld MDP environment.

    Grid layout:
    - 'S': Start state
    - 'G': Goal state (+10 reward)
    - 'X': Obstacle (blocked)
    - '.': Normal state (-1 step cost)
    """

    def __init__(self, grid_size=(5, 5), goal=(4, 4), obstacles=None, gamma=0.9):
        """
        Args:
            grid_size: (height, width) of grid
            goal: (row, col) of goal state
            obstacles: list of (row, col) obstacle positions
            gamma: discount factor
        """
        self.height, self.width = grid_size
        self.goal = goal
        self.obstacles = obstacles or []
        self.gamma = gamma

        self.actions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
        self.action_names = ['→', '↓', '←', '↑']
        self.n_actions = len(self.actions)

        self.states = []
        for i in range(self.height):
            for j in range(self.width):
                if (i, j) not in self.obstacles:
                    self.states.append((i, j))

        self.n_states = len(self.states)
        self.state_to_idx = {s: i for i, s in enumerate(self.states)}

    def is_terminal(self, state):
        return state == self.goal

    def get_next_state(self, state, action_idx):
        if self.is_terminal(state):
            return state

        action = self.actions[action_idx]
        next_state = (state[0] + action[0], state[1] + action[1])

        if (0 <= next_state[0] < self.height and
            0 <= next_state[1] < self.width and
            next_state not in self.obstacles):
            return next_state
        else:
            return state

    def get_reward(self, state, action_idx, next_state):
        if next_state == self.goal:
            return 10.0  # Goal reward
        else:
            return -1.0  # Step cost

    def get_transition_prob(self, state, action_idx, next_state):
        expected_next = self.get_next_state(state, action_idx)
        return 1.0 if next_state == expected_next else 0.0

    def render(self, title="GridWorld Environment"):
        plt.figure(figsize=(self.width, self.height))
        ax = plt.gca()

        for x in range(self.width + 1):
            ax.axvline(x, color='gray', lw=1)
        for y in range(self.height + 1):
            ax.axhline(y, color='gray', lw=1)

        for obs in self.obstacles:
            rect = Rectangle((obs[1], obs[0]), 1, 1, facecolor='darkgray', alpha=0.8)
            ax.add_patch(rect)
            plt.text(obs[1] + 0.5, obs[0] + 0.5, 'X', ha='center', va='center',
                     color='black', fontsize=16, fontweight='bold')

        goal_rect = Rectangle((self.goal[1], self.goal[0]), 1, 1, facecolor='lightgreen', alpha=0.8)
        ax.add_patch(goal_rect)
        plt.text(self.goal[1] + 0.5, self.goal[0] + 0.5, 'G', ha='center', va='center',
                 color='darkgreen', fontsize=16, fontweight='bold')

        ax.set_xlim(0, self.width)
        ax.set_ylim(0, self.height)
        ax.set_xticks(np.arange(0.5, self.width, 1))
        ax.set_yticks(np.arange(0.5, self.height, 1))
        ax.set_xticklabels(np.arange(self.width))
        ax.set_yticklabels(np.arange(self.height))
        ax.set_aspect('equal', adjustable='box')
        ax.invert_yaxis()

        plt.title(title)
        plt.xlabel("Column")
        plt.ylabel("Row")
        plt.grid(False)
        plt.show()

env = GridWorld(
    grid_size=(5, 5),
    goal=(4, 4),
    obstacles=[(1, 1), (2, 2), (3, 1)],
    gamma=0.9
)

print(f"Grid size: {env.height}x{env.width}")
print(f"Number of states: {env.n_states}")
print(f"Number of actions: {env.n_actions}")
print(f"Goal state: {env.goal}")
print(f"Obstacles: {env.obstacles}")

env.render("Initial GridWorld Layout")
output:
Grid size: 5x5
Number of states: 22
Number of actions: 4
Goal state: (4, 4)
Obstacles: [(1, 1), (2, 2), (3, 1)]
Plot output

Policy Evaluation

Policy evaluation computes the state-value function $v_\pi$ for a given policy $\pi$. It iteratively applies the Bellman equation:

$$v_{k+1}(s) = \sum_a \pi(a|s) \sum_{s'} p(s'|s,a)[r(s,a,s') + \gamma v_k(s')]$$

This is guaranteed to converge to $v_\pi$ as $k \to \infty$.

Algorithm:

1. Initialize $V(s) = 0$ for all states

2. Repeat until convergence:

- For each state $s$:

- $v \leftarrow V(s)$

- $V(s) \leftarrow \sum_a \pi(a|s) \sum_{s'} p(s'|s,a)[r + \gamma V(s')]$

- $\Delta \leftarrow \max(\Delta, |v - V(s)|)$

- If $\Delta < \theta$, stop

def policy_evaluation(env, policy, theta=1e-6, max_iterations=1000):
    V = np.zeros(env.n_states)

    for iteration in range(max_iterations):
        delta = 0

        for state_idx, state in enumerate(env.states):
            if env.is_terminal(state):
                continue

            v = V[state_idx]
            new_v = 0

            for action_idx in range(env.n_actions):
                next_state = env.get_next_state(state, action_idx)
                next_state_idx = env.state_to_idx[next_state]
                reward = env.get_reward(state, action_idx, next_state)

                new_v += policy[state_idx, action_idx] * (reward + env.gamma * V[next_state_idx])

            V[state_idx] = new_v
            delta = max(delta, abs(v - new_v))

        if delta < theta:
            print(f"Policy evaluation converged in {iteration + 1} iterations")
            return V, iteration + 1

    print(f"Policy evaluation reached max iterations ({max_iterations})")
    return V, max_iterations

uniform_policy = np.ones((env.n_states, env.n_actions)) / env.n_actions

V_uniform, iterations = policy_evaluation(env, uniform_policy)

print(f"\nValue function for uniform random policy:")
print(f"Converged in {iterations} iterations")
output:
Policy evaluation converged in 93 iterations

Value function for uniform random policy:
Converged in 93 iterations

Policy Iteration

Policy iteration alternates between:

1. Policy Evaluation: Compute $v_\pi$ for current policy

2. Policy Improvement: Update policy greedily w.r.t. $v_\pi$

$$\pi'(s) = \arg\max_a \sum_{s'} p(s'|s,a)[r(s,a,s') + \gamma v_\pi(s')]$$

The policy improvement theorem guarantees that $v_{\pi'}(s) \geq v_\pi(s)$ for all states.

Algorithm:

1. Initialize policy $\pi$ arbitrarily

2. Repeat:

- Policy Evaluation: Compute $V = v_\pi$

- Policy Improvement:

- $\text{policy-stable} \leftarrow \text{true}$

- For each state $s$:

- $\text{old-action} \leftarrow \pi(s)$

- $\pi(s) \leftarrow \arg\max_a \sum_{s'} p(s'|s,a)[r + \gamma V(s')]$

- If $\text{old-action} \neq \pi(s)$, $\text{policy-stable} \leftarrow \text{false}$

- If policy-stable, stop

def policy_iteration(env, theta=1e-6, max_iterations=100):

    policy = np.ones((env.n_states, env.n_actions)) / env.n_actions
    history = []

    for iteration in range(max_iterations):
        V, _ = policy_evaluation(env, policy, theta)
        history.append((V.copy(), policy.copy()))

        policy_stable = True
        new_policy = np.zeros((env.n_states, env.n_actions))

        for state_idx, state in enumerate(env.states):
            if env.is_terminal(state):
                continue

            action_values = np.zeros(env.n_actions)
            for action_idx in range(env.n_actions):
                next_state = env.get_next_state(state, action_idx)
                next_state_idx = env.state_to_idx[next_state]
                reward = env.get_reward(state, action_idx, next_state)
                action_values[action_idx] = reward + env.gamma * V[next_state_idx]

            best_action = np.argmax(action_values)
            new_policy[state_idx, best_action] = 1.0

            if not np.allclose(policy[state_idx], new_policy[state_idx]):
                policy_stable = False

        policy = new_policy

        if policy_stable:
            print(f"Policy iteration converged in {iteration + 1} iterations")
            return policy, V, history

    print(f"Policy iteration reached max iterations ({max_iterations})")
    return policy, V, history

optimal_policy, optimal_V, pi_history = policy_iteration(env)

print(f"\nOptimal policy found!")
print(f"Number of policy improvement steps: {len(pi_history)}")
output:
Policy evaluation converged in 93 iterations
Policy evaluation converged in 9 iterations
Policy evaluation converged in 9 iterations
Policy iteration converged in 3 iterations

Optimal policy found!
Number of policy improvement steps: 3

Value Iteration

Value iteration combines policy evaluation and improvement into a single update. It directly computes the optimal value function using the Bellman optimality equation:

$$v_{k+1}(s) = \max_a \sum_{s'} p(s'|s,a)[r(s,a,s') + \gamma v_k(s')]$$

Once the optimal value function $v_*$ is found, the optimal policy is:

$$\pi_*(s) = \arg\max_a \sum_{s'} p(s'|s,a)[r(s,a,s') + \gamma v_*(s')]$$

Algorithm:

1. Initialize $V(s) = 0$ for all states

2. Repeat until convergence:

- For each state $s$:

- $V(s) \leftarrow \max_a \sum_{s'} p(s'|s,a)[r + \gamma V(s')]$

3. Extract policy: $\pi(s) = \arg\max_a \sum_{s'} p(s'|s,a)[r + \gamma V(s')]$

def value_iteration(env, theta=1e-6, max_iterations=1000):

    V = np.zeros(env.n_states)
    history = []

    for iteration in range(max_iterations):
        delta = 0
        history.append(V.copy())

        for state_idx, state in enumerate(env.states):
            if env.is_terminal(state):
                continue

            v = V[state_idx]

            action_values = np.zeros(env.n_actions)
            for action_idx in range(env.n_actions):
                next_state = env.get_next_state(state, action_idx)
                next_state_idx = env.state_to_idx[next_state]
                reward = env.get_reward(state, action_idx, next_state)
                action_values[action_idx] = reward + env.gamma * V[next_state_idx]

            V[state_idx] = np.max(action_values)
            delta = max(delta, abs(v - V[state_idx]))

        if delta < theta:
            print(f"Value iteration converged in {iteration + 1} iterations")
            break

    policy = np.zeros((env.n_states, env.n_actions))
    for state_idx, state in enumerate(env.states):
        if env.is_terminal(state):
            continue

        action_values = np.zeros(env.n_actions)
        for action_idx in range(env.n_actions):
            next_state = env.get_next_state(state, action_idx)
            next_state_idx = env.state_to_idx[next_state]
            reward = env.get_reward(state, action_idx, next_state)
            action_values[action_idx] = reward + env.gamma * V[next_state_idx]

        best_action = np.argmax(action_values)
        policy[state_idx, best_action] = 1.0

    return policy, V, history

vi_policy, vi_V, vi_history = value_iteration(env)

print(f"\nOptimal policy found!")
print(f"Value iteration took {len(vi_history)} iterations")
print(f"\nVerifying policies match:")
print(f"Policy iteration == Value iteration: {np.allclose(optimal_policy, vi_policy)}")
print(f"V_PI == V_VI: {np.allclose(optimal_V, vi_V)}")
output:
Value iteration converged in 9 iterations

Optimal policy found!
Value iteration took 9 iterations

Verifying policies match:
Policy iteration == Value iteration: True
V_PI == V_VI: True

Visualization

Let's visualize the optimal value function and policy.

def plot_value_function(env, V, title="Value Function"):

    V_grid = np.full((env.height, env.width), np.nan)

    for state_idx, state in enumerate(env.states):
        V_grid[state[0], state[1]] = V[state_idx]

    plt.figure(figsize=(8, 7))
    im = plt.imshow(V_grid, cmap='RdYlGn', interpolation='nearest')
    plt.colorbar(im, label='State Value')


    for i in range(env.height + 1):
        plt.axhline(i - 0.5, color='black', linewidth=1)
    for j in range(env.width + 1):
        plt.axvline(j - 0.5, color='black', linewidth=1)


    for i in range(env.height):
        for j in range(env.width):
            if not np.isnan(V_grid[i, j]):
                plt.text(j, i, f'{V_grid[i, j]:.1f}',
                        ha='center', va='center', fontsize=12, fontweight='bold')


    goal_y, goal_x = env.goal
    plt.text(goal_x, goal_y - 0.35, 'GOAL', ha='center', fontsize=8, color='darkgreen')

    for obs in env.obstacles:
        plt.text(obs[1], obs[0], 'X', ha='center', va='center',
                fontsize=20, color='red', fontweight='bold')

    plt.title(title, fontsize=14, fontweight='bold')
    plt.xticks(range(env.width))
    plt.yticks(range(env.height))
    plt.xlabel('Column')
    plt.ylabel('Row')
    plt.tight_layout()
    plt.show()

def plot_policy(env, policy, title="Optimal Policy"):
    """Plot policy as arrows."""
    plt.figure(figsize=(8, 7))
    ax = plt.gca()


    for i in range(env.height + 1):
        plt.axhline(i, color='black', linewidth=1)
    for j in range(env.width + 1):
        plt.axvline(j, color='black', linewidth=1)


    for obs in env.obstacles:
        rect = Rectangle((obs[1], obs[0]), 1, 1, facecolor='gray', alpha=0.5)
        ax.add_patch(rect)


    goal_rect = Rectangle((env.goal[1], env.goal[0]), 1, 1,
                          facecolor='lightgreen', alpha=0.5)
    ax.add_patch(goal_rect)
    plt.text(env.goal[1] + 0.5, env.goal[0] + 0.5, 'GOAL',
            ha='center', va='center', fontsize=10, fontweight='bold')


    arrow_scale = 0.3
    for state_idx, state in enumerate(env.states):
        if env.is_terminal(state):
            continue

        action_idx = np.argmax(policy[state_idx])
        action = env.actions[action_idx]

        y, x = state[0] + 0.5, state[1] + 0.5
        dy, dx = action[0] * arrow_scale, action[1] * arrow_scale

        plt.arrow(x, y, dx, dy, head_width=0.15, head_length=0.1,
                 fc='blue', ec='blue', linewidth=2)

    plt.xlim(0, env.width)
    plt.ylim(0, env.height)
    plt.gca().invert_yaxis()
    plt.title(title, fontsize=14, fontweight='bold')
    plt.xlabel('Column')
    plt.ylabel('Row')
    plt.xticks(range(env.width + 1))
    plt.yticks(range(env.height + 1))
    plt.tight_layout()
    plt.show()


plot_value_function(env, optimal_V, "Optimal Value Function")
plot_policy(env, optimal_policy, "Optimal Policy")
Plot output
Plot output

Convergence Analysis

Let's compare the convergence behavior of policy iteration vs value iteration.


plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
max_values = [np.max(V) for V in vi_history]
plt.plot(max_values, marker='o', linewidth=2, markersize=4)
plt.xlabel('Iteration', fontsize=12)
plt.ylabel('Max State Value', fontsize=12)
plt.title('Value Iteration Convergence', fontsize=13, fontweight='bold')
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)

bellman_errors = [np.max(np.abs(V - vi_V)) for V in vi_history]
plt.semilogy(bellman_errors, marker='o', linewidth=2, markersize=4, color='red')
plt.xlabel('Iteration', fontsize=12)
plt.ylabel('Max Bellman Error (log scale)', fontsize=12)
plt.title('Value Iteration Error Decay', fontsize=13, fontweight='bold')
plt.grid(True, alpha=0.3, which='both')

plt.tight_layout()
plt.show()

print(f"\nValue Iteration Statistics:")
print(f"Total iterations: {len(vi_history)}")
print(f"Final max value: {max_values[-1]:.4f}")
print(f"Final Bellman error: {bellman_errors[-1]:.2e}")
Plot output
output:
Value Iteration Statistics:
Total iterations: 9
Final max value: 10.0000
Final Bellman error: 0.00e+00