Asynchronous Operation¶
Dask.distributed can operate as a fully asynchronous framework and so interoperate with other highly concurrent applications. Internally Dask is built on top of Tornado coroutines but also has a compatibility layer for asyncio (see below).
Basic Operation¶
When starting a client provide the asynchronous=True
keyword to tell Dask
that you intend to use this client within an asynchronous context.
client = await Client(asynchronous=True)
Operations that used to block now provide Tornado coroutines on which you can
await
.
Fast functions that only submit work remain fast and don’t need to be awaited.
This includes all functions that submit work to the cluster, like submit
,
map
, compute
, and persist
.
future = client.submit(lambda x: x + 1, 10)
You can await futures directly
result = await future
>>> print(result)
11
Or you can use the normal client methods. Any operation that waited until it
received information from the scheduler should now be await
‘ed.
result = await client.gather(future)
If you want to reuse the same client in asynchronous and synchronous
environments you can apply the asynchronous=True
keyword at each method
call.
client = Client() # normal blocking client
async def f():
futures = client.map(func, L)
results = await client.gather(futures, asynchronous=True)
return results
AsyncIO¶
If you prefer to use the Asyncio event loop over the Tornado event loop you
should use the AioClient
.
from distributed.asyncio import AioClient
client = await AioClient()
All other operations remain the same:
future = client.submit(lambda x: x + 1, 10)
result = await future
# or
result = await client.gather(future)
Python 2 Compatibility¶
Everything here works with Python 2 if you replace await
with yield
.
See more extensive comparison in the example below.
Example¶
This self-contained example starts an asynchronous client, submits a trivial job, waits on the result, and then shuts down the client. You can see implementations for Python 2 and 3 and for Asyncio and Tornado.
Python 3 with Tornado¶
from dask.distributed import Client
async def f():
client = await Client(asynchronous=True)
future = client.submit(lambda x: x + 1, 10)
result = await future
await client.close()
return result
from tornado.ioloop import IOLoop
IOLoop().run_sync(f)
Python 2/3 with Tornado¶
from dask.distributed import Client
from tornado import gen
@gen.coroutine
def f():
client = yield Client(asynchronous=True)
future = client.submit(lambda x: x + 1, 10)
result = yield future
yield client.close()
raise gen.Result(result)
from tornado.ioloop import IOLoop
IOLoop().run_sync(f)
Python 3 with Asyncio¶
from distributed.asyncio import AioClient
async def f():
client = await AioClient()
future = client.submit(lambda x: x + 1, 10)
result = await future
await client.close()
return result
from asyncio import get_event_loop
get_event_loop().run_until_complete(f())
Use Cases¶
Historically this has been used in a few kinds of applications:
- To integrate Dask into other asynchronous services (such as web backends), supplying a computational engine similar to Celery, but while still maintaining a high degree of concurrency and not blocking needlessly.
- For computations that change or update state very rapidly, such as is common in some advanced machine learning workloads.
- To develop the internals of Dask’s distributed infrastucture, which is written entirely in this style.
- For complex control and data structures in advanced applications.