注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[轻书快读] Effective Python - 59 Specific Ways to Write Better Python (5)  

2016-03-21 09:47:04|  分类: lang_python |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
Item 39: Use Queue to Coordinate Work Between Threads

多个 thread 组成的 pipeline,需要一种 message-passing 的通讯机制。
ps. 本例子只适用于 python3,python2 中没有 queue.Queue

假设业务场景是,对一些图片顺序进行 download、resize、upload 行为。我们试着来模拟一下:
from time import sleep
from collections import deque
from threading import Thread, Lock

class MyQueue(object):
  def __init__(self):
    self.items = deque()
    self.lock = Lock()

  def put(self, item):
    with self.lock:
      self.items.append(item)

  def get(self):
    with self.lock:
      return self.items.popleft()

class Worker(Thread):
  def __init__(self, func, in_queue, out_queue):
    super().__init__()
    self.func         = func
    self.in_queue     = in_queue
    self.out_queue    = out_queue
    self.polled_count = 0
    self.work_done    = 0

  def run(self):
    while True:
      self.polled_count += 1
      try:
        item = self.in_queue.get()
      except IndexError:
        sleep(0.001)
      else:
        result = self.func(item)
        self.out_queue.put(result)
        self.work_done += 1

def download(item):
    sleep(0.005)

def resize(item):
  sleep(0.005)

def upload(item):
  sleep(0.005)

download_queue = MyQueue()
resize_queue   = MyQueue()
upload_queue   = MyQueue()
done_queue     = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

for thread in threads:
  thread.start()
for _ in range(1000):
  download_queue.put(object())

while len(done_queue.items) < 1000:
  sleep(0.1)

processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling', polled, 'times')

>>>
Processed 1000 items after polling 3226 times

MyQueue() 是 busy-wait,比较消耗 CPU。更好的做法是通过 mutex + condition 来通知一下,而不是依赖 IndexError。
并且上面的程序不会正常退出,几个 Worker thread 一直在跑。

Python 3 有个线程间 queue 的实现。我们来看看 queue.Queue 能干啥:

自带 notify 功能,不需要 busy-wait。
from queue import Queue
queue = Queue()

def consumer():
  print('Consumer waiting')
  queue.get() # Runs after put() below
  print('Consumer done')

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
queue.put(object()) # Runs before get() above
thread.join()
print('Producer done')

>>>
Consumer waiting
Producer putting
Consumer done
Producer done

还可以指定 max items 的个数
queue = Queue(1)     # Buffer size of 1

def consumer():
  time.sleep(0.1)    # Wait
  queue.get()        # Runs second
  print('Consumer got 1')
  queue.get()        # Runs fourth
  print('Consumer got 2')

thread = Thread(target=consumer)
thread.start()

queue.put(object())  # Runs first
print('Producer put 1')
queue.put(object())  # Runs third
print('Producer put 2')
thread.join()
print('Producer done')

>>>
Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done

每完成一次任务,调用一次 task_done()。如果 queue 中任务数量 == 0,则会触发 queue.join() 的返回。
in_queue = Queue()

def consumer():
  print('Consumer waiting')
  work = in_queue.get()     # Done second
  print('Consumer working')
  # doing work ...
  print('Consumer done')
  in_queue.task_done()      # Done third

Thread(target=consumer).start()

in_queue.put(object())      # Done first
print('Producer waiting')
in_queue.join()             # Done fourth
print('Producer done')

>>>
Consumer waiting
Producer waiting
Consumer working
Consumer done
Producer done

来看看最早的例子,我们增加一个 closing 的功能。让 worker threads 优雅地退出。
from time import sleep
from queue import Queue
from threading import Thread, Lock

class ClosableQueue(Queue):
  SENTINEL = object()

  def close(self):
    self.put(self.SENTINEL)

  def __iter__(self):
    while True:
      item = self.get()
      try:
        if item is self.SENTINEL:
          return # Cause the thread to exit
        yield item
      finally:
        self.task_done()

class StoppableWorker(Thread):
  def __init__(self, func, in_queue, out_queue):
    super().__init__()
    self.func         = func
    self.in_queue     = in_queue
    self.out_queue    = out_queue

  def run(self):
    print('start...', self)
    for item in self.in_queue:
      result = self.func(item)
      self.out_queue.put(result)
    self.out_queue.close()
    print('stop...', self)

def download(item):
    sleep(0.005)

def resize(item):
  sleep(0.005)

def upload(item):
  sleep(0.005)

download_queue = ClosableQueue()
resize_queue   = ClosableQueue()
upload_queue   = ClosableQueue()
done_queue     = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
  thread.start()
for _ in range(1000):
  download_queue.put(object())
download_queue.close()
for thread in threads:
  thread.join()

processed = done_queue.qsize()
print('Processed', processed)

>>>
start... <StoppableWorker(Thread-1, started 5524)>
start... <StoppableWorker(Thread-2, started 5900)>
start... <StoppableWorker(Thread-3, started 7076)>
stop... <StoppableWorker(Thread-1, started 5524)>
stop... <StoppableWorker(Thread-2, started 5900)>
stop... <StoppableWorker(Thread-3, started 7076)>
Processed 1001


Item 40: Consider Coroutines to Run Many Functions Concurrently

看看 coroutine 长啥样:
def my_coroutine():
  while True:
    r = yield
    print 'Recv:', r

it = my_coroutine()
next(it)    # 让coroutine开始执行
it.send('First')
it.send('Second')

>>>
Recv: First
Recv: Second

再看看另一个例子:
def minimize():
  current = yield
  while True:
    value = yield current
    current = min(value, current)

it = minimize()
next(it)
print it.send(10)
print it.send(4)
print it.send(22)
print it.send(-1)

>>>
10
4
4
-1

Python 3 中有个 yield from:
def delegated():
  yield 1
  yield 2

def composed():
  yield 'A'
  yield from delegated()
  yield 'B'

print(list(composed()))

>>>>
['A', 1, 2, 'B']

在 Python 2 中没有,只能这样写:
def composed():
  yield 'A'
  for value in delegated():
    yield value
  yield 'B'

Python 3 在 couroutine 中可以 return:
def foo():
  yield 1
  return
  yield 2

print(list(foo()))

>>>
[1]

在 Python 2 中只能:
class MyReturn(Exception):
  pass

def foo():
  yield 1
  raise MyReturn()
  yield 2

def bar():
  try:
    for value in foo():
      yield value
  except MyReturn as e:
    pass

print list(bar())

>>>
[1]

coroutine 有啥用?用来做状态机不错的。:-)


Item 41: Consider concurrent.futures for True Parallelism

本条目是 Python 3 only 的。Python 2 没有 concurrent.futures。
concurrent.futures 是建立在 multiprocessing模块 之上,更高级别的业务封装。

----

看看下面计算最大公约数(gcd)的例子:
from time import time

def gcd(pair):
  a, b = pair
  low = min(a, b)
  for i in range(low, 0, -1):
    if a % i == 0 and b % i == 0:
        return i

numbers = [
    (1963309, 2265973),
    (2030677, 3814172),
    (1551645, 2229620),
    (2039045, 2020802),
]
start = time()
result = list(map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))

>>>
Took 1.982 seconds

----

改成线程池,基本没提升,且因为GIL的原因,性能应该下降:
from concurrent.futures import ThreadPoolExecutor

start = time()
pool = ThreadPoolExecutor(max_workers=2)
result = list(pool.map(gcd, numbers))
end = time()

>>>
Took 1.894 seconds

----

如果我们换成进程池:
pool = ProcessPoolExecutor(max_workers=2) # The one changed
result = list(pool.map(gcd, numbers))

>>>
Took 0.649 seconds

----

ProcessPoolExecutor 实际干了啥:
 <1> 将 numbers 中的数据丢给 pool.map
 <2> 用 pickle 序列化这些数据
 <3> 将序列化后的数据传递给子进程(*nix用domain socket?win32用pipe?我猜的)
 <4> 然后子进程 import 含有 gcd 函数的模块,并执行 gcd 函数
 <5> 将结果序列化,传回给父进程
 <6> 最后,父进程将这些数据组合起来,返回给 result


6. Built-in Modules

Item 42: Define Function Decorators with functools.wraps

先看看 decorator 怎么用:
def trace(func):
  def wrapper(*args, **kwargs):
    result = func(*args, **kwargs)
    print('%s(%r, %r) -> %r' % (func.__name__, args, kwargs, result))
    return result
  return wrapper

@trace
def fibonacci(n):
  if n in (0, 1):
    return n
  return (fibonacci(n-2) + fibonacci(n-1))

fibonacci(3)

>>>
fibonacci((1,), {}) -> 1
fibonacci((0,), {}) -> 0
fibonacci((1,), {}) -> 1
fibonacci((2,), {}) -> 1
fibonacci((3,), {}) -> 2

相当于每次调用 fibonacci() 的时候,等于调用了 trace(fibonacci())
但这样写 decorator 有个缺点,没法追踪到实际的函数了。
print(fibonacci)

>>>
<function trace.<locals>.wrapper at 0x107f7ed08>

help(fibonacci)

>>>
Help on function wrapper in module __main__:
wrapper(*args, **kwargs)

这时,functools.wraps 登场啦,可以解决 print 和 help 的问题:
from functools import wraps

def trace(func):
  @wraps
  def wrapper(*args, **kwargs):
    ...


Item 43: Consider contextlib and with Statements for Reusable try/finally Behavior

如下,with 和 try/finally 的效果是等价的。
lock = Lock()

with lock:
  print 'Lock is held'

lock.acquire()
try:
  print 'Lock is held'
finally:
  lock.release()

但 with 的写法,看起来简介很多嘛。通过 contextlib.contextmanager 这个
decorator,我们还可以很容易把一个 func 改造为一个可以 with 的对象。

@contextmanager
def locked(lock):
  lock.acquire()
  try:
    yield
  finally:
    lock.release()

with locked(myLock):
  # ...

再看一个例子,因为 logging 默认的 level 是 WARNING,debug() 的内容是不输出的。
import logging
def my_function():
  logging.debug('debug1')
  logging.error('error1')
  logging.debug('debug2')

>>>
ERROR:root:error1

@contextmanager
def debug_logging(level):
  logger = logging.getLogger()
  old_level = logger.getEffectiveLevel()
  logger.setLevel(level)
  try:
    yield
  finally:
    logger.setLevel(old_level)

with debug_logging(logging.DEBUG):
  print 'Inside:'
  my_function()
print 'After:'
my_function()

>>>
Inside:
DEBUG:root:debug1
ERROR:root:error1
DEBUG:root:debug2
After:
ERROR:root:error1

  评论这张
 
阅读(283)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017