import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model

Exercise: Anomaly Detection

This exercise is based on the tensorflow tutorial about autoencoders. In this exercise, we will detect anomalies on the ECG5000 dataset using an RNN, an autoencoder and a variational autoencoder. This dataset contains 5,000 Electrocardiograms, each with 140 data points. We will use a simplified version of the dataset, where each example has been labeled either 0 (corresponding to an abnormal rhythm), or 1 (corresponding to a normal rhythm). We are interested in identifying the abnormal rhythms. For more information on anomaly detection, check out this interactive example.

Load and prepare ECG data

The dataset we will use is based on one from

# Load the dataset
raw_data = np.genfromtxt('data.csv')
array([-0.11252183, -2.8272038 , -3.7738969 , -4.3497511 , -4.376041  ,
       -3.4749863 , -2.1814082 , -1.8182865 , -1.2505219 , -0.47749208,
       -0.36380791, -0.49195659, -0.42185509, -0.30920086, -0.4959387 ,
       -0.34211867, -0.35533627, -0.36791303, -0.31650279, -0.41237405,
       -0.47167181, -0.41345783, -0.36461703, -0.44929829, -0.47141866,
       -0.42477658, -0.46251673, -0.55247236, -0.47537519, -0.6942    ,
       -0.7018681 , -0.59381178, -0.66068415, -0.71383066, -0.76980688,
       -0.67228161, -0.65367605, -0.63940562, -0.55930228, -0.59167032,
       -0.49322332, -0.46305183, -0.30164382, -0.23273401, -0.12505488,
       -0.15394314, -0.0243574 , -0.06560876,  0.03499926,  0.06193522,
        0.07119542,  0.12392505,  0.10312371,  0.22522849,  0.12868305,
        0.30248315,  0.25727621,  0.19635161,  0.17938297,  0.24472863,
        0.34121687,  0.32820441,  0.40604169,  0.44660507,  0.42406823,
        0.48151204,  0.4778438 ,  0.62408259,  0.57458456,  0.59801319,
        0.5645919 ,  0.607979  ,  0.62063457,  0.65625291,  0.68474806,
        0.69427284,  0.66558377,  0.57579577,  0.63813479,  0.61491695,
        0.56908343,  0.46857572,  0.44281777,  0.46827436,  0.43249295,
        0.40795792,  0.41862256,  0.36253075,  0.41095901,  0.47166633,
        0.37216676,  0.33787543,  0.22140511,  0.27399747,  0.29866408,
        0.26356357,  0.34256352,  0.41950529,  0.58660736,  0.86062387,
        1.1733446 ,  1.2581791 ,  1.4337887 ,  1.7005334 ,  1.9990431 ,
        2.1253411 ,  1.9932907 ,  1.9322463 ,  1.7974367 ,  1.5222839 ,
        1.2511679 ,  0.99873034,  0.48372242,  0.02313229, -0.19491383,
       -0.22091729, -0.24373668, -0.25469462, -0.29113555, -0.25649034,
       -0.22787425, -0.32242276, -0.28928586, -0.31816951, -0.36365359,
       -0.39345584, -0.26641886, -0.25682316, -0.28869399, -0.16233755,
        0.16034772,  0.79216787,  0.93354122,  0.79695779,  0.57862066,
        0.2577399 ,  0.22807718,  0.12343082,  0.92528624,  0.19313742,
        1.        ])
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21

Normalize the data to [0,1].

min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

We separate the normal rhythms from the abnormal rhythms.

train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

Plot a normal ECG.

plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")


Plot an anomalous ECG.

plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")


RNN for anomaly detection

Since we have access to the labels of the dataset, we can frame the anomaly detection as a supervised learning problem. Similar to the detection of exoplanets, where a time series of light intensities was labeled as having either an exoplanet as cause or not, we want to predict the label of the time series of ecg data.

# Reshape the dataset as we saw in the exoplanet problem
train_data = np.expand_dims(train_data,2)

# Create the model 
model = tf.keras.Sequential()
model.add(layers.LSTM(100, input_shape=(train_data.shape[1],train_data.shape[2])))
model.add(layers.Dense(1, activation='sigmoid'))
Model: "sequential"
Layer (type)                 Output Shape              Param #   
lstm (LSTM)                  (None, 100)               40800     
dropout (Dropout)            (None, 100)               0         
dense (Dense)                (None, 1)                 101       
Total params: 40,901
Trainable params: 40,901
Non-trainable params: 0
# Compile the model
model.compile(loss = 'binary_crossentropy', optimizer='adam')
# Fit the model
history =, train_labels, epochs = 20, batch_size = 256)
Epoch 1/20
16/16 [==============================] - 3s 219ms/step - loss: 0.6538
16/16 [==============================] - 4s 223ms/step - loss: 0.0839
plt.plot(history.history["loss"], label="Training Loss")


# Predict the labels of the testset
preds = model.predict(np.expand_dims(test_data,2))
preds = np.array((np.array(np.round(preds,0).flatten(),dtype=int) > 0).tolist())

# Compute the metrics
accuracy_test_rnn= accuracy_score(test_labels, preds)
print('Accuracy = ',accuracy_test_rnn)

precision_test_rnn=precision_score(test_labels, preds)
print('Precision = ',precision_test_rnn)

recall_test_rnn=recall_score(test_labels, preds)
print('Recall = ',recall_test_rnn)
Accuracy =  0.976
Precision =  0.9785714285714285
Recall =  0.9785714285714285

Autoencoder for anomaly detection

Usually we do not have access to well labeled datasets, but have to frame the problem as an unsupervised learning process. But how can we use an autoencoder in this setting? The objective of an autoencoder is to minimize the reconstruction error of a given input. We will therefore train an autoencoder solely on the normal ecg sequences, such that it reconstructs these examples with minimal error. The idea now is the following: Abnormal rhythms should have a higher reconstruction error as the normal sequences, allowing us to classify a rhythm as an anomaly if the reconstruction error is higher than a fixed threshold.

Build the model

class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    # Define the encoder network
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])
    # Define the decoder network
    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(140, activation="sigmoid")])
  def call(self, x):
    # Define how an evaluation of the network is performed
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()
# Compile the model
autoencoder.compile(optimizer='adam', loss='mae')

Notice that the autoencoder is trained using only the normal ECGs.

history =, normal_train_data, 
          validation_data=(normal_test_data, normal_test_data),
Epoch 1/20
10/10 [==============================] - 0s 11ms/step - loss: 0.0580 - val_loss: 0.0559
10/10 [==============================] - 0s 3ms/step - loss: 0.0200 - val_loss: 0.0197
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")


Let’s take a look at the signal after encoding and decoding by the autoencoder.

encoded_imgs = autoencoder.encoder(normal_test_data).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()

plt.fill_between(np.arange(140), decoded_imgs[0], normal_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])


Creating a similar plot for an anomalous test example:

encoded_imgs = autoencoder.encoder(anomalous_test_data).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()

plt.fill_between(np.arange(140), decoded_imgs[0], anomalous_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])


Detect anomalies

We will detect anomalies by calculating whether the reconstruction loss is greater than a fixed threshold. For this we use the mean average error for normal examples from the training set, then classify future examples as anomalous if the reconstruction error is higher than one standard deviation from the training set.

Ploting the reconstruction error on normal ECGs from the training set

reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss, bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")


Choose a threshold value that is one standard deviation above the mean.

threshold_ae = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold_ae)
Threshold:  0.031951994

Note: There are other strategies you could use to select a threshold value above which test examples should be classified as anomalous, the correct approach will depend on your dataset.

Most anomalous examples in the test set have a greater reconstruction error than the threshold. By changing the threshold, we can adjust precision and recall of the classifier.

reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss, bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")


Classify an ECG as an anomaly if the reconstruction error is greater than the threshold.

def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, preds)))
  print("Precision = {}".format(precision_score(labels, preds)))
  print("Recall = {}".format(recall_score(labels, preds)))
preds = predict(autoencoder, test_data, threshold_ae)
print_stats(preds, test_labels)
Accuracy = 0.942
Precision = 0.9921568627450981
Recall = 0.9035714285714286

Variational Autoencoder for anomaly detection

Autoencoders have a strong tendency to overfit on the training data. In class you got to know variational autoencoders, which are designed to mitigate this problem. First, define a function performing the sampling in the laten space using the reparametrization trick (this allows backpropagation of the gradient).

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

Define the encoder part of the network, containing the sampling step.

def encoder_model(normal_train_data):
    encoder_inputs = tf.keras.Input(shape=(normal_train_data.shape[1]))
    x = layers.Dense(32, activation="relu")(encoder_inputs)
    x = layers.Dense(16, activation="relu")(x)
    x = layers.Dense(8, activation="relu")(x)
    # So far we just copied the network from above
    # Now we generate the latent space of mean and log-variance, in this case of dimension 8
    z_mean = layers.Dense(8, name="z_mean")(x)
    z_log_var = layers.Dense(8, name="z_log_var")(x)
    # Sample from these distributions
    z = Sampling()([z_mean, z_log_var])
    encoder = tf.keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
    return encoder

Define the decoding network.

def decoder_model(normal_train_data):
    # Recreate the network we used for the 'normal' autoencoder
    latent_inputs = tf.keras.Input(shape=(8,))
    x = layers.Dense(16, activation="relu")(latent_inputs)
    x = layers.Dense(32, activation="relu")(x)
    decoder_outputs = layers.Dense(normal_train_data.shape[1], activation="relu")(x)
    decoder = tf.keras.Model(latent_inputs, decoder_outputs, name="decoder")
    return decoder
class VAE(tf.keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.keras.losses.binary_crossentropy(data, reconstruction)
            kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
            kl_loss = tf.reduce_mean(kl_loss)
            kl_loss *= -0.5
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
# Get the encoder and decoder models
encoder = encoder_model(normal_train_data)
decoder = decoder_model(normal_train_data)

# Get the combined model
vae = VAE(encoder, decoder)

# Compile the model

# Fit the model to the training set
history =, normal_train_data, 
Epoch 1/80
19/19 [==============================] - 0s 2ms/step - loss: 3.4000 - reconstruction_loss: 3.3788 - kl_loss: 0.0211
19/19 [==============================] - 0s 2ms/step - loss: 0.6934 - reconstruction_loss: 0.6933 - kl_loss: 1.8521e-04
plt.plot(history.history["loss"], label="Training Loss")


Look at the reconstruction of a normal ecg sequence of the testset

encoded_imgs = vae.encoder(normal_test_data)
decoded_imgs = vae.decoder(encoded_imgs).numpy()

plt.fill_between(np.arange(140), decoded_imgs[0], normal_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])


As before, we compute the threshold from the mean absolute error plus one standard deviation

reconstructions = vae.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss, bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")


threshold_vae = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold_vae)
Threshold:  0.11696462
reconstructions = vae.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss, bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")


Classify an ECG as an anomaly if the reconstruction error is greater than the threshold.

preds = predict(vae, test_data, threshold_vae)
print("Variational Autoencoder")
print_stats(preds, test_labels)
Variational Autoencoder
Accuracy = 0.91
Precision = 0.9351851851851852
Recall = 0.9017857142857143
preds = predict(autoencoder, test_data, threshold_ae)
print_stats(preds, test_labels)
Accuracy = 0.942
Precision = 0.9921568627450981
Recall = 0.9035714285714286
print("RNN as classifier")
print('Accuracy = ',accuracy_test_rnn)
print('Precision = ',precision_test_rnn)
print('Recall = ',recall_test_rnn)
RNN as classifier
Accuracy =  0.979
Precision =  0.9696428571428571
Recall =  0.9926873857404022