CNTK - 内存不足的数据集


本章将解释如何衡量内存不足数据集的性能。

在前面的章节中,我们讨论了验证神经网络性能的各种方法,但我们讨论的方法是处理适合内存的数据集的方法。

这里,问题出现了,内存不足的数据集怎么办,因为在生产场景中,我们需要大量的数据来训练NN。在本节中,我们将讨论如何在使用小批量源和手动小批量循环时衡量性能。

小批量源

在处理内存外数据集(即小批量源)时,我们需要与处理小数据集(即内存数据集)时使用的设置略有不同的损失设置以及指标。首先,我们将了解如何设置一种将数据提供给神经网络模型训练器的方法。

以下是实施步骤 -

步骤 1 - 首先,来自cntk。io 模块导入用于创建小批量源的组件,如下所示:

from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer,
 INFINITY_REPEAT
 

步骤 2 - 接下来,创建一个名为create_datasource的新函数。该函数有两个参数,即 filename 和 limit,默认值为INFINITELY_REPEAT

def create_datasource(filename, limit =INFINITELY_REPEAT)

步骤 3 - 现在,在函数中,通过使用StreamDef类为从具有三个功能的标签字段读取的标签创建流定义。我们还需要将is_sparse设置为False,如下所示:

labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)

步骤 4 - 接下来,创建从输入文件读取特征,创建StreamDef的另一个实例,如下所示。

feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)

步骤 5 - 现在,初始化CTFDeserializer实例类。指定我们需要反序列化的文件名和流,如下所示 -

deserializer = CTFDeserializer(filename, StreamDefs(labels=
label_stream, features=features_stream)

步骤 6 - 接下来,我们需要使用反序列化器创建 minisourceBatch 的实例,如下 -

Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit)
return minibatch_source

步骤 7 - 最后,我们需要提供我们在前面部分中创建的培训和测试源。我们正在使用鸢尾花数据集。

training_source = create_datasource(‘Iris_train.ctf’)
test_source = create_datasource(‘Iris_test.ctf’, limit=1)

创建MinibatchSource实例后,我们需要对其进行训练。我们可以使用与处理小型内存数据集时相同的训练逻辑。在这里,我们将使用MinibatchSource实例作为损失函数训练方法的输入,如下所示 -

以下是实施步骤 -

步骤 1 - 为了记录培训课程的输出,首先从cntk.logging模块导入ProgressPrinter,如下所示 -

from cntk.logging import ProgressPrinter

步骤 2 - 接下来,要设置培训课程,请从cntk.train模块导入培训师training_session,如下所示 -

from cntk.train import Trainer, training_session

步骤 3 - 现在,我们需要定义一些常量集,例如minibatch_sizesamples_per_epochnum_epochs,如下所示 -

minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
max_samples = samples_per_epoch * num_epochs

步骤 4 - 接下来,为了知道如何在 CNTK 训练期间读取数据,我们需要定义网络的输入变量与小批量源中的流之间的映射。

input_map = {
   features: training_source.streams.features,
   labels: training_source.streams.labels
}

步骤 5 - 接下来记录训练过程的输出,使用新的ProgressPrinter实例初始化Progress_printer变量。另外,初始化训练器并为其提供模型,如下所示 -

progress_writer = ProgressPrinter(0)
trainer: training_source.streams.labels

步骤6 - 最后,要开始训练过程,我们需要调用training_session函数,如下所示 -

session = training_session(trainer,
   mb_source=training_source,
   mb_size=minibatch_size,
   model_inputs_to_streams=input_map,
   max_samples=max_samples,
   test_config=test_config)
session.train()

训练模型后,我们可以使用TestConfig对象向此设置添加验证,并将其分配给train_session函数的test_config关键字参数。

以下是实施步骤 -

步骤 1 - 首先,我们需要从模块cntk.train导入TestConfig类,如下所示 -

from cntk.train import TestConfig

步骤 2 - 现在,我们需要使用test_source作为输入创建TestConfig的新实例 -

Test_config = TestConfig(test_source)

完整示例

from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
def create_datasource(filename, limit =INFINITELY_REPEAT)
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
deserializer = CTFDeserializer(filename, StreamDefs(labels=label_stream, features=features_stream)
Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit)
return minibatch_source
training_source = create_datasource(‘Iris_train.ctf’)
test_source = create_datasource(‘Iris_test.ctf’, limit=1)
from cntk.logging import ProgressPrinter
from cntk.train import Trainer, training_session
minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
max_samples = samples_per_epoch * num_epochs
input_map = {
   features:   training_source.streams.features,
   labels: training_source.streams.labels
 }
progress_writer = ProgressPrinter(0)
trainer: training_source.streams.labels
session = training_session(trainer,
   mb_source=training_source,
   mb_size=minibatch_size,
   model_inputs_to_streams=input_map,
   max_samples=max_samples,
   test_config=test_config)
session.train()
from cntk.train import TestConfig
Test_config = TestConfig(test_source)

输出

-------------------------------------------------------------------
average   since   average   since  examples
loss      last    metric    last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.57      1.57     0.214    0.214   16
1.38      1.28     0.264    0.289   48
[………]
Finished Evaluation [1]: Minibatch[1-1]:metric = 69.65*30;

手动小批量循环

正如我们在上面看到的,通过使用 CNTK 中的常规 API 进行训练时的指标,可以很容易地测量训练期间和训练之后我们的 NN 模型的性能。但是,另一方面,使用手动小批量循环时事情不会那么容易。

在这里,我们使用下面给出的模型,该模型具有来自 Iris Flower 数据集的 4 个输入和 3 个输出,该模型也在前面的部分中创建 -

from cntk import default_options, input_variable
from cntk.layers import Dense, Sequential
from cntk.ops import log_softmax, relu, sigmoid
from cntk.learners import sgd
model = Sequential([
   Dense(4, activation=sigmoid),
   Dense(3, activation=log_softmax)
])
features = input_variable(4)
labels = input_variable(3)
z = model(features)

接下来,模型的损失被定义为交叉熵损失函数和前面部分中使用的 F 度量指标的组合。我们将使用criteria_factory实用程序,将其创建为 CNTK 函数对象,如下所示 -

import cntk
from cntk.losses import cross_entropy_with_softmax, fmeasure
@cntk.Function
def criterion_factory(outputs, targets):
   loss = cross_entropy_with_softmax(outputs, targets)
   metric = fmeasure(outputs, targets, beta=1)
   return loss, metric
loss = criterion_factory(z, labels)
learner = sgd(z.parameters, 0.1)
label_mapping = {
   'Iris-setosa': 0,
   'Iris-versicolor': 1,
   'Iris-virginica': 2
}

现在,我们已经定义了损失函数,我们将了解如何在训练器中使用它来设置手动训练课程。

以下是实施步骤 -

步骤 1 - 首先,我们需要导入所需的包(如numpypandas)来加载和预处理数据。

import pandas as pd
import numpy as np

步骤 2 - 接下来,为了在训练期间记录信息,请导入ProgressPrinter类,如下所示 -

from cntk.logging import ProgressPrinter

步骤 3 - 然后,我们需要从 cntk.train 模块导入训练器模块,如下 -

from cntk.train import Trainer

步骤 4 - 接下来,创建ProgressPrinter的新实例,如下所示 -

progress_writer = ProgressPrinter(0)

步骤 5 - 现在,我们需要使用损失、学习者和Progress_writer参数初始化训练器,如下所示 -

trainer = Trainer(z, loss, learner, progress_writer)

步骤 6 - 接下来,为了训练模型,我们将创建一个循环,该循环将迭代数据集三十次。这将是外部训练循环。

for _ in range(0,30):

步骤 7 - 现在,我们需要使用 pandas 从磁盘加载数据。然后,为了以小批量加载数据集,请将chunksize关键字参数设置为 16。

input_data = pd.read_csv('iris.csv',
names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)

步骤 8 - 现在,创建一个内部训练 for 循环来迭代每个小批量

for df_batch in input_data:

步骤 9 - 现在在这个循环中,使用iloc索引器读取前四列,作为训练的特征并将其转换为 float32 -

feature_values = df_batch.iloc[:,:4].values
feature_values = feature_values.astype(np.float32)

步骤 10 - 现在,读取最后一列作为训练标签,如下 -

label_values = df_batch.iloc[:,-1]

步骤 11 - 接下来,我们将使用 one-hot 向量将标签字符串转换为其数字表示形式,如下所示 -

label_values = label_values.map(lambda x: label_mapping[x])

步骤 12 - 之后,获取标签的数字表示。接下来,将它们转换为 numpy 数组,这样使用它们会更容易,如下所示 -

label_values = label_values.values

步骤 13 - 现在,我们需要创建一个新的 numpy 数组,其行数与我们转换的标签值相同。

encoded_labels = np.zeros((label_values.shape[0], 3))

步骤 14 - 现在,为了创建 one-hot 编码标签,请根据数字标签值选择列。

encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.

步骤 15 - 最后,我们需要在训练器上调用train_minibatch方法,并为小批量提供处理后的特征和标签。

trainer.train_minibatch({features: feature_values, labels: encoded_labels})

完整示例

from cntk import default_options, input_variable
from cntk.layers import Dense, Sequential
from cntk.ops import log_softmax, relu, sigmoid
from cntk.learners import sgd
model = Sequential([
   Dense(4, activation=sigmoid),
   Dense(3, activation=log_softmax)
])
features = input_variable(4)
labels = input_variable(3)
z = model(features)
import cntk
from cntk.losses import cross_entropy_with_softmax, fmeasure
@cntk.Function
def criterion_factory(outputs, targets):
   loss = cross_entropy_with_softmax(outputs, targets)
   metric = fmeasure(outputs, targets, beta=1)
   return loss, metric
loss = criterion_factory(z, labels)
learner = sgd(z.parameters, 0.1)
label_mapping = {
   'Iris-setosa': 0,
   'Iris-versicolor': 1,
   'Iris-virginica': 2
}
import pandas as pd
import numpy as np
from cntk.logging import ProgressPrinter
from cntk.train import Trainer
progress_writer = ProgressPrinter(0)
trainer = Trainer(z, loss, learner, progress_writer)
for _ in range(0,30):
   input_data = pd.read_csv('iris.csv',
      names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
      index_col=False, chunksize=16)
for df_batch in input_data:
   feature_values = df_batch.iloc[:,:4].values
   feature_values = feature_values.astype(np.float32)
   label_values = df_batch.iloc[:,-1]
label_values = label_values.map(lambda x: label_mapping[x])
label_values = label_values.values
   encoded_labels = np.zeros((label_values.shape[0], 3))
   encoded_labels[np.arange(label_values.shape[0]), 
label_values] = 1.
   trainer.train_minibatch({features: feature_values, labels: encoded_labels})

输出

-------------------------------------------------------------------
average    since    average   since  examples
loss       last      metric   last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.45       1.45     -0.189    -0.189   16
1.24       1.13     -0.0382    0.0371  48
[………]

在上面的输出中,我们得到了训练期间损失和度量的输出。这是因为我们在函数对象中组合了度量和损失,并在训练器配置中使用了进度打印机。

现在,为了评估模型性能,我们需要执行与训练模型相同的任务,但这次,我们需要使用 Evaluator 实例测试模型。它如下面的 Python 代码所示:

from cntk import Evaluator
evaluator = Evaluator(loss.outputs[1], [progress_writer])
input_data = pd.read_csv('iris.csv',
   names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)
for df_batch in input_data:
   feature_values = df_batch.iloc[:,:4].values
   feature_values = feature_values.astype(np.float32)
   label_values = df_batch.iloc[:,-1]
   label_values = label_values.map(lambda x: label_mapping[x])
   label_values = label_values.values
   encoded_labels = np.zeros((label_values.shape[0], 3))
   encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.
   evaluator.test_minibatch({ features: feature_values, labels:
      encoded_labels})
evaluator.summarize_test_progress()

现在,我们将得到如下输出:

输出

Finished Evaluation [1]: Minibatch[1-11]:metric = 74.62*143;