Amazon’s Flow Framework provides a high-level SDK for interacting with the Amazon Simple Workflow service (SWF). SWF is a managed service that helps developers build, run and monitor parallel or sequential asynchronous workloads. SWF reliably commits your workflow’s state to durable storage, allowing you to focus on your business logic rather than on the complex coordination of distributed services.
Writing an application with the flow framework can be divided into the following steps:
- Define your workflow’s activities
- Define a workflow topology describing how your activities interact
- Implement your workflow and activities
- Implement workflow and activity hosts that process your workflow’s tasks
- Deploy your workers
- Execute your workflow
The rest of this article is divided into one section describing each of these steps. By following to the end of this article you will understand the complete development process for writing an AWS Flow Framework application. For a more in-depth treatment, consult the AWS Flow Framework developer’s guide and the AWS Flow Framework Samples — much of the content of this article was derived from those two resources.
Source code for this article can be found on Github.
1. Define activities
As a motivating example, imagine a file processing application. The interface to the application will be generic and allow for implementations that process the files in user-defined ways. The overall workflow will be to download a file from S3, process that file, and upload it back to S3. Each of these steps corresponds to an “activity” that our workflow must perform. The first step in creating your application is defining these activities.
To be explicit, here are the four activities required by our application:
- Download a file
- Process the file locally
- Upload the processed file to a new location
- Perform local cleanup
In the Flow Framework, all activities are defined as part of a Java
interface. For a cleaner separate of concerns, I divide our activities in
to two interfaces, one for processing files, and one for handling the
storage system. The first interface, StorageActivities
lists all of the
activities for storing files using the @Activities
annotation:
/**
* Flow framework contract for storage activities.
*/
@Activities(version = "1.0")
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 60, defaultTaskStartToCloseTimeoutSeconds = 120)
public interface StorageActivities {
/**
* Upload a file to storage.
*
* @param bucketName Name of the S3 bucket to upload to
* @param localName Local name of the file to upload to S3
* @param remoteName Name of the file to use when uploaded to S3
*/
@ExponentialRetry(initialRetryIntervalSeconds = 10, maximumAttempts = 10)
void upload(String bucketName, String localName, String remoteName);
/**
* Download a file from storage.
*
* @param bucketName Name of the S3 bucket to download from
* @param remoteName Name of the file to download from S3
* @param localName Local name of the file to download to
*/
@ExponentialRetry(initialRetryIntervalSeconds = 10, maximumAttempts = 10)
String download(String bucketName, String remoteName, String localName) throws Exception;
/**
* Delete temporary local files.
*
* @param fileName Name of file to delete from temporary folder
*/
@ExponentialRetry(initialRetryIntervalSeconds = 10)
void deleteLocalFile(String fileName);
}
This code uses a few extra annotations that specify the activities error
conditions. First, the class has an @ActivityRegistrationOptions
annotation that defines the length to wait before throwing an error.
Second, the activities that interact with the storage system are given an
ExponentialRetry
annotation. This annotation signals to the SWF service
to retry on failure up to a maximum number of times. Since the storage
activities are intended to work over the network, this annotation helps
with overcoming transient failures.
Our second interface is defined for handling files:
/**
* Flow framework contract for file processing activities.
*/
@Activities(version="1.0")
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 60, defaultTaskStartToCloseTimeoutSeconds = 60)
public interface FileActivities {
/**
* Process the file at inputFileName and output the result to outputFileName.
* @param inputFileName the name of the file to process
* @param outputFileName the name of the processed file
* @throws Exception
*/
void processFile(String inputFileName, String outputFileName) throws Exception;
}
This interface has a single method for file processing.
Note that each @Activities
annotation contains a version number. SWF
requires that any change to the interface has a unique version number.
This implies that you are free to change the underlying implementation of
your workflow without worry. However, if you change the interface, you
must release a new version of your code to be run through the SWF service.
2. Define your workflow
An application’s workflow is defined using @Workflow
annotations. The
workflow acts as an entry-point to your application and allows the SWF to
manage the state of your application. The workflow connects together the
results of activity tasks. Workflow entry points are denoted by the
@Execute
annotation. In the interface below, we expose a single entry
point for processing a file.
/**
* Flow framework Contract for the file processing workflow.
*/
@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 300, defaultTaskStartToCloseTimeoutSeconds = 10)
public interface FileProcessingWorkflow {
/**
* Process the file at inputBucketName.inputFileName.
* Place the result at outputBucketName.outputFileName.
*
* @param inputBucketName input bucket to process from
* @param inputFilename input file to process from
* @param outputBucketName output bucket to put result to
* @param outputFilename output file to put result to
* @throws IOException
*/
@Execute(name = "ProcessFile", version = "1.0")
void processFile(String inputBucketName, String inputFilename, String outputBucketName, String outputFilename) throws IOException;
/**
* Get the current state of the workflow. This is reported to the SWF console and
* through SWF APIs.
*
* When the decider is done processing a decision task, it fetches the latest state
* using the @GetState annotation.
*
* @return current state of the workflow
*/
@GetState
String getState();
}
The @GetState
annotation exposes an API for the SWF service to retrieve
the current state of a workflow. This state is used in the SWF interface
to aid in debugging and monitoring your workflow
3. Implement our interfaces
Now we can implement the interfaces we have defined, starting with the
activities. Our application will simply zip files during processing, the
details of which are not too important for understanding the Flow
Framework. The most salient parts are that we implement the interface
according to the contract defined by the FileActivities
interface. Full
source code is available on
Github
for the curious.
/**
* An implementation of FileActivities that processes a file by zipping it.
*/
public class ZipFileActivities implements FileActivities {
/**
* Zips the file at inputFileName and output the result to outputFileName.
* @param inputFileName the name of the file to process
* @param outputFileName the name of the processed file
* @throws Exception
*/
@Override
public void processFile(String inputFileName, String outputFileName) throws Exception {
// Implement zipping
}
}
Likewise, we need to implement file storage through the
StorageActivities
interface. Details are also omitted from this
implementation and can be found here
/**
* This is an S3 storage implementation which provides Activities to
* download/upload files from S3.
*/
public class S3StorageActivities implements StorageActivities {
/**
* Upload a file to storage.
*
* @param bucketName Name of the S3 bucket to upload to
* @param localName Local name of the file to upload to S3
* @param remoteName Name of the file to use when uploaded to S3
*/
@Override
public void upload(String bucketName, String localName, String remoteName) {
// upload to S3
}
/**
* Download a file from storage.
*
* @param bucketName Name of the S3 bucket to download from
* @param remoteName Name of the file to download from S3
* @param localName Local name of the file to download to
*/
@Override
public String download(String bucketName, String remoteName, String localName) throws Exception {
// download from S3 to local host
}
/**
* Delete temporary local files.
*
* @param fileName Name of file to delete from temporary folder
*/
@Override
public void deleteLocalFile(String fileName) {
// download local file
}
}
At this point we our workflow’s activities are defined. To implement the
actual workflow — the class with the @Workflow
annotation — we need to
generate activity clients. These clients handle the details of sending
HTTP requests, marshalling data, and handling the asynchronous execution
state. Clients are generated using AspectJ and your environment must be
configured to use AspectJ according to the guide
here.
For Maven users, the
pom.xml
in our starter project provides a working example.
For every @Activities
annotation, the compiler generates a corresponding
ActivitiesClient
is for the interface. For example, to use our
StorageActivities
interface we create and use an instance of the
corresponding StorageActivitiesClient
. Likewise for FileActivities
and
the FileActivitiesClient
. The implementation of our workflow is done
using these clients.
For example, to implement a workflow for zipping a file and uploading it to S3, we can use the generated clients:
public class ZipS3FileProcessingWorkflow implements FileProcessingWorkflow {
// Storage client is auto-generated by flow framework to process storage activities
private final StorageActivitiesClient storageClient;
// File client is auto-generated by flow framework to process file activities
private final FileActivitiesClient fileClient;
/**
* Create a new instance of this workflow.
*/
public ZipS3FileProcessingWorkflow() {
storageClient = new StorageActivitiesClientImpl();
fileClient = new FileActivitiesClientImpl();
}
@Override
public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) {
// process file using fileClient and storageClient
}
}
It’s at this point where we define the logic for our workflow using Promise types and the control flow logic provided by the Flow Framework. Promises are Future-like objects that act as a placeholder for the result of an asynchronous API call. When executing a Promise, control flow in your application returns immediately while the Promise is executed asynchronously. When a portion of your code requires the result of the Promise, your application blocks until the asynchronous code completes. Using promises, your application can run asynchronous tasks either sequentially or in parallel.
In addition to promises, the Flow Framework provides control flow logic
such as a TryCatchFinally
block that uses the result of a promise to
decide whether or not to throw an exception. This provides an asynchronous
version of the familiar try-catch-finally
functionality in Java. In the
following simplified code example, we start by download a file from S3
using the storage client. This call returns a Promise of the hostname
where the file was downloaded to. We use the result of the promise to
process the file on the local machine and then upload the result back to
S3. Here we use promise ‘chaining’ to use the result of one promise as the
input to a second asynchronous function.
The finally block in this example is used to cleanup any resources that are no longer needed.
@Override
public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) {
// omitted ...
new TryCatchFinally() {
/**
* Download the file from S3, process it locally through a chained task, and upload back to S3.
* @throws Throwable
*/
@Override
protected void doTry() throws Throwable {
// download from S3, returns the host that downloaded the file
Promise<String> hostName = storageClient.download(inputBucketName, inputFileName, localInputFileName);
// chaining is a way for one promise to get assigned the value of another
// when the promise is complete, it's value will be available for subsequent operations
hostNameChain.chain(hostName);
// zip the file on the local host
processFileOnHost(localInputFileName, localOutputFileName, hostName);
// upload the zipped file back to S3
upload(outputBucketName, outputFileName, localOutputFileName, hostNameChain);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
state = "Failed: " + e.getMessage();
throw e;
}
@Override
protected void doFinally() throws Throwable {
if (hostNameChain.isReady()) { // File was downloaded
// Set option to schedule activity in worker specific task list
ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostNameChain.get());
// Call deleteLocalFile activity using the host specific task list
storageClient.deleteLocalFile(localInputFileName, options);
storageClient.deleteLocalFile(localOutputFileName, options);
}
if (!state.startsWith("Failed:")) {
state = "Completed";
}
}
};
}
At this point, we have workflow and activity implementations. We now need to run host programs that will run these classes.
4. Implement activity and workflow hosts
The host programs are responsible for polling SWF for tasks and executing them. The hosts are fairly straight-forward — you just need to point them at the activity or workflow implementations we have written to process tasks. Activity and workflow hosts act as standalone executable applications that can be deployed and run independently from the executions of your main workflow.
For example, running an activity worker that will poll for work from a common task list and execute S3 activities can be done with the following code.
// Start worker to poll the common worker task list
final ActivityWorker workerForCommonTaskList = new ActivityWorker(swfClient, domain, taskList);
S3StorageActivities s3Activities = new S3StorageActivities(s3Client, hostName, localFolder);
workerForCommonTaskList.addActivitiesImplementation(s3Activities);
workerForCommonTaskList.start();
LOG.info("Host Service Started for Task List: " + taskList);
And workflow workers that execute the workflow tasks are created using a similar construct.
// Create a workflow worker WorkflowWorker worker = new
WorkflowWorker(swfClient, domain, taskList);
worker.addWorkflowImplementationType(ZipS3FileProcessingWorkflow.class);
worker.start();
5. Start your workers
You can deploy workers from any machine that has access to the SWF service. This includes your local machine, EC2, or your own data centre or server. Starting workers is as simple as executing the host programs you have previously defined. With Maven, you can package your code as a JAR file and run it as you would a regular JAR file.
6. Start executions
Once workers are available to process your tasks, you can start an execution of your workflow. This can be done through the SWF user interface on AWS or programmatically. In a production environment, this would typically be done programmatically in response to an outside event (for example, in response to receiving a message through SNS).
Starting a workflow execution is done by using an auto-generated workflow
client. In our example, we start a workflow by instantiating a workflow
client and then calling the method defined with the @Execute
annotation.
// FileProcessingWorkflowClientExternalFactory is auto-generated through flow framework
FileProcessingWorkflowClientExternalFactory clientFactory =
new FileProcessingWorkflowClientExternalFactoryImpl(config.createSWFClient(), config.getSwfDomain());
FileProcessingWorkflowClientExternal workflow = clientFactory.getClient();
// Start workflow execution
workflow.processFile(
"s3-input-bucket", "s3-input-file",
"s3-output-bucket", "s3-output-file");
Summary
Amazon’s SWF service and the Flow Framework are fairly complex systems and this guide only touched the surface. The official documentation is the best resource for learning more.
The code used in this article is can be found on Github.