Abstract
The purpose of this script is to build a CNN model from scratch, understand various stages, play around with mode architecture and regularization to achieve in the end a relatively good test accuracy. The final result shows close to 95% accuracy on test dataset.
Key highlights:
import os
import zipfile
import random
import shutil
from shutil import copyfile
from os import getcwd
import pathlib
import datetime
import cv2
from PIL import Image
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import RMSprop, Adam, SGD
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.models import model_from_json
from sklearn.model_selection import train_test_split
import matplotlib.pylab as plt
import numpy as np
#Write twice, because sometimes it does not work the first time, especially if you are switching back to notebook
%matplotlib notebook
%matplotlib notebook
The images are under two folders, /train, /test under /chest_xray_images folder. Each folder /train and /test subsequently contains two categories Pneumonia and Normal.
current_working_dir = os.getcwd()
path_to_images = pathlib.Path(os.path.join(current_working_dir, 'chest_xray_images'))
path_to_training = pathlib.Path(os.path.join(path_to_images, 'train'))
path_to_testing = pathlib.Path(os.path.join(path_to_images, 'test'))
#============ Total training images ===============
total_images = 0
for root, dirs, files in os.walk(path_to_training):
total_images += len(files)
print("Total training images: ", total_images)
#=============== Total testing images ===============
total_images = 0
for root, dirs, files in os.walk(path_to_testing):
total_images += len(files)
print("Total testing images: ", total_images)
#=============== Total images ===============
total_images = 0
for root, dirs, files in os.walk(path_to_images):
total_images += len(files)
print("Total images: ", total_images)
new_img_size = (225, 225)
batch_size = 32
train_datagen = ImageDataGenerator(
rescale=1./255,
shear_range=0.2,
zoom_range=0.35,
#vertical_flip=True,
)
test_datagen = ImageDataGenerator(rescale=1./255)
seed = 107
training_ds = train_datagen.flow_from_directory(path_to_training,
seed=seed,
shuffle=True,
target_size=new_img_size,
batch_size=batch_size,
class_mode='binary')
test_ds = test_datagen.flow_from_directory(path_to_testing,
shuffle=True,
seed=seed,
target_size=new_img_size,
batch_size=batch_size,
class_mode='binary')
class_names = training_ds.class_indices
print(class_names)
#invert the dictionary
class_names = {v: k for k, v in class_names.items()}
print(class_names)
Take 1 batch from the validation dataset and plot.
plt.figure(figsize=(8, 8))
image_batch, label_batch = training_ds.next()
for i in range(9):
ax = plt.subplot(3, 3, i + 1)
plt.imshow(image_batch[i])
plt.title(class_names[label_batch[i]])
plt.axis("off")
Use at least 3 layers. The number of filters are increased progressively starting with 16. We use dropout as regularization to avoid over fitting. We do not want high dropout in the initial layers, usually it is progressively increased over the deeper layers. And finally, since we are using ReLu activation function, using kernel_initializer='he_uniform' is highly recommended.
# lr_reduce = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_acc', factor=0.1, min_delta=0.0001, patience=3, verbose=1)
lr_reduce = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4, verbose=2, mode='max')
model = tf.keras.models.Sequential([
#ayers.experimental.preprocessing.Rescaling(1./255,input_shape=(new_img_size[0],new_img_size[1],3)),
#tf.keras.layers.experimental.preprocessing.RandomFlip('horizontal'),
#tf.keras.layers.experimental.preprocessing.RandomRotation(0.3),
tf.keras.layers.Conv2D(16, (3,3), input_shape=(new_img_size[0],new_img_size[1],3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.Conv2D(16, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D(2,2),
#tf.keras.layers.Dropout(0.2),
tf.keras.layers.SeparableConv2D(32, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.SeparableConv2D(32, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D(2,2),
#tf.keras.layers.Dropout(0.2),
tf.keras.layers.SeparableConv2D(64, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.SeparableConv2D(64, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D(2,2),
#tf.keras.layers.Dropout(0.2),
tf.keras.layers.SeparableConv2D(128, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.SeparableConv2D(128, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D(2,2),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.SeparableConv2D(256, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.SeparableConv2D(256, (3,3), activation='relu', kernel_initializer='he_uniform', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D(2,2),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation='relu', kernel_initializer='he_uniform'),
#tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(128, activation='relu', kernel_initializer='he_uniform'),
#tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(64, activation='relu', kernel_initializer='he_uniform'),
#tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(1, activation='sigmoid')
])
learning_rate = 0.0005
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc']) #SGD(lr=0.007, momentum=0.9), #RMSprop(lr=0.0001), #Adam(lr=0.001)
#model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])
#print(model.summary())
#==================================================
# This function keeps the initial learning rate for the first ten epochs
# and decreases it exponentially after that.
# def scheduler(epoch, lr):
# if epoch < 10:
# return lr
# else:
# return lr * tf.math.exp(-0.1)
# lr_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
#==================================================
#==================================================
class MyThresholdCallback(tf.keras.callbacks.Callback):
def __init__(self, threshold):
super(MyThresholdCallback, self).__init__()
self.threshold = threshold
def on_epoch_end(self, epoch, logs=None):
val_acc = logs["val_acc"]
if val_acc >= self.threshold:
self.model.stop_training = True
early_stopping_callback = MyThresholdCallback(threshold=0.951)
logdir = os.path.join(
"logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
modelfname = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")+'bs_'+ str(batch_size) + 'lr' + str(learning_rate) +'.h5'
model_folder = pathlib.Path(os.path.join(current_working_dir,'model'))
mcp_save = tf.keras.callbacks.ModelCheckpoint(os.path.join(model_folder, modelfname), save_best_only=True, monitor='val_loss', mode='min')
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)
print(logdir)
# # ======= Go to the logs folder within the current project. ===========
# # ======= Open a terminal, activate correct env, run following command to launch tensorboard
# # tensorboard --port=6007 --logdir ~/logs
# # =====================================================================
epochs = 200
history = model.fit(
training_ds,
validation_data=test_ds,
epochs=epochs,
verbose=1,
callbacks=[tensorboard_callback, mcp_save, early_stopping_callback] #lr_reduce
)
#model.save(os.path.join(model_folder, modelfname))
# PLOT LOSS AND ACCURACY
#import matplotlib.image as mpimg
#import matplotlib.pyplot as plt
#-----------------------------------------------------------
# Retrieve a list of list results on training and test data
# sets for each training epoch
#-----------------------------------------------------------
acc=history.history['acc']
val_acc=history.history['val_acc']
loss=history.history['loss']
val_loss=history.history['val_loss']
epochs=np.arange(len(acc)) # Get number of epochs
#------------------------------------------------
# Plot training and validation accuracy per epoch
#------------------------------------------------
plt.figure()
plt.plot(epochs, acc, 'r', label="Training Accuracy")
plt.plot(epochs, val_acc, 'b', label="Validation Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Acc.")
plt.title('Training and validation accuracy')
plt.legend()
#------------------------------------------------
# Plot training and validation loss per epoch
#------------------------------------------------
plt.figure()
plt.plot(epochs, loss, 'r', label="Training Loss")
plt.plot(epochs, val_loss, 'b', label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.title('Training and validation loss')
# Desired output. Charts with training and validation metrics. No crash :)
# serialize model to JSON
model_json = model.to_json()
with open(pathlib.Path(os.path.join(current_working_dir,'model','best_model','model_structure.json')), "w") as json_file:
json_file.write(model_json)
# load json and create model
json_file = open(pathlib.Path(os.path.join(current_working_dir, 'model', 'best_model', 'model_structure.json')), 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
# load weights into new model
loaded_model.load_weights(pathlib.Path(os.path.join(
current_working_dir, 'model', 'best_model', modelfname)
)
)
print("Loaded model from disk")
# use loaded model on test data
# loaded_model.compile(loss='binary_crossentropy',
# optimizer='rmsprop', metrics=['accuracy'])
image_batch, label_batch = test_ds.next()
i = random.randint(0,batch_size-1)
img_array = keras.preprocessing.image.img_to_array(image_batch[i])
img_array = tf.expand_dims(img_array, 0) # Create batch axis
true_class = int(label_batch[i])
predicted_class = model.predict_classes(img_array)[0]
predictions = model.predict(img_array)
score = predictions.flatten()[0]
print("Prediction: ",class_names[predicted_class[0]], " | Truth: ", class_names[true_class])
print("Score: ", np.round(score,2))
plt.figure()
plt.imshow(image_batch[i])
plt.title(class_names[true_class])
plt.axis("off")
We have developed Convolutional Neural Network based model without using any pre-trained models or transfer learning methods. This model achieves accuracy of ~95% on test dataset. Notice that the validation loss curve fluctuates a lot around a more or less constant average value. Whereas the training loss improves slowly. This is a clear indication of over fitting. The orver fitting is suppressed quite a bit by using dropouts. There is still some opportunity to improve the model.