This article will cover fully asynchronous pipelines. The term ‘asynchronous’ is misleading here — all piplines are asynchronous in the sense that yielding a pipeline is a non-blocking operation. An asynchronous refers to a pipeline that remains in a RUN state until outside action is taken, for example, a button is clicked or a task is executed.

Marking a pipeline as an asynchronous pipeline is as simple as setting the async class property to True.

class AsyncPipeline(pipeline.Pipeline):
    async = True

Once this pipeline starts, it will remain in the RUN state until the pipeline is transitioned to another state. You transition a pipeline to another state by calling the complete method, using a callback. complete() is a method only available to asynchronous pipelines. Calling complete will fill the pipelines output slots and, if all slots have been filled, mark the pipeline complete. Any barriers related to the slots being filled are notified as described in the previous article.

class AsyncPipeline(pipeline.Pipeline):
    async = True

    def callback(self):
        self.complete()

Callback URLs

The pipeline API provides convenience methods for calling the callback method. get_callback_url returns a URL that, when accessed, passes any query parameters to the callback method. For example, to generate a URL to our pipeline with a choice parameter we can call get_callback_url as follows:

url = get_callback_url(choice='approve')

This will generate a URL of the form:

/_ah/pipeline/callback?choice=approve&pipeline_id=fd789852183b4310b5f1353205a967fe

Accessing this URL will pass the choice parameter to the callback function of the pipeline with pipeline_id fd789852183b4310b5f1353205a967fe.

class AsyncPipeline(pipeline.Pipeline):
    async = True
    public_callbacks = True

    def run(self):
        url = self.get_callback_url(choice='approve')
        logging.info('Callback URL: %s' % url)

    def callback(self, choice):
        if choice == 'approve':
            logging.info('Pipeline Complete')
            self.complete()

Running the pipeline above will log the Callback URL to the console. By visiting that URL, the callback method will execute, completing your pipeline. You can refer to the EmailToContinue Pipeline for a more robust example.

Callback Tasks

The second way to execute a callback method is via a callback task. The Pipelines API provides another convenience method to generate a callback task that will execute our pipeline. In the following example, a task is created to trigger in the future, adding an artificial delay to our pipeline.

class DelayPipeline(pipeline.Pipeline):
    async = True

    def __init__(self, seconds):
        super(DelayPipeline, self).__init__(seconds=seconds)

    def run(self, seconds=None):
        task = self.get_callback_task(
            countdown=seconds,
            name='ae-pipeline-delay-' + self.pipeline_id)
        try:
            task.add(self.queue_name)
        except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
            pass

    def callback(self):
        self.complete(self.kwargs['seconds'])

Note that the task is queued using the pipeline_id in the task name. This helps ensure our run method is idempotent. Full source code for an asynchronous pipeline follows. This pipeline will delay for 10 seconds, and then log a callback_url to the console. Visiting the callback URL will complete the pipeline.

import logging
import webapp2
import pipeline
from google.appengine.api import taskqueue


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        pipeline = DelayPipeline(10)
        pipeline.start()


class DelayPipeline(pipeline.Pipeline):
    async = True

    def __init__(self, seconds):
        pipeline.Pipeline.__init__(self, seconds=seconds)

    def run(self, seconds=None):
        task = self.get_callback_task(
            countdown=seconds,
            name='ae-pipeline-delay-' + self.pipeline_id)
        try:
            task.add(self.queue_name)
        except (taskqueue.TombstonedTaskError,
                taskqueue.TaskAlreadyExistsError):
            pass

    def callback(self):
        AsyncPipeline().start()


class AsyncPipeline(pipeline.Pipeline):
    async = True
    public_callbacks = True

    def run(self):
        url = self.get_callback_url(choice='approve')
        logging.info('Callback URL: %s' % url)

    def callback(self, choice):
        if choice == 'approve':
            self.complete()


routes = [webapp2.Route('/pipeline-test/', handler='main.RunPipelineHandler')]

APP = webapp2.WSGIApplication(routes)