使用 Jupyter Notebook 实现并行任务队列
本文将介绍如何在 Jupyter Notebook 中实现一个简单的并行任务队列,它允许你在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这个实现方案的核心是使用 concurrent.futures.ThreadPoolExecutor 来管理线程池,以及使用 ipywidgets.Output 来捕获和显示子线程的输出。
import sys import asyncio import concurrent.futures import ipywidgets threadpool = concurrent.futures.ThreadPoolExecutor(4) def run(fn, *args, **kwds): "run fn in threadpool" out = ipywidgets.Output() def print(*args, file=sys.stdout): line = ' '.join(map(str, args)) + 'n' if file is sys.stderr: out.append_stderr(line) else: out.append_stdout(line) def done(fut: asyncio.Future): try: result = fut.result() except asyncio.CancelledError: print("cancelled", fut, file=sys.stderr) except Exception: print("failed", fut, file=sys.stderr) else: print("completed", fut) async def go(): loop = asyncio.get_running_loop() return await loop.run_in_executor( threadpool, lambda: fn(print, *args, **kwds), ) task = asyncio.create_task(go()) task.add_done_callback(done) return out
上述代码定义了一个 run 函数,它接受一个函数 fn 和任意数量的参数 *args 和关键字参数 **kwds。run 函数的主要功能是:
- 创建一个 ipywidgets.Output 对象 out: 这个对象用于捕获和显示在线程中执行的函数的输出。
- 定义一个自定义的 print 函数: 这个函数将输出重定向到 out 对象。ipywidgets.Output 似乎依赖于 asyncio 的一些机制,才能正确地将 “real” print 输出到正确的位置。
- 定义一个 done 回调函数: 这个函数在线程中的函数执行完成后被调用,用于处理结果或异常。
- 定义一个异步函数 go: 这个函数使用 asyncio.loop.run_in_executor 在线程池中执行传入的函数 fn。
- 创建一个 asyncio 任务 task: 这个任务执行异步函数 go。
- 添加 done 回调函数到 task: 当任务完成时,done 函数会被调用。
- 返回 out 对象: 这个对象可以在 Notebook 中显示线程的输出。
使用示例
以下是一个使用 run 函数的示例:
import time def cpu_bound(print, dt, fail=False): for i in range(10): time.sleep(dt) print(i, time.time()) if fail: 1 / 0 return "done" run(cpu_bound, 0.1)
在这个例子中,cpu_bound 函数模拟一个耗时的 CPU 密集型任务。它接受一个 print 函数,一个时间间隔 dt,以及一个可选的 fail 参数。该函数循环 10 次,每次休眠 dt 秒,并使用传入的 print 函数打印当前迭代次数和时间戳。如果 fail 参数为 True,则会引发一个 ZeroDivisionError 异常。
通过调用 run(cpu_bound, 0.1),cpu_bound 函数将在后台线程中执行,而不会阻塞 Notebook。输出将显示在与 run 函数调用相关的 ipywidgets.Output 对象中。
处理异常
run 函数还能够处理在线程中发生的异常。例如:
run(cpu_bound, 0.5, fail=True)
在这个例子中,cpu_bound 函数将引发一个 ZeroDivisionError 异常。done 回调函数将捕获这个异常,并将其输出到 ipywidgets.Output 对象中。
注意事项
- concurrent.futures.ThreadPoolExecutor 使用线程,适合于 I/O 密集型任务。对于 CPU 密集型任务,可以考虑使用 concurrent.futures.ProcessPoolExecutor 来利用多核 CPU。
- 在线程中执行的函数必须是可序列化的。这意味着它们不能依赖于在主线程中定义的局部变量或对象。
- 使用 ipywidgets.Output 可以方便地捕获和显示线程的输出,但它可能会引入一些性能开销。
总结
本文介绍了一种在 Jupyter Notebook 中实现并行任务队列的简单方法。通过使用 concurrent.futures.ThreadPoolExecutor 和 ipywidgets.Output,你可以在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这种方法对于交互式数据分析和原型设计非常有用。
评论(已关闭)
评论已关闭