View all articles in the MapReduce API Series.

The MapReduce library supports a number of default output writers. You can also write your own that implements the output writer interface. This article examines how to write a custom output writer that pushes data from the App Engine datastore to an elasticsearch cluster. A similar pattern can be followed to push the output from your MapReduce job to any number of places.

An output writer must implement the abstract interface defined by the MapReduce library. You can find the interface here. It may be a good idea to keep a reference to that interface available while reading this article.

The most important methods of the interface are create and write. create is used to create a new OutputWriter that will handle writing for a single shard. Our elasiticsearch OutputWriter takes parameters specifying the elasticsearch index to write to and the document type. We take advantage of a helper function provided by the library (_get_params) to get the parameters of a MapReduce job given the MapReduce specification.

from mapreduce.output_writers import OutputWriter, _get_params

class ElasticSearchOutputWriter(OutputWriter):

    def __init__(self, default_index_name=None, default_doc_type=None):
        super(ElasticSearchOutputWriter, self).__init__()
        self.default_index_name = default_index_name
        self.default_doc_type = default_doc_type

    @classmethod
    def create(cls, mr_spec, shard_number, shard_attempt, _writer_state=None):
        params = _get_params(mr_spec)
        return cls(default_index_name=params.get('default_index_name',
                   default_doc_type=params.get('default_doc_type'))

Now that we can create an instance of our OutputWriter we can implement the write method to write data to elasticsearch. We use a MutationPool for this (the MutationPool itself will be discussed shortly). The MutationPool is attached to the current execution context of this MapReduce job. Every MapReduce job has it’s own persistent context that can store information required for the current execution of the job. This allows multiple OutputWriter shards to write into the MutationPool and have the MutationPool write data out to its final destination.

In this piece of code we check if we have a MutationPool associated with our context and create a new MutationPool if we don’t. Once we’ve retrieved or created the MutationPool we add the output operation to the pool.

from mapreduce import context

def write(self, data):
   ctx = context.get()
   es_pool = ctx.get_pool('elasticsearch_pool')
   if not es_pool:
       es_pool = _ElasticSearchPool(ctx=ctx,
                                    default_index_name=default_index_name,
                                    default_doc_type=default_doc_type)
       ctx.register_pool('elasticsearch_pool', es_pool)

   es_pool.append(data)

These two methods provide the basis of our OutputWriter, implementing the to_json, from_json and finalize methods is left up to the reader. finalize does not need any functionality but you may want to log a message upon completion.

Now on to the MutationPool. The MutationPool acts as a buffered writer of data changes. It acts as an abstraction that collects any sequence of operations that are to be performed together. After x number of operations have been collected we operate on them all at once. Mutation pools are strictly a performance improvement but they can quickly become essential when processing large amounts of data. For example, rather than writing to the datastore after each map operation with ndb.put we can collect a sequence of writes and put them all at once with ndb.put_multi.

For an elasticsearch OutputWriter our mutation pool will collect and buffer indexing tasks and perform them all during a single streaming bulk operation. Within our OutputWriter we collect our sequence of operations in a private list variable _actions.

class _ElasticSearchPool(context.Pool):
    def __init__(self, ctx=None, default_index_name=None, default_doc_type=None):
        self._actions = []
        self._size = 0
        self._ctx = ctx
        self.default_index_name = default_index_name
        self.default_doc_type = default_doc_type

We then implement the append method to add an action to the current MutationPool. In this example we simply add the action to our list. If our list is greater than 200 elements we flush our MutationPool.

def append(self, action):
    self._actions.append(action)
    self._size += 1
    if self._size > 200:
        self.flush()

Finally, to flush the MutationPool we write all the data collected so far to elasticsearch and clear our list of actions.

def flush(self):
   es_client = elasticsearch(hosts=["127.0.0.1"])  # instantiate elasticsearch client
   if self._actions:
       results = helpers.streaming_bulk(es_client,
                                                                   self._actions,
                                                                   chunk_size=200)
    self._actions = []
    self._size = 0

Now, as long as the map function of our MapReduce job outputs operations in a format recognizeable by elasticsearch the OutputWriter will collect those operations into a MutationPool and periodically flush the results to our elasticsearch cluster.

You can use this code as the basis for writing OutputWriters for almost any custom destination.