This article describes how you can use the Dataflow/Beam SDK to write files to an S3 bucket by implementing a Sink.

A Sink has three phases: initialization, writing, and finalization. The initialization phase is a sequential process where you can create necessary preconditions such as output directories. The write phase lets workers write bundles of records to the Sink. The finalization phase allows for cleanup like merging files or committing writes.

In the rest of this article I will outline how to use the Sink API to develop a custom Beam Write transform that outputs data to Amazon S3.

The Sink API

To implement a Sink you need to extend three classes defined by the API: a (1) Sink, a (2) Sink.WriteOperation and (3) a Sink.Writer

(1) Sink:

The Sink describes the resource to write to. This may be the path to an output directory on a filesystem or a database table. The Sink is reponsible for intializing a WriteOperation.

(2) Sink.WriteOperation:

WriteOperation implements the initialization and finalization phases of a write. It is responsible for spawning Writer instances that are responsible for writing individual records.

(3) Sink.Writer:

Writer does the actual writing of a bundle of records.

A skeletal implementation that writes Strings to an S3 bucket and overrides all of the necessary methods follows. For more information on each of these methods, the Javadoc contains a lot of great details.

import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;

public class S3Sink extends Sink<String> {

  @Override
  public void validate(PipelineOptions pipelineOptions) {
  }

  @Override
  public WriteOperation<String, ?> createWriteOperation(PipelineOptions pipelineOptions) {
    return null;
  }

  protected static final class S3WriteOperation extends Sink.WriteOperation<String, S3WriteResult> {

    @Override
    public void initialize(PipelineOptions pipelineOptions) throws Exception {

    }

    @Override
    public void finalize(Iterable<S3WriteResult> iterable, PipelineOptions pipelineOptions) throws Exception {

    }

    @Override
    public S3Writer createWriter(PipelineOptions pipelineOptions) throws Exception {
      return null;
    }

    @Override
    public S3Sink getSink() {
      return null;
    }

    @Override
    public Coder<S3WriteResult> getWriterResultCoder() {
      return null;
    }
  }

  protected static final class S3Writer extends Writer<String, S3WriteResult> {

    @Override
    public void open(String bundleId) throws Exception {

    }

    @Override
    public void write(String toWrite) throws Exception {

    }

    @Override
    public S3WriteResult close() throws Exception {
      return null;
    }

    @Override
    public S3WriteOperation getWriteOperation() {
      return null;
    }
  }
}

Now it’s up to us to implement the methods to define writing to our Sink. To me it makes the most sense to do this from the inside out, starting with the Writer class.

Implementing Writer

A Writer class is responsible for writing a bundle of elements from a PCollection to a sink. It is defined using two type parameters: T, the type of object being written to the Sink and WriteT, the type of the results from the write (such as a filename or destination bucket). To implement the Writer interface requires implementing open and close methods that are called before and after the bundle has been written along with a write method that is called for each value in the bundle being written.

Each instance of the Writer class is responsible for writing a bundle of data. When starting, open is called with a unique String identifier that is used to ensure that writes using different workers do not interfere with one another. This unique identifier is used during retry and failure scenarios to be described later. At this point in time, we are only worried about writing single values to S3.

The return value of a call to close is an encoding of the success or failure of writing a bundle and the unique bundle id that was given to your Writer during the call to open.

The operations to write to an S3 bucket are defined by the S3 Java SDK and related documentation.

In the implementation that follows, the open() function is used to create a temporary File on the machine running your Beam job.

/**
* A Writer writes a bundle of elements from a PCollection to a sink. open() is
* called before writing begins and close() is called after all elements in the
* bundle have been written. write() writes an individual element.
*/
protected static final class S3Writer extends Writer<String, S3WriteResult> {
  final S3WriteOperation s3WriteOperation;
  final String bucket;
  final String region;
  final String key;

  String bundleId;
  File tempFile;

  /**
   * Create a new write operation.
   * @param s3WriteOperation the operation that created this writer.
   * @param bucket the S3 bucket to write to.
   * @param region the S3 region to write to.
   * @param key the S3 key to prefix output files output with.
   */
  public S3Writer(S3WriteOperation s3WriteOperation, String bucket, String region, String key) {
    this.s3WriteOperation = s3WriteOperation;
    this.bucket = bucket;
    this.region = region;
    this.key = key;
  }

  /**
   * Creates a temporary file for writing.
   * <p>
   * The unique id that is given to open is used to ensure that the temporary filename
   * does not collide with other Writers, as a bundle may be executed many times for
   * fault tolerance.
   */
  @Override
  public void open(String bundleId) throws Exception {
    this.bundleId = bundleId;
    this.tempFile = File.createTempFile(bundleId, ".txt");
  }

  /**
   * Called for each value in the bundle. Writes each record to the temporary
   * file.
   */
  @Override
  public void write(String toWrite) throws Exception {
    Files.append(toWrite + LINE_SEP, tempFile, Charsets.UTF_8);
  }

  /**
   * Uploads the temporary file to the S3 bucket.
   * <p>
   * The return type encodes the result of the write for failure handling.
   */
  @Override
  public S3WriteResult close() {
    String fileKey = key + "_" + bundleId + ".csv";
    try {
      AmazonS3Client s3client = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
      s3client.putObject(new PutObjectRequest(bucket, fileKey, tempFile));
      return new S3WriteResult(S3WriteResult.Result.Success, bundleId, fileKey);
    } catch (AmazonServiceException ase) {
      LOG.error("Error Message:    " + ase.getMessage());
      LOG.error("HTTP Status Code: " + ase.getStatusCode());
      LOG.error("AWS Error Code:   " + ase.getErrorCode());
      LOG.error("Error Type:       " + ase.getErrorType());
      LOG.error("Request ID:       " + ase.getRequestId());
      return new S3WriteResult(S3WriteResult.Result.AmazonServiceException, bundleId, fileKey);
    } catch (AmazonClientException ace) {
      LOG.error("Error Message: " + ace.getMessage());
      return new S3WriteResult(S3WriteResult.Result.AmazonClientException, bundleId, fileKey);
    }
  }

  /**
   * Returns the write operation this writer belongs to.
   */
  @Override
  public S3WriteOperation getWriteOperation() {
    return s3WriteOperation;
  }
}

Implementing a Write Result type

Each Writer also has a return type that encodes the success or failure of writing and uniquely identifies the bundle that was being written. For these purposes we define a POJO that encodes an enum of the job status, along with the unique bundle identifier for this job and the key for this job.

/**
 * A representation of the result of writing to an S3 sink.
 */
public class S3WriteResult {

  public enum Result {
    Success,
    AmazonServiceException,
    AmazonClientException,
  }

  private final Result result;
  private final String bundleId;
  private final String key;

  public S3WriteResult(Result result, String bundleId, String key) {
    this.result = result;
    this.bundleId = bundleId;
    this.key = key;
  }

  @JsonCreator
  public static S3WriteResult of(@JsonProperty("result") Result result,
                                 @JsonProperty("bundleId") String bundleId,
                                 @JsonProperty("key") String key) {
    return new S3WriteResult(result, bundleId, key);
  }

  @JsonProperty
  public Result getResult() {
    return result;
  }

  @JsonProperty
  public String getBundleId() {
    return bundleId;
  }

  @JsonProperty
  public String getKey() {
    return key;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    S3WriteResult that = (S3WriteResult) o;
    return result == that.result &&
        Objects.equals(bundleId, that.bundleId) &&
        Objects.equals(key, that.key);
  }

  @Override
  public int hashCode() {
    return Objects.hash(result, bundleId, key);
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this)
        .addValue(this.result)
        .addValue(this.bundleId)
        .addValue(this.key)
        .toString();
  }
}

Given the S3Writer and the S3WriteResult we have the ability to write individual files to S3. Let’s take a step back and work on a WriteOperation.

Implementing WriteOperation

A WriteOperation defines the process of using parallel Writers to write data. The WriteOperation is responsible for creating instances of a Writer and each of those instances may be written in parallel. For the case of writing to S3, the finalization step cleans up any failed writes. Future work could include merging files together and handling retries on failures.

Another way to think about a WriteOperation is by thinking of the WriteOperation as a Java process and each Writer as an individual thread.

/**
 * Defines the process of a parallel write of objects.
 * <p>
 * Defines how to initialize and finalize the parallel write and can create writer objects to
 * perform the actual writing.
 */
protected static final class S3WriteOperation extends Sink.WriteOperation<String, S3WriteResult> {

  final S3Sink s3Sink;
  final String bucket;
  final String region;
  final String key;

  /**
   * Create a new write operation.
   * @param s3Sink the sink that created this operation.
   * @param bucket the S3 bucket to write to.
   * @param region the S3 region to write to.
   * @param key the S3 key to prefix files with.
   */
  public S3WriteOperation(S3Sink s3Sink, String bucket, String region, String key) {
    this.s3Sink = s3Sink;
    this.bucket = bucket;
    this.region = region;
    this.key = key;
  }

  /**
   * Performs any necessary initialization before writing to the output location.
   * This method is called before writing begins. In our case we do not need
   * to perform any initialization.
   */
  @Override
  public void initialize(PipelineOptions pipelineOptions) throws Exception {
    // No initialization necessary.
  }

  /**
   * Deletes any results that failed to write.
   * Must perform clean-up of any failed writes or writes that were successfully re-tried.
   */
  @Override
  public void finalize(Iterable<S3WriteResult> iterable, PipelineOptions pipelineOptions) throws Exception {
    AmazonS3Client s3Client = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
    DeleteObjectsRequest multiObjectDeleteRequest = new DeleteObjectsRequest(bucket);
    List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<DeleteObjectsRequest.KeyVersion>();

    // Remove any objects that had write failures.
    for (S3WriteResult result : iterable) {
      if (result.getResult() == S3WriteResult.Result.Success) {
        LOG.info("Successfully written: {}", result.getKey());
      } else {
        keys.add(new DeleteObjectsRequest.KeyVersion(result.getKey()));
      }
    }

    if (keys.size() > 0) {
      multiObjectDeleteRequest.setKeys(keys);

      try {
        DeleteObjectsResult delObjRes = s3Client.deleteObjects(multiObjectDeleteRequest);
        LOG.info("Successfully deleted {} error files.", delObjRes.getDeletedObjects().size());
      } catch (MultiObjectDeleteException e) {
        LOG.error(e.getMessage());
        LOG.info("No. of objects successfully deleted = {}", e.getDeletedObjects().size());
        LOG.info("No. of objects failed to delete = {}", e.getErrors().size());
        for (MultiObjectDeleteException.DeleteError deleteError : e.getErrors()) {
          LOG.info("Object Key: {}\t{}\t{}", deleteError.getKey(), deleteError.getCode(), deleteError.getMessage());
        }
      }
    }
  }

  /**
   * Creates an S3Writer that writes a bundle of data.
   */
  @Override
  public S3Writer createWriter(PipelineOptions pipelineOptions) throws Exception {
    return new S3Writer(this, bucket, region, key);
  }

  /**
   * Returns the Sink that this write operation writes to.
   */
  @Override
  public S3Sink getSink() {
    return s3Sink;
  }

  /**
   * Returns a coder for write results
   */
  @Override
  public Coder<S3WriteResult> getWriterResultCoder() {
    return new S3WriteResultCoder();
  }
}

Implementing the Sink

Given the S3WriteOperation and S3Writer, the S3Sink itself is fairly straight-forward. The first portion is the validate function that simply checks that you are able to write to an S3 bucket. The only other step required by the Sink is to create the WriteOperation. In a sense, this “kicks-off” the writing process.

public final class S3Sink extends Sink<String> {

  private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
  private static final String LINE_SEP = System.getProperty("line.separator");

  private final String region;
  private final String bucket;
  private final String key;

  private S3Sink(String bucket, String region, String key) {
    this.bucket = bucket;
    this.region = region;
    this.key = key;
  }

  public static S3Sink of(String bucket, String region, String key) {
    return new S3Sink(bucket, region, key);
  }

  /**
   * Ensures an S3 bucket exists to hold results.
   *
   * @param pipelineOptions options that started the pipeline
   */
  @Override
  public void validate(PipelineOptions pipelineOptions) {
    AmazonS3 s3client = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
    s3client.setRegion(Region.getRegion(Regions.fromName(region)));

    try {
      // Create bucket if it does not exist.
      if (!(s3client.doesBucketExist(bucket))) {
        // Note that CreateBucketRequest does not specify region. So bucket is
        // created in the region specified in the client.
        s3client.createBucket(new CreateBucketRequest(bucket));
      }
    } catch (AmazonServiceException ase) {
      LOG.error("Error Message:    " + ase.getMessage());
      LOG.error("HTTP Status Code: " + ase.getStatusCode());
      LOG.error("AWS Error Code:   " + ase.getErrorCode());
      LOG.error("Error Type:       " + ase.getErrorType());
      LOG.error("Request ID:       " + ase.getRequestId());
      assert false;
    } catch (AmazonClientException ace) {
      LOG.error("Error Message: " + ace.getMessage());
      assert false;
    }
  }

  /**
   * Returns a WriteOperation object that defines how to write to the Sink.
   * Called at pipeline creation.
   */
  @Override
  public WriteOperation<String, ?> createWriteOperation(PipelineOptions pipelineOptions) {
    return new S3WriteOperation(this, bucket, region, key);
  }
}

Testing the Sink

To actually test the sink, we can leverage some of the example pipelines from the SDK. This example uses a simple word count and the DirectPipelineRunner.

public class S3SinkTest {

  private static final Logger LOG = LoggerFactory.getLogger(S3SinkTest.class);

  public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
      .as(DataflowPipelineOptions.class);
    options.setRunner(DirectPipelineRunner.class);
    options.setProject("my-project");
    options.setStagingLocation("gs://my-project/staging/");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
     .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
                       @Override
                       public void processElement(ProcessContext c) {
                         for (String word : c.element().split("[^a-zA-Z']+")) {
                           if (!word.isEmpty()) {
                             c.output(word);
                           }
                         }
                       }
                     }))
     .apply(Count.<String>perElement())
     .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                       @Override
                       public String apply(KV<String, Long> input) {
                         return input.getKey() + ", " + input.getValue();
                       }
                     }))
     .apply(Write.to(S3Sink.of("sookocheff.com", "us-west-2", "s3sink")));

    // Run the pipeline.
    p.run();
  }
}