在“TF Boy 之初筵 - 分布十三式”里面我们给了一个分布式MNIST训练的例子, 然后在“TF Boy 之初筵 - 机器簇” 我们对第四式: ClusterSpec定义进行了解释。 这里我们对第十式:Session配置进行解释。
前言
在“TF Boy 之初筵 - 机器簇”里面,我们讲了TF的Client-Master结构。 分布式需要用到Worker的配置。
但是不管是不是分布式, TF都默认为Client-Master结构的管理。 所谓会话Session,就是这个Client-Master之间的对话。 从下图, 我们可以看到通过对一个Tensor图(网)的配置, 然后由Session提交到Master运行。
但是Session都是要跑到具体的设备上面的。 所以和Session密切相关的是图Graph (下图左)和 设备Device(下图右)。
但是,我们看到“TF Boy 之初筵 - 机器簇” 和 “TF Boy 之初筵 - 分布十三式” 里对Session的使用却截然不同。
1)在分布式数学表达式计算中:
with tf.Session("grpc://localhost:2223") as sess:
result = sess.run(z)
print(result)
2)在分布式MNIST例子里面:
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url,
config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
到底是怎样变化而来的呢?
Session的变化
从HelloWorld开始
Session是TF必不可少的部分, 就算HelloWorld里面就要定义一个tf.Session()。 然后提交运行sess.run( ... ) !
最原始的Session
最原始的会话有3部分, 初始化, 运行和关闭。 在HelloWorld里面, 由于程序结束, 一切都会Close, 所以把sess.close(),省略了。
# Using the `close()` method.
sess = tf.Session()
sess.run(...)
sess.close()
但是写close()的确不方便。 我们知道Python里面对省略close()常用的是一个with 上下文模块化。
with后的Session
通过with上下文设置, 直接隐藏了close(), 而且方便了阅读!
# Using the context manager.
with tf.Session() as sess:
sess.run(...)
这样就很容易使用session了。
Session的两种使用方式
简单来说Session有两个运行接口:run( ) 和 eval( )
run(
feed_dict=None,
session=None
)
eval(
feed_dict=None,
session=None
)
它们分别属于不同的功能, 一个是跑一个Tensor图 tf.Operation.run
, 一个是看看一个Tensor的值tf.Tensor.eval。
这两种方法都支持对默认Session的提交, 也就是说, 如果设置了默认Session, 那么这两个接口都不用输入session作为参数了,直接省略了。 这种设置有点像默认浏览器一样。
默认Default Session
默认Session可以通过Session的函数
as_default()
来设置, 一旦设置了, 也可以通过ft的接口直接获取。
tf.get_default_session()
有了这个默认Session的设置, 使用就更方便了。 所以,tensor.eval()本质上就是 tf.get_default_session().run(tensor).
c = tf.constant(...)
sess = tf.Session()
with sess.as_default():
print(c.eval())
每次都要手动设置默认很麻烦。 能不能直接声明成默认的呢?
交互Interactive Sessino
尤其在IPython被广泛使用下, 直接声明为默认的Session可以在IPthon里面随意使用eval() 接口, 随时查看Tensor的值, 这个还是蛮酷的。 这种情况下直接声明一个交互会话。
sess = tf.InteractiveSession()
a = tf.constant(5.0)
b = tf.constant(6.0)
c = a * b
# We can just use 'c.eval()' without passing 'sess'
print(c.eval())
到现在为止, 我们理解了分布式算术表达式计算里面的Session使用。 但是分布式MNIST的例子还是不明白。 对的, 这得进入下一个境界, 封装。 TensorFlow受到Scikit-Learn超级好用的接口的抽象化的影响。 也开启了超级抽象化的流程。 但是抽象化是一个双刃剑, 好用是好用了, 但是看懂和修改就难喽, Caffe的作者贾帅哥就提出TF的抽象化太重了。
Session的封装
Session除了变化外的进一步封装, 莫过于管理了, 对的。 这个管理就是对应到Session Manager。 不过,有点不一样的是, 这个Session Manager不是独立的, 而是合并在一个叫Supervisor的监管器中。
监管器Supervisor
如果我们再看一下分布式MINIST的代码就能找到这个 tf.train.Supervisor。
if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
Supervisor是对三个东西的封装, 一个是我们这里要说的SessionManager, 另外两个是Coordinator 协调器 和Savor 保存器。 Coordinator是对多线程运行进行协调的。 Savor是对运行的Summary进行保存,以后方便TensorBoard查看。 所以你看到Supervisor初始化的时候, 对日志地址,初始化, 本地初始化和全局计数器都很看重。
这里专注于SessionManager的封装部分,Coordinator和Savor就提一下,不展开了。
Supervisor提供了两个重要的接口来协助SessionManager:
managed_session ( ... )
prepare_or_wait_for_session ( ... )
异步接口 managed_session
所谓异步,主要是针对下面的同步来说的。 其实本质上就是取代默认的 with 上下文的Session。 这里不用初始化了, 直接从Supervisor获取它管理的那个会话。 或者说经由Supervisor来初始化Session。
...create graph...
my_train_op = ...
sv = tf.train.Supervisor(logdir="/my/training/directory")
with sv.managed_session() as sess:
sess.run(my_train_op)
我们知道如果local模式,就是上面的写法, 如果要分布式模式的时候。 Session的master的协议接口就不能忽视了。
sv = Supervisor(logdir='/tmp/mydir')
# Get a TensorFlow session managed by the supervisor.
with sv.managed_session(FLAGS.master) as sess:
# Use the session to train the graph.
while not sv.should_stop():
sess.run(<my_train_op>)
我们发现, 上面这个managed_session还是和MNIST例子里面的不一样。 对的, 因为MNIST例子里用的是同步的优化器,所以需要建立同步的Session机制。
同步接口 prepare_or_wait_for_session
在同步的情况下, 多个Worker或者Device之间需要等待协同之后,才能建立起会话Session来。
这样, 我们就能把前面MNIST分布式的例子对上去了。
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url,
config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
从上面我们可以看到根据gRPC协议链接初始化sess后, 就可以使用了。
使用 prepare_or_wait_for_session 注意
因为同步prepare的时候, 对于Chief Worker来说, 你可以写成with上下文形式:
with sv.prepare_or_wait_for_session(server.target) as sess:
'''
# is chief
if FLAGS.task_index == 0:
sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)
'''
但是对于非Chief的Worker, 需要等待的, 我们深入看一下 prepare_or_wait_for_session()的实现,就能清楚的看到这一点。 因此不太适合写成with block形式。
"""
# For users who recreate the session with prepare_or_wait_for_session(), we
# need to clear the coordinator's stop_event so that threads managed by the
# coordinator can run.
self._coord.clear_stop()
if self._summary_writer:
self._summary_writer.reopen()
if self._is_chief:
sess = self._session_manager.prepare_session(
master, init_op=self.init_op, saver=self.saver,
checkpoint_dir=self._logdir, wait_for_checkpoint=wait_for_checkpoint,
max_wait_secs=max_wait_secs, config=config,
init_feed_dict=self._init_feed_dict, init_fn=self._init_fn)
self._write_graph()
if start_standard_services:
self.start_standard_services(sess)
else:
sess = self._session_manager.wait_for_session(master,
config=config,
max_wait_secs=max_wait_secs)
if start_standard_services:
self.start_queue_runners(sess)
return sess
因为python的with block模块, 自带调用python的特殊函数__enter__ and __exit__ 函数的! 但是非Chief的Worker需要等待的, 如果被with block调用了__exit__ 就无法完成后续的工作了。 这点是特别要注意的!!!
对比 prepare_or_wait_for_session 和 managed_session
如果,你深入细看managed_session的Python实现, 你会惊奇的发现, 它只是对prepare_or_wait_for_session的一个调用。 如下所示:
try:
sess = self.prepare_or_wait_for_session(
master=master, config=config,
start_standard_services=start_standard_services)
yield sess
except Exception as e:
self.request_stop(e)
finally:
你会大叫, 我是不是被忽悠了, 明明managed_session就是prepare_or_wait_for_session, 错! 你再仔细看看,它返回用的不是return sess, 而是yield sess。 这就是Python另外一个神奇的关键词。 经常被用来做很高明的处理。 尤其在异步的世界里面。 我一直认为:一个人的Python的水平高低,就看他用没有用过yield,用的溜不溜。
譬如这里有个很简单的异步生成器的例子, 大家有兴趣可以研究一下, 这里就不深入展开了。
import asyncio
async def async_generator():
for i in range(3):
await asyncio.sleep(1)
yield i*i
async def main():
async for i in async_generator():
print(i)
所以, 异步是通过yield同步函数来实验的。 这就是managed_session和prepare_or_wait_for_session他们之间的差异!
我们看完了Supervisor对Session的封装, 再稍微提一下SessionManager本身的功能!
SessionManager 会话管理器
SessionManager主要提供准备,等待和恢复Session的功能! 这样在循环iteration的时候,就可以加入检查点checkpoint,方便后续记录跟踪优化过程了。
with tf.Graph().as_default():
...add operations to the graph...
# Create a SessionManager that will checkpoint the model in '/tmp/mydir'.
sm = SessionManager()
sess = sm.prepare_session(master, init_op, saver, checkpoint_dir)
# Use the session to train the graph.
while True:
sess.run(<my_train_op>)
with tf.Graph().as_default():
...add operations to the graph...
# Create a SessionManager that will wait for the model to become ready.
sm = SessionManager()
sess = sm.wait_for_session(master)
# Use the session to train the graph.
while True:
sess.run(<my_train_op>)
所以, Session管理的功能很重要的还是为了增加检查点, 方便保存日志, 跟踪。 这也是为什么Supervisor要把SessionManager 和 Savor 、 Coordinator封装在一起。 他们都是围绕检查点checkpoint为中心的。
小结:
这里我们详细的解释了Session的变化和封装。 尤其提醒注意了 Session的同步异步初始化, prepare_or_wait_for_session的使用。 跟这个有关, 顺便解释了: 有两个Python的关键词with 和 yield的神奇。
相关话题: