Monkey Patching Python Code

Author: Adrian Tam

Python is a dynamic scripting language. Not only does it have a dynamic type system where a variable can be assigned to one type first and changed later, but its object model is also dynamic. This allows us to modify its behavior at run time. A consequence of this is the possibility of monkey patching. This is an idea that we can modify the base layer of a program without modifying the higher-level code. Imagine you can use the print() function to print something to the screen, and we can modify the definition of this function to print it to a file without modifying any single line of your code.

It is possible because Python is an interpreted language, so we can make changes while the program is running. We can make use of this property in Python to modify the interface of a class or a module. It’s useful if we are dealing with legacy code or code from other people in which we do not want to modify it extensively but still want to make it run with different versions of libraries or environments. In this tutorial, we are going to see how we can apply this technique to some Keras and TensorFlow code.

After finishing this tutorial, you will learn:

  • What is monkey patching
  • How to change an object or a module in Python at runtime

Let’s get started.

Monkey Patching Python Code. Photo by Juan Rumimpunu. Some rights reserved.

Tutorial Overview

This tutorial is in three parts; they are:

  • One model, two interfaces
  • Extending an object with monkey patching
  • Monkey patching to revive legacy code

One Model, Two Interfaces

TensorFlow is a huge library. It provides a high-level Keras API to describe deep learning models in layers. It also comes with a lot of functions for training, such as different optimizers and data generators. It is overwhelming to install TensorFlow just because we need to run our trained model. Therefore, TensorFlow provides us with a counterpart called TensorFlow Lite that is much smaller in size and suitable to run in small devices such as mobile or embedded devices.

We want to show how the original TensorFlow Keras model and the TensorFlow Lite model are used differently. So let’s make a model of moderate size, such as the LeNet-5 model. Below is how we load the MNIST dataset and train a model for classification:

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping

# Load MNIST data
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Reshape data to shape of (n_sample, height, width, n_channel)
X_train = np.expand_dims(X_train, axis=3).astype('float32')
X_test = np.expand_dims(X_test, axis=3).astype('float32')

# LeNet5 model: ReLU can be used intead of tanh
model = Sequential([
    Conv2D(6, (5,5), input_shape=(28,28,1), padding="same", activation="tanh"),
    AveragePooling2D((2,2), strides=2),
    Conv2D(16, (5,5), activation="tanh"),
    AveragePooling2D((2,2), strides=2),
    Conv2D(120, (5,5), activation="tanh"),
    Flatten(),
    Dense(84, activation="tanh"),
    Dense(10, activation="softmax")
])

# Training
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["sparse_categorical_accuracy"])
earlystopping = EarlyStopping(monitor="val_loss", patience=4, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping])

Running the above code will download the MNIST dataset using the TensorFlow’s dataset API and train the model. Afterward, we can save the model:

model.save("lenet5-mnist.h5")

Or we can evaluate the model with our test set:

print(np.argmax(model.predict(X_test), axis=1))
print(y_test)

Then we should see:

[7 2 1 ... 4 5 6]
[7 2 1 ... 4 5 6]

But if we intend to use it with TensorFlow Lite, we want to convert it to the TensorFlow Lite format as follows:

# tflite conversion with dynamic range optimization
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

# Optional: Save the data for testing
import numpy as np
np.savez('mnist-test.npz', X=X_test, y=y_test)

# Save the model.
with open('lenet5-mnist.tflite', 'wb') as f:
    f.write(tflite_model)

We can add more options to the converter, such as reducing the model to use a 16-bit floating point. But in all cases, the output of the conversion is a binary string. Not only will the conversion reduce the model to a much smaller size (compared to the size of the HDF5 file saved from Keras), but it will also allow us to use it with a lightweight library. There are libraries for Android and iOS mobile devices. If you’re using embedded Linux, you may find the tflite-runtime module from the PyPI repository (or you may compile one from TensorFlow source code). Below is how we can use tflite-runtime to run the converted model:

import numpy as np
import tflite_runtime.interpreter as tflite

loaded = np.load('mnist-test.npz')
X_test = loaded["X"]
y_test = loaded["y"]
interpreter = tflite.Interpreter(model_path="lenet5-mnist.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(input_details[0]['shape'])

rows = []
for n in range(len(X_test)):
    # this model has single input and single output
    interpreter.set_tensor(input_details[0]['index'], X_test[n:n+1])
    interpreter.invoke()
    row = interpreter.get_tensor(output_details[0]['index'])
    rows.append(row)
rows = np.vstack(rows)

accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)

In fact, the larger TensorFlow library can also run the converted model in a very similar syntax:

import numpy as np
import tensorflow as tf

interpreter = tf.lite.Interpreter(model_path="lenet5-mnist.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

rows = []
for n in range(len(X_test)):
    # this model has single input and single output
    interpreter.set_tensor(input_details[0]['index'], X_test[n:n+1])
    interpreter.invoke()
    row = interpreter.get_tensor(output_details[0]['index'])
    rows.append(row)
rows = np.vstack(rows)

accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)

Note the different ways of using the models: In the Keras model, we have the predict() function that takes a batch as input and returns a result. In the TensorFlow Lite model, however, we have to inject one input tensor at a time to the “interpreter” and invoke it, then retrieve the result.

Putting everything together, the code below is how we build a Keras model, train it, convert it to TensorFlow Lite format, and test with the converted model:

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping

# Load MNIST data
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Reshape data to shape of (n_sample, height, width, n_channel)
X_train = np.expand_dims(X_train, axis=3).astype('float32')
X_test = np.expand_dims(X_test, axis=3).astype('float32')

# LeNet5 model: ReLU can be used intead of tanh
model = Sequential([
    Conv2D(6, (5,5), input_shape=(28,28,1), padding="same", activation="tanh"),
    AveragePooling2D((2,2), strides=2),
    Conv2D(16, (5,5), activation="tanh"),
    AveragePooling2D((2,2), strides=2),
    Conv2D(120, (5,5), activation="tanh"),
    Flatten(),
    Dense(84, activation="tanh"),
    Dense(10, activation="softmax")
])

# Training
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["sparse_categorical_accuracy"])
earlystopping = EarlyStopping(monitor="val_loss", patience=4, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping])

# Save model
model.save("lenet5-mnist.h5")

# Compare the prediction vs test data
print(np.argmax(model.predict(X_test), axis=1))
print(y_test)

# tflite conversion with dynamic range optimization
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

# Optional: Save the data for testing
import numpy as np
np.savez('mnist-test.npz', X=X_test, y=y_test)

# Save the tflite model.
with open('lenet5-mnist.tflite', 'wb') as f:
    f.write(tflite_model)

# Load the tflite model and run test
interpreter = tf.lite.Interpreter(model_path="lenet5-mnist.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

rows = []
for n in range(len(X_test)):
    # this model has single input and single output
    interpreter.set_tensor(input_details[0]['index'], X_test[n:n+1])
    interpreter.invoke()
    row = interpreter.get_tensor(output_details[0]['index'])
    rows.append(row)
rows = np.vstack(rows)

accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)

Extending an Object with Monkey Patching

Can we use predict() in the TensorFlow Lite interpreter?

The interpreter object does not have such a function. But since we’re using Python, it is possible for us to add it using the monkey patching technique. To understand what we are doing, first, we have to note that the interpreter object we defined in the previous code may contain many attributes and functions. When we call interpreter.predict() like a function, Python will look for the one with such a name inside the object, then execute it. If no such name is found, Python will raise the AttributeError exception:

...
interpreter.predict()

That gives:

Traceback (most recent call last):
  File "/Users/MLM/pred_error.py", line 13, in <module>
    interpreter.predict()
AttributeError: 'Interpreter' object has no attribute 'predict'

To make this work, we need to add a function to the interpreter object with the name predict, and that should behave like one when it is invoked. To make things simple, we notice that our model is a sequential one with an array as input and returns an array of softmax results as output. So we can write a predict() function that behaves like the one from the Keras model, but using the TensorFlow Lite interpreter:

...

# Monkey patching the tflite model
def predict(self, input_batch):
    batch_size = len(input_batch)
    output = []

    input_details = self.get_input_details()
    output_details = self.get_output_details()
    # Run each sample from the batch
    for sample in range(batch_size):
        self.set_tensor(input_details[0]["index"], input_batch[sample:sample+1])
        self.invoke()
        sample_output = self.get_tensor(output_details[0]["index"])
        output.append(sample_output)

    # vstack the output of each sample
    return np.vstack(output)

interpreter.predict = predict.__get__(interpreter)

The last line above assigns the function we created to the interpreter object, with the name predict. The __get__(interpreter) part is required to make a function we defined to become a member function of the object interpreter.

With these, we can now run a batch:

...
out_proba = interpreter.predict(X_test)
out = np.argmax(out_proba, axis=1)
print(out)

accuracy = np.sum(out == y_test) / len(y_test)
print(accuracy)

[7 2 1 ... 4 5 6]
0.9879

This is possible because Python has a dynamic object model. We can modify attributes or member functions of an object at runtime. In fact, this should not surprise us. A Keras model needs to run model.compile() before we can run model.fit(). One effect of the compile function is to add the attribute loss to the model to hold the loss function. This is accomplished at runtime.

With the predict() function added to the interpreter object, we can pass around the interpreter object just like a trained Keras model for prediction. While they are different behind the scenes, they share the same interface so other functions can use it without modifying any line of code.

Below is the complete code to load our saved TensorFlow Lite model, then monkey patch the predict() function to it to make it look like a Keras model:

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist

# Load MNIST data and reshape
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = np.expand_dims(X_train, axis=3).astype('float32')
X_test = np.expand_dims(X_test, axis=3).astype('float32')

# Monkey patching the tflite model
def predict(self, input_batch):
    batch_size = len(input_batch)
    output = []

    input_details = self.get_input_details()
    output_details = self.get_output_details()
    # Run each sample from the batch
    for sample in range(batch_size):
        self.set_tensor(input_details[0]["index"], input_batch[sample:sample+1])
        self.invoke()
        sample_output = self.get_tensor(output_details[0]["index"])
        output.append(sample_output)

    # vstack the output of each sample
    return np.vstack(output)

# Load and monkey patch
interpreter = tf.lite.Interpreter(model_path="lenet5-mnist.tflite")
interpreter.predict = predict.__get__(interpreter)
interpreter.allocate_tensors()

# test output
out_proba = interpreter.predict(X_test)
out = np.argmax(out_proba, axis=1)
print(out)
accuracy = np.sum(out == y_test) / len(y_test)
print(accuracy)

Monkey Patching to Revive Legacy Code

We can give one more example of monkey patching in Python. Consider the following code:

# https://machinelearningmastery.com/dropout-regularization-deep-learning-models-keras/
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasClassifier
from keras.constraints import maxnorm
from keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv("sonar.csv", header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)

# dropout in hidden layers with weight constraint
def create_model():
	# create model
	model = Sequential()
	model.add(Dense(60, input_dim=60, activation='relu', kernel_constraint=maxnorm(3)))
	model.add(Dropout(0.2))
	model.add(Dense(30, activation='relu', kernel_constraint=maxnorm(3)))
	model.add(Dropout(0.2))
	model.add(Dense(1, activation='sigmoid'))
	# Compile model
	sgd = SGD(lr=0.1, momentum=0.9)
	model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy'])
	return model

estimators = []
estimators.append(('standardize', StandardScaler()))
estimators.append(('mlp', KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print("Hidden: %.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100))

This code was written a few years back and assumes an older version of Keras with TensorFlow 1.x. The data file sonar.csv can be found in the other post. If we run this code with TensorFlow 2.5, we will see the issue of an ImportError on the line of SGD. We need to make two changes at a minimum in the above code in order to make it run:

  1. Functions and classes should be imported from tensorflow.keras instead of keras
  2. The constraint class maxnorm should be in camel case, MaxNorm

The following is the updated code, in which we modified only the import statements:

# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.constraints import MaxNorm as maxnorm
from tensorflow.keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv("sonar.csv", header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)

# dropout in hidden layers with weight constraint
def create_model():
	# create model
	model = Sequential()
	model.add(Dense(60, input_dim=60, activation='relu', kernel_constraint=maxnorm(3)))
	model.add(Dropout(0.2))
	model.add(Dense(30, activation='relu', kernel_constraint=maxnorm(3)))
	model.add(Dropout(0.2))
	model.add(Dense(1, activation='sigmoid'))
	# Compile model
	sgd = SGD(lr=0.1, momentum=0.9)
	model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy'])
	return model

estimators = []
estimators.append(('standardize', StandardScaler()))
estimators.append(('mlp', KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print("Hidden: %.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100))

If we have a much bigger project with a lot of scripts, it would be tedious to modify every single line of import. But Python’s module system is just a dictionary at sys.modules. Therefore we can monkey patch it to make the old code fit with the new library. The following is how we do it. This works for TensorFlow 2.5 installations (this backward compatibility issue of Keras code was fixed in TensorFlow 2.9; hence you don’t need this patching in the latest version of libraries):

# monkey patching
import sys
import tensorflow.keras
tensorflow.keras.constraints.maxnorm = tensorflow.keras.constraints.MaxNorm
for x in sys.modules.keys():
    if x.startswith("tensorflow.keras"):
        sys.modules[x[len("tensorflow."):]] = sys.modules[x]

# Old code below:

# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasClassifier
from keras.constraints import maxnorm
from keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv("sonar.csv", header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)

# dropout in hidden layers with weight constraint
def create_model():
	# create model
	model = Sequential()
	model.add(Dense(60, input_dim=60, activation='relu', kernel_constraint=maxnorm(3)))
	model.add(Dropout(0.2))
	model.add(Dense(30, activation='relu', kernel_constraint=maxnorm(3)))
	model.add(Dropout(0.2))
	model.add(Dense(1, activation='sigmoid'))
	# Compile model
	sgd = SGD(lr=0.1, momentum=0.9)
	model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy'])
	return model

estimators = []
estimators.append(('standardize', StandardScaler()))
estimators.append(('mlp', KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print("Hidden: %.2f%% (%.2f%%)" % (results.mean()*100, results.std()*100))

This is definitely not a clean and tidy code, and it will be a problem for future maintenance. Therefore, monkey patching is unwelcomed in production code. However, this would be a quick technique that exploited the inner mechanism of Python language to get something to work easily.

Further Readings

This section provides more resources on the topic if you are looking to go deeper.

Articles

Summary

In this tutorial, we learned what monkey patching is and how to do it. Specifically,

  • We learned how to add a member function to an existing object
  • How to modify the Python module cache at sys.modules to deceive the import statements

The post Monkey Patching Python Code appeared first on Machine Learning Mastery.

Go to Source