目录
Tensorflow Federated Framework 谷歌联邦学习框架
1、TensorFlow Federated Framework
数据为主
整体训练
新的语言
Federated Learning (FL) API
安装TFF库(conda)
2、数据类型
3、完整代码
Tensorflow Federated Framework 谷歌联邦学习框架
联邦学习是谷歌在2016年提出的概念:在分布式的场景下,训练数据分别保存在每个clients中,希望提出一种训练方法:跨多个参与客户端(clients)训练一个共享的全局模型。其中的重点关注的问题包括:
参与的clients数量众多,比如每个人的手机、物联网设备等
clients不能一直保持在线状态,比如只有当手机接入充电器和wifi时才会参与计算
clients收集的本地数据敏感,比如输入法的输入内容,包含隐私信息,不希望分享给其他人(包括中心节点)
因此,联邦学习希望在保证数据隐私的同时,让众多clients利用自己的数据协同参与训练一个中心模型。以移动设备作为clients为例,大致的训练流程如图:
图中,移动设备作为clients,云服务器作为中心server。训练的三个步骤A、B、C是不断循环迭代的,具体含义为:
A步骤:clients从中心节点获取当前模型参数(蓝色圆),在本地设备上利用自己的local data对模型进行训练,得到新的模型参数(绿色方块)
B步骤:多个clients分别训练了不同的模型参数(各种颜色的各种形状),融合为一个模型,比如简单地求平均值。注意到图中各种形状的数量是不等的,可以从两方面理解:1)local data的分布不均,即non-i.i.d.的数据,那么训练出来的模型也分布不均。2)不是所有的clients都参与了训练
C步骤:中心服务器server将新一轮的模型分发给clients,在这个过程中,server可以有选择性地只让一部分clients参与训练
1、TensorFlow Federated Framework
联邦学习的理念其实就这么简单直接。在实现方面,Tensorflow专门为联邦学习推出了一个学习框架(TensorFlow Federated,后文简称TFF),现有的TensorFlow(简称TF)或Keras模型代码通过一些转换后就可以变为联邦学习模型。甚至可以加载单机版的预训练的模型,以迁移学习的模式应用到分散式数据的机器学习中。
阅读本文可能需要:
一定的Python技能:函数和类、数据类型、装饰器
了解TensorFlow的一些概念:non-eager模式、计算图等
了解机器学习的一些概念:模型、数据集、训练、梯度等
就算不了解也没有关系,我将把TFF当作一个新的编程语言(也确实是)来讲解入门,如果哪里写的不清楚请多多留言指出,谢谢!
框架设计理念
在一头扎入技术细节之前,最好先了解TFF框架的设计理念,对于理解各种class的意义有很大帮助。联邦学习的参与角色有:客户端(clients)和服务端(server)。
数据为主
提到clients和server的时候,马上会想到C&S端两套代码、数据交互等分布式的东西。但是在TFF框架中,谷歌并不想让用户去考虑这些东西,它希望用户能够将重点放在数据处理上,而不是代码分离上。因此,在编写TFF代码时,我们不需要指明是某段代码应该运行在Clients端还是Server端(后文简称C端、S端),但是要显式指出每个数据是储存在C端/S端、是全局唯一的还是有多份拷贝的。需要注意,这里提到的“数据”,可以指代:数据集、变量、常量等所有模型会用到的值(官方文档中所称的value)。
回到上图中的例子:蓝色圆和绿色方块是需要学习的目标模型,因此他们是存放在S端、且全局唯一的。将S端的值交给clients去更新时(图中C到A步骤),称为广播(对应tff.federated_broadcast函数,但现在不要去关心具体函数内容)。广播操作会将一个S端的value“转化”成C端的value,而其中的“转化”操作的具体实现对TFF用户来说是隐藏的,不需要去关心。这是TFF框架与分布式训练理念非常不同的一点,需要加倍理解。
同理,在图中的B步骤,把多个C端的value整合成一个S端的value,称为聚合(Aggregation)操作,TFF也提供了许多种预设函数供用户使用,不需要关心数据到底是咋传输的。
整体训练
在编写模型、训练代码的时候,clients和server应当是看作一个整体(也就是“联邦”的含义),不需要分割开S端和C端的代码,完全可以写入同一个文件里,C端和S端的区分是在代码逻辑层面的。例如:一个函数,它既能被C端调用,也能同时被S端调用。但是,某些TFF函数只能接受存放在C端的输入,某些只能接受S端的。
类似TF的non-eager模式一样编写完模型代码和训练代码后,TFF会自动地将代码分别放置到clients和server设备上(但是目前还只能单机模拟哈哈)。我们编写代码的首要且唯一的任务就是训练一个好的模型,只需要关注模型的架构、C&S交互的数据格式、聚合多clients模型的方式就可以了。至于如何协同多设备、怎么在网络上传输都不是TFF用户需要关注的细节。
新的语言
类似TF框架,TFF实际运行的代码并不是Python,而是通过Python代码来编写运算逻辑,实际上是编译成另一种语言去执行。Why?因为许多设备(例如手机、传感器)是很难有Python的运行环境的,更不可能去安装几百Mb的TensorFlow框架,那么在这些设备上执行Python代码的难度是非常大的。因此,编写TFF代码时会遇到非常多“反Python逻辑”的要求,例如:要求提供函数的输入参数的类型、要求使用它规定的几种数据类型、要求确定value的存放位置(S端/C端),原生Python逻辑不会在训练时执行等等。所有的这些,都是为了让TFF编写的代码能编译到low-level,让模型能运行在真实分布式场景下所作出的妥协。当然,随着TFF的更新,某些要求会有所放松也是可能的。
所以,学习TFF框架时,应该把它当成一门新的语言进行学习:这里有新的数据类型、新的变量声明方式、新的函数定义方式、新的执行方法等。原生Python代码和它进行混用时,要特别注意它是否能在训练时生效,是否有预期的结果。一个很简单的例子:在函数定义中print了一些东西,只会在函数定义的时候输出,而在执行的时候没有输出,就是因为函数已经被“编译”成其他语言了,而print没有被编译进去(类似注释)。如果我们编写的一些函数使用到了不同端的数据,在真正执行的时候,一个函数甚至会被拆分到不同的机器上执行。但是,这些情况停留在逻辑设计层面就可以了,编写代码的时候该咋写函数就咋写,还可以嵌套着用,把拆分执行的事情交给TFF去解决。
不同层次的API
又来类比一下:TensorFlow相对于Keras来说,是low-level的接口。编写模型时,TF需要更详细地编写变量之间的运算,而Keras只需要拼接全连接层、卷积层等部件。对应到TFF框架中,也有两个不同层次的接口:
Federated Learning (FL) API
该层提供了一组高阶接口,使开发者能够将包含的联合训练和评估实现应用于现有的 TensorFlow 模型。
Federated Core (FC) API:该系统的核心是一组较低阶接口,可以通过在强类型函数式编程环境中结合使用 TensorFlow 与分布式通信运算符,简洁地表达新的联合算法。这一层也是我们构建联合学习的基础。
在本文中,我们将自底向上进行学习,把FC API当作一门新的语言进行学习、实践。
安装TFF库(conda)
详细的安装流程请参见官方指南,这里我们将使用conda进行安装。假设你已经安装好了conda(Anaconda或Miniconda都行),命令行执行以下命令:新建一个名为tf-fed的环境,并安装python3.7:
conda create -n tf-fed python=3.7 --yes
安装完成后,切换到新建的tf-fed环境(以后每次重启命令行都要切换过来):
conda activate tf-fed
用pip安装TFF,写作时安装版本的是tensorflow_federated==0.13.1(这一步耗时较长,可以去泡杯茶先):
pip install --upgrade tensorflow_federated
如果下载太慢,可以切换到pip清华源再次安装。
等待安装结束后,验证一下是否安装成功:
python -c "import tensorflow_federated as tff; print(tff.federated_computation(lambda: 'Hello World')())"
如果成功输出了'Hello World'(以及一堆Warning),就说明TFF框架已经安装好了。顺便提一下,本文写作时tff.__version__=0.13.1,它还是一个开发版,因此本文提到的API可能会随着版本更新发生变化。
2、数据类型
把TFF当作一门新的编程语言来学习,我们先来了解一下它提供的数据类型有哪些。TFF中,这些数据类型可以分为两类:端无关的,和端有关的,端无关的类型不需要指明存放位置(S端或C端),另一个则需要。定义类型的时候,可以直接print出来看看它的类型和shape,称为compact notation。
为避免误解,请读者先思考这个问题:“类型和变量的关系是什么?”
执行下面的例子前,请先引入库:
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
tf.compat.v1.enable_v2_behavior()
# tff v0.13.1 新版本需要指定默认executor,否则无法eager执行tff.function
tff.framework.set_default_executor(tff.framework.ReferenceExecutor())
端无关的类型
Tensor types(tff.TensorType)
张量类型,需要指定它的元素数据类型dtype和形状shape。其中,dtype的所有可选列表由tf.dtypes.DType指定。例如:
print(tff.TensorType(tf.string, shape=[3, 4, 5]))
print(tff.TensorType(tf.float32, shape=[6, 7]))
print(tff.TensorType(tf.int8, shape=None))
print(tff.TensorType(tf.bool))
将会输出它们的compact notation,代表着类型和形状,而不是输出值(想想numpy.array会输出什么):
string[3,4,5]
float32[6,7]
int8
bool
Sequence types(tff.SequenceType)
列表类型,其中的元素类型应当为TFF的tff.Type,或者是能转换成tff.Type的东西。例如:
print(tff.SequenceType(tff.TensorType(tf.int8, shape=None)))
print(tff.SequenceType(tf.float32))
print(tff.SequenceType(tff.TensorType(tf.string, shape=[3, 4, 5])))
将会输出:
int8*
float32*
string[3,4,5]*
这里的*就代表着它是一个列表,*前面的是列表中元素的类型。
Named tuple types(tff.NamedTupleType)
如果你用过Python标准库中的collection.namedtuple,那这个类型就是就是TFF框架下的它。没有用过也没有关系,简单来说,tff.NamedTupleType就是元素可以带有key的tuple。看下例子:
import collections
print(tff.NamedTupleType(list((tf.int8, tf.int16))))
print(tff.NamedTupleType(tuple((('key1', tf.float32), (tf.int8), ('key3', tff.SequenceType(tf.string))))))
print(tff.NamedTupleType(collections.OrderedDict([('y', tf.float32), ('x', tff.TensorType(tf.string, shape=[3, 4, 5]))])))
tff.NamedTupleType接受三种类型的输入:list,tuple和collections.OrderedDict(也就是collection.namedtuple生成的subclass生成的对象,一种有序的字典dict类型)
将会输出:
<int8,int16>
<key1=float32,int8,key3=string*>
<y=float32,x=string[3,4,5]>
其中的尖括号是named tuple types的标志。可以注意到,元素的key是可选的,有key无key的元素位置也是很随意的。一般来说,这个类可以用于定义模型的参数、输入输出等。
Function types(tff.FunctionType)
TFF是函数式编程框架,编写的模型是由一个个函数拼接起来的。因此,编写模型代码时,内置函数以及我们编写的函数,作为一个个部件拼凑起来,由TFF后端编译成跨平台的语言进行运算。这些函数具有的类型,就是tff.FunctionType。在编译的过程中,我们需要指定函数的输入类型,且只能有一个输入值,和一个函数返回值(不像python函数那样可以输入多个形参、返回多个值)。
下面举个简单的例子,先不用理解它,后文会详细讲解如何定义函数(也分成端无关和端有关的函数)。
import numpy as np
@tff.tf_computation(tff.SequenceType(tf.int32))
def add_up_integeres(x):
return x.reduce(np.int32(0), lambda x, y: x + y)
print(type(add_up_integeres))
print(add_up_integeres.type_signature)
print(isinstance(add_up_integeres.type_signature, tff.FunctionType))
得到输出:
<class 'tensorflow_federated.python.core.impl.computation_impl.ComputationImpl'>
(int32* -> int32)
True
可以看出,tff.FunctionType不是经过TFF包装的函数本身,而是函数的签名type_signature。它指明了输入输出的形式,用圆括号和箭头->表示。同时,只有一个输入和一个输出,但是输入变量可以是复合类型(例如这里的列表类型),因此是能满足各种需求的。
端有关的类型
端有关的类型,是联邦学习逻辑层面的重点。它们主要完成两件任务:
显式地定义数据值应该存放在C端还是S端(Placement)
定义这个数据是否全局一致(All equal?)
我们可以把端有关的类型,想象成一个快递盒:里面装的数据类型是上文提到的端无关类型,盒子上贴着两个标签Placement和All equal?。那么联邦学习的流程,可以类比成:快递盒子从S端中心节点发出,交给C端加工厂处理(盒子由S端传输到C端,还可以复制成多份)。加工厂把盒子里的东西拿出来操作(与端无关)。操作完成后再打包发回中心节点(盒子由C端传输到S端),中心点把各个工厂发回的加工品聚合成一个,就完成了一轮加工。
Placement type
顾名思义,就是定义存放位置的类型。目前,我们能用到的有两个:tff.SERVER和tff.CLIENTS,也就是定义了S端和C端,把他们俩当常数用就好了。
以后也许可以定义新的Placement,以实现更复杂的场景。具体咋用呢,看下面的联邦类型。
Federated types(tff.FederatedType)
以数据驱动的联邦学习,终于到了定义联邦类型的时候了。联邦类型tff.FederatedType把上面提到的端无关类型包装起来,并增加两个属性:
placement(必填),必须是tff.SERVER和tff.CLIENTS这些Placement type
all_equal(可选,默认为None)。类型为bool,代表着这份数据是否全局统一,还是可以有不同的值。如果没有指定all_equal,它会根据placement的值来选择。默认情况下placement=tff.SERVER时all_equal=True,反之为False。
同样,Federated types也可以输出定义,看下面的例子:
import collections
print(tff.FederatedType(tf.int8, tff.CLIENTS))
print(tff.FederatedType(tff.SequenceType(tf.float32), tff.CLIENTS, all_equal=True))
ntt = tff.NamedTupleType(collections.OrderedDict([('y', tf.float32), ('x', tff.TensorType(tf.string, shape=[3, 4, 5]))]))
print(tff.FederatedType(ntt, tff.SERVER))
print(tff.FederatedType(tff.TensorType(tf.int8, shape=None), tff.SERVER, all_equal=False))
将会输出:
{int8}@CLIENTS
float32*@CLIENTS
<y=float32,x=string[3,4,5]>@SERVER
{int8}@SERVER
Federated types的类型表示格式为T@G或{T}@G,其中T为TFF的数据类型,G为存放的位置,花括号{}表示非全局唯一,而没有花括号就表示全局唯一,即all_equal=True。仔细看一下,第1、3个例子是没有指定all_equal,他们是根据placement的值来确定的。
变量声明
解决了类型定义,就可以来声明变量了。每个变量都有一个类型,没错吧?别搞混了类型和变量的概念哈。下面看一下TFF框架里要用到的变量怎么声明:
# 定义一个类型
OUR_TYPE = tff.TensorType(tf.int8, shape=[10])
# 声明一个变量
var = tff.utils.create_variables('var_name', OUR_TYPE)
# 打印一下
print(OUR_TYPE)
print(var)
和TF很类似,上面的代码声明了一个名为var,在计算图中名为var_name,类型为tff.TensorType(tf.int8, shape=[10])的变量。它可以作为模型参数等进行训练和更新。上面的代码会获得如下输出:
int8[10]
<tf.Variable 'var_name:0' shape=(10,) dtype=int8, numpy=array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=int8)>
可见,定义的变量实质上是tf.Variable的实例,而不是tff的。那么在实际应用时,如果你对tf比较熟悉,你可以先定义tf的数据类型(或python标准库的类型),再导出tff的类型,如下例子:
# 声明一个tf Tensor类型
ccc = tf.TensorSpec(shape=(10,), name='const_name', dtype=tf.int8)
# 转成类型
OUR_TYPE2 = tff.to_type(ccc)
# 打印一下
print(ccc)
print(OUR_TYPE2)
上面的代码会输出:
TensorSpec(shape=(10,), dtype=tf.int8, name='const_name')
int8[10]
1
2
是不是觉得上面的数据类型定义都白学啦?只需要一个函数tff.to_type,里面放进去一个可转换的数据类型,就可以得到tff所需的数据类型。好用是好用,但还是得严谨地学习的嘛。接下来的例子,可能会更多地看到第二种定义类型的方式。
3、函数定义
TFF是函数式编程框架,根据函数是否使用到了不同端的数据来划分,可以分为两种类型:端无关的函数和端有关的函数。
端无关的函数
使用TensorFlow原生计算方式,只需要定义一下输入的类型即可,例如:
import numpy as np
@tff.tf_computation(tff.SequenceType(tf.int32))
def add_up_integeres(x):
return x.reduce(np.int32(0), lambda x, y: x + y)
上面这段代码,使用TFF的APItff.tf_computation来包装一个TensorFlow计算函数,其中显式声明了形参x的类型为tff.SequenceType(tf.int32)。在函数内部,x可以当作tf.data.Dataset来使用成员函数reduce(),这也是上文介绍过的。经过这个包装,函数add_up_integeres()已经被编译成了一个TFF的强类型函数(返回值类型也自动推算出来了),可供其他TFF函数调用,也可以当成普通python函数调用。例如:
print(add_up_integeres.type_signature)
print(add_up_integeres([1,2,3]))
你会得到下面的输出:
(int32* -> int32)
6
注意到两个地方:
返回值类型是自动推算出来的(int32)
变量的自动类型转换是可以的(list(int) -> tff.SequenceType(tf.int32))
端有关的函数
我们也可以自己定义端有关的函数,声明输入形参时,不仅需要指明输入的数据类型,还需要指明它的存放位置。废话不多说,我们看个例子:
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(sensor_readings):
return tff.federated_mean(sensor_readings)
print(get_average_temperature.type_signature)
3、完整代码
上面用作讲解的例子的代码,参考自谷歌TFF文档。
import collections
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
tf.compat.v1.enable_v2_behavior()
# TODO(b/148678573,b/148685415): must use the ReferenceExecutor because it
# supports unbounded references and tff.sequence_* intrinsics.
tff.framework.set_default_executor(tff.framework.ReferenceExecutor())
mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
NUM_EXAMPLES_PER_USER = 1000
BATCH_SIZE = 100
def get_data_for_digit(source, digit):
output_sequence = []
all_samples = [i for i, d in enumerate(source[1]) if d == digit]
for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE):
batch_samples = all_samples[i:i + BATCH_SIZE]
output_sequence.append({
'x':
np.array([source[0][i].flatten() / 255.0 for i in batch_samples],
dtype=np.float32),
'y':
np.array([source[1][i] for i in batch_samples], dtype=np.int32)
})
return output_sequence
federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)]
federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
BATCH_SPEC = collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None], dtype=tf.int32))
BATCH_TYPE = tff.to_type(BATCH_SPEC)
print(str(BATCH_TYPE))
MODEL_SPEC = collections.OrderedDict(
weights=tf.TensorSpec(shape=[784, 10], dtype=tf.float32),
bias=tf.TensorSpec(shape=[10], dtype=tf.float32))
MODEL_TYPE = tff.to_type(MODEL_SPEC)
print(MODEL_TYPE)
@tf.function
def forward_pass(model, batch):
predicted_y = tf.nn.softmax(
tf.matmul(batch['x'], model['weights']) + model['bias'])
return -tf.reduce_mean(
tf.reduce_sum(
tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1]))
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE)
def batch_loss(model, batch):
return forward_pass(model, batch)
initial_model = collections.OrderedDict(
weights=np.zeros([784, 10], dtype=np.float32),
bias=np.zeros([10], dtype=np.float32))
sample_batch = federated_train_data[5][-1]
print(batch_loss(initial_model, sample_batch))
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32)
def batch_train(initial_model, batch, learning_rate):
# Define a group of model variables and set them to `initial_model`. Must
# be defined outside the @tf.function.
model_vars = collections.OrderedDict([
(name, tf.Variable(name=name, initial_value=value))
for name, value in initial_model.items()
])
optimizer = tf.keras.optimizers.SGD(learning_rate)
@tf.function
def _train_on_batch(model_vars, batch):
# Perform one step of gradient descent using loss from `batch_loss`.
with tf.GradientTape() as tape:
loss = forward_pass(model_vars, batch)
grads = tape.gradient(loss, model_vars)
optimizer.apply_gradients(
zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars)))
return model_vars
return _train_on_batch(model_vars, batch)
print(str(batch_train.type_signature))
model = initial_model
losses = []
for _ in range(5):
model = batch_train(model, sample_batch, 0.1)
losses.append(batch_loss(model, sample_batch))
print("5 loops loss:", losses)
LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE)
@tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE)
def local_train(initial_model, learning_rate, all_batches):
# Mapping function to apply to each batch.
@tff.federated_computation(MODEL_TYPE, BATCH_TYPE)
def batch_fn(model, batch):
return batch_train(model, batch, learning_rate)
return tff.sequence_reduce(all_batches, initial_model, batch_fn)
print(str(local_train.type_signature))
locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5])
@tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE)
def local_eval(model, all_batches):
# TODO(b/120157713): Replace with `tff.sequence_average()` once implemented.
return tff.sequence_sum(
tff.sequence_map(
tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE),
all_batches))
print(str(local_eval.type_signature))
print('initial_model loss [num 5] =', local_eval(initial_model, federated_train_data[5]))
print('locally_trained_model loss [num 5] =', local_eval(locally_trained_model, federated_train_data[5]))
print('initial_model loss [num 0] =', local_eval(initial_model, federated_train_data[0]))
print('locally_trained_model loss [num 0] =', local_eval(locally_trained_model, federated_train_data[0]))
SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER)
CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS)
@tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE)
def federated_eval(model, data):
return tff.federated_mean(
tff.federated_map(local_eval, [tff.federated_broadcast(model), data]))
print(str(federated_eval.type_signature))
print('initial_model loss =', federated_eval(initial_model,
federated_train_data))
print('locally_trained_model loss =',
federated_eval(locally_trained_model, federated_train_data))
SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER)
@tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE,
CLIENT_DATA_TYPE)
def federated_train(model, learning_rate, data):
return tff.federated_mean(
tff.federated_map(local_train, [
tff.federated_broadcast(model),
tff.federated_broadcast(learning_rate), data
]))
model = initial_model
learning_rate = 0.1
for round_num in range(5):
# 每一轮,把大家的模型分别更新一下,取平均之后拿回来(做赋值替换)
model = federated_train(model, learning_rate, federated_train_data)
# 把学习率减小一点
learning_rate = learning_rate * 0.9
# 算个loss输出一下
loss = federated_eval(model, federated_train_data)
print('round {}, loss={}'.format(round_num, loss))
# 下一轮S端的模型又发给各位clients去更新