variational_autoencoder_mc_samples-checkpoint.ipynb (Source)
Preamble¶
In [1]:
%matplotlib notebook
In [2]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from keras import backend as K
from keras.layers import (Input, InputLayer, Dense, Lambda, Layer,
Add, Multiply)
from keras.models import Model, Sequential
from keras.datasets import mnist
In [3]:
import pandas as pd
from matplotlib.ticker import FormatStrFormatter
from keras.utils.vis_utils import model_to_dot, plot_model
from IPython.display import SVG
Notebook Configuration¶
In [4]:
np.set_printoptions(precision=2,
edgeitems=3,
linewidth=80,
suppress=True)
In [5]:
'TensorFlow version: ' + K.tf.__version__
Out[5]:
Constant definitions¶
In [6]:
mc_samples = 25
batch_size = 100
original_dim = 784
latent_dim = 2
intermediate_dim = 256
epochs = 50
epsilon_std = 1.0
Model specification¶
Encoder¶
Figure 1: Reparameterization using Keras Layers¶
In [7]:
z_mu = Input(shape=(latent_dim,), name='mu')
z_sigma = Input(shape=(latent_dim,), name='sigma')
eps = Input(shape=(mc_samples, latent_dim), name='eps')
z_eps = Multiply(name='z_eps')([z_sigma, eps])
z = Add(name='z')([z_mu, z_eps])
In [8]:
m = Model(inputs=[eps, z_mu, z_sigma], outputs=z)
In [9]:
SVG(model_to_dot(m, show_shapes=True)
.create(prog='dot', format='svg'))
Out[9]:
In [10]:
plot_model(
model=m, show_shapes=False,
to_file='../../images/vae/reparameterization_mc_samples.svg'
)
In [11]:
plot_model(
model=m, show_shapes=True,
to_file='../../images/vae/reparameterization_mc_samples_shapes.svg'
)
Figure 2: Encoder architecture¶
In [12]:
x = Input(shape=(original_dim,), name='x')
h = Dense(intermediate_dim, activation='relu', name='encoder_hidden')(x)
z_mu = Dense(latent_dim, name='mu')(h)
z_log_var = Dense(latent_dim, name='log_var')(h)
z_sigma = Lambda(lambda t: K.exp(.5*t), name='sigma')(z_log_var)
In [13]:
eps = Input(shape=(mc_samples, latent_dim), name='eps')
z_eps = Multiply(name='z_eps')([z_sigma, eps])
z = Add(name='z')([z_mu, z_eps])
In [14]:
encoder = Model(inputs=[x, eps], outputs=z)
In [15]:
SVG(model_to_dot(encoder, show_shapes=True)
.create(prog='dot', format='svg'))
Out[15]:
In [16]:
plot_model(
model=encoder, show_shapes=False,
to_file='../../images/vae/encoder_mc_samples.svg'
)
In [17]:
plot_model(
model=encoder, show_shapes=True,
to_file='../../images/vae/encoder_mc_samples_shapes.svg'
)
Figure 3: Full Encoder architecture with auxiliary layers¶
In [18]:
class KLDivergenceLayer(Layer):
""" Identity transform layer that adds KL divergence
to the final model loss.
"""
def __init__(self, *args, **kwargs):
self.is_placeholder = True
super(KLDivergenceLayer, self).__init__(*args, **kwargs)
def call(self, inputs):
mu, log_var = inputs
kl_batch = - .5 * K.sum(1 + log_var -
K.square(mu) -
K.exp(log_var), axis=-1)
self.add_loss(K.mean(kl_batch), inputs=inputs)
return inputs
In [19]:
z_mu, z_log_var = KLDivergenceLayer(name='kl')([z_mu, z_log_var])
z_sigma = Lambda(lambda t: K.exp(.5*t), name='sigma')(z_log_var)
In [20]:
eps = Input(shape=(mc_samples, latent_dim), name='eps')
z_eps = Multiply(name='sigma_eps')([z_sigma, eps])
z = Add(name='z')([z_mu, z_eps])
In [21]:
encoder = Model(inputs=[x, eps], outputs=z)
In [22]:
SVG(model_to_dot(encoder, show_shapes=True)
.create(prog='dot', format='svg'))
Out[22]:
In [23]:
plot_model(
model=encoder, show_shapes=False,
to_file='../../images/vae/encoder_full_mc_samples.svg'
)
In [24]:
plot_model(
model=encoder, show_shapes=True,
to_file='../../images/vae/encoder_full_mc_samples_shapes.svg'
)
Decoder¶
In [25]:
decoder = Sequential([
Dense(intermediate_dim, input_dim=latent_dim,
activation='relu', name='decoder_hidden'),
Dense(original_dim, activation='sigmoid', name='x_mean')
], name='decoder')
In [26]:
# equivalent to above. Writing InputLayer explicitly
# to set layer name in architecture diagram
decoder = Sequential([
InputLayer(input_shape=(latent_dim,), name='z'),
Dense(intermediate_dim, input_shape=(latent_dim,),
activation='relu', name='decoder_hidden'),
Dense(original_dim, activation='sigmoid', name='x_mean')
], name='decoder')
In [27]:
SVG(model_to_dot(decoder, show_shapes=True)
.create(prog='dot', format='svg'))
Out[27]:
In [28]:
plot_model(
model=decoder, show_shapes=False,
to_file='../../images/vae/decoder_mc_samples.svg'
)
In [29]:
plot_model(
model=decoder, show_shapes=True,
to_file='../../images/vae/decoder_mc_samples_shapes.svg'
)
In [30]:
x_decoded = decoder(z)
In [31]:
# again, equivalent to above. writing out fully
# for final end-to-end vae architecture visualization;
# otherwise, sequential models just get chunked into
# single layer
h_decoded = Dense(intermediate_dim,
activation='relu',
name='decoder_hidden')(z)
x_decoded = Dense(original_dim,
activation='sigmoid',
name='x_mean')(h_decoded)
In [32]:
vae = Model(inputs=[x, eps], outputs=x_decoded)
In [33]:
SVG(model_to_dot(vae, show_shapes=True)
.create(prog='dot', format='svg'))
Out[33]:
In [34]:
plot_model(
model=vae, show_shapes=False,
to_file='../../images/vae/vae_full_mc_samples.svg'
)
In [35]:
plot_model(
model=vae, show_shapes=True,
to_file='../../images/vae/vae_full_mc_samples_shapes.svg'
)
Putting it all together¶
In [36]:
x = Input(shape=(original_dim,))
h = Dense(intermediate_dim, activation='relu')(x)
z_mu = Dense(latent_dim)(h)
z_log_var = Dense(latent_dim)(h)
z_mu, z_log_var = KLDivergenceLayer()([z_mu, z_log_var])
z_sigma = Lambda(lambda t: K.exp(.5*t))(z_log_var)
eps = Input(tensor=K.random_normal(shape=(K.shape(x)[0], mc_samples, latent_dim)))
z_eps = Multiply()([z_sigma, eps])
z = Add()([z_mu, z_eps])
decoder = Sequential([
Dense(intermediate_dim, input_dim=latent_dim, activation='relu'),
Dense(original_dim, activation='sigmoid')
])
x_mean = decoder(z)
In [37]:
def nll(y_true, y_pred):
""" Negative log likelihood. """
# keras.losses.binary_crossentropy give the mean
# over the last axis. we require the sum
return K.sum(K.binary_crossentropy(y_true, y_pred), axis=-1)
In [38]:
vae = Model(inputs=[x, eps], outputs=x_mean)
vae.compile(optimizer='rmsprop', loss=nll)
Model fitting¶
In [39]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(-1, original_dim) / 255.
x_test = x_test.reshape(-1, original_dim) / 255.
In [40]:
vae.evaluate(
x_test,
# np.tile(np.expand_dims(x_test, axis=1), (1, 5, 1)),
np.expand_dims(x_test, axis=1),
batch_size=batch_size,
)
Out[40]:
In [41]:
h = vae.fit(
x_train,
np.expand_dims(x_train, axis=1),
shuffle=True,
epochs=epochs,
batch_size=batch_size,
validation_data=(
x_test,
np.expand_dims(x_test, axis=1)
)
)
In [42]:
recons = np.squeeze(vae.predict(np.atleast_2d(x_test[0])))
recons.shape
Out[42]:
In [43]:
np.all(recons[0] == recons[-1])
Out[43]:
In [44]:
np.all(recons[1:] == recons[:-1], axis=1)
Out[44]:
In [45]:
fig, (ax1, ax2) = plt.subplots(
ncols=2,
figsize=(6, 3),
subplot_kw=dict(
xticks=[],
yticks=[],
frame_on=False
)
)
ax1.set_title('original')
ax1.imshow(x_test[0].reshape(28, 28), cmap='gray')
ax2.set_title('reconstructions')
ax2.imshow(np.block(list(map(list, recons.reshape(5, 5, 28, 28)))),
cmap='gray')
plt.savefig('../../images/vae/mc_samples_reconstructions.png')
plt.show()
In [46]:
golden_size = lambda width: (width, 2. * width / (1. + np.sqrt(5.)))
In [47]:
fig, ax = plt.subplots(figsize=golden_size(6))
pd.DataFrame(h.history).plot(ax=ax)
ax.set_ylabel('NELBO')
ax.set_xlabel('# epochs')
plt.savefig('../../images/vae/nelbo_mc_samples.svg', format='svg')
plt.show()
In [48]:
# deterministic test time encoder
test_encoder = Model(x, z_mu)
# display a 2D plot of the digit classes in the latent space
z_test = test_encoder.predict(x_test, batch_size=batch_size)
In [49]:
# display a 2D manifold of the digits
n = 15 # figure with 15x15 digits
digit_size = 28
# linearly spaced coordinates on the unit square were transformed
# through the inverse CDF (ppf) of the Gaussian to produce values
# of the latent variables z, since the prior of the latent space
# is Gaussian
u_grid = np.dstack(np.meshgrid(np.linspace(0.05, 0.95, n),
np.linspace(0.05, 0.95, n)))
z_grid = norm.ppf(u_grid)
x_decoded = decoder.predict(z_grid.reshape(n*n, 2))
x_decoded = x_decoded.reshape(n, n, digit_size, digit_size)
In [50]:
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(np.block(list(map(list, x_decoded))), cmap='gray')
ax.set_xticks(np.arange(0, n*digit_size, digit_size) + .5 * digit_size)
ax.set_xticklabels(map('{:.2f}'.format, norm.ppf(np.linspace(0.05, 0.95, n))),
rotation=90)
ax.set_yticks(np.arange(0, n*digit_size, digit_size) + .5 * digit_size)
ax.set_yticklabels(map('{:.2f}'.format, -norm.ppf(np.linspace(0.05, 0.95, n))))
ax.set_xlabel('$z_1$')
ax.set_ylabel('$z_2$')
plt.savefig('../../images/vae/result_manifold_mc_samples.png')
plt.show()
In [51]:
fig, ax = plt.subplots(figsize=(6, 5))
cbar = ax.scatter(z_test[:, 0], z_test[:, 1], c=y_test,
alpha=.4, s=3**2, cmap='viridis')
fig.colorbar(cbar, ax=ax)
ax.set_xlim(-4.5, 4.5)
ax.set_ylim(-4.5, 4.5)
ax.set_xlabel('$z_1$')
ax.set_ylabel('$z_2$')
plt.savefig('../../images/vae/result_latent_space_mc_samples.png')
plt.show()
In [52]:
fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(12, 4.5))
ax1.imshow(np.block(list(map(list, x_decoded))), cmap='gray')
ax1.set_xticks(np.arange(0, n*digit_size, digit_size) + .5 * digit_size)
ax1.set_xticklabels(map('{:.2f}'.format, norm.ppf(np.linspace(0.05, 0.95, n))),
rotation=90)
ax1.set_yticks(np.arange(0, n*digit_size, digit_size) + .5 * digit_size)
ax1.set_yticklabels(map('{:.2f}'.format, -norm.ppf(np.linspace(0.05, 0.95, n))))
ax1.set_xlabel('$z_1$')
ax1.set_ylabel('$z_2$')
cbar = ax2.scatter(z_test[:, 0], z_test[:, 1], c=y_test,
alpha=.4, s=3**2, cmap='viridis')
fig.colorbar(cbar, ax=ax2)
ax2.set_xlim(-4.5, 4.5)
ax2.set_ylim(-4.5, 4.5)
ax2.set_xlabel('$z_1$')
ax2.set_ylabel('$z_2$')
plt.savefig('../../images/vae/result_combined_mc_samples.png')
plt.show()