View all articles in the MapReduce API Series.

Last time we looked at an overview of how MapReduce works. In this article we’ll be getting our hands dirty writing some code to handle the Map Stage. If you’ll recall, the Map Stage is composed of two separate components: an InputReader and a map function. We’ll look at each of these in turn.

Getting Started: Installation

First, let’s install the MapReduce API for Python. The API is constantly changing so the best way to install the latest version is to checkout the code directly from the SVN repository.

svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

Place the mapreduce folder into your application root directory and add the mapreduce handler to your app.yaml file.

includes:
- lib/mapreduce/include.yaml

handlers:
- url: /_ah/pipeline.*
  script: mapreduce.lib.pipeline.handlers._APP
  login: admin

You can verify your installation by going to the /mapreduce URL in your app. You’ll see a UI listing the status of any MapReduce jobs. You’ll also see a notice that the UI could not find the file mapreduce.yaml. You can ignore that notice for now.

Could not find mapreduce.yaml

To get a proper view of the data you will also need to add two indexes to your index.yaml file to allow the MapReduce library to query for MapReduce jobs that are run via Pipelines and display them in the GUI.

indexes:
- kind: _AE_Pipeline_Record
  properties:
  - name: is_root_pipeline
  - name: start_time
    direction: desc
- kind: _AE_Pipeline_Record
  properties:
  - name: class_path
  - name: start_time
    direction: desc

Running Your First MapReduce Job

The easiest way to get started with MapReduce is to use the mapreduce.yaml file. This file allows you define a mapper function that will be executed for each entity passed to it. Let’s go straight to an example and create a mapreduce.yaml file (in your applications root directory) that will iterate over all entities of a certain Kind and put them to the datastore (updating their timestamp).

mapreduce:
- name: Touch all entity_kind Models
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: path_to_my.touch
    params:
    - name: entity_kind
      default: path_to_my.MyModel

Go to the /mapreduce URL in your app and you should see the Touch all entity_kind Models job selectable under the Launch job setting.

Select first MapReduce to Launch

Go ahead and select this job and click Run. You will get an error saying that MyModel could not be found.

Could not find a Model

This is a great time to edit your yaml file point to an actual model in your application to continue with this tutorial. Now that our InputReader is pointing to a model we can define the map function specified by our yaml files handler parameter. The map function is iteratively passed entities from our InputReader and we can take actions on those entities.

def touch(entity):
    """
    Update the entities timestamp.
    """
    entity.put()

Go back to the /mapreduce URL in your app and run the job again. Refresh the page (if it does not auto-refresh) and you can see your job running.

Running your first MapReduce Job

You can click on the Detail link to get full details on the MapReduce job. This view gives you the status of individual shards in the MapReduce job and an overview of the processing time that was required.

Running Job Details

We’ve ran our first MapReduce job!

The MutationPool

In our touch function we put our entity to the datastore once for each entity. This is wasteful when the datastore allows putting multiple items at a time. To take advantage of this feature the MapReduce library offers a MutationPool that collects datastore operations to be performed in batches.

We can re-write our map function to take advantage of the MutationPool by yielding a database operation from within our map function. If you are unfamiliar with yield you can think of it as returning a value to the MapReduce job. You can have multiple yield statements in a function that will all return values to be handled by the MapReduce job.

from mapreduce import operation as op

def touch(entity):
    """
    Update the entities timestamp.
    """
    yield op.db.Put(entity)

You can run the MapReduce job again and see that the job works correctly using datastore operations via the MutationPool.

The source code for MapReduce operations can be found in the mapreduce.operation module. The mapreduce.operation.db module currently supports two operations via the MutationPool Put and Delete.

Counters

The MapReduce library also provides counters that can be incremented when a condition is met. In our example we can count the number of entities that were touched by incrementing a counter.

from mapreduce import operation as op

def touch(entity):
    """
    Update the entities timestamp.
    """
    yield op.db.Put(entity)
    yield op.counters.Increment('touched')

All the counters that were incremented during operation of the job are listed with the job details summary.

Incrementing a Custom Counter

Passing Parameters to the Map Function

We can pass additional parameters to our map function by specifying them in mapreduce.yaml. Parameters are passed to both our InputReader and to our map handler function. In our example, we listed entity_kind and this parameter was expected by our InputReader and used to specify the datastore Kind processed by our InputReader. On the MapReduce status page (/mapreduce) we can type in a new value for this parameter to specify a different Kind before running the job.

Editing Job Parameters

Let’s add an additional parameter for the map function that will only touch the entity if it is older than a specific date.

- name: Touch all entity_kind Models
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: app.pipelines.touch
    params:
    - name: entity_kind
      default: app.models.UserModel
    - name: if_older_than
      default:

The mapreduce context holds the specifation for the job as defined by the mapreduce.yaml file. Within this context we can access our parameters.

from mapreduce import operation as op, context
from datetime import datetime

def touch(entity):
    """
    Update the entities timestamp if not updated since if_older_than.
    """
    params = context.get().mapreduce_spec.mapper.params
    if_older_than = params.get('if_older_than')
    older_than = datetime.strptime(if_older_than, '%b %d %Y') if if_older_than else datetime.now()

    if entity.updated < older_than:
    	yield op.db.Put(entity)
    	yield op.counters.Increment('touched')

Now our map function will operate on entities that have been updated previous to our if_older_than parameter.

Parameter Validation

The MapReduce library also provides a method to do parameter validation. In our previous example we passed a date to our map function as a string. We can use a validator to validate that parameter and modify it as necessary. To use a validator function, specify it in mapreduce.yaml as params_validator.

- name: Touch all entity_kind Models
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: app.pipelines.touch
    params:
    - name: entity_kind
      default: app.models.UserModel
    - name: if_older_than
      default: Jun 1 2014
    params_validator: app.pipelines.touch_validator

The validator function accepts a single argument, a dictionary of parameters. The function can modify this dictionary and any modifications will be made available to the map function. In our example we can use the validator to attempt converting our input date into a datetime object. The strptime function returns a ValueError if it cannot convert a string to the datetime.

def touch_validator(user_params):
    """
    Validate the parameters of our map function.
    """
    if_older_than = user_params['if_older_than']
    datetime.strptime(if_older_than, '%b %d %Y')

We can trigger the validator to fail by passing in an invalid date format.

Passing an Invalid Paramater

If parameter validation fails the MapReduce job is not started and no entities are passed from our InputReader to the map function.

Callbacks

The MapReduce library allows you to specify a callback function that is called after the MapReduce completes. This can be used for logging purposes or to trigger a specific event in code. The callback is specified in your mapreduce.yaml file as done_callback and points to a user specified function. This is a parameter of the MapReduce itself and not the map function – note the independent entry in mapreduce.yaml.

- name: Touch all entity_kind Models
  params:
  - name: done_callback
    value: /done_touch
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: app.pipelines.touch
    params:
    - name: entity_kind
      default: app.models.UserModel
    - name: if_older_than
      default: Jun 1 2014
    params_validator: app.pipelines.touch_validator

Upon completion a POST request is made to the URL given by the done_callback parameter. The MapReduce library sets a custom header in this request with the jobs Mapreduce-Id. You can use this header to retrieve details on the job that just completed. This is also a great place to do any cleanup such as deleting temporary files. In our example we will just log the original specification for this job that we set via mapreduce.yaml

import webapp2
import logging
from mapreduce.model import MapreduceState

class DoneTouch(webapp2.RequestHandler):
    """
    Callback function upon completion of touch MapReduce job.
    """

    def post(self):
        """
        Log the MapReduce ID and input parameters.
        """
        mapreduce_id =  self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_key_name(mapreduce_id)
        spec = state.mapreduce_spec
        logging.info(spec)

Additional Input Readers

In addition to the DatastoreInputReader the library includes readers for the Blobstore, Files and Google Cloud Storage Buckets. The documentation for these readers is scarse but you can consult the mapreduce.input_readers module for more information on the expected parameters for these readers. This information was gathered from a combination of the offical Python Users Guide and from reading the source. This should give you enough information to get started with the InputReader of your choice.

Input Reader Reference

As a reference here is a list of InputReaders and their parameters. All InputReaders support the namespace parameter for specifying the namespaces to iterate over. If no namespace is given then all namespaces are used

namespace
The list of namespaces that will be searched.

BlobstoreLineInputReader

Input reader for a newline delimited blob in Blobstore.

blob_key
The BlobKey that this input reader is processing. Either a string containing a single key or a list of blob key strings.
start_position
the line number position to start reading at.
end_position
The last line number position to read.

BlobstoreZipInputReader

Input reader for files from a zip archive stored in the Blobstore. Iterates over all compressed files in a zipfile in Blobstore.

blob_key
The BlobKey that this input reader is processing. Either a string containing a single key or a list of blob key strings.
start_index
the index of the first file to read.
end_index
The index of the last file that will not be read.

BlobstoreZipLineInputReader

Input reader for files from a zip archive stored in the Blobstore. Iterates over all compressed files in a zipfile in Blobstore. Each compressed file is expected to be a newline delimited file.

blob_key
The BlobKey that this input reader is processing. Either a string containing a single key or a list of blob key strings.
start_file_index
the index of the first file to read within the zip.
end_file_index
the index of the last file that will not be read.
offset
The by offset with `BLOB_KEY.zip[start_file_index]` to start reading.

DatastoreInputReader

Iterates over a Model and yields model instances. Supports both db.model and ndb.model.

entity_kind
the datastore kind to map over.
keys_only
use a keys_only query.
batch_size
the number of entities to read from the datastore with each batch get.
key_range
a range of keys to return from your query
filters
Any filters to apply to the datastore query.

DatastoreKeyInputReader

Iterate over an entity kind and yields datastore.Key.

entity_kind
the datastore kind to map over.
keys_only
use a keys_only query.
batch_size
the number of entities to read from the datastore with each batch get.
key_range
a range of keys to return from your query
filters
Any filters to apply to the datastore query.

FileInputReader

Iterate over Google Cloud Storage files using the Files API.

files
A list of filenames or globbed filename patterns. The format is `/gs/bucket/filename` or `/gs/bucket/prefix*`.
format
One of "lines", "bytes", "zip". "lines" reads the input file line-by-line, "bytes" reads the whole file at once and "zip" iterates over every file within the zip.

LogInputReader

Input reader for a time range of logs via the Logs API.

start_time
The earliest request completion or last-update time of logs that should be mapped over, in seconds since the Unix epoch.
end_time
The latest request completion or last-update time that logs should be mapped over, in seconds since the Unix epoch.
minimum_log_level
An application log level which serves as a filter on the requests mapped over.
include_incomplete
Whether or not to include requests that have started but not yet finished, as a boolean.
include_app_logs
Whether or not to include application level logs in the mapped logs, as a boolean.
version_ids
A list of version ids whose logs should be read. This can not be used with module_versions
module_versions
A list of tuples containing a module and version id whose logs should be read. This can not be used with version_ids.

NamespaceInputReader

An input reader to iterate over namespaces. This reader yields namespace names as string.

namespace_range
An alphabetic range for the namespace. As defined by [namespace_range.py](https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/namespace_range.py).
batch_size
The number of namespaces to read with each batch.

RandomStringInputReader

Yields random strings as output. Useful to populate output with testing entries.

count
The total number of entries this reader should generate.
string_length
The length of the generated strings.

RawDatastoreInputReader

Exactly the same as DatastoreInputReader but yields a datastore.Entity.

entity_kind
the datastore kind to map over.
keys_only
use a keys_only query.
batch_size
the number of entities to read from the datastore with each batch get.
key_range
a range of keys to return from your query
filters
Any filters to apply to the datastore query.

RecordsReader

Reads a list of Files API files in records format.

files
A comma separated string of files to read from.

Conclusions

Defining a MapReduce job via mapreduce.yaml provides a convenient way to iterate over large datasets and run a function on each unit of work. Unfortunately, running a MapDeduce job this way has a few limitations. First, there is no way to specify a reduce phase, limiting the type of jobs we can perform. Second, you cannot start a MapReduce job programmatically.

The next article in this series will show how to overcome these limitations using MapReduce Pipelines to programmatically control your API.