1 1 1 1 1 1 1 1 1 1 Rating 0.00 (0 Votes)
 2017-07-11 史春奇 AI2ML人工智能to机器学习

在“TF Boy 之初筵 - 分布十三式”里面我们给了一个分布式MNIST训练的例子, 然后在“TF Boy 之初筵 - 机器簇” 我们对第四式: ClusterSpec定义进行了解释。 这里我们对第十式:Session配置进行解释。

 

前言

 

在“TF Boy 之初筵 - 机器簇”里面,我们讲了TF的Client-Master结构。 分布式需要用到Worker的配置。 

但是不管是不是分布式, TF都默认为Client-Master结构的管理。 所谓会话Session,就是这个Client-Master之间的对话。 从下图, 我们可以看到通过对一个Tensor图(网)的配置, 然后由Session提交到Master运行。

 

 

但是Session都是要跑到具体的设备上面的。 所以和Session密切相关的是图Graph (下图左)和 设备Device(下图右)。 

 

 

但是,我们看到“TF Boy 之初筵 - 机器簇”  和 “TF Boy 之初筵 - 分布十三式” 里对Session的使用却截然不同。 

 

1)在分布式数学表达式计算中:

with tf.Session("grpc://localhost:2223") as sess:
result = sess.run(z)
print(result)

 2)在分布式MNIST例子里面:                                                           

if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)

sess = sv.prepare_or_wait_for_session(server_grpc_url,
config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

print("Worker %d: Session initialization complete." % FLAGS.task_index)

if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
   sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])

 

到底是怎样变化而来的呢?

 

 

Session的变化

 

从HelloWorld开始

 

Session是TF必不可少的部分, 就算HelloWorld里面就要定义一个tf.Session()。 然后提交运行sess.run( ... ) !

 

最原始的Session

 

最原始的会话有3部分, 初始化, 运行和关闭。 在HelloWorld里面, 由于程序结束, 一切都会Close, 所以把sess.close(),省略了。 

# Using the `close()` method.
sess = tf.
Session()
sess.run(...)
sess.close()

 

但是写close()的确不方便。 我们知道Python里面对省略close()常用的是一个with 上下文模块化。 

 

with后的Session

 

通过with上下文设置, 直接隐藏了close(), 而且方便了阅读!

# Using the context manager.
with tf.Session() as sess:
  sess.run(...)

 

这样就很容易使用session了。 

 

Session的两种使用方式

 

简单来说Session有两个运行接口:run( ) 和 eval( )

run(
    feed_dict=
None,
    session=
None
)
eval(
    feed_dict=
None,
    session=
None
)

它们分别属于不同的功能, 一个是跑一个Tensor图 tf.Operation.run , 一个是看看一个Tensor的值tf.Tensor.eval。

 

这两种方法都支持对默认Session的提交, 也就是说, 如果设置了默认Session, 那么这两个接口都不用输入session作为参数了,直接省略了。 这种设置有点像默认浏览器一样。 

 

默认Default Session

默认Session可以通过Session的函数 

as_default()

来设置, 一旦设置了, 也可以通过ft的接口直接获取。 

tf.get_default_session()

有了这个默认Session的设置, 使用就更方便了。 所以,tensor.eval()本质上就是 tf.get_default_session().run(tensor).

c = tf.constant(...)
sess = tf.
Session()
with sess.as_default():
 
print(c.eval())

每次都要手动设置默认很麻烦。 能不能直接声明成默认的呢?

 

交互Interactive Sessino

 

尤其在IPython被广泛使用下, 直接声明为默认的Session可以在IPthon里面随意使用eval() 接口, 随时查看Tensor的值, 这个还是蛮酷的。  这种情况下直接声明一个交互会话。 

 

sess = tf.InteractiveSession()
a = tf.constant(
5.0)
b = tf.constant(
6.0)
c = a * b
# We can just use 'c.eval()' without passing 'sess'
print(c.eval())

 

到现在为止, 我们理解了分布式算术表达式计算里面的Session使用。 但是分布式MNIST的例子还是不明白。 对的, 这得进入下一个境界, 封装。  TensorFlow受到Scikit-Learn超级好用的接口的抽象化的影响。  也开启了超级抽象化的流程。 但是抽象化是一个双刃剑, 好用是好用了, 但是看懂和修改就难喽, Caffe的作者贾帅哥就提出TF的抽象化太重了。 

 

 

 

 

 

Session的封装

 

Session除了变化外的进一步封装, 莫过于管理了, 对的。 这个管理就是对应到Session Manager。 不过,有点不一样的是, 这个Session Manager不是独立的, 而是合并在一个叫Supervisor的监管器中。  

 

监管器Supervisor

 

如果我们再看一下分布式MINIST的代码就能找到这个 tf.train.Supervisor。

if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)

Supervisor是对三个东西的封装, 一个是我们这里要说的SessionManager, 另外两个是Coordinator 协调器 和Savor 保存器。 Coordinator是对多线程运行进行协调的。 Savor是对运行的Summary进行保存,以后方便TensorBoard查看。   所以你看到Supervisor初始化的时候, 对日志地址,初始化, 本地初始化和全局计数器都很看重。 

 

这里专注于SessionManager的封装部分,Coordinator和Savor就提一下,不展开了。 

 

Supervisor提供了两个重要的接口来协助SessionManager:

managed_session ( ... ) 

prepare_or_wait_for_session ( ... ) 

 

异步接口 managed_session

 

所谓异步,主要是针对下面的同步来说的。 其实本质上就是取代默认的 with 上下文的Session。 这里不用初始化了, 直接从Supervisor获取它管理的那个会话。 或者说经由Supervisor来初始化Session。 

 

  ...create graph...
  my_train_op
= ...

  sv
= tf.train.Supervisor(logdir="/my/training/directory")
 
with sv.managed_session() as sess:
      sess
.run(my_train_op)

 

我们知道如果local模式,就是上面的写法, 如果要分布式模式的时候。 Session的master的协议接口就不能忽视了。 

 

  sv = Supervisor(logdir='/tmp/mydir')
 
# Get a TensorFlow session managed by the supervisor.
 
with sv.managed_session(FLAGS.master) as sess:
   
# Use the session to train the graph.
   
while not sv.should_stop():
      sess
.run(<my_train_op>)

 

我们发现, 上面这个managed_session还是和MNIST例子里面的不一样。 对的, 因为MNIST例子里用的是同步的优化器,所以需要建立同步的Session机制。 

 

 

同步接口 prepare_or_wait_for_session 

 

在同步的情况下, 多个Worker或者Device之间需要等待协同之后,才能建立起会话Session来。 

 

这样, 我们就能把前面MNIST分布式的例子对上去了。 

 

if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url,
config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
   sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])

从上面我们可以看到根据gRPC协议链接初始化sess后, 就可以使用了。 

 

 

使用 prepare_or_wait_for_session 注意

 

因为同步prepare的时候, 对于Chief Worker来说, 你可以写成with上下文形式:

with sv.prepare_or_wait_for_session(server.target) as sess:
'''
   # is chief
   if FLAGS.task_index == 0:
       sv.start_queue_runners(sess, [chief_queue_runner])
       sess.run(init_token_op)
   '''

 

但是对于非Chief的Worker, 需要等待的, 我们深入看一下 prepare_or_wait_for_session()的实现,就能清楚的看到这一点。 因此不太适合写成with block形式。 

"""
# For users who recreate the session with prepare_or_wait_for_session(), we
# need to clear the coordinator's stop_event so that threads managed by the
# coordinator can run.
self._coord.clear_stop()
if self._summary_writer:
self._summary_writer.reopen()

if self._is_chief:
sess = self._session_manager.prepare_session(
master, init_op=self.init_op, saver=self.saver,
checkpoint_dir=self._logdir, wait_for_checkpoint=wait_for_checkpoint,
max_wait_secs=max_wait_secs, config=config,
init_feed_dict=self._init_feed_dict, init_fn=self._init_fn)
self._write_graph()
if start_standard_services:
self.start_standard_services(sess)
else:
sess = self._session_manager.wait_for_session(master,
config=config,
max_wait_secs=max_wait_secs)
if start_standard_services:
self.start_queue_runners(sess)
return sess

 

因为python的with block模块, 自带调用python的特殊函数__enter__ and __exit__ 函数的! 但是非Chief的Worker需要等待的, 如果被with block调用了__exit__ 就无法完成后续的工作了。 这点是特别要注意的!!!

 

 

对比 prepare_or_wait_for_session 和 managed_session

 

如果,你深入细看managed_session的Python实现, 你会惊奇的发现, 它只是对prepare_or_wait_for_session的一个调用。 如下所示:

 

try:
sess = self.prepare_or_wait_for_session(
master=master, config=config,
start_standard_services=start_standard_services)
yield sess
except Exception as e:
self.request_stop(e)
finally:

 

你会大叫, 我是不是被忽悠了, 明明managed_session就是prepare_or_wait_for_session, 错! 你再仔细看看,它返回用的不是return sess, 而是yield sess。  这就是Python另外一个神奇的关键词。 经常被用来做很高明的处理。   尤其在异步的世界里面。 我一直认为:一个人的Python的水平高低,就看他用没有用过yield,用的溜不溜。 

 

譬如这里有个很简单的异步生成器的例子, 大家有兴趣可以研究一下, 这里就不深入展开了。 

 

import asyncio


async def async_generator():
    for i in range(3):
        await asyncio.sleep(1)
        yield i*i


async def main():
    async for i in async_generator():
        print(i)

 

所以, 异步是通过yield同步函数来实验的。 这就是managed_session和prepare_or_wait_for_session他们之间的差异!

 

我们看完了Supervisor对Session的封装, 再稍微提一下SessionManager本身的功能!

 

SessionManager 会话管理器

 

SessionManager主要提供准备,等待和恢复Session的功能! 这样在循环iteration的时候,就可以加入检查点checkpoint,方便后续记录跟踪优化过程了。 

with tf.Graph().as_default():
   
...add operations to the graph...
 
# Create a SessionManager that will checkpoint the model in '/tmp/mydir'.
  sm
= SessionManager()
  sess
= sm.prepare_session(master, init_op, saver, checkpoint_dir)
 
# Use the session to train the graph.
 
while True:
    sess
.run(<my_train_op>)
with tf.Graph().as_default():
   
...add operations to the graph...
 
# Create a SessionManager that will wait for the model to become ready.
  sm
= SessionManager()
  sess
= sm.wait_for_session(master)
 
# Use the session to train the graph.
 
while True:
    sess
.run(<my_train_op>)

 

所以, Session管理的功能很重要的还是为了增加检查点, 方便保存日志, 跟踪。  这也是为什么Supervisor要把SessionManager 和 Savor 、 Coordinator封装在一起。 他们都是围绕检查点checkpoint为中心的。 

 

小结:

这里我们详细的解释了Session的变化和封装。 尤其提醒注意了 Session的同步异步初始化, prepare_or_wait_for_session的使用。 跟这个有关, 顺便解释了: 有两个Python的关键词with 和 yield的神奇。  

 

 

 

 

相关话题: