From Jan Hendrik Metzen 2017-01-03 with a small mods by d. gannon
From Metzen's intro:
The main motivation for this post was that I wanted to get more experience with both Variational Autoencoders (VAEs) and with Tensorflow. Thus, implementing the former in the latter sounded like a good idea for learning about both at the same time. This post summarizes the result.
Note: The post was updated on December 7th 2015:
Note: The post was updated on January 3rd 2017:
From d. gannon: This version uses this version uses images from the web of galaxies. it is a very small database (less than 100 images) but the autoencoder has no problem with it. this is a version of the code that we modified from another project on autoencoders. The article that describes that work is https://cloud4scieng.org/manifold-learning-and-deep-autoencoders-in-science/
#!pip install opencv-python
import numpy as np
import tensorflow as tf
import cv2
import random
import matplotlib.pyplot as plt
%matplotlib inline
np.random.seed(0)
tf.set_random_seed(0)
totalimg = 90 #1032
n_samples = totalimg
The link to download a tar'd gzipped copy of the data is here: http://www.imagexd.org/assets/cells.tar.gz . The following function is looking for the path to the un-tar'd images and a file with the name of each image one per line. you can create this file with the command
ls > classlist
def test(size, listfile, datafile):
classes = np.array([0]*size)
data = np.array([np.zeros(128*128*3, dtype=np.float32)]*size )
print('data', data.shape)
with open(listfile) as f:
i = 0
for line_of_text in f:
#print(datafile+"/"+line_of_text[:-1])
x = cv2.imread(datafile+"/"+line_of_text[:-1])
#print(x.shape)
x = cv2.resize(x, (128,128))
#print("shape =",x.shape)
img2 = cv2.imdecode(x, cv2.IMREAD_UNCHANGED)
f = 256.0
x = x/f
#print(x.shape)
data[i] = x.reshape(128*128*3)
x = line_of_text.find("class")
c = line_of_text[0]
if c=='e':
classes[i]= 0
elif c == 'b':
classes[i]= 1
else:
classes[i]= 2
i = i+1
if i == size:
break
return classes, data
The function test above loads the data file which contains 3 classes of galaxies: eliptical, barred and spiral.
the etract_class can be used to pull out the subsets of each galaxy class.
def extract_class(i, bigclasses,bigdata ):
size = 0
for x in bigclasses:
if x==i:
size = size+1
print('size =', size)
data = np.array([np.zeros(128*128*3, dtype=np.float32)]*size )
j = 0
for k in range(len(bigclasses)):
if bigclasses[k] == i:
data[j] = bigdata[k]
j = j+1
return data
classes, data = test(totalimg, './classlist',
'./galaxies')
c0data = extract_class(0, classes, data)
c1data = extract_class(1, classes, data)
c2data = extract_class(2, classes, data)
c0class = np.zeros(len(c0data), dtype=np.float32)
c1class = np.zeros(len(c1data), dtype=np.float32)+1
c2class = np.zeros(len(c2data), dtype=np.float32)+2
data[3]
plt.imshow(data[55].reshape((128,128,3)))
def get_next_batch(size, classes, data):
if size > totalimg:
s = totalimg
else:
s = size
c = np.array([0]*s)
d = np.array([np.zeros(128*128*3, dtype=np.float32)]*s )
for i in range(s):
j = np.random.randint(0,len(classes)-1)
c[i]= classes[j]
d[i]= data[j]
return d, c
print('totalimg', totalimg)
dd, cc = get_next_batch(totalimg, classes, data)
def xavier_init(fan_in, fan_out, constant=1):
""" Xavier initialization of network weights"""
# https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow
low = -constant*np.sqrt(6.0/(fan_in + fan_out))
high = constant*np.sqrt(6.0/(fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out),
minval=low, maxval=high,
dtype=tf.float32)
Based on this, we define now a class "VariationalAutoencoder" with a sklearn-like interface that can be trained incrementally with mini-batches using partial_fit. The trained model can be used to reconstruct unseen input, to generate new samples, and to map inputs to the latent space.
$$ KL(P || Q) = - \sum_x p(x)log q(x) + \sum_x p(x)log(p(x)) $$
$$ Z_{\mu} , Z_{ln(\sigma^2)} = enc(X) $$ $$ \epsilon \in N(0,1) $$ $$ Z = Z_{\mu} + \epsilon \sqrt{exp(Z_{ln(\sigma^2)})} $$ $$ X_{recon\mu} = dec(Z) $$
the loss function is
$$ ReconLoss = \sum{ (X*ln(X_{recon\mu}) + (1-X)*ln(1 - X_{recon\mu})})$$ $$ LatentLoss = \sum{ (1+Z_{ln(\sigma^2)} - {Z_{\mu}}^2 - \exp(Z_{ln(\sigma^2)})) } $$
class VariationalAutoencoder(object):
""" Variation Autoencoder (VAE) with an sklearn-like interface implemented using TensorFlow.
This implementation uses probabilistic encoders and decoders using Gaussian
distributions and realized by multi-layer perceptrons. The VAE can be learned
end-to-end.
See "Auto-Encoding Variational Bayes" by Kingma and Welling for more details.
"""
def __init__(self, network_architecture, transfer_fct=tf.nn.softplus,
learning_rate=0.001, batch_size=20):
self.network_architecture = network_architecture
self.transfer_fct = transfer_fct
self.learning_rate = learning_rate
self.batch_size = batch_size
# tf Graph input
self.x = tf.placeholder(tf.float32, [None, network_architecture["n_input"]])
# Create autoencoder network
self._create_network()
# Define loss function based variational upper-bound and
# corresponding optimizer
self._create_loss_optimizer()
# Initializing the tensor flow variables
init = tf.initialize_all_variables()
# Launch the session
self.sess = tf.InteractiveSession()
self.sess.run(init)
def _create_network(self):
# Initialize autoencode network weights and biases
network_weights = self._initialize_weights(**self.network_architecture)
# Use recognition network to determine mean and
# (log) variance of Gaussian distribution in latent
# space
self.z_mean, self.z_log_sigma_sq = \
self._recognition_network(network_weights["weights_recog"],
network_weights["biases_recog"])
# Draw one sample z from Gaussian distribution
n_z = self.network_architecture["n_z"]
eps = tf.random_normal((self.batch_size, n_z), 0, 1,
dtype=tf.float32)
# z = mu + sigma*epsilon
self.z = tf.add(self.z_mean,
tf.multiply(tf.sqrt(tf.exp(self.z_log_sigma_sq)), eps))
# Use generator to determine mean of
# Bernoulli distribution of reconstructed input
self.x_reconstr_mean = \
self._generator_network(network_weights["weights_gener"],
network_weights["biases_gener"])
def _initialize_weights(self, n_hidden_recog_1, n_hidden_recog_2,
n_hidden_gener_1, n_hidden_gener_2,
n_input, n_z):
all_weights = dict()
all_weights['weights_recog'] = {
'h1': tf.Variable(xavier_init(n_input, n_hidden_recog_1)),
'h2': tf.Variable(xavier_init(n_hidden_recog_1, n_hidden_recog_2)),
'out_mean': tf.Variable(xavier_init(n_hidden_recog_2, n_z)),
'out_log_sigma': tf.Variable(xavier_init(n_hidden_recog_2, n_z))}
all_weights['biases_recog'] = {
'b1': tf.Variable(tf.zeros([n_hidden_recog_1], dtype=tf.float32)),
'b2': tf.Variable(tf.zeros([n_hidden_recog_2], dtype=tf.float32)),
'out_mean': tf.Variable(tf.zeros([n_z], dtype=tf.float32)),
'out_log_sigma': tf.Variable(tf.zeros([n_z], dtype=tf.float32))}
all_weights['weights_gener'] = {
'h1': tf.Variable(xavier_init(n_z, n_hidden_gener_1)),
'h2': tf.Variable(xavier_init(n_hidden_gener_1, n_hidden_gener_2)),
'out_mean': tf.Variable(xavier_init(n_hidden_gener_2, n_input)),
'out_log_sigma': tf.Variable(xavier_init(n_hidden_gener_2, n_input))}
all_weights['biases_gener'] = {
'b1': tf.Variable(tf.zeros([n_hidden_gener_1], dtype=tf.float32)),
'b2': tf.Variable(tf.zeros([n_hidden_gener_2], dtype=tf.float32)),
'out_mean': tf.Variable(tf.zeros([n_input], dtype=tf.float32)),
'out_log_sigma': tf.Variable(tf.zeros([n_input], dtype=tf.float32))}
return all_weights
def _recognition_network(self, weights, biases):
# Generate probabilistic encoder (recognition network), which
# maps inputs onto a normal distribution in latent space.
# The transformation is parametrized and can be learned.
layer_1 = self.transfer_fct(tf.add(tf.matmul(self.x, weights['h1']),
biases['b1']))
layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']),
biases['b2']))
z_mean = tf.add(tf.matmul(layer_2, weights['out_mean']),
biases['out_mean'])
z_log_sigma_sq = \
tf.add(tf.matmul(layer_2, weights['out_log_sigma']),
biases['out_log_sigma'])
return (z_mean, z_log_sigma_sq)
def _generator_network(self, weights, biases):
# Generate probabilistic decoder (decoder network), which
# maps points in latent space onto a Bernoulli distribution in data space.
# The transformation is parametrized and can be learned.
layer_1 = self.transfer_fct(tf.add(tf.matmul(self.z, weights['h1']),
biases['b1']))
layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']),
biases['b2']))
x_reconstr_mean = \
tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['out_mean']),
biases['out_mean']))
return x_reconstr_mean
def _create_loss_optimizer(self):
# The loss is composed of two terms:
# 1.) The reconstruction loss (the negative log probability
# of the input under the reconstructed Bernoulli distribution
# induced by the decoder in the data space).
# This can be interpreted as the number of "nats" required
# for reconstructing the input when the activation in latent
# is given.
# Adding 1e-10 to avoid evaluation of log(0.0)
reconstr_loss = \
-tf.reduce_sum(self.x * tf.log(1e-10 + self.x_reconstr_mean)
+ (1-self.x) * tf.log(1e-10 + 1 - self.x_reconstr_mean),
1)
# 2.) The latent loss, which is defined as the Kullback Leibler divergence
## between the distribution in latent space induced by the encoder on
# the data and some prior. This acts as a kind of regularizer.
# This can be interpreted as the number of "nats" required
# for transmitting the the latent space distribution given
# the prior.
latent_loss = -0.5 * tf.reduce_sum(1 + self.z_log_sigma_sq
- tf.square(self.z_mean)
- tf.exp(self.z_log_sigma_sq), 1)
self.cost = tf.reduce_mean(reconstr_loss + latent_loss) # average over batch
# Use ADAM optimizer
self.optimizer = \
tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.cost)
def partial_fit(self, X):
"""Train model based on mini-batch of input data.
Return cost of mini-batch.
"""
opt, cost = self.sess.run((self.optimizer, self.cost),
feed_dict={self.x: X})
return cost
def transform(self, X):
"""Transform data by mapping it into the latent space."""
# Note: This maps to mean of distribution, we could alternatively
# sample from Gaussian distribution
return self.sess.run(self.z_mean, feed_dict={self.x: X})
def generate(self, z_mu=None):
""" Generate data by sampling from latent space.
If z_mu is not None, data for this point in latent space is
generated. Otherwise, z_mu is drawn from prior in latent
space.
"""
if z_mu is None:
z_mu = np.random.normal(size=self.network_architecture["n_z"])
# Note: This maps to mean of distribution, we could alternatively
# sample from Gaussian distribution
return self.sess.run(self.x_reconstr_mean,
feed_dict={self.z: z_mu})
def reconstruct(self, X):
""" Use VAE to reconstruct given data. """
return self.sess.run(self.x_reconstr_mean,
feed_dict={self.x: X})
In general, implementing a VAE in tensorflow is relatively straightforward (in particular since we don not need to code the gradient computation). A bit confusing is potentially that all the logic happens at initialization of the class (where the graph is generated), while the actual sklearn interface methods are very simple one-liners.
We can now define a simple fuction which trains the VAE using mini-batches:
def train(network_architecture, learning_rate=0.0001,
batch_size=20, training_epochs=10, display_step=5):
vae = VariationalAutoencoder(network_architecture,
learning_rate=learning_rate,
batch_size=batch_size)
# Training cycle
for epoch in range(training_epochs):
avg_cost = 0.
total_batch = int(n_samples / batch_size)
# Loop over all batches
for i in range(total_batch):
batch_xs, _ = get_next_batch(batch_size, classes, data)
# Fit training using batch data
cost = vae.partial_fit(batch_xs)
# Compute average loss
avg_cost += cost / n_samples * batch_size
# Display logs per epoch step
if epoch % display_step == 0:
print("Epoch:", '%04d' % (epoch+1),
"cost=", "{:.9f}".format(avg_cost))
return vae
We can now train a VAE by just specifying the network topology. We start with training a VAE with a 20-dimensional latent space.
network_architecture = \
dict(n_hidden_recog_1=1000, # 1st layer encoder neurons
n_hidden_recog_2=1000, # 2nd layer encoder neurons
n_hidden_gener_1=1000, # 1st layer decoder neurons
n_hidden_gener_2=1000, # 2nd layer decoder neurons
n_input=128*128*3, # MNIST data input (img shape: 28*28)
n_z=20) # dimensionality of latent space
vae = train(network_architecture, training_epochs=2500)
Based on this we can sample some test inputs and visualize how well the VAE can reconstruct those. In general the VAE does really well.
let's grab some of the class0 images and show them alongside their reconstruction
x_sample =get_next_batch(20, classes, data)[0]
x_reconstruct = vae.reconstruct(x_sample)
x_sample.shape
plt.figure(figsize=(8, 12))
for i in range(5):
plt.subplot(5, 2, 2*i + 1)
plt.imshow(x_sample[i].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")
plt.title("Test input")
plt.colorbar()
plt.subplot(5, 2, 2*i + 2)
plt.imshow(x_reconstruct[i].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")
plt.title("Reconstruction")
plt.colorbar()
plt.tight_layout()
now show a class1 example.
X= c1data[18]
plt.imshow(X.reshape((128,128,3)))
Z = vae.transform(X.reshape((1,49152)))
Z = Z+np.random.normal(size=20)+2.0
#Zex = np.array([np.zeros(128*128*3, dtype=np.float32)]*20 )
Zex = np.array([Z]*20)
Zex = Zex.reshape(20,20)
Zex.shape
z_mu = np.random.normal(size=20) #vae.network_architecture["n_z"])
z_mu= z_mu.reshape((1,20))
z_mu.shape
c1_imaginary = vae.generate(Zex)
plt.figure(figsize=(8, 12))
for i in range(0,10,2):
plt.subplot(5, 2, i + 1)
plt.imshow(c1_imaginary[i].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")
plt.title("Test input")
plt.colorbar()
plt.subplot(5, 2, i + 2)
plt.imshow(c1_imaginary[i+1].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")
plt.title("Reconstruction")
plt.colorbar()
plt.tight_layout()
i
c2_recon = vae.reconstruct(c2data[:20])
plt.imshow(c2_recon[18].reshape(128,128, 3))
plt.imshow(c1data[15].reshape(128,128,3))
import imageio
import pickle
from IPython.display import Image
from IPython.display import display
from numpy import linalg
from numpy.linalg import norm
from scipy.spatial.distance import squareform, pdist
# We import sklearn.
import sklearn
from sklearn.manifold import TSNE
from sklearn.datasets import load_digits
from sklearn.preprocessing import scale
# We'll hack a bit with the t-SNE code in sklearn 0.15.2.
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.manifold.t_sne import (_joint_probabilities,
_kl_divergence)
#from sklearn.utils.extmath import _ravel
we will pick a path through the latent space using six start and stop points and lineraly interpreting the path between. we generate the corresponding images. the start and stop points correspond to spots in the image of the transformed sample z.
NOTE: this following steps were done with a previous execution of this notebook. that makes it consistent with the movie in the blog.
v = get_next_batch(len(c2data), c2class, c2data)[0]
z = vae.transform(v)
np.mean(z)
z.shape
start = [2,7, 12, 18, 22]
end = [7, 12, 18, 22,29]
#lets look at the class 0 images
plt.figure(figsize=(8, 12))
for i in range(5):
plt.subplot(5, 2, 2*i + 1)
plt.imshow(v[i].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")
plt.title("Test input")
plt.colorbar()
plt.subplot(5, 2, 2*i + 2)
d = np.array([z[i]]*20).reshape(20,20)
plt.imshow(vae.generate(z_mu=d)[0].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")
plt.title("Reconstruction")
plt.colorbar()
plt.tight_layout()
picdir = "/home/dbgannon/notebooks/galaxy_movie"
figlist = []
for seg in range(5):
m = 80
a = z[start[seg]]
b = z[end[seg]]
#a_ar = np.array([v]).reshape(80,128*128*3)
a_ar = np.array([np.zeros(128*128*3, dtype=np.float32)]*80 ).reshape(80,128*128*3)
d = np.array([np.zeros(20, dtype=np.float32)]*80*20 ).reshape(80*20,20)
print('da.shape', da.shape, ' db.shape', db.shape, ' d.shape', d.shape)
for s in range(m):
d[s]=(s/(m*1.0))*b+(1.0-(s/(m*1.0)))*a
a_ar[0:20] =vae.generate(z_mu=d[0:20])
a_ar[20:40] =vae.generate(z_mu=d[20:40])
a_ar[40:60] =vae.generate(z_mu=d[40:60])
a_ar[60:80] =vae.generate(z_mu=d[60:80])
for i in range(m):
filename = picdir+"/r-color"+str(seg*100+i)+".png"
figlist.append(filename)
im = plt.imsave(filename, a_ar[i].reshape(128,128, 3))#, vmin=0, vmax=1, cmap="gray")
a_ar.shape
now cat the images together to make a movie. note: you need a different movie name each time you run this because it caches the old ones.
images = []
for filename in figlist:
#print filename
images.append(imageio.imread(filename))
imageio.mimsave(picdir+'/movie9.gif', images)
#must start a simple_file_server on the host machine.
display(Image(url='http://104.210.62.79:5000/movie9.gif', width=256, height=256))
plt.imshow(a_ar[10].reshape(128,128,3))#, vmin=0, vmax=1, cmap="gray")