目录
-
What is Reproducible Machine Learning?
-
Reproducible Machine Learning的发展与架构
-
Reproducible Machine Learning 基本工具
-
代码版本管理—— Git
-
数据版本管理—— DVC
-
实验过程监控—— Weight & Biases
-
特征管理—— Feature Store
-
实验参数设置—— Hydra
-
容器技术—— Docker
-
总结
-
参考资料
在真实工业场景下,训练好的 Machine learning 模型通常会在几个月之后出现精度变差的情况(由于新的数据与模型产生交互,导致数据出现异常值或者数据分布改变等原因导致),行业内称之为Concept drift
.
Your model’s accuracy will be at its best until you start using it. It then deteriorates as the world it was trained to predict changes. This phenomenon is called concept drift, and while it’s been heavily studied in academia for the past two decades, it’s still often ignored in industry best practices.
这其实是 Machine Learning 领域长期以来欠下的“技术债”。事实上,当前开发领域的人们讨论软件时,通常会先设计一个基本的基于 Devops 的架构,保证实现持续集成(Continuous integration,CI)和持续交付(Continuous delivery,CD)的基本功能。一个好的架构是很重要的,否则在未来添加新功能会变得更慢,成本更高。在工业开发领域也有人提出了一种被称为 MLops 的新的开发范式,即机器学习时代的 Devops。
而在学术领域中,新的更大的数据集发布后,原来的模型大多数也会出现精度急剧下降的问题,这就需要进行算法的进一步优化。因此,可复现机器学习(Reproducible Machine Learning,RML)的概念也应运而生。
1. What is Reproducible Machine Learning?
Reproducible Machine Learning,顾名思义,即为可复现的机器学习。考虑到工业界和学术界数据迭代速度的巨大差别,RML在实际实践上其实和MLops有很大区别。MLops是一种聚焦于软件开发架构的工业环境下进行合作开发和快速迭代上线的能力的标准算法开发范式。而RML则是一种解决实验室环境下不同机器合作开发、算法移植、实验记录和依据新增加的数据进行模型迭代的算法开发范式。简而言之就是,RML的要求比MLops低很多,同时主要框架参考自MLops。MLops中提出的一些Serverless,Continuous training的概念及工具,RML都没有涉及。其实也可以理解,实验室环境中毕竟算法研究工作者比较多,开发比较少,产品经理近似于没有,同时不需要进行基于数据的快速的迭代,只需要在不同师兄弟之间可以实现模型的复现和实验记录就可以了。因此RML需要掌握的技能点也比较简单,主要包括代码版本管理(Git), 数据版本管理 (DVC),实验过程监控 (Weight & Biases),特征管理(Feature Store),实验参数设置 (Hydra),以及容器技术(Docker)等。
PS:还有一个概念叫做 Continuous Machine Learning,其比 MLops 更强,其通过更新的数据监测模型推理结果,同时基于这样的结果实现模型的自动优化训练。CML 的目标是架设一种可持续的模拟人类连续获取和微调信息的能力的。其相当于是基于 MLops 范式架构做 AutoML,力图实现 Continuous integration,Continuous deliver,Continuous training (CT)的新的自动化机器学习算法来解决 Concept drift 的问题。
2. Reproducible Machine Learning的发展与架构
一般来说实验室主要的目光还是聚焦于可以实现并训练一个机器学习模型,该模型在给定用例的相关训练数据的情况下,可在测试集上实现出色的预测性能。但是,由于缺乏有效的代码管理和数据管理,当出现师兄毕业,服务器宕机等问题之后,难以实现原来的最优的实验结果。同时每次数据集更新后,需要缺乏对于上次实验的过程记录,只能依赖于最后一次的超参数结果,近乎等于从头开始调参。更官方的说,真正的挑战不是构建机器学习模型,而是构建集成的机器学习系统以及在生产环境中持续运行该系统。如同谷歌在 The High Interest Credit Card of Technical Debt 提出的,在实际的机器学习系统中,只有一小部分是由机器学习代码组成的。所需的相关元素既庞大又复杂。
用图示来表示,目前大多数实验室的训练流程如下所示,也就是仍然处于 MLOps 级别 0:手动过程。
PS:甚至有时候训练模型没来得及记录,四舍五入变成了 MLOps 级别 -1:反复手动过程
用 MLOps 级别 0 的总结,这种过程有如下特点:
-
脚本驱动的交互式手动过程:每个步骤(包括数据分析、数据准备、模型训练和验证)都是手动的。该过程需要手动执行每个步骤,并且手动从一个步骤转到另一个步骤。此过程通常由数据科学家以交互方式在笔记本中编写和执行的实验性代码驱动,直到生成有效的模型为止。 -
机器学习与操作分离:该过程会将创建模型的数据科学家与将模型用作预测服务的工程师分开。数据科学家将经过训练的模型作为工件移交给工程团队,以便在其 API 基础架构上进行部署。此移交工作可能包括将经过训练的模型放在存储位置、将模型对象签入代码库,或者将其上传到模型注册表。然后,部署模型的工程师需要在生产环境中提供所需的功能以实现低延时服务,这可能会导致训练-应用偏差。 -
不频繁发布迭代:该过程假定您的数据科学团队管理一些不会频繁更改(更改模型实现或使用新数据重新训练模型)的模型。新模型版本每年仅部署几次。 -
无 CI:由于假定几乎不更改实现,因此 CI 已被忽略。通常,测试代码是笔记本或脚本执行的一部分。实现实验步骤的脚本和笔记本由源代码控制,并生成经过训练的模型、评估指标和可视化等工件。 -
无 CD:由于不会频繁部署模型版本,因此不考虑 CD。 -
部署指的是预测服务:该过程仅涉及将经过训练的模型部署为预测服务(例如,具有 REST API 的微服务),而不是部署整个机器学习系统。 -
缺少主动性能监控:该过程不会跟踪或记录模型预测和操作,模型预测和操作是检测模型性能下降和其他模型行为偏移所必需的信息。
这些特点有些我觉得其实还可以接受,不是很影响算法的迭代。但是有却是真实存在的影响生产力的问题:
-
脚本驱动的交互式手动过程:一次一般只能测一组参数,还得盯着结果,影响调参速度 -
机器学习与操作分离:不设置Feature Store,新数据集发布后又要重新调参 -
无 CI/CD:缺乏代码版本管理,无法进行合作开发; -
缺少主动性能监控:实验过程中一般只用命令行显示和记录,没有有效的进行可视化,导致调参困难;
因此我们需要针对现有的问题进行改进,考虑到 MLops 2级里加了一些快速进行 CI/CD 的功能,实验室环境下部署那些内容消耗大量精力但是获得的回报较少,因为也不需要那么快的迭代。因此当前主流推行的RML我认为其实是一种 MLops 1 级的架构。
这种架构中使用 Hydra 等工具实现多次训练和结果记录,增设了 Source repository 用来做代码管理和数据管理,增设了 Feature Store 用于下一次新的数据集更新后的特征选择/网络架构选择,增设了可以可视化的 Performance monitoring,基本可以解决当前的主要矛盾。考虑到实验室环境下不同算法研究者间的代码移植,RML 的主要框架中还增设了 Dockerization 的操作,可以实现不同操作系统,文件系统,用户间的代码移植。
3. Reproducible Machine Learning 基本工具
代码版本管理—— Git
现在的话,实际上很多实验室都在 Github 上建立了自己的仓库,因此 Git 也就成为了 RML 里最基本的工具之一。
-
初始化 Git 仓库
要实现基于 Git 的代码版本管理,首先需要初始化一个仓库。
$ git config --global user.name "Vaew"
$ git config --global user.email [email protected]
这里有一个值得注意的就是 Github 上可以免费建立私有仓库了,因此部分处于某些原因无法开源代码的研究工作者也不必担心传到 Github 上会造成代码泄漏的问题。
-
添加文件
$ git add <file>
-
检查状态
$ git status
-
Commit消息
$ git commit -m "Message"
-
更新代码库
$ git pull
上述是一些 Git 的基本的 CRUD 操作。Git 其实有一个非常有用的功能就是它可以设置很多个不同的 Branch,比如说,你已经决定要解决前几天遇到的 #53 问题。想要新建一个分支并同时切换到那个分支上,你可以运行一个带有 -b
参数的 git checkout
命令:
$ git checkout -b iss53
Switched to a new branch "iss53"
它是下面两条命令的简写:
$ git branch iss53
$ git checkout iss53
更多的操作可以参考:https://git-scm.com/
数据版本管理—— DVC
DVC 其实就是 Data version control 的简称,它的基本操作和 Git 类似,是主流的数据版本管理工具。其可以直接基于 S3, Google cloud, Azure, Alibaba cloud, SSH server 甚至是本地物理机控制数据版本。其安装也非常简单,只需要一行:
$ pip install dvc
-
初始化
由于 DVC 基于 Git 之上运行,因此需要先进行 Git 的初始化:
$ git init
$ dvc init
$ git commit -m "Initialize DVC"
路径下会产生 .dvc/.gitignore 、 .dvc/cache/ 、 . dvc/config,其中最重要的是.dvc/cache/,DVC 会在这里建立档案的联结,也是最终会 push 到云端的文件。
-
将模型文件夹加入 DVC 进行版本控制
$ dvc add model
完成后会产生相对应的 .dvc 文件:
dvc-test
├── main.py
├── model
│ └── my_model.h5
└── model.dvc
下面以一个模型训练的完整例子来说明 DVC 工具的使用。
-
数据分割:
$ dvc run -d 要执行的程序或者输入的档案 -o 要输出的档案 python 要执行的程序
$ dvc run -d script/split_train_test.py -d script/config.py -d dataset/annotation.csv -o dataset/train.csv -o dataset/test.csv python script/split_train_test.py
执行完成后 DVC 文件变化:
$ git status -s
?? train.csv.dvc
$ cat train.csv.dvc
md5: c53dd6d76f7cdc25aaf2146db6223bf0
cmd: python script/split_train_test.py
wdir: .
deps:
- md5: 826ea439f28fb04923de739af8c26b5d
path: script/split_train_test.py
- md5: a933ce5d996b1687817a60b3453e18ed
path: script/config.py
- md5: 87c46f0402b54b960b294ef7791f7cf8
path: dataset/annotation.csv
outs:
- md5: 896389741ff20a2055acfe5c65893bf1
path: dataset/train.csv
cache: true
metric: false
persist: false
- md5: 1ea600a9a7b720cd28fe99ff6d1c3e70
path: dataset/test.csv
cache: true
metric: false
persist: false
我们可以发现:
-
DVC 以第一个 -o
的档案去加上.dvc
变成新的档案 -
由于有两个输出档案 test.csv
和train.csv
,因此都被记录在train.csv.dvc
中 -
md5 会记录档案在 .dvc/config
中的位置,而每一份档案对于 DVC 来说是建立与原来档案的链接,而非建立新的档案
$ du -sh .dvc/cache/89/* .dvc/cache/1e/*
60K .dvc/cache/89/6389741ff20a2055acfe5c65893bf1
16K .dvc/cache/1e/a600a9a7b720cd28fe99ff6d1c3e70
$ du -sh dataset/train.csv dataset/test.csv
60K dataset/train.csv
16K dataset/test.csv
最后我们用 Git 管理档案:
$ git add .
$ git commit -m "split data"
-
模型训练
$ mkdir log
$ mkdir model
$ dvc run -d script/train.py -d script/config.py -d dataset/train.csv -d dataset/test.csv -o model/model.pth -o log/train-output.txt -o log/test-output.txt python script/train.py
$ git add .
$ git commit -m "train model"
-
模型评估
值得注意的是,输出的 eval.txt
档案,是要用 -M
而並非 -o
,原因是 DVC 后续会去追踪这份档案,让我们能够快速进行比较。又因为这样的话没有输出档案因此可以用 -f
的方式來指定输出的档案。
$ dvc run -d script/evaluate.py -d script/config.py -d dataset/test.csv -d model/model.pth -M log/eval.txt -f Dvcfile python script/evaluate.py
执行完后,会建立 log/eval.txt
以及 Dvcfile
,取名文Dvcfile
是因为之后做 dvc repro
时候如果没有指定的档案,则会使用 default 的档案,而 default 档案名字是 Dvcfile
。
PS:Dvcfile
和上面步骤的 .dvc
档案是一样的,记载了 md5
、 cache
的信息。由于我们刚刚使用了 -M
输出 eval.txt
档案,此处可以使用dvc metrics show
查看模型表现:
$ dvc metrics show
log/eval.txt:
input size: 224
classes number: 7
use pretrained: True
epochs: 21
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.8626760563380281
$ cat log/eval.txt
input size: 224
classes number: 7
use pretrained: True
epochs: 21
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.8626760563380281
$ git add .
$ git commit -m "evaluation"
-
Reproduce
假设想要将 epochs 21 修改为 51:
$ git checkout -b epochs51
$ vi script/config.py # 将epochs = 21 改为 epochs = 51
$ dvc repro
由于一开始进行了有效的记录,因此 DVC 就可以很快地做到自动化复现。
$ dvc metrics show -a
epochs51:
log/eval.txt:
input size: 224
classes number: 7
use pretrained: True
epochs: 21
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.8661971830985915
master:
log/eval.txt:
input size: 224
classes number: 7
use pretrained: True
epochs: 21
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.8661971830985915
-
Merge the model to master
为了能够练习 merge ,因此我多训练了一组沒有使用 pretrained model 的模型(先切回 master 再 checkout -b)
$ git checkout master
$ git checkout -b without_pretrained
$ vi script/config.py # 将usePretrained = True 改为 False
$ dvc repro
$ git add .
$ git commit -m "without pretrained"
$ dvc metrics show -a
epochs51:
log/eval.txt:
input size: 224
classes number: 7
use pretrained: True
epochs: 51
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.8626760563380281
master:
log/eval.txt:
input size: 224
classes number: 7
use pretrained: True
epochs: 21
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.8661971830985915
wo_pretrained:
log/eval.txt:
input size: 224
classes number: 7
use pretrained: False
epochs: 21
batch size: 32
learning rate: 0.001
momentum: 0.9
accuracy: 0.30633802816901406
若我们要将 without_pretrained
以及 epochs51
merge 在一起,直接 merge 会出现很多冲突:
$ git merge epochs51
Auto-merging train.csv.dvc
CONFLICT (content): Merge conflict in train.csv.dvc
Auto-merging script/config.py
CONFLICT (content): Merge conflict in script/config.py
Auto-merging model.pth.dvc
CONFLICT (content): Merge conflict in model.pth.dvc
Auto-merging log/eval.txt
CONFLICT (content): Merge conflict in log/eval.txt
Auto-merging Dvcfile
CONFLICT (content): Merge conflict in Dvcfile
Automatic merge failed; fix conflicts and then commit the result.
因此需要进行手动的调整,以 train.csv.dvc 为例:
<<<<<<< HEAD
md5: 8d2bb1a5dd7542df80a684cf04179444
=======
md5: 0a72f2ce076e4d8f5ba6cd476d1cb464
>>>>>>> epochs51
cmd: python script/split_train_test.py
wdir: .
deps:
- md5: 826ea439f28fb04923de739af8c26b5d
path: script/split_train_test.py
<<<<<<< HEAD
- md5: acb7b60a7d2f31a2ea5c527731f3b5f7
=======
- md5: 1ffb468a9e98a8248f5071286cfd0111
>>>>>>> epochs51
path: script/config.py
- md5: 87c46f0402b54b960b294ef7791f7cf8
path: dataset/annotation.csv
outs:
- md5: 896389741ff20a2055acfe5c65893bf1
path: dataset/train.csv
cache: true
metric: false
persist: false
- md5: 1ea600a9a7b720cd28fe99ff6d1c3e70
path: dataset/test.csv
cache: true
metric: false
persist: false
需要改成:
md5: 0a72f2ce076e4d8f5ba6cd476d1cb464
cmd: python script/split_train_test.py
wdir: .
deps:
- md5: 826ea439f28fb04923de739af8c26b5d
path: script/split_train_test.py
- md5: 1ffb468a9e98a8248f5071286cfd0111
path: script/config.py
- md5: 87c46f0402b54b960b294ef7791f7cf8
path: dataset/annotation.csv
outs:
- md5: 896389741ff20a2055acfe5c65893bf1
path: dataset/train.csv
cache: true
metric: false
persist: false
- md5: 1ea600a9a7b720cd28fe99ff6d1c3e70
path: dataset/test.csv
cache: true
metric: false
persist: false
修改完后输入 dvc checkout
,目的是为了将 DVC 控制的 pipeline 切回刚刚设定的,並且重新 reproduce 。
$ dvc checkout
$ dvc repro
$ git add .
$ git commit -m "merge without_pretrained and epochs51"
最后再切回 master
去 merge 原本的 branch :
$ git checkout master
$ git merge wo_pretrained
Fast-forward
Dvcfile | 8 ++++----
log/eval.txt | 4 ++--
model.pth.dvc | 12 ++++++------
script/config.py | 2 +-
script/train.py | 1 -
train.csv.dvc | 4 ++--
6 files changed, 15 insertions(+), 16 deletions(-)
-
将数据上传到云
以 Google Cloud 为例,首先安装套件:
$ pip install dvc[gs]
建立 Bucket 后,将设定好的 gcp-test.json 加入 .dvc/config
中:
$ dvc remote add -d upstream gs://vaew-bucket/
$ dvc remote modify upstream credentialpath gcp-test.json
$ dvc push
Preparing to upload data to 'gs://vaew-bucket/'
Preparing to collect status from gs://vaew-bucket/
Collecting information from local cache...
[##############################] 100%
Collecting information from remote cache...
[##############################] 100
[##############################] 100% Analysing status
[##############################] 100% log/test-output.txt
[##############################] 100% dataset/test.csv
[##############################] 100% log/train-output.txt
[##############################] 100% dataset/train.csv
[##############################] 100% model/model.pth
DVC 会将 cache 里的资料全部上传上去。
-
获取数据
获取数据可以直接使用 Git
$ git clone https://github.com/vaew/xxx.git
$ dvc pull
实验过程监控—— Weight & Biases
实验过程监控有很多工具可以使用:Weights & Biases、MlFlow、Neptune、Comet.ml ,这里我们仅介绍 Weight & Biases。总的来说,W&B 提供了 4 个有用的工具:
-
Dashboard: 实验跟踪 -
Artifacts: 数据集版本控制、模型版本控制 -
Sweeps: 超参数优化 -
Reports: 保存和共享可重现的结果
此处以简单 Keras 分类器模型为例,首先我们进行安装和初始化:
$ pip install wandb -q
$ wandb login
wandb: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter:
然后我们在 Jupyter Notebook 创建一个新的项目,并设置如下超参数:
project_name = 'first_steps'
group_name = 'cnn'
experiment_name = '2_conv'
config_dict = {
"conv_1": 16,
"activation_1": "relu",
"kernel_size": (3, 3),
"pool_size": (2, 2),
"dropout": 0.7,
"conv_2": 32,
"activation_out": "softmax",
"optimizer": "adam",
"loss": "sparse_categorical_crossentropy",
"metric": "accuracy",
"epoch": 6,
"batch_size": 32
}
wandb.init(
project=project_name,
group=group_name,
name=experiment_name,
config=config_dict
)
config = wandb.config
config
是一个带有超参数的字典。您还可以加载 .yaml 格式的配置文件。 wandb.init
在 W&B 中创建一个新的运行并启动后台进程同步数据。接下来我们进行数据的加载并定义一个简单的 CNN 模型:
import tensorflow as tf
from tensorflow.keras.callbacks import Callback
from wandb.keras import WandbCallback
import numpy as np
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data() ##data download
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
x_train, y_train = x_train[::5], y_train[::5]
class_names = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
def cnn_mnist(config, num_classes = 10, input_shape = (28, 28, 1)): ##simple Keras CNN
img_inputs = tf.keras.Input(shape=input_shape)
conv_1 = tf.keras.layers.Conv2D(config.conv_1, kernel_size=config.kernel_size, activation=config.activation_1)(img_inputs)
pool_1 = tf.keras.layers.MaxPooling2D(pool_size=config.pool_size)(conv_1)
conv_2 = tf.keras.layers.Conv2D(config.conv_2, kernel_size=config.kernel_size, activation=config.activation_1)(pool_1)
pool_2 = tf.keras.layers.MaxPooling2D(pool_size=config.pool_size)(conv_2)
flatten = tf.keras.layers.Flatten()(pool_2)
dropout = tf.keras.layers.Dropout(config.dropout)(flatten)
dense_out = tf.keras.layers.Dense(num_classes, activation=config.activation_out)(dropout)
model = tf.keras.models.Model(inputs=img_inputs, outputs=dense_out)
model.compile(loss=config.loss, optimizer=config.optimizer, metrics=[config.metric])
return model
our_model = cnn_mnist(config)
our_model.fit(x_train, y_train, epochs=config.epoch, batch_size=config.batch_size,
validation_data=(x_test, y_test),
callbacks=[wandb.keras.WandbCallback(data_type="image",
labels=class_names)])
wandb.finish()
之后我们可以在 DashBoard 中查看我们的结果。Sweeps 是一种用于超参数和模型优化的工具,其详细的优化机制如使用 Optuna 框架进行高效的超参数优化介绍。首先定义超参数优化策略:
# Configure the sweep – specify the parameters to search through, the search strategy, the optimization metric et all.
sweep_config = {
'method': 'random', #grid, random
'metric': {
'name': 'accuracy',
'goal': 'maximize'
},
'parameters': {
'epoch': {
'values': [5, 10]
},
'dropout': {
'values': [0.3, 0.4, 0.5]
},
'conv_1': {
'values': [16, 32, 64]
},
'conv_2': {
'values': [16, 32, 64]
},
'optimizer': {
'values': ['adam', 'nadam', 'sgd', 'rmsprop']
},
'activation_1': {
'values': ['relu', 'elu', 'selu','sigmoid']
},
'kernel_size': {
'values': [(3, 3), (5, 5), (7, 7)]
},
}
}
然后让我们创建一个 sweep 并定义一个 train 函数。sweep 使用每组超参数的时候会调用此函数。
sweep_id = wandb.sweep(sweep_config, project="first_steps")
def train():
# Default values for hyper-parameters we're going to sweep over
config_defaults = {
"conv_1": 32,
"activation_1": "relu",
"kernel_size": (3, 3),
"pool_size": (2, 2),
"dropout": 0.1,
"conv_2": 64,
"activation_out": "softmax",
"optimizer": "adam",
"loss": "sparse_categorical_crossentropy",
"metric": "accuracy",
"epoch": 6,
"batch_size": 32
}
# Initialize a new wandb run
wandb.init(config=config_defaults)
# Config is a variable that holds and saves hyperparameters and inputs
config = wandb.config
model = cnn_mnist(config=config)
model.fit(x_train, y_train, epochs=config.epoch, batch_size=config.batch_size,
validation_data=(x_test, y_test),
callbacks=[wandb.keras.WandbCallback()])
wandb.agent(sweep_id, train)
得到如下输出:
除了 Sweeps 和 Dashboard,W&B 还提供了一个名为 Artifacts 的实用工具,可以实现数据和模型的记录(有点类似于上面的 DVC)。首先加载一个原始数据集,然后创建一个新的 Artifact。
from collections import namedtuple
Dataset = namedtuple("Dataset", ["x", "y"])
def load_data_split(train_size=50_000):
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_val = x_train[:train_size], x_train[train_size:]
y_train, y_val = y_train[:train_size], y_train[train_size:]
training_data = Dataset(x_train, y_train)
validation_data = Dataset(x_val, y_val)
test_data = Dataset(x_test, y_test)
datasets = [training_data, validation_data, test_data]
return datasets
def load_and_log():
with wandb.init(project=project_name, job_type="load-data") as run:
datasets = load_data_split()
names = ["training", "validation", "test"]
# Artifact
raw_data = wandb.Artifact(
"mnist-raw", type="dataset",
description="Raw MNIST dataset, splitted",
metadata={"source": "keras.datasets.mnist",
"train_data": len(datasets[0].x),
"valid_data": len(datasets[1].x),
"test_daata": len(datasets[2].x)})
for name, data in zip(names, datasets):
# Save our datasets
with raw_data.new_file(name + ".npz", mode="wb") as file:
np.savez(file, x=data.x, y=data.y)
#save Artifact
run.log_artifact(raw_data)
load_and_log()
一个输出 Artifact 会包含如下内容:
-
元数据—— 当前 Artifact 的描述。在我们的例子中,它是我们数据集的描述(来源,分割大小) -
文件——数据集 -
图形视图—— Artifacts 图表(输入、输出、过程)
我们添加一个新的 Artifact,它将描述数据预处理:
def preprocess_dataset(dataset, normalize=True, expand_dims=True, to_categorical=True):
x, y = dataset.x, dataset.y
if normalize:
x = x.astype("float32") / 255
if expand_dims:
x = np.expand_dims(x, -1)
if to_categorical:
y = tf.keras.utils.to_categorical(y, 10) # Hardcoded num_classes
return Dataset(x, y)
import os
def preprocess_and_log(preprocess_steps):
with wandb.init(project=project_name, job_type="data_preprocessing", name="preprocess_simple") as run:
processed_data = wandb.Artifact(
"mnist-preprocessed", type="dataset",
description="Preprocessed MNIST dataset",
metadata=preprocess_steps)
# which Artifact we will use
raw_data_artifact = run.use_artifact('mnist-raw:latest')
# download Artifact
raw_dataset = raw_data_artifact.download()
for split in ["training", "validation", "test"]:
datafile = split + ".npz"
data = np.load(os.path.join(raw_dataset, datafile))
raw_split = Dataset(x=data["x"], y=data["y"])
processed_dataset = preprocess_dataset(raw_split, **preprocess_steps)
with processed_data.new_file(split + ".npz", mode="wb") as file:
np.savez(file, x=processed_dataset.x, y=processed_dataset.y)
run.log_artifact(processed_data)
steps = {"normalize": True,
"expand_dims": True,
"to_categorical" : True}
preprocess_and_log(steps)
我们现在有 2 个 Artifacts:“mnist-raw”和“mnist-preprocessed”。矩形是输入/输出 Artifacts,圆形是 Artifacts 之间的过程。在图表视图的帮助下,可以轻松跟踪 pipeline 在工作过程中发生的变化。
Reports 功能则是可以直接从网站导出报告。
特征管理—— Feature Store
特征是指在训练和推断期间用来进行预测的属性或特性模型。例如,在推荐音乐播放列表的机器学习应用程序中,特征可能包括歌曲评分、播放历史以及播放时长。机器学习模型的精确度基于特征的精确集合和组成。通常,这些特征会被训练多个模型的多个团队重复使用。而且。用于训练模型的任何特征集都要可用于进行实时预测(推理)。在这些不同的访问模式中保持一个统一且最新的特征来源是一项挑战,因为大多数组织会保留两个不同的特征存储库,一个用于训练,另一个用于推理。
实际上,目前很多结构化数据的比赛强依赖于数据的特征工程,如 Prof. Andrew Ng 所言
Coming up with features is difficult, time-consuming, requires expert knowledge. ‘Applied machine learning’ is basically feature engineering.
考虑到很多情况下相似结构的数据会有一些类似的强特征,因此建立 Feature Store 就像建立 ACM 选手的板子一样,是一件能够最小化特征工程精力的事情。实际上当前时序数据的tsfresh
就是基于这样的思路来自动抽取模型的特征。特征存储是驱动高可用ML模型的数据处理部分的中心枢纽。其将原始数据转换为特征值,存储这些值,并将其用于模型训练和在线预测。通过自动化这些步骤,特征存储使得我们在数小时内针对新的数据构建和部署新的模型。
由于 AWS 关于 Feature Store 的操作很详细,而且过程也很简单,这里直接给出链接:https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-use-with-studio.html
更多关于 Feature Store 的资料可以参考:https://www.featurestore.org/
实验参数设置—— Hydra
Facebook Hydra 允许开发人员通过编写和覆盖配置来简化 Python 应用程序(尤其是机器学习方面)的开发。开发人员可以借助Hydra,通过更改配置文件来更改产品的行为方式,而不是通过更改代码来适应新的用例。Hydra 提供了一种灵活的方法来开发和维护代码及配置,从而加快了机器学习研究等领域中复杂应用程序的开发。它允许开发人员从命令行或配置文件“组合”应用程序的配置。这解决了在修改配置时可能出现的问题,例如:
-
维护配置的稍微不同的副本或添加逻辑以覆盖配置值。 -
可以在运行应用程序之前就组成和覆盖配置。 -
动态命令行选项卡完成功能可帮助开发人员发现复杂配置并减少错误。 -
可以在本地或远程启动应用程序,使用户可以利用更多的本地资源。
Hydra 其他好处包括:
-
为新用例和需求的项目添加功能变得更加容易,而无需重写大量代码。 -
减少了复杂应用程序中常见的一些样板代码,例如处理配置文件,配置日志记录和定义命令行标志。
Hydra 的安装:
$ pip install --upgrade hydra-core
Hydra 简单用例
-
减少配置文件代码量
其可以可以极大程度减少配置代码的代码量,下面给出一个简单的例子:
import hydra
@hydra.main()
def app(cfg):
print(cfg.pretty())
print("The user is : " + cfg.user)
if __name__ == "__main__":
app()
$ python3 test_hydra.py +user=ua +pwd=pa
输出如下:
Use OmegaConf.to_yaml(cfg)
category=UserWarning,
user: vaew
pwd: vaew
The user is : vaew
-
简化参数处理
常见的一个机器学习程序中,是用如下代码来处理输入和各种参数。
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
通过示例代码我们可以看出来,在 hydra 之中,我们直接使用 cfg.user 就可以。
而且还可以通过配置文件来直接处理参数,比如:
@hydra.main(config_path="conf", config_name="config")
def my_app(cfg: DictConfig) -> None:
print(OmegaConf.to_yaml(cfg))
-
输出目录
人们在做研究时经常遇到的一个问题是如何保存输出。典型的解决方案是传入一个指定输出目录的命令行标志,但是当你希望同时运行多项任务,并且必须为每个任务传递不同的输出目录时,这尤其令人恼火。Hydra 通过为每次运行生成输出目录,并在运行代码之前更改当前工作目录来解决此问题。这样可以很好地将来自同一 sweep 的任务分组在一起,同时保持每个任务与其他任务的输出分离。我们可以简单的来看看目录的变化,可以看到,在当前目录下生成了一个 outputs 目录。其内部组织是按照时间来进行,把每次运行的输出,log 和配置都归类在一起。
├── outputs
│ └── 2021-03-21
│ ├── 11-52-35
│ │ ├── .hydra
│ │ │ ├── config.yaml
│ │ │ ├── hydra.yaml
│ │ │ └── overrides.yaml
│ │ └── test_hydra.log
│ └── 11-57-55
│ ├── .hydra
│ │ ├── config.yaml
│ │ ├── hydra.yaml
│ │ └── overrides.yaml
│ └── test_hydra.log
├── test_hydra.py
-
配置所在
我们分别打开两个 .hydra 目录下的 config.yaml 文件看看。可以看到,每次运行时候,对应的参数配置都保存在其中。这样极大的方便了用户的比对和分析。
$ cat outputs/2021-03-21/11-52-35/.hydra/config.yaml
user: ua
pwd: pa
$ cat outputs/2021-03-21/11-57-55/.hydra/config.yaml
user: ub
pwd: pb
Multirun 处理组合情况
Multirun 是 Hydra 的一种功能,它可以多次运行你的函数,每次都组成一个不同的配置对象。这是一个自然的扩展,可以轻松地组合复杂的配置,并且非常方便地进行参数扫描,而无需编写冗长的脚本。
例如,对于两种参数,我们可以扫描所有 4 个组合,一个命令就是会完成所有组合的执行:
$ python test_hydra.py --multirun user=ua,ub pwd=pa,pb
得到输出如下:
[2021-03-27 11:57:54,435][HYDRA] Launching 4 jobs locally
[2021-03-27 11:57:54,435][HYDRA] #0 : +user=ua +pwd=pa
user: ua
pwd: pa
[2021-03-27 11:57:54,723][HYDRA] #1 : +user=ua +pwd=pb
user: ua
pwd: pb
[2021-03-27 11:57:54,992][HYDRA] #2 : +user=ub +pwd=pa
user: ub
pwd: pa
[2021-03-27 11:57:55,248][HYDRA] #3 : +user=ub +pwd=pb
user: ub
pwd: pb
可以看到生成如下目录树,每个参数组合对应了一个目录。
├── multirun
│ └── 2021-03-27
│ └── 11-57-53
│ ├── 0
│ │ ├── .hydra
│ │ │ ├── config.yaml
│ │ │ ├── hydra.yaml
│ │ │ └── overrides.yaml
│ │ └── test_hydra.log
│ ├── 1
│ │ ├── .hydra
│ │ │ ├── config.yaml
│ │ │ ├── hydra.yaml
│ │ │ └── overrides.yaml
│ │ └── test_hydra.log
│ ├── 2
│ │ ├── .hydra
│ │ │ ├── config.yaml
│ │ │ ├── hydra.yaml
│ │ │ └── overrides.yaml
│ │ └── test_hydra.log
│ ├── 3
│ │ ├── .hydra
│ │ │ ├── config.yaml
│ │ │ ├── hydra.yaml
│ │ │ └── overrides.yaml
│ │ └── test_hydra.log
│ └── multirun.yaml
多线程运行
Python subprocess 允许你去创建一个新的进程让其执行另外的程序,并与它进行通信,获取标准的输入、标准输出、标准错误以及返回码等。subprocess 模块中定义了一个 Popen 类,通过它可以来创建进程,并与其进行复杂的交互。Popen 是 subprocess 的核心,子进程的创建和管理都靠它处理。
构造函数:
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0,restore_signals=True, start_new_session=False, pass_fds=(),*, encoding=None, errors=None)
常用参数:
-
args:shell 命令,可以是字符串或者序列类型(如:list,元组) -
bufsize:缓冲区大小。当创建标准流的管道对象时使用,默认 -1。0:不使用缓冲区 1:表示行缓冲,仅当 universal_newlines=True 时可用,也就是文本模式 正数:表示缓冲区大小 负数:表示使用系统默认的缓冲区大小。 -
stdin, stdout, stderr:分别表示程序的标准输入、输出、错误句柄 -
preexec_fn:只在 Unix 平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用 -
shell:如果该参数为 True,将通过操作系统的 shell 执行指定的命令。 -
cwd:用于设置子进程的当前目录。 -
env:用于指定子进程的环境变量。如果 env = None,子进程的环境变量将从父进程中继承。
具体例子:
下面例子很简陋,不能直接运行,只是给大家演示下大致思路,还请根据具体情况做相关调整。
-
我们通过 subprocess.Popen 启动了 spark; -
hydra 的输入 可以转换为 spark 和 python 的输入; -
然后读取子进程的 stdout; -
逐次使用 log.info 来打印转发的 stdout,这样 spark 的输出就被转发到了 hydra 的输出之中;
这样,spark 的输出就可以被 hydra 捕获,从而整合到 hydra log 体系之中。
import shlex
import subprocess
import hydra
import logging
log = logging.getLogger(__name__)
@hydra.main()
def app(cfg):
# 可以在这里事先处理参数,被 hydra 处理之后,也成为 spark 和 python 的输入,进行处理
shell_cmd = 'spark-submit cut_words.py' + cfg.xxxxxx # 假如 cut_words 有参数
cmd = shlex.split(shell_cmd)
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while p.poll() is None:
line = p.stdout.readline()
line = line.strip()
if line:
log.info('Subprogram output: [{}]'.format(line))
if p.returncode == 0:
log.info('Subprogram success')
else:
log.info('Subprogram failed')
if __name__ == '__main__':
app()
流程示例:
以下就是我采取办法的流程示例。
-
Input 由 hydra 处理之后,由 python 父进程转发给 spark 和 Business Python; -
具体 spark 的输出,由 python 父进程转发给 Hydra logging;
具体如下图:
Input Input
Hydra +----------+ +------------------------v
| ^ |
| | |
| | |
+-------------------------+ v
| | | | +------+-------------+
| v +----------> | | Spark |
| | | |
| Parent Python Process | | Business Python |
| | | |
| +<-----------^ | | |
| | | | | |
+-------------------------+ +------+-------------+
| | |
| | |
| | |
Hydra <---------<+ +------------------------+
Logging Output
容器技术—— Docker
Docker 属于 Linux 容器的一种封装,提供简单易用的容器使用接口,其可以为模型在别的 PC 上的运行提供提供一次性的环境。它是目前最流行的 Linux 容器解决方案。Docker 将应用程序与该程序的依赖,打包在一个文件里面。运行这个文件,就会生成一个虚拟容器。程序在这个虚拟容器里运行,就好像在真实的物理机上运行一样。有了 Docker,就不用担心环境问题。总体来说,Docker 的接口相当简单,用户可以方便地创建和使用容器,把自己的应用放入容器。容器还可以进行版本管理、复制、分享、修改,就像管理普通的代码一样。要进一步理解 Docker 需要了解如下几个概念:
-
镜像(Image)
是一个只读模板,用来运行 Docker 容器。
-
容器(Container)
负责应用程序的运行,包括操作系统、用户添加的文件以及元数据 容器是从镜像创建的运行实例。它可以被启动、开始、停止、删除。每个容器都是相互隔离的、保证安全的平台。
注:镜像是只读的,容器在启动的时候创建一层可写层作为最上层。
-
仓库(Repository)
仓库是集中存放镜像文件的场所。仓库分为公开仓库(Public)和私有仓库(Private)两种形式。
注:Docker 仓库的概念跟 Git 类似,注册服务器可以理解为 GitHub 这样的托管服务
安装Docker
$ curl -fsSL https://get.docker.com/ | sh
$ sudo service docker restart
关于镜像的操作
获取
# 从仓库注册服务器拉取
$ sudo docker pull ubuntu:14.04
# 官方仓库注册服务器,相当于 sudo docker pull registry.hub.docker.com/ubuntu:14.04
# 也可使用其他仓库,如: sudo docker pull dl.dockerpool.com:5000/ubuntu:12.04
显示
$ sudo docker images
运行
$ sudo docker run -t -i ubuntu:14.04 /bin/bash
修改
# 运行容器bash,通过 shell 进行操作
$ sudo docker run -t -i ubuntu:14.04 /bin/bash
# 提交更新
$ sudo docker commit -m "Added json gem" -a "Docker Newbee" 0b2616b0e5a8 ouruser/sinatra:v2
-
-m
: 提交信息 -
-a
: 指定更新的用户信息 -
0b2616b0e5a8
: 容器的 ID -
ouruser/sinatra
: 仓库名 -
v2
: 仓库 tag
创建
-
通过修改已有 image
, 具体操作修改
中已有 -
通过 Dockerfile
来创建
$ mkdir mydockerimg
# docker image 的配置文件
$ vim Dockerfile
# 创建 image
$ sudo docker build -t testimg .
-
-t
: 指定新的 image 的名字 -
.
: Dockerfile 所在目录
Dockerfile
示例
# 这是注释
FROM ubuntu:14.04
MAINTAINER AIR_CC <[email protected]>
RUN apt-get -y update
CMD echo "hello-world"
ADD myApp /var/www
EXPOSE 80
-
#
: 注释 -
FROM
: 告诉 Docker 使用哪个镜像作为基础 -
MAINTAINER
: 维护者的信息 -
RUN
: 在创建镜像时运行的操作 -
CMD
: 启动容器后运行的程序 -
ADD
: 复制本地文件到镜像 -
WORKDIR
: 设置 dockerfile 命令运行目录 -
EXPOSE
: 对外部开放端口
3.通过文件系统导入
$ sudo cat ubuntu-14.04-x86_64-minimal.tar.gz | docker import - ubuntu:14.04
保存与载入
1.保存已有的镜像
$ sudo docker save -o ubuntu_14.04.tar ubuntu:14.04
2.载入镜像
$ sudo docker load --input ubuntu_14.04.tar
# 或者 sudo docker load < ubuntu_14.04.tar
删除
# 暂停并删除相应的容器
$ sudo docker stop containerName
$ sudo docker rm containerName
# 删除镜像
$ sudo docker rmi imageName
注:删除镜像之前需要先stop & rm
相应的container
# 删除所有的 container
$ sudo docker stop $(docker ps -a -q)
$ sudo docker rm $(docker ps -a -q)
# 删除所有的 image
$ sudo docker rmi $(docker images -q)
关于容器的操作
容器是独立运行的一个或一组应用,以及它们的运行态环境。对应的,虚拟机可以理解为模拟运行的一整套操作系统(提供了运行态环境和其他系统环境)和跑在上面的应用。
运行
# 运行一下,就终止
$ sudo docker run -tid ubuntu:14.04
run
的参数说明
-
-t
: 为 container 分配一个伪终端(pseudo-tty),并绑定到容器的标准输入上 -
-i
: 让容器的标准输入保持打开 -
-d
: 使容器在后台以守护态(Daemonized)形式运行
终止
使用sudo docker stop <container ID>
终止; 注:当容器中指定的应用终结时,容器也会终止 sudo docker start <container ID>
— 启动一个处于终止的容器 sudo docker restart <container ID>
— 重启一个处于运行态的容器
运行日志
获取 container 中程序输出到 terminal 上的信息 docker logs <container>
进入
-
使用 attach
$ sudo docker run -tid ubuntu:14.04
$ sudo docker attach <container namespace>
# ctrl + c: 杀死该 contanier
# ctrl + p + ctrl + q: 退出 container 交互界面
-
使用 nsenter
导出与导入
-
导出
$ sudo docker export <container ID> > outputFileName
-
导入
$ sudo docker import <filePath/fileURL>
注:用户既可以使用 docker load 来导入镜像存储文件到本地镜像库,也可以使用 dockermport 来导入一个容器快照到本地镜像库。这两者的区别在于容器快照文件将丢弃所有的历史记录和元数据信息(即仅保存容器当时的快照状态),而镜像存储文件将保存完整记录,体积也要大。此外,从容器快照文件导入时可以重新指定标签等元数据信息。
删除
$ sudo docker stop <container ID>
$ sudo docker rm <container ID>
-
注: 删除容器前需先 stop
*
其他
获取 container 的 PID
$ sudo docker inspect --format '{{.State.Pid}}' <container ID>
4. 总结
本文从 MLops 因何而来开始分析,比较了实验室环境下的 RML 和工业环境下的 MLops 的区别和联系。基于 MLops 1 级架构的基本思路对 Reproducible Machine Learning 的基本架构进行了简单的分析,之后对 MLops 1 级架构中的核心工具以及 Docker 技术进行了简单的介绍。实际环境下的 RML 还有很多别的优秀的基本工具:MLFlows,neptune.ai,DELTA LAKES 等,可以基于这样的架构进行更进一步的探索,使用更适合自己的工具。
5. 参考资料
[1] Why Machine Learning Models Crash And Burn In Production,https://www.forbes.com/sites/forbestechcouncil/2019/04/03/why-machine-learning-models-crash-and-burn-in-production/
[2] Software Architecture Guide,https://martinfowler.com/architecture/
[3] 从小作坊到智能中枢: MLOps简介,https://zhuanlan.zhihu.com/p/357897337
[4] “Reproducible Deep Learning” PhD course,https://www.sscardapane.it/teaching/reproducibledl/
[5] What is Continuous Machine Learning? https://levity.ai/blog/what-is-continuous-machine-learning
[6] Machine Learning: The High Interest Credit Card of Technical Debt,https://research.google/pubs/pub43146/
[7] MLOps:机器学习中的持续交付和自动化流水线,https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
[8] A curated list of awesome open source libraries to deploy, monitor, version and scale your machine learning,https://github.com/EthicalML/awesome-production-machine-learning
[9] Efficient Hyperparameter Optimization with Optuna Framework,https://broutonlab.com/blog/efficient-hyperparameter-optimization-with-optuna-framework
[10] Data science experiments management with Weights & Biases, https://wandb.ai/broutonlab/first_steps/reports/Data-Science-Experiments-Management-with-Weights-Biases—Vmlldzo2NjE3MDI
[11] A project-based course on the foundations of MLOps with a focus on intuition and application,https://github.com/GokuMohandas/mlops
[12] Full Stack Deep Learning (UC Berkeley CS194-080),https://fullstackdeeplearning.com/
[13] Amazon SageMaker Feature Store,https://aws.amazon.com/cn/sagemaker/feature-store/
[14] Why We Need DevOps for ML Data,https://www.tecton.ai/blog/devops-ml-data/
[15] Use Amazon SageMaker Feature Store with Amazon SageMaker Studio,https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-use-with-studio.html
[16] Feature Store for ML,https://www.featurestore.org/
[17] 用 Facebook Hydra 参数配置框架来简化程序配置,https://www.cnblogs.com/rossiXYZ/p/14826431.html
[18] Docker 的基本使用,https://www.jianshu.com/p/8c932dadceef
[19] Docker 入门教程,https://www.ruanyifeng.com/blog/2018/02/docker-tutorial.html
[20] Data Version Control With Python and DVC,https://realpython.com/python-data-version-control/
[21] Comparing Data Version Control Tools – 2020,https://dagshub.com/blog/data-version-control-tools/
[22] 一小时搭建一个云原生机器学习平台,https://zhuanlan.zhihu.com/p/383528646
[23] Continuous Delivery for Machine Learning,https://martinfowler.com/articles/cd4ml.html
[24] Machine Learning Operations,https://ml-ops.org/
[25] Introducing MLOps,https://www.oreilly.com/library/view/introducing-mlops/9781492083283/
[26] Visualizer for neural network, deep learning, and machine learning models,https://github.com/lutzroeder/netron
end
招新小广告
ChaMd5 Venom 招收大佬入圈
新成立组IOT+工控+样本分析 长期招新
原文始发于微信公众号(ChaMd5安全团队):Reproducible Machine Learning|可复现的机器学习发展、架构及基本工具使用概述