Using Coiled with Dash and Dask for Scalability

The previous chapter covered using Dask with your Dash app

Coiled provides cloud-based clusters for Dask users that can be scaled on-demand. Accordingly, consider adapting your Dash app with Dask to use Coiled at the back end where additional computing resources are required or where scalability is important.

Here, we provide an overview to help connect a Dash app to a Coiled cluster, as well as a few suggestions.

We recommend reading through Coiled’s getting started guide.

Credentials

If one has not been set up, go to Coiled to sign up for new credentials. At the time of writing, Coiled offers a free tier with 1000 CPU hours per month, which can be upgraded (pricing page) for heavier usage.

As with many other cloud-based compute services, Coiled’s pricing is currently based on hourly usage. Therefore we advise provisioning the cluster with an automatic timed shutdown mechanism in case of non-use, while leaving the Dash app up. We also recommend that you provision an easy way to re-start the cluster from the Dash app, such as via a button, as the Coiled cluster may be shut down following a period of activity.

We provide example implementations of this concept later on.

Authentication

If shell access is available to the device hosting the Dash server, run the command coiled login and follow the prompts to save the credentials for future use.

Alternatively, you can save the authentication token (which can be found in the profile page) as a file or an environment variable, and run the following script in Python. This script could be run when starting the Dash app.

import os
coiled_token = os.environ['COILED_TOKEN']
os.system(f"coiled login --token {coiled_token}")

Setup

Coiled’s remote cluster must be provided with the appropriate software environment, as well as the requisite data. Here are our suggestions on how.

Software Environment

The remote cluster’s software environment can be specified with any one of conda, pip or Docker images. Run:

coiled.create_software_environment(
    name="my-software-env",
    conda="environment.yml"
)

or:

coiled.create_software_environment(
    name="my-software-env",
    pip="requirements.txt"
)

or:

coiled.create_software_environment(
    name="my-docker-env",
    container="path_to_container",
)

as appropriate to create a public Docker image at Coiled with the given name (“my-software-env” or “my-docker-env” in the above examples). The name must be unique and lowercase. For a private environment, add the argument private=True.

You can reuse an environment by specifying the name when creating a cluster, as will be shown below.

The local and remote software environments do not need to be identical. For example, the cluster at Coiled will likely have no use for the dash library, while the host device for the Dash app will likely not require libraries such as s3fs or gcsfs to load the requisite data.

Nonetheless, we recommend that you keep the two environments as identical as possible. This will reduce the risks of unexpected behaviours that may occur due to library version mismatches, or libraries missing at one end but not the other.

Read more about Coiled’s software environments here.

Cluster Initialization

You can initialize a remote cluster as succinctly as the below example:

cluster = coiled.Cluster(n_workers=1)

However, more realistically the command will include more parameters.

We provide one example below, including a reference to the software environment created above:

cluster = coiled.Cluster(
    name="my-coiled-cluster",
    software="my-software-env",
    n_workers=2,
    worker_cpu=2,
    worker_memory="8 GiB",
    shutdown_on_close=False,
    scheduler_options={"idle_timeout": "1 hour"}
)

Following this, you can instantiate a Dask client object using client = Client(cluster) which is the primary entry point for interacting with the cluster (read more).

Further details on creating clusters can be found here.

Data

To provide data to the remote cluster, you can leverage Dask’s built-in capabilities for loading remote data (read more).

Dask is able to read data from popular services such as Amazon S3, Google Cloud Storage or Microsoft Azure Storage, as well as over HTTP(S).

For instance, you can load a parquet file stored publicly in an Amazon S3 bucket using:

df = dd.read_parquet(
    's3://bucket/path',
    storage_options={'anon': True, 'use_ssl': False}
)

Note: To connect to Amazon S3, Google Cloud Storage or Microsoft Azure Storage and read data, the underlying libraries to access the service (s3fs, gcsfs, or adlfs respectively) must be included in the remote software environment.

Other protocols such as SSH, FTP, WebHDFS and Dropbox are available through fsspec (usage), although not directly available through Dask.

External Authentication

Authentication must be provided to access resources that are not publicly or anonymously available. There are a number of ways this may be achieved.

coiled.set_backend_options(
    backend="aws",
    aws_access_key_id="#-your-access-key-ID#",
    aws_secret_access_key="######-your-aws-secret-access-key-######",
)

If adopting this method, follow Coiled’s guidance here to ensure that only limited access is provided to AWS resources (e.g. S3).

We DO NOT RECOMMEND providing access credentials as environment variables when creating software environments. Environment variables are not encrypted, so no sensitive information should be stored this way.

Timeouts & Restart

Timeouts

Coiled’s pricing system is based on the time length of cluster usage. Consequently, you may wish to ensure that any unused clusters are shut down rather than left running so that they do not unnecessarily incur costs or use credit.

On the other hand, it would be undesirable for the Coiled cluster to shut down immediately after completing a task. Such an arrangement will not be suitable for a Dash app where its user(s) may request multiple, discrete, evaluations as they use the app.

To balance these competing requirements, we recommend passing on arguments shutdown_on_close=False as well as scheduler_options={"idle_timeout": “n hours”} when initializing a cluster as you saw in the example code above for starting a cluster.

This will ensure that the cluster is not prematurely shut down upon completion of an evaluation, while avoiding large chunks of unnecessary uptime.

Restart

With separated front end (Dash) and back end (Coiled) architecture, it may be possible for the remote cluster to be down while the Dash app remains running. For example, the cluster may have shut down after exceeding the maximum "idle_timeout" time specified above.

Restarting the Dash app may restart the Coiled cluster, but this would likely require administrative intervention and is not readily available for the end user.

Instead, we recommend providing a means for the end user to restart the remote cluster through the Dash app. This may be achieved by reinstantiating the cluster and client objects as required, which would in turn connect to Coiled and restart the cluster.

One way to structure this in code would be to:

An example implementation of this function is shown below.

client = None

def get_client(client):
    if client is None or client.status == "closed":
        logger.info("Starting or connecting to Coiled cluster...")
        cluster = coiled.Cluster(
            name="my-cluster",
            software="my-software-env",
            n_workers=2,
            worker_cpu=2,
            worker_memory="8 GiB",
            shutdown_on_close=False,
            scheduler_options={"idle_timeout": "1 hour"}
        )
        try:
            client = Client(cluster)
        except:
            logger.info("Failed, trying to close the client and connect again...")
            Client(cluster).close()
            client = Client(cluster)
        logger.info(f"Coiled cluster is up! ({client.dashboard_link})")

    return client

client = get_client(client)

The cluster may take some time to restart, so we recommend displaying a message on the Dash app to the user that a restart is occurring, for example with a notification. The below example uses Dash Design Kit’s ddk.Notification module:

...
    html.Div(id="notifications", children=None)
...

@app.callback(
    Output("notifications", "children"), [Input("restart-btn", "n_clicks")],
)
def restart_client(n_clicks):
    if n_clicks is not None:
        global client
        global df
        client = get_client(client)
        df = load_df()
        return ddk.Notification(
            title="Restarting server cluster - please be patient",
            children=[
                """
                        Restarting Coiled cluster & loading data...
                        Please wait for a minute and then try again or reload the app.
                    """
            ],
            user_dismiss=True,
            type="warn",
            timeout=-1,
        )
    else:
        return dash.no_update

Additionally, consider displaying the cluster’s status on the page by periodically updating the client.status property and updating it on the page, such as through the dcc.Interval module, or as part of another user callback analysis.