Python 标准库中非常有用的装饰器,


众所周知,Python 语言灵活、简洁,对程序员友好,但在性能上有点不太令人满意,这一点通过一个递归的求斐波那契额函数就可以说明:

  1. def fib(n): 
  2.     if n <= 1: 
  3.         return n 
  4.     return fib(n - 1) + fib(n - 2) 

在我的 MBP 上计算 fib(40) 花费了 33 秒:

  1. import time 
  2.  
  3. def main(): 
  4.     start = time.time() 
  5.     result = fib(40) 
  6.     end = time.time() 
  7.     cost = end - start 
  8.     print(f"{result = } {cost = :.4f}") 
  9.  
  10. if __name__ == '__main__': 
  11.     main() 

但是,假如使用标准库中的这个装饰器,那结果完全不一样

  1. from functools import lru_cache 
  2.  
  3. @lru_cache 
  4. def fib(n): 
  5.     if n <= 1: 
  6.         return n 
  7.     return fib(n - 1) + fib(n - 2) 

这次的结果是 0 秒,你没看错,我保留了 4 位小数,后面的忽略了。

提升了多少倍?我已经计算不出来了。

为什么 lru_cache 装饰器这么牛逼,它到底做了什么事情?今天就来聊一聊这个最有用的装饰器。

如果看过计算机操作系统的话,你对 LRU 一定不会陌生,这就是著名的最近最久未使用缓存淘汰算法。

而 lru_cache 就是这个算法的具体实现。(这个算法可是面试经常考的哦,有的面试官要求现场手写代码)

现在,我们来看一个 lru_cache 的源代码,其中的英文注释,我已经为你翻译为中文:

  1. def lru_cache(maxsize=128, typed=False): 
  2.     """LRU 缓存装饰器 
  3.  
  4.     如果 *maxsize* 是 None, 将不会淘汰缓存,缓存大小也不做限制 
  5.  
  6.     如果 *typed* 是 True, 不同类型的参数将独立做缓存,比如 f(3.0) and f(3) 将认为是不同的函数调用而缓存在两个缓存节点上。 
  7.  
  8.     函数的参数必须可以被 hash 
  9.  
  10.     查看缓存信息使用的是命名元组 (hits, misses, maxsize, currsize) 
  11.     查看缓存信息:user_func.cache_info().  清理缓存信息:user_func.cache_clear(). 
  12.  
  13.     LRU 算法:  http://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU) 
  14.  
  15.     """ 
  16.  
  17.     # lru_cache 的内部实现是线程安全的 
  18.  
  19.     if isinstance(maxsize, int): 
  20.         # 负数转换为 0  
  21.         if maxsize < 0: 
  22.             maxsize = 0 
  23.     elif callable(maxsize) and isinstance(typed, bool): 
  24.         #如果被装饰的函数(user_function)直接通过 maxsize 参数传入  
  25.         user_function, maxsize = maxsize, 128 
  26.         wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) 
  27.         return update_wrapper(wrapper, user_function) 
  28.     elif maxsize is not None: 
  29.         raise TypeError( 
  30.             'Expected first argument to be an integer, a callable, or None') 
  31.  
  32.     def decorating_function(user_function): 
  33.         wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) 
  34.         return update_wrapper(wrapper, user_function) 
  35.  
  36.     return decorating_function 

这里面有两个参数,一个是 maxsize,表示缓存的大小,当传入负数时,自动设置为 0,如果不传入 maxsize,或者设置为 None,表示缓存没有大小限制,此时没有缓存淘汰。还有一个是 type,当 type 传入 True 时,不同的参数类型会当作不同的 key 存到缓存当中。

接下来,lru_cache 的核心在这个函数上 _lru_cache_wrapper,建议有感情的阅读、背诵并默写。我们来看下它的源代码

  1. def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo): 
  2.     # 所有 lru cache 实例共享的常量: 
  3.     sentinel = object()          # 用来表示缓存未命中的唯一对象 
  4.     make_key = _make_key         # build a key from the function arguments 
  5.     PREV, NEXT, KEY, RESULT = 0, 1, 2, 3   # names for the link fields 
  6.  
  7.     cache = {} 
  8.     hits = misses = 0 
  9.     full = False 
  10.     cache_get = cache.get    # 绑定函数来获取缓存中 key 的值 
  11.     cache_len = cache.__len__  # 绑定函数获取缓存大小 
  12.     lock = RLock()           # 因为链表上的更新是线程不安全的 
  13.     root = []                # 循环双向链表的根节点 
  14.     root[:] = [root, root, None, None]     # 初始化根节点的前后指针都指向它自己 
  15.  
  16.     if maxsize == 0: 
  17.  
  18.         def wrapper(*args, **kwds): 
  19.             # 没有缓存,仅更新统计信息 
  20.             nonlocal misses 
  21.             misses += 1 
  22.             result = user_function(*args, **kwds) 
  23.             return result 
  24.  
  25.     elif maxsize is None: 
  26.  
  27.         def wrapper(*args, **kwds): 
  28.             # 仅仅排序,不考虑排序和缓存大小限制 
  29.             nonlocal hits, misses 
  30.             key = make_key(args, kwds, typed) 
  31.             result = cache_get(key, sentinel) 
  32.             if result is not sentinel: 
  33.                 hits += 1 
  34.                 return result 
  35.             misses += 1 
  36.             result = user_function(*args, **kwds) 
  37.             cache[key] = result 
  38.             return result 
  39.  
  40.     else: 
  41.  
  42.         def wrapper(*args, **kwds): 
  43.             # 大小有限制,并跟踪最近使用的缓存 
  44.             nonlocal root, hits, misses, full 
  45.             key = make_key(args, kwds, typed) 
  46.             with lock: 
  47.                 link = cache_get(key) 
  48.                 if link is not None: 
  49.                     # 缓存命中,将命中的缓存移动到循环双向链表的头部 
  50.                     link_prev, link_next, _key, result = link 
  51.                     link_prev[NEXT] = link_next 
  52.                     link_next[PREV] = link_prev 
  53.                     last = root[PREV] 
  54.                     last[NEXT] = root[PREV] = link 
  55.                     link[PREV] = last 
  56.                     link[NEXT] = root 
  57.                     hits += 1 
  58.                     return result 
  59.                 misses += 1 
  60.             result = user_function(*args, **kwds) 
  61.             with lock: 
  62.                 if key in cache: 
  63.                     # 走到这里说明 key 已经放在了缓存,且锁已经释放了,链表已经更新了,这里什么也不需要做了,最后只需要返回计算的结果就可以了。 
  64.                     pass 
  65.                 elif full: 
  66.                     # 如果缓存满了, 使用最老的根节点来存储新节点就可以了,链表上不需要删除(是不是很聪明) 
  67.                     oldroot = root 
  68.                     oldroot[KEY] = key 
  69.                     oldroot[RESULT] = result 
  70.                     root = oldroot[NEXT] 
  71.                     oldkey = root[KEY] 
  72.                     oldresult = root[RESULT] 
  73.                     root[KEY] = root[RESULT] = None 
  74.                      
  75.                     # 最后,我们需要从缓存中清除这个 key,因为它已经无效了。 
  76.                     del cache[oldkey] 
  77.                     # 新值放入缓存 
  78.                     cache[key] = oldroot 
  79.                 else: 
  80.                     # 如果没有满,将新的结果放入循环双向链表的头部 
  81.                     last = root[PREV] 
  82.                     link = [last, root, key, result] 
  83.                     last[NEXT] = root[PREV] = cache[key] = link 
  84.                     # 使用 cache_len 绑定方法而不是 len() 函数,后者可能会被包装在 lru_cache 本身中 
  85.                     full = (cache_len() >= maxsize) 
  86.             return result 
  87.  
  88.     def cache_info(): 
  89.         """报告缓存统计信息""" 
  90.         with lock: 
  91.             return _CacheInfo(hits, misses, maxsize, cache_len()) 
  92.  
  93.     def cache_clear(): 
  94.         """清理缓存信息""" 
  95.         nonlocal hits, misses, full 
  96.         with lock: 
  97.             cache.clear() 
  98.             root[:] = [root, root, None, None] 
  99.             hits = misses = 0 
  100.             full = False 
  101.  
  102.     wrapper.cache_info = cache_info 
  103.     wrapper.cache_clear = cache_clear 
  104.     return wrapper 

如果我写的注释你都看明白了,那也不用看我下面的废话了,如果还有点不太明白,我啰嗦几句,也许你就明白了。

第一、所谓缓存,用的仍然是内存,为了快速存取,用的就是一个 hash 表,也就是 Python 的字典,都是在内存里的操作。

  1. cache = {} 

第二、如果 maxsize == 0,就相当于没有使用缓存,每调用一次,未命中数就 + 1,代码逻辑是这样的:

  1. def wrapper(*args, **kwds): 
  2.     nonlocal misses 
  3.     misses += 1 # 未命中数 
  4.     result = user_function(*args, **kwds) 
  5.     return result 

第三、如果 maxsize == None,相当于缓存无限制,也就不需要考虑淘汰,这个实现非常简单,我们直接在函数中用一个字典就可以实现,比如说:

  1. cache = {} 
  2. def fib(n): 
  3.  
  4.     if n in cache: 
  5.         return cache[n] 
  6.  
  7.     if n <= 1: 
  8.         return n 
  9.     result = fib(n - 1) + fib(n - 2) 
  10.     cache[n] = result 
  11.     return result 

运行时间:

理解了这一点,在装饰器中,这段逻辑就不难看懂:

  1. def wrapper(*args, **kwds): 
  2.     nonlocal hits, misses 
  3.     key = make_key(args, kwds, typed) 
  4.     result = cache_get(key, sentinel) 
  5.     if result is not sentinel: 
  6.         hits += 1 
  7.         return result 
  8.     misses += 1 
  9.     result = user_function(*args, **kwds) 
  10.     cache[key] = result 
  11.     return result 

第四、真正的缓存淘汰算法。

为了实现缓存(键值对)的淘汰,我们需要对缓存按时间进行排序,这就需要用到链表,链表的头部是最新插入的,尾部是最老插入的,当缓存数量已经达到最大值时,我们删除最久未使用的链尾节点,为了不删除链尾,我们可以使用循环链表,当缓存满了,直接更新链尾节点赋值为新节点,并把它做为新的链头就可以了。

当缓存命中时,我们需要把这个节点移动到链表的头部,保证链表的头部是最近经常使用的,为了移动方便,我们需要双向链表。

双向循环链表在 Python 中实现,可以简单的这么写:

  1. PREV, NEXT, KEY, RESULT = 0, 1, 2, 3   # names for the link fields 
  2. root = []                # root of the circular doubly linked list 
  3. root[:] = [root, root, None, None]     # initialize by pointing to self 

可能有些朋友看不懂最后那行代码:root[:] = [root, root, None, None],画个图你就理解了:

这些箭头指向的都是节点的内存地址,随着节点的增多,就是这个样子的:

对比这个图,再看源代码,就很容易看懂了。尤其是这块的代码逻辑,是面试常考的重点,如果你能手写出这样线程安全的 LRU 缓存淘汰算法,那无疑是非常优秀的。

其他 LRU 算法的实现

其他关于 LRU 算法的实现,我自己写了两个,可以看这里:

LRU 缓存淘汰算法-双链表+hash 表[1]

LRU 缓存淘汰算法-Python 有序字典[2]

最后的话

装饰器 lru_cache 的作用就是把函数的计算机结果保存下来,下次用的时候可以直接从 hash 表中取出,避免重复计算从而提升效率,简单点的,直接在函数中使用个字典就搞定了,复杂点的,请看 lru_cache 的代码实现。另一方面,递归函数慢的一个主要原因就是重复计算。

Python 标准库的源码,是学习编程最有营养的原料,当你有好奇心时,不妨去窥探一下源码,相信你有定会有新的收获。今天的分享就到这里,如果有收获的话,请点赞、在看、转发、关注,感谢你的支持。

参考资料

[1]

LRU 缓存淘汰算法-双链表+hash 表: https://github.com/somenzz/geekbang/blob/master/algorthms/lru_use_link_table.py

[2]

LRU 缓存淘汰算法-Python 有序字典: https://github.com/somenzz/geekbang/blob/master/algorthms/lru_use_ordered_dict.py

评论关闭