This article describes how you can use the Dataflow/Beam SDK to write files to an S3
bucket by implementing a Sink
.
A Sink
has three phases: initialization, writing, and finalization. The
initialization phase is a sequential process where you can create necessary
preconditions such as output directories. The write phase lets workers write
bundles of records to the Sink
. The finalization phase allows for cleanup like
merging files or committing writes.
In the rest of this article I will outline how to use the Sink
API to develop a
custom Beam Write transform that outputs data to Amazon S3.
The Sink API
To implement a Sink
you need to extend three classes defined by the API: a (1) Sink
, a (2)
Sink.WriteOperation
and (3) a Sink.Writer
(1) Sink:
The Sink describes the resource to write to. This may be the path to an output
directory on a filesystem or a database table. The Sink
is reponsible for
intializing a WriteOperation
.
(2) Sink.WriteOperation:
WriteOperation
implements the initialization and finalization phases of a write.
It is responsible for spawning Writer
instances that are responsible for writing
individual records.
(3) Sink.Writer:
Writer does the actual writing of a bundle of records.
A skeletal implementation that writes Strings
to an S3 bucket and overrides all
of the necessary methods follows. For more information on each of these methods, the Javadoc contains a lot of great details.
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
public class S3Sink extends Sink<String> {
@Override
public void validate(PipelineOptions pipelineOptions) {
}
@Override
public WriteOperation<String, ?> createWriteOperation(PipelineOptions pipelineOptions) {
return null;
}
protected static final class S3WriteOperation extends Sink.WriteOperation<String, S3WriteResult> {
@Override
public void initialize(PipelineOptions pipelineOptions) throws Exception {
}
@Override
public void finalize(Iterable<S3WriteResult> iterable, PipelineOptions pipelineOptions) throws Exception {
}
@Override
public S3Writer createWriter(PipelineOptions pipelineOptions) throws Exception {
return null;
}
@Override
public S3Sink getSink() {
return null;
}
@Override
public Coder<S3WriteResult> getWriterResultCoder() {
return null;
}
}
protected static final class S3Writer extends Writer<String, S3WriteResult> {
@Override
public void open(String bundleId) throws Exception {
}
@Override
public void write(String toWrite) throws Exception {
}
@Override
public S3WriteResult close() throws Exception {
return null;
}
@Override
public S3WriteOperation getWriteOperation() {
return null;
}
}
}
Now it’s up to us to implement the methods to define writing to our Sink
. To me
it makes the most sense to do this from the inside out, starting with the Writer
class.
Implementing Writer
A Writer
class is responsible for writing a bundle of elements from a
PCollection
to a sink. It is defined using two type parameters: T, the type of
object being written to the Sink
and WriteT
, the type of the results from the
write (such as a filename or destination bucket). To implement the Writer
interface requires implementing open
and close
methods that are called
before and after the bundle has been written along with a write
method that is
called for each value in the bundle being written.
Each instance of the Writer
class is responsible for writing a bundle of data.
When starting, open
is called with a unique String
identifier that is used to
ensure that writes using different workers do not interfere with one another.
This unique identifier is used during retry and failure scenarios to be
described later. At this point in time, we are only worried about writing single
values to S3.
The return value of a call to close
is an encoding of the success or failure
of writing a bundle and the unique bundle id that was given to your Writer
during the call to open
.
The operations to write to an S3 bucket are defined by the S3 Java SDK and related documentation.
In the implementation that follows, the open()
function is used to create a
temporary File
on the machine running your Beam job.
/**
* A Writer writes a bundle of elements from a PCollection to a sink. open() is
* called before writing begins and close() is called after all elements in the
* bundle have been written. write() writes an individual element.
*/
protected static final class S3Writer extends Writer<String, S3WriteResult> {
final S3WriteOperation s3WriteOperation;
final String bucket;
final String region;
final String key;
String bundleId;
File tempFile;
/**
* Create a new write operation.
* @param s3WriteOperation the operation that created this writer.
* @param bucket the S3 bucket to write to.
* @param region the S3 region to write to.
* @param key the S3 key to prefix output files output with.
*/
public S3Writer(S3WriteOperation s3WriteOperation, String bucket, String region, String key) {
this.s3WriteOperation = s3WriteOperation;
this.bucket = bucket;
this.region = region;
this.key = key;
}
/**
* Creates a temporary file for writing.
* <p>
* The unique id that is given to open is used to ensure that the temporary filename
* does not collide with other Writers, as a bundle may be executed many times for
* fault tolerance.
*/
@Override
public void open(String bundleId) throws Exception {
this.bundleId = bundleId;
this.tempFile = File.createTempFile(bundleId, ".txt");
}
/**
* Called for each value in the bundle. Writes each record to the temporary
* file.
*/
@Override
public void write(String toWrite) throws Exception {
Files.append(toWrite + LINE_SEP, tempFile, Charsets.UTF_8);
}
/**
* Uploads the temporary file to the S3 bucket.
* <p>
* The return type encodes the result of the write for failure handling.
*/
@Override
public S3WriteResult close() {
String fileKey = key + "_" + bundleId + ".csv";
try {
AmazonS3Client s3client = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
s3client.putObject(new PutObjectRequest(bucket, fileKey, tempFile));
return new S3WriteResult(S3WriteResult.Result.Success, bundleId, fileKey);
} catch (AmazonServiceException ase) {
LOG.error("Error Message: " + ase.getMessage());
LOG.error("HTTP Status Code: " + ase.getStatusCode());
LOG.error("AWS Error Code: " + ase.getErrorCode());
LOG.error("Error Type: " + ase.getErrorType());
LOG.error("Request ID: " + ase.getRequestId());
return new S3WriteResult(S3WriteResult.Result.AmazonServiceException, bundleId, fileKey);
} catch (AmazonClientException ace) {
LOG.error("Error Message: " + ace.getMessage());
return new S3WriteResult(S3WriteResult.Result.AmazonClientException, bundleId, fileKey);
}
}
/**
* Returns the write operation this writer belongs to.
*/
@Override
public S3WriteOperation getWriteOperation() {
return s3WriteOperation;
}
}
Implementing a Write Result type
Each Writer
also has a return type that encodes the success or failure of
writing and uniquely identifies the bundle that was being written. For these
purposes we define a POJO that encodes an enum
of the job status, along with
the unique bundle identifier for this job and the key for this job.
/**
* A representation of the result of writing to an S3 sink.
*/
public class S3WriteResult {
public enum Result {
Success,
AmazonServiceException,
AmazonClientException,
}
private final Result result;
private final String bundleId;
private final String key;
public S3WriteResult(Result result, String bundleId, String key) {
this.result = result;
this.bundleId = bundleId;
this.key = key;
}
@JsonCreator
public static S3WriteResult of(@JsonProperty("result") Result result,
@JsonProperty("bundleId") String bundleId,
@JsonProperty("key") String key) {
return new S3WriteResult(result, bundleId, key);
}
@JsonProperty
public Result getResult() {
return result;
}
@JsonProperty
public String getBundleId() {
return bundleId;
}
@JsonProperty
public String getKey() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
S3WriteResult that = (S3WriteResult) o;
return result == that.result &&
Objects.equals(bundleId, that.bundleId) &&
Objects.equals(key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(result, bundleId, key);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.addValue(this.result)
.addValue(this.bundleId)
.addValue(this.key)
.toString();
}
}
Given the S3Writer
and the S3WriteResult
we have the ability to write
individual files to S3. Let’s take a step back and work on a WriteOperation
.
Implementing WriteOperation
A WriteOperation
defines the process of using parallel Writer
s to write
data. The WriteOperation
is responsible for creating instances of a
Writer
and each of those instances may be written in parallel. For the case of
writing to S3, the finalization step cleans up any failed writes. Future work
could include merging files together and handling retries on failures.
Another way to think about a WriteOperation
is by thinking of the
WriteOperation
as a Java process and each Writer
as an individual thread.
/**
* Defines the process of a parallel write of objects.
* <p>
* Defines how to initialize and finalize the parallel write and can create writer objects to
* perform the actual writing.
*/
protected static final class S3WriteOperation extends Sink.WriteOperation<String, S3WriteResult> {
final S3Sink s3Sink;
final String bucket;
final String region;
final String key;
/**
* Create a new write operation.
* @param s3Sink the sink that created this operation.
* @param bucket the S3 bucket to write to.
* @param region the S3 region to write to.
* @param key the S3 key to prefix files with.
*/
public S3WriteOperation(S3Sink s3Sink, String bucket, String region, String key) {
this.s3Sink = s3Sink;
this.bucket = bucket;
this.region = region;
this.key = key;
}
/**
* Performs any necessary initialization before writing to the output location.
* This method is called before writing begins. In our case we do not need
* to perform any initialization.
*/
@Override
public void initialize(PipelineOptions pipelineOptions) throws Exception {
// No initialization necessary.
}
/**
* Deletes any results that failed to write.
* Must perform clean-up of any failed writes or writes that were successfully re-tried.
*/
@Override
public void finalize(Iterable<S3WriteResult> iterable, PipelineOptions pipelineOptions) throws Exception {
AmazonS3Client s3Client = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
DeleteObjectsRequest multiObjectDeleteRequest = new DeleteObjectsRequest(bucket);
List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<DeleteObjectsRequest.KeyVersion>();
// Remove any objects that had write failures.
for (S3WriteResult result : iterable) {
if (result.getResult() == S3WriteResult.Result.Success) {
LOG.info("Successfully written: {}", result.getKey());
} else {
keys.add(new DeleteObjectsRequest.KeyVersion(result.getKey()));
}
}
if (keys.size() > 0) {
multiObjectDeleteRequest.setKeys(keys);
try {
DeleteObjectsResult delObjRes = s3Client.deleteObjects(multiObjectDeleteRequest);
LOG.info("Successfully deleted {} error files.", delObjRes.getDeletedObjects().size());
} catch (MultiObjectDeleteException e) {
LOG.error(e.getMessage());
LOG.info("No. of objects successfully deleted = {}", e.getDeletedObjects().size());
LOG.info("No. of objects failed to delete = {}", e.getErrors().size());
for (MultiObjectDeleteException.DeleteError deleteError : e.getErrors()) {
LOG.info("Object Key: {}\t{}\t{}", deleteError.getKey(), deleteError.getCode(), deleteError.getMessage());
}
}
}
}
/**
* Creates an S3Writer that writes a bundle of data.
*/
@Override
public S3Writer createWriter(PipelineOptions pipelineOptions) throws Exception {
return new S3Writer(this, bucket, region, key);
}
/**
* Returns the Sink that this write operation writes to.
*/
@Override
public S3Sink getSink() {
return s3Sink;
}
/**
* Returns a coder for write results
*/
@Override
public Coder<S3WriteResult> getWriterResultCoder() {
return new S3WriteResultCoder();
}
}
Implementing the Sink
Given the S3WriteOperation
and S3Writer
, the S3Sink
itself is fairly
straight-forward. The first portion is the validate
function that simply
checks that you are able to write to an S3 bucket. The only other step required
by the Sink
is to create the WriteOperation
. In a sense, this “kicks-off”
the writing process.
public final class S3Sink extends Sink<String> {
private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final String LINE_SEP = System.getProperty("line.separator");
private final String region;
private final String bucket;
private final String key;
private S3Sink(String bucket, String region, String key) {
this.bucket = bucket;
this.region = region;
this.key = key;
}
public static S3Sink of(String bucket, String region, String key) {
return new S3Sink(bucket, region, key);
}
/**
* Ensures an S3 bucket exists to hold results.
*
* @param pipelineOptions options that started the pipeline
*/
@Override
public void validate(PipelineOptions pipelineOptions) {
AmazonS3 s3client = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
s3client.setRegion(Region.getRegion(Regions.fromName(region)));
try {
// Create bucket if it does not exist.
if (!(s3client.doesBucketExist(bucket))) {
// Note that CreateBucketRequest does not specify region. So bucket is
// created in the region specified in the client.
s3client.createBucket(new CreateBucketRequest(bucket));
}
} catch (AmazonServiceException ase) {
LOG.error("Error Message: " + ase.getMessage());
LOG.error("HTTP Status Code: " + ase.getStatusCode());
LOG.error("AWS Error Code: " + ase.getErrorCode());
LOG.error("Error Type: " + ase.getErrorType());
LOG.error("Request ID: " + ase.getRequestId());
assert false;
} catch (AmazonClientException ace) {
LOG.error("Error Message: " + ace.getMessage());
assert false;
}
}
/**
* Returns a WriteOperation object that defines how to write to the Sink.
* Called at pipeline creation.
*/
@Override
public WriteOperation<String, ?> createWriteOperation(PipelineOptions pipelineOptions) {
return new S3WriteOperation(this, bucket, region, key);
}
}
Testing the Sink
To actually test the sink, we can leverage some of the example pipelines from
the SDK. This example uses a simple word count and the DirectPipelineRunner
.
public class S3SinkTest {
private static final Logger LOG = LoggerFactory.getLogger(S3SinkTest.class);
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DirectPipelineRunner.class);
options.setProject("my-project");
options.setStagingLocation("gs://my-project/staging/");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.<String>perElement())
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ", " + input.getValue();
}
}))
.apply(Write.to(S3Sink.of("sookocheff.com", "us-west-2", "s3sink")));
// Run the pipeline.
p.run();
}
}