1

Trying to upgrade this awesome implementation of gumble-softmax-vae found here. However, I keep getting

TypeError: Cannot convert a symbolic Keras input/output to a numpy array. 

I am stumped - tried many many things. Interestingly some searches return with other implementation of VAEs. I believe the error is somewhere in the "KL" term calculation of the loss.

Here is the almost working code:

import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt


batch_size = 10
data_dim = 784

M = 10  # classes
N = 30  # how many distributions

nb_epoch = 100
epsilon_std = 0.01
anneal_rate = 0.0003
min_temperature = 0.5

tau = tf.Variable(5.0, dtype=tf.float32)


class Sampling(keras.layers.Layer):
    def call(self, logits_y):
        u = tf.random.uniform(tf.shape(logits_y), 0, 1)
        y = logits_y - tf.math.log(
            -tf.math.log(u + 1e-20) + 1e-20
        )  # logits + gumbel noise
        y = tf.nn.softmax(tf.reshape(y, (-1, N, M)) / tau)
        y = tf.reshape(y, (-1, N * M))
        return y


encoder_inputs = keras.Input(shape=(data_dim))
x = keras.layers.Dense(512, activation="relu")(encoder_inputs)
x = keras.layers.Dense(256, activation="relu")(x)
logits_y = keras.layers.Dense(M * N, name="logits_y")(x)
z = Sampling()(logits_y)
encoder = keras.Model(encoder_inputs, z, name="encoder")
encoder.build(encoder_inputs)

print(encoder.summary())

decoder_inputs = keras.Input(shape=(N * M))
x = keras.layers.Dense(256, activation="relu")(decoder_inputs)
x = keras.layers.Dense(512, activation="relu")(x)
decoder_outputs = keras.layers.Dense(data_dim, activation="sigmoid")(x)
decoder = keras.Model(decoder_inputs, decoder_outputs, name="decoder")
decoder.build(decoder_inputs)

print(decoder.summary())


class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.bce = tf.keras.losses.BinaryCrossentropy()
        self.loss_tracker = keras.metrics.Mean(name="loss")

    @property
    def metrics(self):
        return [self.loss_tracker]

    def call(self, x):
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

    @tf.function
    def gumbel_loss(self, y_true, y_pred, logits_y):
        q_y = tf.reshape(logits_y, (-1, N, M))
        q_y = tf.nn.softmax(q_y)
        log_q_y = tf.math.log(q_y + 1e-20)
        kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))
        kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))
        kl = tf.squeeze(kl, axis=0)
        elbo = data_dim * self.bce(y_true, y_pred) - kl
        return elbo

    def train_step(self, data):
        x = data

        with tf.GradientTape(persistent=True) as tape:
            z = self.encoder(x, training=True)
            x_hat = self.decoder(z, training=True)

            x = tf.cast(x, dtype=tf.float32)
            x_hat = tf.cast(x_hat, dtype=tf.float32)
            logits_y = self.encoder.get_layer('logits_y').output

            loss = self.gumbel_loss(x, x_hat, logits_y)

        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}


def main():

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
        path="mnist.npz"
    )

    x_train = x_train.astype("float32") / 255.0
    x_test = x_test.astype("float32") / 255.0
    x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
    x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

    vae = VAE(encoder, decoder, name="vae-model")
    vae_inputs = (None, data_dim)
    vae.build(vae_inputs)
    vae.compile(optimizer="adam", loss=None)
    vae.fit(
        x_train,
        shuffle=True,
        epochs=1,
        batch_size=batch_size
    )

if __name__ == "__main__":
    main()

1 Answer 1

1

I think the main issue occurs when you try to get the output from the logits_y layer, (AFAIK), you can't do that, and instead, you need to build your encoder model with two outputs. Something like this way

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        # self.encoder = encoder 
        self.encoder = tf.keras.Model(inputs=encoder.input, 
                                      outputs=[encoder.get_layer(name='logits_y').output, 
                                               encoder.output])
        
        whatever...

So, in the training loop, this self.encoder will produce two outputs, one of them is the output of layer logit_y, which you need for some loss function. Lastly, change a few codes in other places for this, as follows

def call(self, x):
        _, z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

@tf.function
    def gumbel_loss(self, y_true, y_pred, logits_y):
        q_y = tf.reshape(logits_y, (-1, N, M))
        q_y = tf.nn.softmax(q_y)
        log_q_y = tf.math.log(q_y + 1e-20)
        kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))
        kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))
        elbo = data_dim * self.bce(y_true, y_pred) - kl
        return elbo

And lastly, the train_step function; note, corresponding variables are already in tf.float32, no need to convert.

   def train_step(self, data):
        x = data
        with tf.GradientTape(persistent=True) as tape:
            logits_y, z = self.encoder(x, training=True)
            x_hat = self.decoder(z, training=True)
            loss = self.gumbel_loss(x, x_hat, logits_y) 
        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

You don't need to change anything of the above code now, here is some training logs (running on cpu, tf 2.5).

Epoch 1/5
6000/6000 [==============================] - 60s 10ms/step - loss: 54.4604
Epoch 2/5
6000/6000 [==============================] - 60s 10ms/step - loss: 18.8960
Epoch 3/5
6000/6000 [==============================] - 59s 10ms/step - loss: 12.1036
Epoch 4/5
6000/6000 [==============================] - 59s 10ms/step - loss: 8.5804
Epoch 5/5
6000/6000 [==============================] - 59s 10ms/step - loss: 6.3916
Sign up to request clarification or add additional context in comments.

1 Comment

You the best - it worked. I changed my encoder output to encoder = keras.Model(encoder_inputs, [logits_y, z], name="encoder") - looked cleaner

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.