Asyncのまとめ
python asyncPythonのasyncioを使いたくて、ドキュメントを読んだので大事そうなところをメモ。
Coroutines
coroutine。普通は、async def
を使って定義される。coroutineを実行するには、3つの方法がある。
asyncio.run
: トップレベルのcoroutineを呼び出す。プログラムのエントリポイントとして1回だけ呼ばれるのが普通の使い方await
するasyncio.create_task
: coroutineがTask
として実行される
ドキュメントでは、coroutineを
async def
で定義される関数async def
で定義される関数の返り値
のどちらの意味でも使うと書いてある。区別するに、前者はcoroutine function, 後者はcoroutine objectと呼ばれる。
Aawaitables
await
することができるオブジェクト。
- coroutine
- Task
- Future
の3種類がある。Futureは、コールバック方式のasync,awaitと共存するために必要なんだそうな・・
Creating Tasks
Taskを作るにはasyncio.create_task
にcoroutineを渡す。await
するのと違い、この呼び出しで実行はブロックされることはなく、即座に次の行の処理に移行することに注意。
Running Tasks Concurrently
複数のawaitableを同時に実行するには、asyncio.gather
に、awaitableを渡す。
asyncio.gather([a1, a2, a3])
ではなくて
asyncio.gather(a1, a2, a3)
なので注意。
Running in Threads
普通の関数をawaitするには、asyncio.to_thread
に関数を渡す。
Queues
async/awaitと合わせて使うためのqueueが用意されている。
例えば、
hoge.com/users/X/follower
にアクセスして、ユーザーX
のフォロワー一覧を取得する。- 各フォロワー
Y
に対してhoge.com/users/Y/favorites
にアクセスして、お気に入り一覧を取得する。
という処理を、複数のX
に対して実行したい場合、
async def get_followers(x):
response = await ... # get url of follwers for x
followers = parse_response(response)
return follwers
async def get_favorites(y):
response = await ... # get url of favorites for y
favorites = parse_response(response)
return favorites
async def main(xs)
yss = await asyncio.gather(*[ # ・・・ (A)
get_followers(x) for x in xs
])
fss = await asyncio.gather(*[
get_favorites(y) for ys in yss for y in ys
])
のように書くと、全てのx
に対するリクエストが終わるまで(A)
の部分で処理がブロックされる。
本当なら、1つのx
に対するリクエストが終わったらすぐに、そいつのフォロワーを集めるリクエストを開始した方が効率が良い。
こういうときに、queueを上手く使うと思った処理が実現できる。 上の例とは別の人工的な処理だが、ドキュメントの例を見本として引用しておく。
次の囲みはドキュメントからの引用
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
ところで、上のworker
では、queue.get()
してから、queue.task_done()
するまでの間に例外が起こると、queueの中身が減らないためにqueue.join()
が永遠に解決されない。
なので、次のようにした方が良い。
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
try:
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
print(f'{name} has slept for {sleep_for:.2f} seconds')
except Exception as e:
# 例外を処理する
finally:
queue.task_done()