Skip to content
Jose Lorenzo edited this page May 3, 2017 · 7 revisions

Getting started with the Google App Engine Pipeline Framework in Java

<wiki:toc max_depth="2" />

Introduction

Pipeline is a framework for building dynamic, complex workflows on Google App Engine. This a user’s guide for the Java version of Pipeline.

Add the Pipeline library to your project

Using Maven

The library is available via the Maven Central repositories. Include the following dependency in your projects pom.xml file:

<dependency>
    <groupId>com.google.appengine.tools</groupId>
    <artifactId>appengine-pipeline</artifactId>
    <version>RELEASE</version>
</dependency>

Using Ant with Ivy

Add the following dependency to your project's ivy.xml file:

<dependency org="com.google.appengine.tools" name="appengine-pipeline" rev="latest.integration" />

Using Subversion

Check out the pipelines code:

svn checkout http://appengine-pipeline.googlecode.com/svn/trunk/java

Then, build the code using ant or maven in the directory you just checked out:

cd java
ant

or

cd java
mvn

After building the jars as above, copy them from the dist directory into your application's WEB-INF/lib directory.

Mapping the Library into Your Application

Add a servlet mapping to your web.xml file:

<servlet>
  <servlet-name>PipelineServlet</servlet-name>
  <servlet-class>com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet</servlet-class>
</servlet>
<servlet-mapping>
  <servlet-name>PipelineServlet</servlet-name>
  <url-pattern>/_ah/pipeline/*</url-pattern>
</servlet-mapping>

Jobs

A job is a Java class that extends com.google.appengine.tools.pipeline.Job and implements a method named run(). Jobs are written by the user and run asynchronously by the framework. (In this document the user refers to the developer of an App Engine application who is using the Pipeline framework.)

Job Phases

A job has two phases, the run phase, in which it's run() method is invoked, and the finalize phase, in which the job's final output is submitted to the framework. A job's final output may not be known at the end of its run() method. During the run() method a job may spawn one or more child jobs that will run asynchronously and the parent job may specify its final output to be the final output of one of its child jobs. We call such a job a generator job.

Values

com.google.appengine.tools.pipeline.Value is an interface that represents input to and output from a job. There are two types of values, ImmediateValues and FutureValues. An ImmediateValue is a value that is known now, whereas a FutureValue represents an empty slot that will be filled in the future when some job finalizes.

Job Signatures

A job is characterized by the signature of its run method. A <T, T1, …, Tn>-job is a job whose final output is of type T and that takes as input n arguments of types T1 through Tn. The job's run() method will take these same argument types but will return a Value<T>. For example a <T, T1, T2>-job is a job whose run() method has the signature

Value<T> run(T1 arg1, T2 arg2);

Recall that the final output of a job is not necessarily known when run() returns. That is why run() returns a Value<T> instead of a T. If run() returns an ImmediateValue<T> then the job is called an immediate job and in this case the final output of the job is known by the time run() returns. But run() may return a FutureValue<T> which is only an empty slot to be filled when some child job finalizes.

Generic Parameters

The Job<T> class is parametrized by the output type T. The framework also includes several subclasses of Job named Job0, Job1, Job2, etc that users should use as a superclass for their jobs instead of using Job directly. The Jobn class represents a job that takes n arguments. Jobn declares an abstract run method that must be overridden and implemented in the user’s job class. For example here are Job0, Job1 and Job2:

public abstract class Job0<T> extends Job<T> {
  public abstract Value<T> run();
}

public abstract class Job1<T, T1> extends Job<T> {
  public abstract Value<T> run(T1 arg1);
}

public abstract class Job2<T, T1, T2> extends Job<T> {
  public abstract Value<T> run(T1 arg1, T2 arg2);
}

So if the user wishes to write a job that takes two Integers as input and returns a String then the user should write a job class the extends Job2<String, Integer, Integer>.

By convention we use java.lang.Void for jobs that don't have a return value. Thus a job that takes a List of Strings but doesn't return anything should be a subclass of Job1<Void, List<String>>.

In case the user needs to write a job class that takes more arguments than the greatest n such that the framework provides a Jobn class (as of this writing n=6) the user may write a job class that directly subclasses Job<T>. Job<T> does not declare an abstract run() method and so the user is free to implement a run() method in his job class that takes as many arguments as he wishes. In this case though the user will forfeit some of the type-safety offered by the framework.

First Examples

In order to succinctly illustrate the concepts of the framework our examples will use trivial jobs that do simple arithmetic.

Below are two examples of immediate jobs that take two integers and compute their difference and product respectively.

class DiffJob extends Job2<Integer, Integer, Integer> {
  @Override
  public Value<Integer> run(Integer a, Integer b) {
    return immediate(a - b);
  }
}

class MultJob extends Job2<Integer, Integer, Integer> {
  @Override
  public Value<Integer> run(Integer a, Integer b) {
    return immediate(a*b);
  }
}

The method immediate() being invoked above is simply syntactic sugar. immediate(x) is equivalent to new ImmediateValue(x).

Job Graphs

The framework allows a user to express arrangements of multiple jobs in which the output of one job becomes an input of one or more jobs. These arrangements can best be described as a type of directed graph in which a directed edge from job A to one of the input slots in job B indicates that the output of A should be directed to the input slot of B. For example, suppose we wanted to use DiffJob and MultJob to build a job which takes three integers inputs, x, y, z, and performs the following calculation [ (x - y) * (x - z) ] - 2. That calculation may be expressed as the following job graph. (Graph reads from right to left.)

Job Graph

Generator Jobs

In the Pipeline framework the way in which a developer describes a job graph is by writing a parent job that generates it as part of the parent job's run() method. Below is a generator job called ComplexJob that generates the graph above.

class ComplexJob extends Job3<Integer, Integer, Integer, Integer> {
  @Override
  public Value<Integer> run(Integer x, Integer y, Integer z) {
    DiffJob diffJob = new DiffJob();
    MultJob multJob = new MultJob();
    FutureValue<Integer> r = futureCall(diffJob, immediate(x), immediate(y));
    FutureValue<Integer> s = futureCall(diffJob, immediate(x), immediate(z));
    FutureValue<Integer> t = futureCall(multJob, r, s);
    FutureValue<Integer> u = futureCall(diffJob, t, immediate(2));
    return u;
  }
}

The following diagram illustrates the Pipeline that includes ComplexJob and the child graph it generates.

Complex Job

The diagram indicates that when ComplexJob runs, it generates a child graph and provides immediate input to some of the input slots of some of the jobs. The return statement in ComplexJob's run method specifies that the final output for ComplexJob is u, the final output from the instance of DiffJob shown on the left side of the diagram.

When ComplexJob's run method returns, the child graph has been described, but none of the child jobs have run. Soon afterward the two instances of DiffJob on the right side of the diagram will start to run because their input slots are already filled.

Because DiffJob is an immediate job, it is finalized immediately after it is run and so the framework has access to the output values from the two DiffJobs on the right and can provide those output values to the input slots forMultJob.

MultJob will run as soon as its input slots are filled. MultJob is also an immediate job so the framework immediately has access to its output value, which it provides to the first input slot of the DiffJob on the left. The second input slot to that DiffJob has already been filled so that DiffJob runs and since it is an immediate job the framework immediately gets its output.

At this point ComplexJob can finalize because its finalize slot has been filled by u. Finally the framework has access to the output of ComplexJob and the Pipeline is complete.

futureCall

The way in which a generator job describes a child job graph is via a series of calls to futureCall(). This is a protected method in the Job<T> class that the user invokes in his run method. The philosophy behind the syntax is that while describing a child job graph, a user should use a notation that is similar to the notation that he would use if he were simply invoking methods synchronously. In the example above, the syntax is similar to what the user would write if he were to directly invoke DiffJob (as if it were a Java method) on x and y, then directly invoke DiffJob on x and z, then take the outputs of those two invocations and invoke MultJob on them, and finally invoke DiffJob again on that output and 2, and then return that output.

Compile-time Type-checking

If the output of job A is to be sent to an input slot in Job B, then the type of the output must match the type of the input slot. The framework offers compile-time type-checking by overloading the futureCall method as follows:

protected <T> FutureValue<T> futureCall(Job0<T> jobInstance);

protected <T, T1> FutureValue<T> futureCall(Job1<T, T1> jobInstance, Value<T1> v1);

protected <T, T1, T2> FutureValue<T> futureCall(Job2<T, T1, T2> jobInstance, Value<T1> v1, Value<T2> v2);

There is a version of futureCall corresponding to each of the Jobn classes. Each version of futureCall takes a job instance as its first argument. The version of futureCall corresponding to Jobn also takes n additional arguments that correspond to the n input parameters of the run method of Jobn. If a user passes either the wrong number or the wrong types of arguments to futureCall, the compiler will complain.

Error Handling

Pipelines supports error handling via exceptions! See the [ErrorHandling] page as well as the JavaDocs for more information.

However the idea is that you can add a method called the handleException() to your job class and any exceptions generated matching the type declared as the parameter to the handleEception method that were thrown in the run() method or any sub Job's run method that it invoked will be caught there.

Error handling

This method works just like a Java Catch block, where the run method is the corresponding try block.

Running Pipelines

Here is how the ComplexJob Pipeline would be run on the inputs 11, 5 and 7:

PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new ComplexJob(), 11, 5, 7);

// Later, check on the status and get the final output
JobInfo jobInfo = service.getJobInfo(pipelineId);
JobInfo.State state = jobInfo.getJobState();
if (JobInfo.State.COMPLETED_SUCCESSFULLY  state){
  System.out.println("The output is " + jobInfo.getOutput());
}

startNewPipeline

The startNewPipeline method is used to start new Pipelines. Similarly to the futureCall method, startNewPipeline is overloaded for type safety. There is a version of startNewPipeline for each Jobn class that takes n arguments after the job instance. There is also a non-type-safe version to handle larger numbers of arguments.

startNewPipeline differs from futureCall in two ways. First, the arguments to startNewPipeline are real values---they don't have to be wrapped in ImmediateValue.

Also the return value from startNewPipeline is an opaque String that uniquely identifies the Pipeline, or equivalently, the root job of the Pipeline. In most applications this string will be saved (in the Datastore, or in an HTTP paramater or in a task payload for example) and then used in some later App Engine request in order to retrieve the JobStatus.

Job State

In addition to COMPLETED_SUCCESSFULLY the other job states are RUNNING, STOPPED_BY_REQUEST, STOPPED_BY_ERROR, and WAITING_TO_RETRY.

In case the state is STOPPED_BY_ERROR, a stack trace can be retrieved via jobInfo.getError().

WAITING_TO_RETRY indicates that an exception occurred while running the job and the framework will retry sometime in the future. JobSettings.

PipelineService

In addition to startNewPipeline and getJobInfo the other methods on PipelineService are

void stopPipeline(String pipelineHandle);
void deletePipelineRecords(String pipelineHandle);
void submitPromisedValue(String promiseHandle, Object value);

submitPromisedValue is used to allow asynchronous, out-of-process agents, such as human users, to fill input slots that jobs are waiting on.

Promised Values and External Agents

A PromisedValue represents a value slot that will be filled in when some asynchronous external agent supplies a value. This mechanism allows the framework to call out to other frameworks such as MapReduce and also to wait for input from a human during a Pipeline.

The following Pipeline will obtain three integers from a human, pass them as input to ComplexJob, return the answer to the human, ask for one more integer and then multiply the additional integer by the result of ComplexJob. Below the code we draw a diagram of this Pipeline.

class ExternalAgentJob extends Job1<Integer, String> {
  @Override
  public Value<Integer> run(String userEmail) {
    // Invoke ComplexJob on three promised values
    PromisedValue<Integer> x = newPromise(Integer.class);
    PromisedValue<Integer> y = newPromise(Integer.class);
    PromisedValue<Integer> z = newPromise(Integer.class);
    FutureValue<Integer> intermediate = futureCall(new ComplexJob(), x, y, z);
      
    // Kick off the process of retrieving the data from the external agent
    getIntFromUser("Please give 1st int", userEmail, x.getHandle());
    getIntFromUser("Please give 2nd int", userEmail, y.getHandle());
    getIntFromUser("Please give 3rd int", userEmail, z.getHandle());

    // Send the user the intermediate result and ask for one more integer
    FutureValue<Integer> oneMoreInt = futureCall(new PromptJob(), intermediate, immediate(userEmail));

    // Invoke MultJob on intermediate and oneMoreInt
    return futureCall(new MultJob(), intermediate, oneMoreInt);
  }

  public static void getIntFromUser(String prompt, String userEmail, String promiseHandle) {
    // 1. Send the user an e-mail containing the prompt.
    // 2. Ask user to submit one more integer on some web page.
    // 3. promiseHandle is a query string argument
    // 4. Handler for submit invokes submitPromisedValue(promiseHandle, value)
  }
}

class PromptJob extends Job2<Integer, Integer, String> {
  @Override
  public Value<Integer> run(Integer intermediate, String userEmail) {
    String prompt = "The intermediate result is " + intermediate + "." + " Please give one more int";
    PromisedValue<Integer> oneMoreInt = newPromise(Integer.class);
    ExternalAgentJob.getIntFromUser(prompt, userEmail, oneMoreInt.getHandle());
    return oneMoreInt;
  }
}

External agent

Serialization

Both jobs and job arguments are persisted to the App Engine datastore using Java serialization. This means that values can be any serializable Java data type.

It also means that job instances passed to futureCall() may be given state in addition to the job's arguments. For example, values known at the time that futureCall() is invoked may be passed to a constructor for the job instead of being passed as ImmediateValues.

Finally it is also necessary for users to keep in mind that their job objects must be serializable. Since the parent class Job is declared to implement Serializable, this is not usually much of a burden. One thing to watch out for is writing Job classes as anonymous inner classes. This is a fine technique, but it implies that the outer class must also be serializable.

Job Settings

The futureCall() and startNewPipeline() family of methods have a final vararg parameter of type JobSetting. For example

<T, T1, T2> FutureValue<T> futureCall(Job2<T, T1, T2> jobInstance, Value<T1> v1, Value<T2> v2, JobSetting... settings);

Currently there are two categories of JobSettings:

WaitFor

Sometimes when describing a job graph it is desirable to specify that one job should not run until another has completed, but the second job does not actually depend on the final output of the first job. Of course this problem could be solved by introducing a dummy argument which is never used. But the framework offers a more elegant solution. To specify that JobB should wait until JobA has finalized, use the following syntax:

class ExampleWaitForJob extends Job0<Void> {
  public Value<Void> run() {
    FutureValue<Void> a = futureCall(new JobA());
    futureCall(new JobB(), waitFor(a));
    return null;
  }
}

This tells the framework not to run JobB until JobA has finalized. In the above example waitFor(a) is another example of our syntactic sugar. It is equivalent to

new JobSetting.WaitForSetting(a)

Retry Settings

The other category of JobSetting the framework has currently is settings that allow the user to customize the way a job is retried. There are three such settings

  • BackoffSeconds
  • BackoffFactor
  • MaxAttempts

When a job fails it will be retried MaxAttempts number of times. After each failure the wait time before the next attempt will be

backoffSeconds * backoffFactor ^ attemptNumber

Testing

There are several ways users may test code they have written that uses the Java Pipeline framework.

Firstly, like all App Engine code, user's may run their code in the local development app server.

But this is not convenient for writing automated unit tests. For that reason App Engine provides a local testing framework. Since the Pipeline framework is built on top of the App Engine Task Queue, in order to get Pipeline to work in the local testing environment it is necessary to register a LocalTaskQueueCallback that handles Pipeline tasks. The framework provides `com.google.appengine.tools.pipeline.TestingTaskQueueCallback' with our test code for this purpose.

The [GettingStarted Python] version of Pipeline offers an additional feature to facilitate testing: the ability to run Pipelines in testing mode. This feature is not yet available in this first version of the Java framework, but will be coming soon. In testing mode, the behavior of futureCall() and startNewPipeline() will be changed so that instead of building job graphs these methods will synchronously invoke the target jobs. This will allow developers to test the logic of their code independently of the asynchrony inherent in the Pipeline framework.