TensorFlow的Estimators API可用于在具有多个GPU的分布式环境中训练机器学习模型。在这里,我们将通过训练tf.keras为小型Fashion-MNIST数据集编写的自定义估算器来呈现此工作流程,然后在最后展示更实用的用例。
TL; DR:基本上我们想要记住的是,tf.keras.Model可以通过tf.estimatorAPI将其转换为tf.estimator.Estimator通过该tf.keras.estimator.model_to_estimator方法的对象来训练。转换后,我们可以应用Estimators提供的机制来训练不同的硬件配置。
导入Python库
importimport osos
importimport timetime
#!pip install -q -U tensorflow-gpu#!pip i
import tensorflow as tf
import numpy as np
导入Fashion-MNIST数据集
我们将使用Fashion-MNIST数据集(https://github.com/zalandoresearch/fashion-mnist),这是MNIST的直接替代品,其中包含数千张Zalando时尚文章的灰度图像。获取训练和测试数据非常简单:
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()
我们想把这些图像的像素值从0到255转换成0到1到c之间的值,将数据集转换为[B,H,W,C]格式,其中B是批处理的图像数,H和W是高度和宽度,C是我们数据集的通道数(灰度为1),Python代码如下:
TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)
train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))
test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the test images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
接下来,我们想要将标签从整数id(例如,2或Pullover)转换为one hot编码(例如,0,0,1,0,0,0,0,0,0,0)。为此,我们将使用该tf.keras.utils.to_categorical 函数:
# How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10
train_labels = tf.keras.utils.to_categorical(train_labels,LABEL_DIMENSIONS)
test_labels = tf.keras.utils.to_categorical(test_labels,LABEL_DIMENSIONS)
# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)
建立一个tf.keras 模型
我们将使用Keras Functional API创建我们的神经网络。Keras是一个用于构建和训练深度学习模型的高级API,用户友好,模块化且易于扩展。tf.keras是TensorFlow的这个API的实现,它支持诸如Eager Execution,tf.data管道和Estimators之类的东西。
就架构而言,我们将使用ConvNets。对于每个训练示例,它们都使用一个3d张量(height, width, channels),对于灰度图像,这个张量从channels=1开始,然后返回一个3d张量。
因此,在ConvNet部分之后,我们将需要Flatten张量并添加Dense图层,其中最后一个返回LABEL_DIMENSIONS带有tf.nn.softmax激活的大小向量,Python代码如下:
inputs = tf.keras.Input(shape=(28,28,1)) # Returns a placeholder tensor
x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation=tf.nn.relu)(inputs)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS, activation=tf.nn.softmax)(x)
我们现在可以定义我们的机器学习模型,选择优化器(我们从TensorFlow中选择一个而不是使用一个tf.keras.optimizers)并编译它:
model = tf.keras.Model(inputs=inputs, outputs=predictions)
optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
model.compile(loss='categorical_crossentropy',
optimizer=optimizer,
metrics=['accuracy'])
创建一个Estimator
要从编译的Keras模型创建Estimator,我们称之为model_to_estimator方法。请注意,Keras模型的初始模型状态将保留在创建的Estimator中。
那么Estimators有什么好处呢?
- 您可以在本地主机或分布式多GPU环境中运行基于Estimator的机器学习模型,而无需更改模型;
- Estimators简化了机器学习模型开发人员之间的共享实现;
- Estimators为您构建图,因此有点像Eager Execution,没有明确的会话。
那么我们如何开始训练我们的简单tf.keras模型来使用多GPU呢?我们可以使用tf.contrib.distribute.MirroredStrategy通过同步训练进行图内复制的范例。
基本上每个worker GPU都有一个网络副本,并获取数据的子集,在该数据上计算本地梯度,然后等待所有workers以同步方式完成。然后,workers通过Ring All-reduce操作将他们的本地梯度相互通信,这通常被优化以减少网络带宽并增加吞吐量。一旦所有梯度到达,每个worker将它们平均并更新其参数,然后开始下一步。这在通过某些高速互连连接的单个节点上有多个GPU的情况下非常理想。
要使用此策略,我们首先从编译的tf.keras模型中创建一个Estimator,然后MirroredStrategy通过RunConfigconfig 为其配置。默认情况下,此配置将使用所有GPU,但您也可以num_gpus选择使用特定数量的GPU,Python示例如下:
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,
config=config)
创建一个Estimator输入函数
要将数据传输到Estimators,我们需要定义一个数据导入函数,该函数返回批量数据的数据tf.data集(images,labels)。下面的函数接受numpy数组并通过ETL过程返回数据集。
请注意,最后我们还调用了这样的prefetch方法,它将在训练时将数据缓冲到GPU,以便下一批准备就绪并等待GPU,而不是让GPU在每次迭代时等待数据。GPU可能仍然没有得到充分利用,为了改善这一点,我们可以使用fused版本的转换操作,比如shuffle_and_repeat,而不是两个单独的操作。
def input_fn(images, labels, epochs, batch_size):
# Convert the inputs to a Dataset. (E)
ds = tf.data.Dataset.from_tensor_slices((images, labels))
# Shuffle, repeat, and batch the examples. (T)
SHUFFLE_SIZE = 5000
ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
ds = ds.prefetch(2)
# Return the dataset. (L)
return ds
训练Estimator
让我们首先定义一个SessionRunHook类来记录随机梯度下降的每次迭代的次数:
class TimeHistory(tf.train.SessionRunHook):
def begin(self):
self.times = []
def before_run(self, run_context):
self.iter_time_start = time.time()
def after_run(self, run_context, run_values):
self.times.append(time.time() - self.iter_time_start)
我们可以在我们的Estimator 上调用train函数,赋予它我们定义的input_fn(带有批大小和我们想要训练的epoch的数量)和一个TimeHistory实例,通过它的hooks参数:
time_hist = TimeHistory()
BATCH_SIZE = 512
EPOCHS = 5
estimator.train(lambda:input_fn(train_images,
train_labels,
epochs=EPOCHS,
batch_size=BATCH_SIZE),
hooks=[time_hist])
性能
们现在可以用它来计算训练的总时间以及平均每秒训练的图像数量(平均输入量):
total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")
avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with
{NUM_GPUS} GPU(s)")
评估Estimator
为了检查我们的模型的性能,我们调用评估方法对我们的Estimator:
estimator.evaluate(lambda:input_fn(test_images,
test_labels,
epochs=1,
batch_size=BATCH_SIZE))
Retinal OCT(光学相干断层扫描)图像示例
为了测试更大数据集的缩放性能,我们使用Retinal OCT图像数据集(https://www.kaggle.com/paultimothymooney/kermany2018),这是Kaggle的众多优秀数据集之一。该数据集由活人视网膜的横截面X射线图像组成,分为四类:NORMAL,CNV,DME和DRUSEN:
数据集通常共有84,495个X射线JPEG图像,512x496可以通过kaggle CLI下载:
#!pip install kaggle
#!kaggle datasets download -d paultimothymooney/kermany2018
下载后,训练和测试集图像类位于各自的文件夹中,因此我们可以将模式定义为:
labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']
train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')
test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')
接下来,我们编写Estimator的输入函数,它可以获取任何文件模式,并返回调整大小的图像和one-hot编码标签,作为tf.data.Dataset。如果prefetch的buffer_size为None,则TensorFlow将自动使用最优的prefetch缓冲区大小,Python实现如下:
def input_fn(file_pattern, labels,
image_size=(224,224),
shuffle=False,
batch_size=64,
num_epochs=None,
buffer_size=4096,
prefetch_buffer_size=None):
table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
num_classes = len(labels)
def _map_func(filename):
label = tf.string_split([filename], delimiter=os.sep).values[-2]
image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
image = tf.image.convert_image_dtype(image, dtype=tf.float32)
image = tf.image.resize_images(image, size=image_size)
return (image, tf.one_hot(table.lookup(label), num_classes))
dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
if num_epochs is not None and shuffle:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size,
num_epochs))
elif shuffle:
dataset = dataset.shuffle(buffer_size)
elif num_epochs is not None:
dataset = dataset.repeat(num_epochs)
dataset = dataset.apply(
tf.contrib.data.map_and_batch(map_func=_map_func,
batch_size=batch_size,
num_parallel_calls=os.cpu_count()))
dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
return dataset
这次要训练这个机器学习模型,我们将使用一个预先训练过的VGG16,并重新训练它的最后5层:
keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),
include_top=False)
output = keras_vgg16.output
output = tf.keras.layers.Flatten()(output)
prediction = tf.keras.layers.Dense(len(labels),
activation=tf.nn.softmax)(output)
model = tf.keras.Model(inputs=keras_vgg16.input,
outputs=prediction)
for layer in keras_vgg16.layers[:-4]:
layer.trainable = False
可以按照上面的步骤进行操作,并使用NUM_GPUSGPU 在几分钟内训练我们的机器学习模型:
model.compile(loss='categorical_crossentropy',
optimizer=tf.train.AdamOptimizer(),
metrics=['accuracy'])
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,
config=config)
BATCH_SIZE = 64
EPOCHS = 1
estimator.train(input_fn=lambda:input_fn(train_folder,
labels,
shuffle=True,
batch_size=BATCH_SIZE,
buffer_size=2048,
num_epochs=EPOCHS,
prefetch_buffer_size=4),
hooks=[time_hist])
经过训练,我们可以评估测试集的准确度,该准确度应该在95%左右
estimator.evaluate(input_fn=lambda:input_fn(test_folder,
labels,
shuffle=False,
batch_size=BATCH_SIZE,
buffer_size=1024,
num_epochs=1))
最后
我们在上面展示了使用Estimators API在多个GPU上训练深度学习Keras模型是,和如何编写一个遵循最佳实践的输入管道来充分利用我们的资源(线性缩放)以及如何计时通过钩子训练吞吐量。
随着num_gpu的增加,测试集的准确性会降低。其中一个原因可能是,当我们使用更多GPU时,MirroredStrategy有效训练批量大小BATCH_SIZE*NUM_GPUS可能需要调整BATCH_SIZE或学习率。在这里NUM_GPUS,为了制作图形,我保持所有其他超参数不变,但实际上需要调整它们。
数据集的大小以及机器学习模型大小也会影响这些方案的扩展程度。GPU在读取或写入小数据时带宽较差,对于像K80这样的旧GPU尤其如此,并且可以解释上面的Fashion-MNIST图。