Getting Started with Amazon Flow Framework

Amazon’s Flow Framework provides a high-level SDK for interacting with the Amazon Simple Workflow service (SWF). SWF is a managed service that helps developers build, run and monitor parallel or sequential asynchronous workloads. SWF reliably commits your workflow’s state to durable storage, allowing you to focus on your business logic rather than on the complex coordination of distributed services.

Writing an application with the flow framework can be divided into the following steps:

  1. Define your workflow’s activities
  2. Define a workflow topology describing how your activities interact
  3. Implement your workflow and activities
  4. Implement workflow and activity hosts that process your workflow’s tasks
  5. Deploy your workers
  6. Execute your workflow

The rest of this article is divided into one section describing each of these steps. By following to the end of this article you will understand the complete development process for writing an AWS Flow Framework application. For a more in-depth treatment, consult the AWS Flow Framework developer’s guide and the AWS Flow Framework Samples — much of the content of this article was derived from those two resources.

Source code for this article can be found on Github.

1. Define activities

As a motivating example, imagine a file processing application. The interface to the application will be generic and allow for implementations that process the files in user-defined ways. The overall workflow will be to download a file from S3, process that file, and upload it back to S3. Each of these steps corresponds to an “activity” that our workflow must perform. The first step in creating your application is defining these activities.

To be explicit, here are the four activities required by our application:

  1. Download a file
  2. Process the file locally
  3. Upload the processed file to a new location
  4. Perform local cleanup

In the Flow Framework, all activities are defined as part of a Java interface. For a cleaner separate of concerns, I divide our activities in to two interfaces, one for processing files, and one for handling the storage system. The first interface, StorageActivities lists all of the activities for storing files using the @Activities annotation:

/**
 * Flow framework contract for storage activities.
 */
@Activities(version = "1.0")
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 60, defaultTaskStartToCloseTimeoutSeconds = 120)
public interface StorageActivities {

  /**
   * Upload a file to storage.
   *
   * @param bucketName Name of the S3 bucket to upload to
   * @param localName  Local name of the file to upload to S3
   * @param remoteName Name of the file to use when uploaded to S3
   */
  @ExponentialRetry(initialRetryIntervalSeconds = 10, maximumAttempts = 10)
  void upload(String bucketName, String localName, String remoteName);

  /**
   * Download a file from storage.
   *
   * @param bucketName Name of the S3 bucket to download from
   * @param remoteName Name of the file to download from S3
   * @param localName  Local name of the file to download to
   */
  @ExponentialRetry(initialRetryIntervalSeconds = 10, maximumAttempts = 10)
  String download(String bucketName, String remoteName, String localName) throws Exception;

  /**
   * Delete temporary local files.
   *
   * @param fileName Name of file to delete from temporary folder
   */
  @ExponentialRetry(initialRetryIntervalSeconds = 10)
  void deleteLocalFile(String fileName);

}

This code uses a few extra annotations that specify the activities error conditions. First, the class has an @ActivityRegistrationOptions annotation that defines the length to wait before throwing an error. Second, the activities that interact with the storage system are given an ExponentialRetry annotation. This annotation signals to the SWF service to retry on failure up to a maximum number of times. Since the storage activities are intended to work over the network, this annotation helps with overcoming transient failures.

Our second interface is defined for handling files:

/**
 * Flow framework contract for file processing activities.
 */
@Activities(version="1.0")
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 60, defaultTaskStartToCloseTimeoutSeconds = 60)
public interface FileActivities {

  /**
   * Process the file at inputFileName and output the result to outputFileName.
   * @param inputFileName the name of the file to process
   * @param outputFileName the name of the processed file
   * @throws Exception
   */
  void processFile(String inputFileName, String outputFileName) throws Exception;

}

This interface has a single method for file processing.

Note that each @Activities annotation contains a version number. SWF requires that any change to the interface has a unique version number. This implies that you are free to change the underlying implementation of your workflow without worry. However, if you change the interface, you must release a new version of your code to be run through the SWF service.

2. Define your workflow

An application’s workflow is defined using @Workflow annotations. The workflow acts as an entry-point to your application and allows the SWF to manage the state of your application. The workflow connects together the results of activity tasks. Workflow entry points are denoted by the @Execute annotation. In the interface below, we expose a single entry point for processing a file.

/**
 * Flow framework Contract for the file processing workflow.
 */
@Workflow
@WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 300, defaultTaskStartToCloseTimeoutSeconds = 10)
public interface FileProcessingWorkflow {

  /**
   * Process the file at inputBucketName.inputFileName.
   * Place the result at outputBucketName.outputFileName.
   *
   * @param inputBucketName input bucket to process from
   * @param inputFilename input file to process from
   * @param outputBucketName output bucket to put result to
   * @param outputFilename output file to put result to
   * @throws IOException
   */
  @Execute(name = "ProcessFile", version = "1.0")
  void processFile(String inputBucketName, String inputFilename, String outputBucketName, String outputFilename) throws IOException;

  /**
   * Get the current state of the workflow. This is reported to the SWF console and
   * through SWF APIs.
   *
   * When the decider is done processing a decision task, it fetches the latest state
   * using the @GetState annotation.
   *
   * @return current state of the workflow
   */
  @GetState
  String getState();

}

The @GetState annotation exposes an API for the SWF service to retrieve the current state of a workflow. This state is used in the SWF interface to aid in debugging and monitoring your workflow

3. Implement our interfaces

Now we can implement the interfaces we have defined, starting with the activities. Our application will simply zip files during processing, the details of which are not too important for understanding the Flow Framework. The most salient parts are that we implement the interface according to the contract defined by the FileActivities interface. Full source code is available on Github for the curious.

/**
 * An implementation of FileActivities that processes a file by zipping it.
 */
public class ZipFileActivities implements FileActivities {

  /**
   * Zips the file at inputFileName and output the result to outputFileName.
   * @param inputFileName the name of the file to process
   * @param outputFileName the name of the processed file
   * @throws Exception
   */
  @Override
  public void processFile(String inputFileName, String outputFileName) throws Exception {
    // Implement zipping
  }
}

Likewise, we need to implement file storage through the StorageActivities interface. Details are also omitted from this implementation and can be found here

/**
 * This is an S3 storage implementation which provides Activities to
 * download/upload files from S3.
 */
public class S3StorageActivities implements StorageActivities {

  /**
   * Upload a file to storage.
   *
   * @param bucketName Name of the S3 bucket to upload to
   * @param localName  Local name of the file to upload to S3
   * @param remoteName Name of the file to use when uploaded to S3
   */
  @Override
  public void upload(String bucketName, String localName, String remoteName) {
    // upload to S3
  }

  /**
   * Download a file from storage.
   *
   * @param bucketName Name of the S3 bucket to download from
   * @param remoteName Name of the file to download from S3
   * @param localName  Local name of the file to download to
   */
  @Override
  public String download(String bucketName, String remoteName, String localName) throws Exception {
    // download from S3 to local host
  }

  /**
   * Delete temporary local files.
   *
   * @param fileName Name of file to delete from temporary folder
   */
  @Override
  public void deleteLocalFile(String fileName) {
    // download local file
  }

}

At this point we our workflow’s activities are defined. To implement the actual workflow — the class with the @Workflow annotation — we need to generate activity clients. These clients handle the details of sending HTTP requests, marshalling data, and handling the asynchronous execution state. Clients are generated using AspectJ and your environment must be configured to use AspectJ according to the guide here. For Maven users, the pom.xml in our starter project provides a working example.

For every @Activities annotation, the compiler generates a corresponding ActivitiesClient is for the interface. For example, to use our StorageActivities interface we create and use an instance of the corresponding StorageActivitiesClient. Likewise for FileActivities and the FileActivitiesClient. The implementation of our workflow is done using these clients.

For example, to implement a workflow for zipping a file and uploading it to S3, we can use the generated clients:

public class ZipS3FileProcessingWorkflow implements FileProcessingWorkflow {

  // Storage client is auto-generated by flow framework to process storage activities
  private final StorageActivitiesClient storageClient;

  // File client is auto-generated by flow framework to process file activities
  private final FileActivitiesClient fileClient;

  /**
   * Create a new instance of this workflow.
   */
  public ZipS3FileProcessingWorkflow() {
    storageClient = new StorageActivitiesClientImpl();
    fileClient = new FileActivitiesClientImpl();
  }

  @Override
  public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) {
    // process file using fileClient and storageClient
  }
}

It’s at this point where we define the logic for our workflow using Promise types and the control flow logic provided by the Flow Framework. Promises are Future-like objects that act as a placeholder for the result of an asynchronous API call. When executing a Promise, control flow in your application returns immediately while the Promise is executed asynchronously. When a portion of your code requires the result of the Promise, your application blocks until the asynchronous code completes. Using promises, your application can run asynchronous tasks either sequentially or in parallel.

In addition to promises, the Flow Framework provides control flow logic such as a TryCatchFinally block that uses the result of a promise to decide whether or not to throw an exception. This provides an asynchronous version of the familiar try-catch-finally functionality in Java. In the following simplified code example, we start by download a file from S3 using the storage client. This call returns a Promise of the hostname where the file was downloaded to. We use the result of the promise to process the file on the local machine and then upload the result back to S3. Here we use promise ‘chaining’ to use the result of one promise as the input to a second asynchronous function.

The finally block in this example is used to cleanup any resources that are no longer needed.

@Override
public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) {

  // omitted ...

  new TryCatchFinally() {

    /**
     * Download the file from S3, process it locally through a chained task, and upload back to S3.
     * @throws Throwable
     */
    @Override
    protected void doTry() throws Throwable {
      // download from S3, returns the host that downloaded the file
      Promise<String> hostName = storageClient.download(inputBucketName, inputFileName, localInputFileName);

      // chaining is a way for one promise to get assigned the value of another
      // when the promise is complete, it's value will be available for subsequent operations
      hostNameChain.chain(hostName);

      // zip the file on the local host
      processFileOnHost(localInputFileName, localOutputFileName, hostName);

      // upload the zipped file back to S3
      upload(outputBucketName, outputFileName, localOutputFileName, hostNameChain);
    }

    @Override
    protected void doCatch(Throwable e) throws Throwable {
      state = "Failed: " + e.getMessage();
      throw e;
    }

    @Override
    protected void doFinally() throws Throwable {
      if (hostNameChain.isReady()) { // File was downloaded

        // Set option to schedule activity in worker specific task list
        ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostNameChain.get());

        // Call deleteLocalFile activity using the host specific task list
        storageClient.deleteLocalFile(localInputFileName, options);
        storageClient.deleteLocalFile(localOutputFileName, options);
      }
      if (!state.startsWith("Failed:")) {
        state = "Completed";
      }
    }

  };
}

At this point, we have workflow and activity implementations. We now need to run host programs that will run these classes.

4. Implement activity and workflow hosts

The host programs are responsible for polling SWF for tasks and executing them. The hosts are fairly straight-forward — you just need to point them at the activity or workflow implementations we have written to process tasks. Activity and workflow hosts act as standalone executable applications that can be deployed and run independently from the executions of your main workflow.

For example, running an activity worker that will poll for work from a common task list and execute S3 activities can be done with the following code.

// Start worker to poll the common worker task list
final ActivityWorker workerForCommonTaskList = new ActivityWorker(swfClient, domain, taskList);
S3StorageActivities s3Activities = new S3StorageActivities(s3Client, hostName, localFolder);
workerForCommonTaskList.addActivitiesImplementation(s3Activities);
workerForCommonTaskList.start();
LOG.info("Host Service Started for Task List: " + taskList);

And workflow workers that execute the workflow tasks are created using a similar construct.

// Create a workflow worker WorkflowWorker worker = new
WorkflowWorker(swfClient, domain, taskList);
worker.addWorkflowImplementationType(ZipS3FileProcessingWorkflow.class);
worker.start();

5. Start your workers

You can deploy workers from any machine that has access to the SWF service. This includes your local machine, EC2, or your own data centre or server. Starting workers is as simple as executing the host programs you have previously defined. With Maven, you can package your code as a JAR file and run it as you would a regular JAR file.

6. Start executions

Once workers are available to process your tasks, you can start an execution of your workflow. This can be done through the SWF user interface on AWS or programmatically. In a production environment, this would typically be done programmatically in response to an outside event (for example, in response to receiving a message through SNS).

Starting a workflow execution is done by using an auto-generated workflow client. In our example, we start a workflow by instantiating a workflow client and then calling the method defined with the @Execute annotation.

// FileProcessingWorkflowClientExternalFactory is auto-generated through flow framework
FileProcessingWorkflowClientExternalFactory clientFactory =
    new FileProcessingWorkflowClientExternalFactoryImpl(config.createSWFClient(), config.getSwfDomain());
FileProcessingWorkflowClientExternal workflow = clientFactory.getClient();

// Start workflow execution
workflow.processFile(
    "s3-input-bucket", "s3-input-file",
    "s3-output-bucket", "s3-output-file");

Summary

Amazon’s SWF service and the Flow Framework are fairly complex systems and this guide only touched the surface. The official documentation is the best resource for learning more.

The code used in this article is can be found on Github.

Like this post? Subscribe via RSS or email to never miss an update.