boxmoe_header_banner_img

Hello! 欢迎来到悠悠畅享网!

文章导读

使用 Jupyter Notebook 实现并行任务队列


avatar
站长 2025年8月17日 1

使用 Jupyter Notebook 实现并行任务队列

使用 Jupyter Notebook 实现并行任务队列

本文将介绍如何在 Jupyter Notebook 中实现一个简单的并行任务队列,它允许你在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这个实现方案的核心是使用 concurrent.futures.ThreadPoolExecutor 来管理线程池,以及使用 ipywidgets.Output 来捕获和显示子线程的输出。

import sys import asyncio import concurrent.futures  import ipywidgets  threadpool = concurrent.futures.ThreadPoolExecutor(4)  def run(fn, *args, **kwds):     "run fn in threadpool"     out = ipywidgets.Output()      def print(*args, file=sys.stdout):         line = ' '.join(map(str, args)) + 'n'         if file is sys.stderr:             out.append_stderr(line)         else:             out.append_stdout(line)      def done(fut: asyncio.Future):         try:             result = fut.result()         except asyncio.CancelledError:             print("cancelled", fut, file=sys.stderr)         except Exception:             print("failed", fut, file=sys.stderr)         else:             print("completed", fut)      async def go():         loop = asyncio.get_running_loop()         return await loop.run_in_executor(             threadpool,             lambda: fn(print, *args, **kwds),         )      task = asyncio.create_task(go())     task.add_done_callback(done)      return out

上述代码定义了一个 run 函数,它接受一个函数 fn 和任意数量的参数 *args 和关键字参数 **kwds。run 函数的主要功能是:

  1. 创建一个 ipywidgets.Output 对象 out: 这个对象用于捕获和显示在线程中执行的函数的输出。
  2. 定义一个自定义的 print 函数: 这个函数将输出重定向到 out 对象。ipywidgets.Output 似乎依赖于 asyncio 的一些机制,才能正确地将 “real” print 输出到正确的位置。
  3. 定义一个 done 回调函数: 这个函数在线程中的函数执行完成后被调用,用于处理结果或异常。
  4. 定义一个异步函数 go: 这个函数使用 asyncio.loop.run_in_executor 在线程池中执行传入的函数 fn。
  5. 创建一个 asyncio 任务 task: 这个任务执行异步函数 go。
  6. 添加 done 回调函数到 task: 当任务完成时,done 函数会被调用。
  7. 返回 out 对象: 这个对象可以在 Notebook 中显示线程的输出。

使用示例

以下是一个使用 run 函数的示例:

import time  def cpu_bound(print, dt, fail=False):     for i in range(10):         time.sleep(dt)         print(i, time.time())     if fail:         1 / 0     return "done"  run(cpu_bound, 0.1)

在这个例子中,cpu_bound 函数模拟一个耗时的 CPU 密集型任务。它接受一个 print 函数,一个时间间隔 dt,以及一个可选的 fail 参数。该函数循环 10 次,每次休眠 dt 秒,并使用传入的 print 函数打印当前迭代次数和时间戳。如果 fail 参数为 True,则会引发一个 ZeroDivisionError 异常。

通过调用 run(cpu_bound, 0.1),cpu_bound 函数将在后台线程中执行,而不会阻塞 Notebook。输出将显示在与 run 函数调用相关的 ipywidgets.Output 对象中。

处理异常

run 函数还能够处理在线程中发生的异常。例如:

run(cpu_bound, 0.5, fail=True)

在这个例子中,cpu_bound 函数将引发一个 ZeroDivisionError 异常。done 回调函数将捕获这个异常,并将其输出到 ipywidgets.Output 对象中。

注意事项

  • concurrent.futures.ThreadPoolExecutor 使用线程,适合于 I/O 密集型任务。对于 CPU 密集型任务,可以考虑使用 concurrent.futures.ProcessPoolExecutor 来利用多核 CPU。
  • 在线程中执行的函数必须是可序列化的。这意味着它们不能依赖于在主线程中定义的局部变量或对象。
  • 使用 ipywidgets.Output 可以方便地捕获和显示线程的输出,但它可能会引入一些性能开销。

总结

本文介绍了一种在 Jupyter Notebook 中实现并行任务队列的简单方法。通过使用 concurrent.futures.ThreadPoolExecutor 和 ipywidgets.Output,你可以在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这种方法对于交互式数据分析和原型设计非常有用。



评论(已关闭)

评论已关闭