はじめに
ニフティ株式会社 会員システムグループの深田です。
ニフティではブカツ制度という社内交流の活動支援制度があり、最近会社でも健康志向が高まってきているので、筋トレ部なるものを発足しようかと考えています。
早速ですが、みなさんは Python でマルチスレッド(以下、並行処理)を実装されていますか?
Python で並行処理を実装する方法はいくつか挙げられますが、ここでは concurrent.futures モジュールの ThreadPoolExecutor について紹介していきたいと思います。
ThreadPoolExecutorとは
ThreadPoolExecutor は Python の concurrent.futures モジュールに含まれるスレッドプールを実装するクラスであり、並行処理を効率化するために使用されます。スレッドプールは、複数のスレッドを作成し、再利用することで、タスクの実行を効率化することができます。
ProcessPoolExecutorとの違い
concurrent.futures モジュールには Executor のサブクラスとして ThreadPoolExecutor の他に、もう一つ ProcessPoolExecutor というクラスが存在ます。
ProcessPoolExecutor はプロセスを利用して並列処理(マルチプロセス)を行うため、複数のプロセスを実行します。主にCPU負荷の高いタスクに適しています。
一方、ThreadPoolExecutor はスレッドを利用して並行処理(マルチスレッド)を行うため、同一のプロセス内で複数のスレッドを実行します。メモリ使用量も比較的少ないため、I/OバウンドのタスクやCPU負荷の低いタスクなど、主にI/O待ちの時間が長いタスクに適しています。
Pythonでの並列処理・並行処理
Pythonで並列処理や並行処理を実装したい場合、主に以下が挙げられます。
- threadingモジュール
- multiprocessingモジュール
- concurrent.futuresモジュール
- asyncioモジュール
- サードパーティのライブラリ
今回は「3. concurrent.futuresモジュール」のクラスである ThreadPoolExecutor について解説しますが、特徴の1つとして「1. threadingモジュール」をベースにしているという点にあります。concurrent.futures モジュールは threading モジュールを抽象化して提供し、スレッドやプロセスの選択を自動的に行っています。
したがって、concurrent.futures モジュールを使用することで、パフォーマンスやリソース利用の効率化を図った並列処理・並行処理を実現することができます。
しかし、場合によっては直接 threading モジュールを使用することもあるので、どちらを選ぶかは具体的な要件次第になります。
ThreadPoolExecutorの基本的な使い方
ThreadPoolExecutor を使用するには、まず concurrent.futures モジュールをインポートします。次に、ThreadPoolExecutor クラスのインスタンスを作成し、引数にスレッド数を指定します。スレッド数は、同時に実行されるスレッドの最大数を表します。
1 2 3 4 5 6 7 |
from concurrent.futures import ThreadPoolExecutor # スレッド数を指定してThreadPoolExecutorのインスタンスを作成 with ThreadPoolExecutor(max_workers=5) as executor: # タスクを実行する関数をsubmitメソッドでスレッドプールに登録 future = executor.submit(my_function, arg1, arg2, ...) |
submit メソッドには、実行したいタスクを関数として渡し、必要に応じて引数を指定することができます。 submit メソッドが実行されると、 my_function メソッド内の実行を待たずに次の行に進み、非同期に処理が進行します。
コールバック関数を使用した結果の取得
submit メソッドは、concurrent.futures.Future オブジェクトを返します。このオブジェクトを使用して、タスクの実行結果を取得したり、タスクの完了を待ったりすることができます。
1 2 3 4 5 6 7 8 9 10 11 12 |
from concurrent.futures import ThreadPoolExecutor # スレッド数を指定してThreadPoolExecutorのインスタンスを作成 with ThreadPoolExecutor(max_workers=5) as executor: # タスクを実行する関数をsubmitメソッドでスレッドプールに登録 future = executor.submit(my_function, arg1, arg2, ...) # コールバック関数を使用して結果を取得 future.add_done_callback(my_callback_function) def my_callback_function(future): result = future.result() # タスクの実行結果を取得 |
add_done_callback メソッドを使用して、コールバック関数を登録することができます。コールバック関数は、タスクの実行が完了した際に自動的に呼び出され、タスクの結果を受け取ることができます。
例外のハンドリング
ThreadPoolExecutor を使用して並行処理を行う際には、例外のハンドリングに注意が必要です。タスクの実行中に例外が発生した場合には、その例外を処理する必要があります。ThreadPoolExecutor では、各タスクの実行結果を保持する Future オブジェクトに例外が発生した場合には、その例外を含んだ状態で完了します。これを受け取るためには、 add_done_callback メソッドで登録したコールバック関数の中で、Future オブジェクトの result メソッドを呼び出すことで、例外を取得することができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from concurrent.futures import ThreadPoolExecutor # スレッド数を指定してThreadPoolExecutorのインスタンスを作成 with ThreadPoolExecutor(max_workers=5) as executor: # タスクを実行する関数をsubmitメソッドでスレッドプールに登録 future = executor.submit(my_function, arg1, arg2, ...) # コールバック関数を使用して結果を取得 future.add_done_callback(my_callback_function) # コールバック関数の中で例外をハンドリング def my_callback_function(future): try: result = future.result() # タスクの実行結果を取得 # タスクの結果を処理 except Exception as e: # 例外をハンドリング print(f"例外が発生しました: {e}") |
実践的な使い方
先ほど紹介した add_done_callback メソッドで完了した並行タスクを検知することができますが、 as_completed メソッドでも並行タスクの完了を検知することができます。 as_completed は、複数のタスクが同時に実行されている場合に、完了したタスクのイテレータを返します。つまり、各タスクが完了したときに取得できるため、コールバック関数を設定する必要がありません。
以下はHTTPリクエストを伴う大量の短時間タスクのバッチ処理を想定したサンプルコードです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed # タスクを実行する関数(例として、指定したミリ秒数応答が返ってこないHTTPリクエスト) def long_task(milli_second: int) -> tuple[str, int]: with urllib.request.urlopen(f"https://httpstat.us/200?sleep={milli_second}") as response: body = response.read().decode() return body, milli_second # ThreadPoolExecutorを作成 executor = ThreadPoolExecutor() # タスクをリストに格納 tasks = [6000, 2000, 7000, 13000] # タスクを並行して実行 futures = [] for task in tasks: future = executor.submit(long_task, milli_second=task) futures.append(future) # 結果をas_completedメソッドで反復処理 for future in as_completed(futures): # タスクの完了を検出したら結果を取得 result = future.result() print(f"Task completed: {result}") # 出力例: # Task completed: ('200 OK', 2000) # Task completed: ('200 OK', 6000) # Task completed: ('200 OK', 7000) # Task completed: ('200 OK', 13000) |
add_done_callback は、個々のタスクごとに結果を処理するために使用され、 as_completed は、複数のタスクを並行で実行し、それぞれのタスクが完了したときに結果を取得するために使用されます。 要件によって使い分けましょう。
スレッドプールの最適なスレッド数の設定
ThreadPoolExecutor の引数である max_workers には、スレッドプール内のスレッド数を指定します。この値の設定には注意が必要であり、適切な値を選ぶことが性能の向上に繋がります。スレッド数を多くしすぎると、スレッドの切り替えや同期によるオーバーヘッドが増え、逆にパフォーマンスが低下する可能性があります。一方、スレッド数を少なくしすぎると、スレッドプールが効果的に利用されず、処理能力が十分に引き出されない可能性があります。
最適なスレッド数は、実行するタスクの性質や環境によって異なります。I/Oバウンドなタスクの場合には、より多くのスレッド数を許容することができます。ただし、前述したようにスレッド数を無制限に増やすと、オーバーヘッドが発生する場合があります。リソースの制約やタスクの性質に応じて、適切なスレッド数を見極める必要があります。実際のシナリオでのベンチマークなどを通じて最適なスレッド数を見つけることが重要です。
まとめ
ThreadPoolExecutor は、Python の concurrent.futures モジュールを使用して、スレッドプールを作成するための便利なクラスです。スレッドプールを使用することで、複数のタスクを並行して実行し、処理を効率化することができます。
ニフティではメールアカウントの作成をはじめとした、メールに関わる多種多様な設定変更をこの技術を用いて日々大量に処理しています。
ぜひ、必要な場面で活用していきましょう!