App Engine MapReduce API - Part 4: Combining Sequential MapReduce Jobs

View all articles in the MapReduce API Series.

Last time we looked at how to run a full MapReduce Pipeline to count the number of occurrences of a character within each string. In this post we will see how to chain multiple MapReduce Pipelines together to perform sequential tasks.

Combining Sequential MapReduce Jobs

As a contrived example (as all examples are) let’s imagine a scenario where we want to clean up some data by deleting a business entity from the datastore. Each business has employees stored that also need to be deleted. Our simplified models look like this.

from google.appengine.ext import ndb

class Business(ndb.model):
    """
    Model representing a business which will have employees.
    """
    name = ndb.StringProperty(required=True)
    address = ndb.StringProperty()

class Employee(ndb.model):
    """
    Model representing employees of a business.
    """
    name = ndb.StringProperty(required=True)
    business = ndb.StringProperty(required=True)

Let’s create a pipeline that will iterate over every business with a matching name and delete all the employees from that business. We can take advantage of the filters parameter of the DatastoreInputReader to find all employees working at a business with a matching name.

def delete_employee(entity):
    """ Delete an employee entity. """
    yield op.db.Delete(entity)

class DeleteBusinessPipeline(pipeline.Pipeline):
    """ Delete a business. """

    def run(self, business_name, **kwargs):
        """ run """
        employee_params = {
            "entity_kind": "app.pipelines.Employee",
            "filters": [('business', '=', business_name)],
        }
        yield mapreduce_pipeline.MapperPipeline(
            "delete_employee",
            handler_spec=app.pipelines.delete_employee,
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=employee_params,
            shards=2)

This simple pipeline will delete all of the employees. We can add a second pipeline to our execution that will delete the business by simply yielding the return value of the first pipeline to the Pipeline API.

def delete_employee(entity):
    """ Delete an employee entity. """
    yield op.db.Delete(entity)

def delete_business(entity):
    """ Delete a business entity. """
    yield op.db.Delete(entity)

class DeleteBusinessPipeline(pipeline.Pipeline):
    """ Delete a business. """

    def run(self, business_name, **kwargs):
        """ run """
        employee_params = {
            "entity_kind": "app.pipelines.Employee",
            "filters": [('business', '=', business_name)],
        }
        yield mapreduce_pipeline.MapperPipeline(
            "delete_employee",
            handler_spec=app.pipelines.delete_employee,
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=employee_params,
            shards=2)

        business_params = {
            "entity_kind": "app.pipelines.Business",
            "filters": [('name', '=', business_name)],
        }
        yield mapreduce_pipeline.MapperPipeline(
            "delete_business",
            handler_spec=app.pipelines.delete_business,
            input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
            params=business_params,
            shards=2)

The return value of the MapperPipeline call is a PipelineFuture object. This future will be executed once the previous future has completed. In this case our employee deletion pipeline will complete and the business deletion future will execute.

And that’s all it takes to run two sequential MapReduce jobs!

comments powered by Disqus