import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import altair as alt
import pandas as pd
class Urn():
''' classic urn model using np array of ball counts
for two balls this is a model for the Binomial distribution
for more balls this is a model of the multinomial distribution
'''
def __init__(self, ball_names=['white','black'], init='ones', weights=None):
'''initialize the urn with ball names and weights'''
self.ball_names = ball_names
if weights is not None:
assert type(weights) == np.ndarray, "Weights must be a numpy array."
assert weights.shape[0] == (len(self.ball_names)), f"Weight shape {weights.shape} not the same as the number of balls. {len(self.ball_names)}"
self.weights = weights
else:
if init == 'ones':
self._weights = np.ones((self.ball_colors))
elif init == 'zeros':
self._weights = np.zeros((self.ball_colors))
elif init == 'random':
self._weights = np.random.rand(self.ball_colors)
else:
raise ValueError("Initialization parameter must be 'ones', 'zeros', or 'random'.")
@property
def ball_names(self):
return self._ball_names
@ball_names.setter
def ball_names(self, ball_names):
assert type(ball_names) == list, "Ball names must be a list."
self._ball_names = ball_names
self._num_balls = len(ball_names)
@property
def ball_colors(self):
return len(self.ball_names)
@property
def weights(self):
return self._weights
@weights.setter
def weights(self, weights):
assert type(weights) == np.ndarray, "Weights must be a numpy array."
#assert weights.shape == (self.ball_colors), "Weights must have the same shape as the number of balls and columns."
self._weights = weights
def draw(self,n=1):
''' draw a ball from the urn with replacement'''
= np.random.choice(self.ball_colors, p=self.weights/self.weights.sum(), size=n)
row_idx = []
result for i in range(n):
self.ball_names[row_idx[i]])
result.append(return result
Let’s create one of the most fundumental models in probability theory - the urn model.
Urn models go back to the 17th century and were first introduced in (Bernoulli 1713) by Jacob Bernoulli The urn model is a simple model that describes the process of drawing balls from an urn. The urn contains balls of different colors, and the goal is to study the probabilities of drawing balls of different colors.
The urn model is a simple model that describes the process of drawing balls from an urn.
The urn contains balls of different colors, and the goal is to study the probabilities of drawing balls of different colors.
Although basic urn models can be represented with draws from well known distributions an the urn model is useful concrete form for thinking about probability particularly when implementing simple reinforcement learning algorithms or model with Bayesian updating.
Some more complex processess in probability theory can be set up as urn model making this a useful model to understand.
When it comes to implementing agents, we can quickly set them up for reinforcement or Bayesian learning by equipping the agnet with such an urn model.
In python we can implement the urn model using a numpy array to represent the balls in the urn and their counts.
The basic operations of the urn model is to draw a ball from the urn and update the urn with the new ball counts.
basic operations:
- draw() draw a ball from the urn
- draw(n) draw n balls from the urn
operations:
- we might want to draw n balls then observe how many were of a certain color
- we might want to draw n balls then update the urn with the new ball counts
- we might want to draw n balls without updateing the urn with new balls to capture the current distribution of balls.
- convert balls/weights to probabilities
- estimate probability of drawing a certain sequence of balls with or without updating the urn.
- given n observations of balls estimate the ball proportions and thier confidence intervals.
Urn Models and distributions
- hypergeometric urn - sampling without replacement…
- urn with sampling with replacement and adding a new ball of the same color
- polya urn - when a ball is observed the urn is updated with the same color ball and a new ball of the same color
- beta-binomial when ever a ball is observed the urn is updated with the number of balls of the same color
- dirichlet
- hoppe urn - the urn has a mutator ball that generates new ball color (a new column) and a mutator state that generates new states (a new row)
- derichlet process
- chinee restaurant process
- moran urn - like a polya urn but we a;sp remove a ball so that the total number of balls remains constant.
The Basic Urn model
Bernulli Urn Model
#some examples
= Urn()
benulli_urn print(benulli_urn.draw(10))
print(benulli_urn.draw(10))
print(benulli_urn.draw(10))
['white', 'black', 'white', 'white', 'black', 'white', 'black', 'white', 'white', 'black']
['black', 'white', 'black', 'white', 'white', 'white', 'white', 'black', 'white', 'black']
['white', 'white', 'black', 'black', 'white', 'white', 'black', 'black', 'black', 'white']
= np.array([1., 9.])
benulli_urn.weights print(benulli_urn.draw(10))
['black', 'black', 'white', 'black', 'black', 'black', 'black', 'black', 'black', 'black']
= pd.DataFrame({'balls': benulli_urn.draw(100)})
bern_df bern_df.head()
balls | |
---|---|
0 | black |
1 | black |
2 | black |
3 | black |
4 | black |
=alt.Chart(bern_df).mark_bar().encode(
fig='balls',
x='count()'
y=200, height=200)
).properties(width fig.show()
Multinomial Urn Model
= Urn(ball_names=['red','blue','green'], weights=np.array([3., 9., 1.]))
multinomial_urn = pd.DataFrame({'balls': multinomial_urn.draw(100)})
multi_df multi_df.head()
balls | |
---|---|
0 | blue |
1 | green |
2 | blue |
3 | blue |
4 | blue |
#alt.renderers.enable("html")
alt.Chart(multi_df).mark_bar().encode(='balls',
x='count()'
y=200, height=200) ).properties(width
print(multinomial_urn.draw(10))
['blue', 'blue', 'red', 'blue', 'blue', 'red', 'blue', 'blue', 'red', 'blue']
The Polya Urn model
class Polya(Urn):
'''
The polya urn model is a generalization of the urn model where c is the number of balls of the same color added to the urn
for c=0 the polya urn model we get drawing with replacement reulting in binomial and multinomial distributions.
for c=1 the polya urn model we get drawing with replacement and adding a new ball of the same color resulting in a BetaBinomial and Dirichlet distributions.
for c=-1 the polya urn model we get drawing withot replacement resulting in a the hypergeometric distribution.
'''
def __init__(self,ball_names=['white','black'], init='ones', weights=None, c=1):
'''initialize the urn with ball names and weights'''
super().__init__(ball_names, init, weights)
self.c = c
def draw(self,n=1,update=True):
'''
draw a ball from the urn with replacement and add a new ball of the same color
Parameters:
n: int, number of balls to draw
update: bool, update the urn with the new ball counts or keep it frozen
'''
= []
result for i in range(n):
= np.random.choice(self.ball_colors, p=self.weights/self.weights.sum(), size=n)
row_idx self.ball_names[row_idx[i]])
result.append(if update:
self.weights[row_idx[i]] += self.c
return result
BetaBinomial Urn Model - Polya Urn Model with c=1
= Polya(ball_names=['red','blue'], c=1)
betabinomial_urn print(betabinomial_urn.draw(10))
['blue', 'red', 'blue', 'blue', 'blue', 'blue', 'blue', 'blue', 'red', 'blue']
= pd.DataFrame({'balls': betabinomial_urn.draw(100)})
betabinomial_df
alt.Chart(betabinomial_df).mark_bar().encode(='balls',
x='count()'
y=200, height=200) ).properties(width
BetaBinomial urn model
Dirichlet Urn Model - Polya Urn Model with c=1
Hypergeometric Urn Model - Polya Urn Model with c=-1
The Hoppe Urn models
class Hoppe(Polya):
''' Hoppe urn model'''
def __init__(self,ball_names=['0'], init='ones', weights=None, c=1, mutator_mass=1.0):
'''initialize the urn with ball names and weights'''
super().__init__(ball_names, init, weights, c)
self.mutator_mass = mutator_mass
if weights is not None:
assert type(weights) == np.ndarray, "Weights must be a numpy array."
assert weights.shape[0] == (len(self.ball_names)), f"Weight shape {weights.shape} not the same as the number of balls. {len(self.ball_names)}"
self.weights = weights
else:
if init == 'ones':
self._weights = np.ones((self.ball_colors))
elif init == 'zeros':
self._weights = np.zeros((self.ball_colors))
elif init == 'random':
self._weights = np.random.rand(self.ball_colors)
else:
raise ValueError("Initialization parameter must be 'ones', 'zeros', or 'random'.")
#set the weight of the mutator ball to the mutator mass
self.weights[0] = self.mutator_mass
def draw(self,n=1):
''' draw a ball from the urn with replacement and add a new ball of the same color'''
= []
result for i in range(n):
= np.random.choice(self.ball_colors, p=self.weights/self.weights.sum(), size=1)
row_idx if row_idx[i] == 0:
#add a new ball color
self.ball_names.append(str(len(self.ball_names)))
self.weights = np.append(self.weights, c)
self.ball_names[-1])
result.append(else:
self.ball_names[row_idx[0]])
result.append(self.weights[row_idx[i]] += c
return result
The Moran Urn model
class Moran(Polya):
''' Moran urn model'''
def draw(self,n=1):
''' draw a ball from the urn with replacement and add a new ball of the same color'''
= []
result for i in range(n):
= np.random.choice(self.ball_colors, p=self.weights/self.weights.sum(), size=2)
row_idx self.weights[row_idx[0]] += c
self.weights[row_idx[1]] -= c
self.ball_names[row_idx[0]])
result.append(
return result
Ehrenfest Urn Model
The Ehrenfest urn model is a simple model that describes the process of moving balls between two urns. I view this as a precursor to compartmental models in epidemiology and other fields and it demostrates how one can extend the urn model can be used to model more complex systems. The more general model is the multiurn model where we have multiple urns and we can move balls between them which is a Markov chain model.
The model consists of two urns, each containing a fixed number of balls. At each time step, a ball is randomly selected from one of the urns and moved to the other urn.
The MultiUrn model
“any problem of probability appears comparable to a suitable problem about bags containing balls, and any random mass phenomenon appears as similar in certain essential respects to successive drawings of balls from a system of suitibly combined bags.” - Polya (1954)
So I actualy implemented this model first to do some basic RL algorithms for the Lewis Signalling model.
The MultiUrn model is an extension of the basic Urn model that allows for multiple urns to be used together.
We may for example need to learn an urn model per state in our system extending a bandit algorithm to a contextual bandit algorithm.
We can represent these using rows of a matrix where each row represents an urn and each column represents a ball color.
In cases where we have hierarchical models we may be able to use additional constraints - for example on both rows and columns to speed up learning.
class MultiUrn:
def __init__(self, row_names, col_names, init='ones'):
self.row_names = row_names
self.col_names = col_names
self.num_rows = len(row_names)
self.num_cols = len(col_names)
if init == 'ones':
self.weights = np.ones((self.num_rows, self.num_cols))
elif init == 'zeros':
self.weights = np.zeros((self.num_rows, self.num_cols))
elif init == 'random':
self.weights = np.random.rand(self.num_rows, self.num_cols)
else:
raise ValueError("Initialization parameter must be 'ones', 'zeros', or 'random'.")
def _convert_to_numeric(self, row_name, col_name):
try:
= self.row_names.index(row_name)
row_idx = self.col_names.index(col_name)
col_idx return row_idx, col_idx
except ValueError:
raise ValueError("Invalid row or column name.")
def get_weight(self, row_name, col_name):
= self._convert_to_numeric(row_name, col_name)
row_idx, col_idx return self.weights[row_idx, col_idx]
def set_weight(self, row_name, col_name, value):
= self._convert_to_numeric(row_name, col_name)
row_idx, col_idx self.weights[row_idx, col_idx] = value
def add_weights(self, other_urn):
if self.weights.shape != other_urn.weights.shape:
raise ValueError("Urns must have the same dimensions to add weights.")
return Urn(self.row_names, self.col_names, init=None, weights=self.weights + other_urn.weights)
def get_conditional_probabilities(self):
= self.weights.sum(axis=1, keepdims=True)
row_sums = self.weights / row_sums
conditional_probs return conditional_probs
def get_conditional_probability(self, row_name, col_name):
= self._convert_to_numeric(row_name, col_name)
row_idx, col_idx = self.weights[row_idx, :].sum()
row_sum = self.weights[row_idx, col_idx] / row_sum
conditional_prob return conditional_prob
def choose_option(self, row_name):
= self.row_names.index(row_name)
row_idx = self.weights[row_idx, :]
row_weights = np.random.choice(self.num_cols, p=row_weights/row_weights.sum())
col_idx return self.col_names[col_idx]
def update_weights(self, row_name, col_name, reward):
= self._convert_to_numeric(row_name, col_name)
row_idx, col_idx self.weights[row_idx, col_idx] += reward
def plot_heatmap(self):
for idx, row_name in enumerate(self.row_names):
=(10, 1))
plt.figure(figsizeself.weights[idx, :].reshape(1, -1), annot=True, cmap="viridis", cbar=False, xticklabels=self.col_names, yticklabels=[row_name])
sns.heatmap(f"Urn for {row_name}")
plt.title(
plt.show()
def calculate_expected_reward(self, receiver_urn):
= 0.0
result = self.get_conditional_probabilities()
sender_probs = receiver_urn.get_conditional_probabilities()
receiver_probs
for sender_state in self.row_names:
for sender_signal in self.col_names:
= self.get_conditional_probability(sender_state, sender_signal)
p_sender for receiver_signal in receiver_urn.row_names:
for receiver_state in receiver_urn.col_names:
= receiver_urn.get_conditional_probability(receiver_signal, receiver_state)
p_receiver if receiver_signal == sender_signal:
+= p_sender * p_receiver
result return result
def add_expected_reward(self, receiver_urn):
= self.calculate_expected_reward(receiver_urn)
expected_reward for row_name in self.row_names:
for col_name in self.col_names:
self.update_weights(row_name, col_name, expected_reward)
def __str__(self):
return str(self.weights)
# Example usage
= ['state0', 'state1', 'state2', 'state3', 'state4']
row_names = ['a', 'b', 'c', 'd']
col_names
= MultiUrn(row_names, col_names, init='ones')
urn
print("Initial weights:")
print(urn)
= urn.get_weight('state0', 'a')
weight_0_a print(f"Weight for state0 and signal 'a': {weight_0_a}")
'state0', 'a', 2.0)
urn.set_weight(print("Weights after setting weight for state0 and signal 'a' to 0.5:")
print(urn)
= urn.get_conditional_probabilities()
conditional_probs print("Conditional probabilities:")
print(conditional_probs)
= 'state0'
state = 'a'
signal = urn.get_conditional_probability(state, signal)
conditional_prob print(f"Conditional probability of signal {signal} given {state}: {conditional_prob}")
= 'state1'
state = 'a'
signal = urn.get_conditional_probability(state, signal)
conditional_prob print(f"Conditional probability of signal {signal} given {state}: {conditional_prob}")
= urn.choose_option('state0')
chosen_signal print(f"Chosen signal for state0: {chosen_signal}")
'state0', 'a', 1.0)
urn.update_weights(print("Weights after updating weight for state0 and signal 'a' with a reward of 0.1:")
print(urn)
# Plot heatmaps
urn.plot_heatmap()
Initial weights:
[[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]]
Weight for state0 and signal 'a': 1.0
Weights after setting weight for state0 and signal 'a' to 0.5:
[[2. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]]
Conditional probabilities:
[[0.4 0.2 0.2 0.2 ]
[0.25 0.25 0.25 0.25]
[0.25 0.25 0.25 0.25]
[0.25 0.25 0.25 0.25]
[0.25 0.25 0.25 0.25]]
Conditional probability of signal a given state0: 0.4
Conditional probability of signal a given state1: 0.25
Chosen signal for state0: c
Weights after updating weight for state0 and signal 'a' with a reward of 0.1:
[[3. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]
[1. 1. 1. 1.]]
= ['state0', 'state1', 'state2', 'state3', 'state4']
s_row_names = ['a', 'b', 'c', 'd']
s_col_names = MultiUrn(s_row_names, s_col_names, init='ones')
s_urn s_urn.plot_heatmap()
= ['a', 'b', 'c', 'd']
r_row_names = ['state0', 'state1', 'state2', 'state3', 'state4']
r_col_names = MultiUrn(r_row_names, r_col_names, init='ones')
r_urn r_urn.plot_heatmap()
lets add a method to calculate the expected reward of two urns
result = 0.0 for each sender state sender_state for each sender signal sender_signal p_sender = the conditional probability of the sender_signal given the sender_state for each reciever signal reciever_signal for each reciever state reciever_state p_reciever = the conditional probability of the reciever_state given the reciever_signal if the reciever_signal is the same as the sender_signal result += p_sender * p_reciever return result
note I think the expected reward chould be less then one - since the expected reward is the probability of the reciever signal given the sender signal
and sum of probabilities of the reciever signal given the sender signal is less then one.
calculate the expected reward
add the expected reward to the urn
where we start with a reciever, chose
= s_urn.calculate_expected_reward(r_urn)/(s_urn.num_rows*r_urn.num_cols*s_urn.num_rows*r_urn.num_cols)
expected_reward print(f"Expected reward: {expected_reward}")
s_urn.add_expected_reward(r_urn)print("Sender Urn weights after adding expected reward:")
print(s_urn)
s_urn.plot_heatmap()
Expected reward: 0.007999999999999985
Sender Urn weights after adding expected reward:
[[6. 6. 6. 6.]
[6. 6. 6. 6.]
[6. 6. 6. 6.]
[6. 6. 6. 6.]
[6. 6. 6. 6.]]
Citation
@online{bochman2024,
author = {Bochman, Oren},
title = {Urn Models Using {Numpy}},
date = {2024-05-02},
url = {https://orenbochman.github.io/posts/2024/2024-05-01-signals/urn.html},
langid = {en}
}