View all articles in the MapReduce API Series.

One of the great things about the MapReduce library is the abilitiy to write a cutom InputReader to process data from any data source. In this post we will explore how to write an InputReader the leases tasks from an AppEngine pull queue by implementing the InputReader interface.

The interface we need to implement is available at mapreduce.input_readers.InputReader. Take a minute to examine the abstract methods that need to be implmemented. Relevant portions of the source are copied below.

class InputReader(json_util.JsonMixin):
  """Abstract base class for input readers.
  InputReaders have the following properties:
   * They are created by using the split_input method to generate a set of
     InputReaders from a MapperSpec.
   * They generate inputs to the mapper via the iterator interface.
   * After creation, they can be serialized and resumed using the JsonMixin
     interface.
  """

  def next(self):
    """Returns the next input from this input reader as a key, value pair.
    Returns:
      The next input from this input reader.
    """
    raise NotImplementedError("next() not implemented in %s" % self.__class__)

  @classmethod
  def from_json(cls, input_shard_state):
    """Creates an instance of the InputReader for the given input shard state.
    Args:
      input_shard_state: The InputReader state as a dict-like object.
    Returns:
      An instance of the InputReader configured using the values of json.
    """
    raise NotImplementedError("from_json() not implemented in %s" % cls)

  def to_json(self):
    """Returns an input shard state for the remaining inputs.
    Returns:
      A json-izable version of the remaining InputReader.
    """
    raise NotImplementedError("to_json() not implemented in %s" %
                              self.__class__)

  @classmethod
  def split_input(cls, mapper_spec):
    """Returns a list of input readers.
    This method creates a list of input readers, each for one shard.
    It attempts to split inputs among readers evenly.
    Args:
      mapper_spec: model.MapperSpec specifies the inputs and additional
        parameters to define the behavior of input readers.
    Returns:
      A list of InputReaders. None or [] when no input data can be found.
    """
    raise NotImplementedError("split_input() not implemented in %s" % cls)

  @classmethod
  def validate(cls, mapper_spec):
    """Validates mapper spec and all mapper parameters.
    Input reader parameters are expected to be passed as "input_reader"
    subdictionary in mapper_spec.params.
    Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
    to be compatible, input reader check mapper_spec.params as well and
    issue a warning if "input_reader" subdicationary is not present.
    Args:
      mapper_spec: The MapperSpec for this InputReader.
    Raises:
      BadReaderParamsError: required parameters are missing or invalid.
    """
    if mapper_spec.input_reader_class() != cls:
      raise BadReaderParamsError("Input reader class mismatch")

Let’s fill out this interface with our InputReader that leases tasks from an AppEngine pull queue. To start, we implement the split_input method that instantiates a list of InputReaders, splitting the work among each reader. One of the standard parameters for a MapReduce job is the number of shards you want to use. For leasing tasks we will create one InputReader for shard parameter.

@classmethod
def split_input(cls, mapper_spec):
    """
    Returns a list of input readers
    """
    shard_count = mapper_spec.shard_count

    return [cls()] * shard_count

split_input is called to start our InputReader and returns a list of readers. Each of these reader instances must implement a the next method which returns a single value from our Reader. This method is part of the generator interface and will be called during MapReduce operation. We can use next to attempt to lease a single task from our queue, returning the task as a key-value tuple.

def next(self):
    """
    Returns the queue, and a task leased from it as a tuple
    Returns:
      The next input from this input reader.
    """
    ctx = context.get()
    input_reader_params = ctx.mapreduce_spec.mapper.params.get('input_reader', {})
    queue_name = input_reader_params.get(self.QUEUE_PARAM)
    tag = input_reader_params.get(self.TAG_PARAM)
    lease_seconds = input_reader_params.get(self.LEASE_SECONDS_PARAM, 60)

    # Attempt to lease a task
    queue = taskqueue.Queue(queue_name)
    if tag:
        tasks = queue.lease_tasks_by_tag(lease_seconds, 1, tag=tag)
    else:
        tasks = queue.lease_tasks(lease_seconds, 1)

    if tasks:
        operation.counters.Increment(self.TASKS_LEASED_COUNTER)(ctx)
        return (queue, tasks[0])
    raise StopIteration()

We begin this function by reading in our parameters, using the context helper to find the current parameters for this InputReder. We then attempt to lease a task. If tasks are available to lease we return the task, otherwise we raise StopIteration to halt the generator.

This basic implementation is all that’s needed to write an InputReader – split our source into multiple shards and return a single next value from within each shard. The MapReduce library will use this skeleton to call your map function for each next value that is returned by the input reader.

To finish this up, we add some boilerplate required for serialization of reader state and parameter validation.

If your InputReader needs to hold any state between execution of the next method you must serialize that state using the to_json and from_json methods. to_json returns the current state of the reader in JSON format. from_json creates an instance of an InputReader given a JSON format. Typically we use this to save the constructor values used to create our InputReader. We’ll also need to formally define our constructor here.

The constructor takes only a few parameters. A queue name, a tag to lease tasks with and the number of seconds to hold the lease.

def __init__(self, queue_name='default', tag=None, lease_seconds=60):
    super(TaskInputReader, self).__init__()
    self.queue_name = queue_name
    self.tag = tag
    self.lease_seconds = lease_seconds

Now we can define how to serialize and deserialize the state of our reader.

@classmethod
def from_json(cls, input_shard_state):
    """Creates an instance of the InputReader for the given input shard state.
    Args:
      input_shard_state: The InputReader state as a dict-like object.
    Returns:
      An instance of the InputReader configured using the values of json.
    """
    return cls(input_shard_state.get('queue_name'),
               input_shard_state.get('tag'),
               input_shard_state.get('lease_seconds')))

def to_json(self):
    """Returns an input shard state for the remaining inputs.
    Returns:
      A json-izable version of the remaining InputReader.
    """
    return {
        'queue_name': self.queue_name,
        'tag': self.tag,
        'lease_seconds': self.lease_seconds,
    }

The last method to implement is validate. This method parses the parameters used to start your InputReader to make sure they are valid. In our example we validate that the queue_name we are attempting to lease tasks from is valid and that the number of seconds we wish to lease is an integer.

@classmethod
def validate(cls, mapper_spec):
    """
    Validates mapper spec and all mapper parameters.
    Input reader parameters are expected to be passed as "input_reader"
    subdictionary in mapper_spec.params.
    Args:
      mapper_spec: The MapperSpec for this InputReader.
    Raises:
      BadReaderParamsError: required parameters are missing or invalid.
    """
    if mapper_spec.input_reader_class() != cls:
        raise BadReaderParamsError("Input reader class mismatch")

    # Check that a valid queue is specified
    input_reader_params = mapper_spec.params.get('input_reader', {})
    queue_name = input_reader_params.get('queue_name')
    lease_seconds = input_reader_params.get('lease_seconds', 60)
    if not queue_name:
        raise BadReaderParamsError('queue_name is required')
    if not isinstance(lease_seconds, int):
        raise BadReaderParamsError('lease_seconds must be an integer')
    try:
        queue = taskqueue.Queue(name=queue_name)
        queue.fetch_statistics()
    except Exception as e:
        raise BadReaderParamsError('queue_name is invalid', e.message)

Putting this all together we get our final InputReader. We can use this as a basis to make more complex readers for additional data sources.

"""
TaskInputReader
"""
from google.appengine.api import taskqueue

from mapreduce.input_readers import InputReader
from mapreduce.errors import BadReaderParamsError
from mapreduce import context
from mapreduce import operation


class TaskInputReader(InputReader):
    """
    Input reader for Pull-queue tasks
    """

    QUEUE_PARAM = 'queue'
    TAG_PARAM = 'tag'
    LEASE_SECONDS_PARAM = 'lease-seconds'

    TASKS_LEASED_COUNTER = 'tasks leased'

    def next(self):
        """
        Returns the queue, and a task leased from it as a tuple

        Returns:
          The next input from this input reader.
        """
        ctx = context.get()
        input_reader_params = ctx.mapreduce_spec.mapper.params.get('input_reader', {})
        queue_name = input_reader_params.get(self.QUEUE_PARAM)
        tag = input_reader_params.get(self.TAG_PARAM)
        lease_seconds = input_reader_params.get(self.LEASE_SECONDS_PARAM, 60)

        # Attempt to lease a task
        queue = taskqueue.Queue(queue_name)
        if tag:
            tasks = queue.lease_tasks_by_tag(lease_seconds, 1, tag=tag)
        else:
            tasks = queue.lease_tasks(lease_seconds, 1)

        if tasks:
            operation.counters.Increment(self.TASKS_LEASED_COUNTER)(ctx)
            return (queue, tasks[0])
        raise StopIteration()

    @classmethod
    def from_json(cls, input_shard_state):
        """Creates an instance of the InputReader for the given input shard state.

        Args:
          input_shard_state: The InputReader state as a dict-like object.

        Returns:
          An instance of the InputReader configured using the values of json.
        """
        return cls(input_shard_state.get(cls.QUEUE_NAME),
               input_shard_state.get(cls.TAG),
               input_shard_state.get(cls.LEASE_SECONDS)))

    def to_json(self):
        """Returns an input shard state for the remaining inputs.

        Returns:
          A json-izable version of the remaining InputReader.
        """
        return {
            'queue_name': self.queue_name,
            'tag': self.tag,
            'lease_seconds': self.lease_seconds,
        }

    @classmethod
    def split_input(cls, mapper_spec):
        """
        Returns a list of input readers
        """
        shard_count = mapper_spec.shard_count

        return [cls()] * shard_count

    @classmethod
    def validate(cls, mapper_spec):
        """
        Validates mapper spec and all mapper parameters.

        Input reader parameters are expected to be passed as "input_reader"
        subdictionary in mapper_spec.params.

        Args:
          mapper_spec: The MapperSpec for this InputReader.

        Raises:
          BadReaderParamsError: required parameters are missing or invalid.
        """
        if mapper_spec.input_reader_class() != cls:
            raise BadReaderParamsError("Input reader class mismatch")

        # Check that a valid queue is specified
        input_reader_params = mapper_spec.params.get('input_reader', {})
        queue_name = input_reader_params.get(cls.QUEUE_NAME)
        lease_seconds = input_reader_params.get(cls.LEASE_SECONDS, 60)
        if not queue_name:
            raise BadReaderParamsError('%s is required' % cls.QUEUE_NAME)
        if not isinstance(lease_seconds, int):
            raise BadReaderParamsError('%s must be an integer' % cls.LEASE_SECONDS)
        try:
            queue = taskqueue.Queue(name=queue_name)
            queue.fetch_statistics()
        except Exception as e:
            raise BadReaderParamsError('%s is invalid' % cls.QUEUE_NAME, e.message)