Last time, we discussed basic pipeline instantiation and execution. This time, we will cover sequential pipelines, answering the question “How do I connect the output of one pipeline with the input of another pipeline”?

To begin, let’s review a basic pipeline that squares its input. If any of this does not make sense refer to the first part of this tutorial.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        stage = SquarePipeline(10)
        stage.start()


class SquarePipeline(pipeline.Pipeline):

    def run(self, number):
        return number * number

    def finalized(self):
        logging.info('All done! Square is %s', self.outputs.default.value)

The first step in passing data between two pipelines is updating our pipeline to use the generator interface. The generator interface uses the yield keyword as a means of connecting pipelines together. For this contrived example, let’s create a parent pipeline that executes SquarePipeline twice in succession.

class TwiceSquaredPipeline(pipeline.Pipeline):

    def run(self, number):
        first_square = yield SquarePipeline(number)
        second_square = yield SquarePipeline(first_square)

What now? We need a way to access the value stored in second_square. When execution hits a yield statement a task is started to run the pipeline and a PipelineFuture is returned. The PipelineFuture will have a value after the task has finished executing but not immediately. So how do we access the value? With a child pipeline that can read the result. In this example, we simply log the value of the computation.

class TwiceSquaredPipeline(pipeline.Pipeline):

    def run(self, number):
        first_square = yield SquarePipeline(number)
        second_square = yield SquarePipeline(first_square)
        yield LogResult(second_square)

class LogResult(pipeline.Pipeline):

    def run(self, number):
        logging.info('All done! Value is %s', number)

The rule of thumb here is that anything you instantiate your pipeline with (and subsequently pass to the run method) is accessible within your pipeline. These are called immediate values and you can treat them as regular Python values. When this code is executed, each pipeline started by a yield call is a separate App Engine Task that executes in the Task Queue. The Pipeline runtime coordinates running these tasks and shares the results of execution between tasks, allowing you to safely connect pipelines together.

Full source code for this example follows.

import logging
import webapp2
import pipeline


class RunPipelineHandler(webapp2.RequestHandler):
    def get(self):
        stage = TwiceSquaredPipeline(10)
        stage.start()


class SquarePipeline(pipeline.Pipeline):

    def run(self, number):
        return number * number


class TwiceSquaredPipeline(pipeline.Pipeline):

    def run(self, number):
        first_square = yield SquarePipeline(number)
        second_square = yield SquarePipeline(first_square)
        yield LogResult(second_square)


class LogResult(pipeline.Pipeline):

    def run(self, number):
        logging.info('All done! Value is %s', number)


routes = [
    webapp2.Route('/pipeline-test/', handler='main.RunPipelineHandler')
]

APP = webapp2.WSGIApplication(routes)