The Pipelines API is a general purpose workflow engine for App Engine applications. With the Pipelines API we can connect together complex workflows into a coherent run time backed by the Datastore. This article provides a basic overview of the Pipelines API and how it can be used for abritrary computational workflows.

In the most basic sense a Pipeline is an object that takes input, performs some logic or computation on that input, and produces output. Pipelines can take two general forms – synchronous or asynchronous. Synchronous pipelines act as basic functions that must complete during a single request. Asynchronous pipelines spawn child pipelines and connect them together into a workflow by passing input and output parameters around.

A word of warning.

Pipelines must be idempotent and it is up to the developer to ensure that they are – this is not enforced by the run-time. A pipeline may fail and be retried and it is important that running the same pipeline with the same set of inputs will product the same results.

Getting Started

The first step is to grab the latest version of the Pipelines API (and its dependencies) using pip. The following assumes you install third party App Engine dependencies in the lib directory relative to where pip is being run. You can also grab the source code from GitHub.

pip install GoogleAppEnginePipeline -t lib/

Pipeline requests need to be handled by the Pipeline application. We set that up by adding a handler to app.yaml. The pipeline library itself will enforce the login access restrictions so we do not need to secure these handlers.

handlers:
- url: /_ah/pipeline.*
  script: pipeline.handlers._APP

Basic Synchronous Pipelines

A synchronous pipeline runs within the bounds of a single App Engine request. Once the request has been made the pipeline starts and pipeline processing happens automatically. We can set up this pipeline by defining a handler responsible for starting the pipeline. For now, create a default handler that will receive a request at the URL of your choosing.

import logging
import webapp2

class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        logging.info('Launch pipeline')

A request processed by this handler will kick off our Pipeline. To define a pipeline we inherit from the Pipeline object and the method run. The pipeline is launched via the start method. The code below instantiates a custom pipeline and launches it. Accessing the URL for the RunPipelineHandler will print the message ‘Do something here’ to the logs.

import logging
import webapp2
import pipeline

class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        logging.info('Launch pipeline')
        pipeline = MyPipeline()
        pipeline.start()


class MyPipeline(pipeline.Pipeline):
    def run(self, *args, **kwargs):
        logging.info('Do something here.')

We can update our pipeline to do a simple operation, like squaring a number. You’ll notice in the code that follows that the arguments passed when initializing the pipeline are accessible as parameters to the run method within the pipeline.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        square_stage = SquarePipeline(10)
        square_stage.start()


class SquarePipeline(pipeline.Pipeline):
    def run(self, number):
        return number * number

Running this pipeline will show that the pipeline executes correctly. But where does our return value go? How can we access the output of SquarePipeline?

Accessing Pipeline Output

You’ll notice that in SquarePipeline we are returning a value directly but we never actually access it. Pipeline output can only ever be accessed after the pipeline has finished executing. We can check for the end of pipeline execution using the has_finalized property. This property will be set to True when all stages of a pipeline have finished executing. At this point in time our output will be available as a value on the Pipeline object. Let’s see what happens when we try to check if our pipeline has finalized. To do this we need to store the pipeline_id generated from our start method and check the has_finalized property.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        square_stage = SquarePipeline(10)
        square_stage.start()

        pipeline_id = square_stage.pipeline_id

        stage = SquarePipeline.from_id(pipeline_id)
        if stage.has_finalized:
            logging.info('Finalized')
        else:
            logging.info('Not finalized')


class SquarePipeline(pipeline.Pipeline):
    def run(self, number):
        return number * number

Running the preceding code we see that our pipeline is not finalized. What happened here? The pipeline is executed as an ayschronous task after it has been started and may or may not complete by the time we check that it has finalized. The pipeline itself is a future whose value has not materialized. Any output from a pipeline is not actually available until all child pipeline tasks are executed. So how do we get the final value of the SquarePipeline?

Finalized

The finalized method is called by the pipeline API once a Pipeline has completed its work (by filling all of is slots – to be described later). By overriding the finalized method we can see the result of our pipeline and do further processing on that result if necessary. By default our output is set to self.outputs.default.value. As an example, executing the following code will log the message “All done! Square is 100”.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        square_stage = SquarePipeline(10)
        square_stage.start()


class SquarePipeline(pipeline.Pipeline):
    def run(self, number):
        return number * number

    def finalized(self):
        logging.info('All done! Square is %s', self.outputs.default.value)

We will see in a later article how to connect the output of one pipeline with another.

Named outputs

Pipelines also allow you to explicitly name outputs, this is useful in the case where you have more than one output to return or as a means of passing data between one pipeline execution and the next. When using named outputs, instead of returning a value from the run method we fill a pipeline slot with our value. To use named outputs we define an output_names class variable listing the names of our outputs. By calling self.fill on our named output we store the return value of our pipeline for later access in the run method.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        square_stage = SquarePipeline(10)
        square_stage.start()


class SquarePipeline(pipeline.Pipeline):

    output_names = ['square']

    def run(self, number):
        self.fill(self.outputs.square, number * number)

    def finalized(self):
        logging.info('All done! Square is %s', self.outputs.square.value)

Testing a pipeline

Sometimes our pipelines call out over the wire or perform expensive data operations. The Pipeline API provides a convenient way to test pipelines. By calling start_test instead of start. In our example we verify the expected output of our squaring pipeline by calling start_test. The final value of our pipeline is available immediately.

class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        square_stage = SquarePipeline(10)
        square_stage.start_test()
        assert stage.outputs.square.value == 100

If we need to mock out any behaviour from our run method, we can supply a run_test method that is executed whenever we run our pipeline with start_test. Within this method we can mock out or adjust the behaviour of the pipeline to work under test.

Conclusion

This article gives a basic outline of how to start and execute pipelines. Full source code for the final example is listed below. In the next article we will see how to pass the output of one pipeline to another and understand how parent and child pipelines interact.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        square_stage = SquarePipeline(10)
        square_stage.start()


class SquarePipeline(pipeline.Pipeline):

    output_names = ['square']

    def run(self, number):
        self.fill(self.outputs.square, number * number)

    def finalized(self):
        logging.info('All done! Square is %s', self.outputs.square.value)

routes = [
    webapp2.Route('/pipeline-test/', handler='main.RunPipelineHandler')
]

APP = webapp2.WSGIApplication(routes)