-
Notifications
You must be signed in to change notification settings - Fork 60
Java
<wiki:toc max_depth="2" />
Pipeline is a framework for building dynamic, complex workflows on Google App Engine. This a user’s guide for the Java version of Pipeline.
The library is available via the Maven Central repositories. Include the following dependency in your projects pom.xml file:
<dependency>
<groupId>com.google.appengine.tools</groupId>
<artifactId>appengine-pipeline</artifactId>
<version>RELEASE</version>
</dependency>
Add the following dependency to your project's ivy.xml file:
<dependency org="com.google.appengine.tools" name="appengine-pipeline" rev="latest.integration" />
Check out the pipelines code:
svn checkout http://appengine-pipeline.googlecode.com/svn/trunk/java
Then, build the code using ant
or maven
in the directory you just checked out:
cd java
ant
or
cd java
mvn
After building the jars as above, copy them from the dist directory into your application's WEB-INF/lib
directory.
Add a servlet mapping to your web.xml file:
<servlet>
<servlet-name>PipelineServlet</servlet-name>
<servlet-class>com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>PipelineServlet</servlet-name>
<url-pattern>/_ah/pipeline/*</url-pattern>
</servlet-mapping>
A job is a Java class that extends com.google.appengine.tools.pipeline.Job
and implements a method named run()
. Jobs are written by the user and run asynchronously by the framework. (In this document the user refers to the developer of an App Engine application who is using the Pipeline framework.)
A job has two phases, the run phase, in which it's run()
method is invoked, and the finalize phase, in which the job's final output is submitted to the framework. A job's final output may not be known at the end of its run()
method. During the run()
method a job may spawn one or more child jobs that will run asynchronously and the parent job may specify its final output to be the final output of one of its child jobs. We call such a job a generator job.
com.google.appengine.tools.pipeline.Value
is an interface that represents input to and output from a job. There are two types of values, ImmediateValues
and FutureValues
. An ImmediateValue
is a value that is known now, whereas a FutureValue
represents an empty slot that will be filled in the future when some job finalizes.
A job is characterized by the signature of its run method. A
<T, T1, …, Tn>-job
is a job whose final output is of type T
and that takes as input n arguments of types T1
through Tn
. The job's run()
method will take these same argument types but will return a Value<T>
. For example a <T, T1, T2>-job
is a job whose run()
method has the signature
Value<T> run(T1 arg1, T2 arg2);
Recall that the final output of a job is not necessarily known when run()
returns. That is why run()
returns a Value<T>
instead of a T
. If run()
returns an ImmediateValue<T>
then the job is called an immediate job and in this case the final output of the job is known by the time run()
returns. But run()
may return a FutureValue<T>
which is only an empty slot to be filled when some child job finalizes.
The Job<T>
class is parametrized by the output type T
. The framework also includes several subclasses of Job
named Job0
, Job1
, Job2
, etc that users should use as a superclass for their jobs instead of using Job
directly. The Jobn
class represents a job that takes n arguments. Jobn
declares an abstract run
method that must be overridden and implemented in the user’s job class. For example here are Job0
, Job1
and Job2
:
public abstract class Job0<T> extends Job<T> {
public abstract Value<T> run();
}
public abstract class Job1<T, T1> extends Job<T> {
public abstract Value<T> run(T1 arg1);
}
public abstract class Job2<T, T1, T2> extends Job<T> {
public abstract Value<T> run(T1 arg1, T2 arg2);
}
So if the user wishes to write a job that takes two Integers as input and returns a String then the user should write a job class the extends Job2<String, Integer, Integer>.
By convention we use java.lang.Void
for jobs that don't have a return value. Thus a job that takes a List
of Strings
but doesn't return anything should be a subclass of Job1<Void, List<String>>
.
In case the user needs to write a job class that takes more arguments
than the greatest n
such that the framework provides a Jobn
class (as of this writing n=6
) the user may write a job class that directly subclasses Job<T>
. Job<T>
does not declare an abstract run()
method and so the user is free to implement a run()
method in his job class that takes as many arguments as he wishes. In this case though the user will forfeit some of the type-safety offered by the framework.
In order to succinctly illustrate the concepts of the framework our examples will use trivial jobs that do simple arithmetic.
Below are two examples of immediate jobs that take two integers and compute their difference and product respectively.
class DiffJob extends Job2<Integer, Integer, Integer> {
@Override
public Value<Integer> run(Integer a, Integer b) {
return immediate(a - b);
}
}
class MultJob extends Job2<Integer, Integer, Integer> {
@Override
public Value<Integer> run(Integer a, Integer b) {
return immediate(a*b);
}
}
The method immediate()
being invoked above is simply syntactic sugar. immediate(x)
is equivalent to new ImmediateValue(x)
.
The framework allows a user to express arrangements of multiple jobs in which the output of one job becomes an input of one or more jobs. These arrangements can best be described as a type of directed graph in which a directed edge from job A to one of the input slots in job B indicates that the output of A should be directed to the input slot of B. For example, suppose we wanted to use DiffJob
and MultJob
to build a job which takes three integers inputs, x, y, z
, and performs the following calculation [ (x - y) * (x - z) ] - 2
. That calculation may be expressed as the following job graph. (Graph reads from right to left.)
In the Pipeline framework the way in which a developer describes a job graph is by writing a parent job that generates it as part of the parent job's run()
method. Below is a generator job called ComplexJob
that generates the graph above.
class ComplexJob extends Job3<Integer, Integer, Integer, Integer> {
@Override
public Value<Integer> run(Integer x, Integer y, Integer z) {
DiffJob diffJob = new DiffJob();
MultJob multJob = new MultJob();
FutureValue<Integer> r = futureCall(diffJob, immediate(x), immediate(y));
FutureValue<Integer> s = futureCall(diffJob, immediate(x), immediate(z));
FutureValue<Integer> t = futureCall(multJob, r, s);
FutureValue<Integer> u = futureCall(diffJob, t, immediate(2));
return u;
}
}
The following diagram illustrates the Pipeline that includes ComplexJob
and the child graph it generates.
The diagram indicates that when ComplexJob
runs, it generates a child graph and provides immediate input to some of the input slots of some of the jobs. The return
statement in ComplexJob's
run
method specifies that the final output for ComplexJob
is u
, the final output from the instance of DiffJob
shown on the left side of the diagram.
When ComplexJob's
run
method returns, the child graph has been described, but none of the child jobs have run. Soon afterward the two instances of DiffJob
on the right side of the diagram will start to run because their input slots are already filled.
Because DiffJob
is an immediate job, it is finalized immediately after it is run and so the framework has access to the output values from the two DiffJobs
on the right and can provide those output values to the input slots forMultJob
.
MultJob
will run as soon as its input slots are filled. MultJob
is also an immediate job so the framework immediately has access to its output value, which it provides to the first input slot of the DiffJob
on the left. The second input slot to that DiffJob
has already been filled so that DiffJob
runs and since it is an immediate job the framework immediately gets its output.
At this point ComplexJob
can finalize because its finalize slot has been filled by u
. Finally the framework has access to the output of ComplexJob
and the Pipeline is complete.
The way in which a generator job describes a child job graph is via a series of calls to futureCall()
. This is a protected method in the Job<T>
class that the user invokes in his run
method. The philosophy behind the syntax is that while describing a child job graph, a user should use a notation that is similar to the notation that he would use if he were simply invoking methods synchronously. In the example above, the syntax is similar to what the user would write if he were to directly invoke DiffJob
(as if it were a Java method) on x and y, then directly invoke DiffJob
on x and z, then take the outputs of those two invocations and invoke MultJob
on them, and finally invoke DiffJob
again on that output and 2, and then return that output.
If the output of job A is to be sent to an input slot in Job B, then the type of the output must match the type of the input slot. The framework offers compile-time type-checking by overloading the futureCall
method as follows:
protected <T> FutureValue<T> futureCall(Job0<T> jobInstance);
protected <T, T1> FutureValue<T> futureCall(Job1<T, T1> jobInstance, Value<T1> v1);
protected <T, T1, T2> FutureValue<T> futureCall(Job2<T, T1, T2> jobInstance, Value<T1> v1, Value<T2> v2);
There is a version of futureCall
corresponding to each of the Jobn
classes. Each version of futureCall
takes a job instance as its first argument. The version of futureCall corresponding to Jobn
also takes n additional arguments that correspond to the n input parameters of the run
method of Jobn
. If a user passes either the wrong number or the wrong types of arguments to futureCall
, the compiler will complain.
Pipelines supports error handling via exceptions! See the [ErrorHandling] page as well as the JavaDocs for more information.
However the idea is that you can add a method called the handleException() to your job class and any exceptions generated matching the type declared as the parameter to the handleEception method that were thrown in the run() method or any sub Job's run method that it invoked will be caught there.
This method works just like a Java Catch block, where the run method is the corresponding try block.
Here is how the ComplexJob
Pipeline would be run on the inputs 11, 5 and 7:
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new ComplexJob(), 11, 5, 7);
// Later, check on the status and get the final output
JobInfo jobInfo = service.getJobInfo(pipelineId);
JobInfo.State state = jobInfo.getJobState();
if (JobInfo.State.COMPLETED_SUCCESSFULLY state){
System.out.println("The output is " + jobInfo.getOutput());
}
The startNewPipeline
method is used to start new Pipelines. Similarly to the futureCall
method, startNewPipeline
is overloaded for type safety. There is a version of startNewPipeline
for each Jobn
class that takes n arguments after the job instance. There is also a non-type-safe version to handle larger numbers of arguments.
startNewPipeline
differs from futureCall
in two ways. First, the arguments to startNewPipeline
are real values---they don't have to be wrapped in ImmediateValue
.
Also the return value from startNewPipeline
is an opaque String that uniquely identifies the Pipeline, or equivalently, the root job of the Pipeline. In most applications this string will be saved (in the Datastore, or in an HTTP paramater or in a task payload for example) and then used in some later App Engine request in order to retrieve the JobStatus
.
In addition to COMPLETED_SUCCESSFULLY
the other job states are RUNNING
, STOPPED_BY_REQUEST
, STOPPED_BY_ERROR
, and WAITING_TO_RETRY
.
In case the state is STOPPED_BY_ERROR
, a stack trace can be retrieved via jobInfo.getError()
.
WAITING_TO_RETRY
indicates that an exception occurred while running the job and the framework will retry sometime in the future. JobSettings
.
In addition to startNewPipeline
and getJobInfo
the other methods on PipelineService
are
void stopPipeline(String pipelineHandle);
void deletePipelineRecords(String pipelineHandle);
void submitPromisedValue(String promiseHandle, Object value);
submitPromisedValue
is used to allow asynchronous, out-of-process agents, such as human users, to fill input slots that jobs are waiting on.
A PromisedValue
represents a value slot that will be filled in when some asynchronous external agent supplies a value. This mechanism allows the framework to call out to other frameworks such as MapReduce and also to wait for input from a human during a Pipeline.
The following Pipeline will obtain three integers from a human, pass them as input to ComplexJob
, return the answer to the human, ask for one more integer and then multiply the additional integer by the result of ComplexJob
. Below the code we draw a diagram of this Pipeline.
class ExternalAgentJob extends Job1<Integer, String> {
@Override
public Value<Integer> run(String userEmail) {
// Invoke ComplexJob on three promised values
PromisedValue<Integer> x = newPromise(Integer.class);
PromisedValue<Integer> y = newPromise(Integer.class);
PromisedValue<Integer> z = newPromise(Integer.class);
FutureValue<Integer> intermediate = futureCall(new ComplexJob(), x, y, z);
// Kick off the process of retrieving the data from the external agent
getIntFromUser("Please give 1st int", userEmail, x.getHandle());
getIntFromUser("Please give 2nd int", userEmail, y.getHandle());
getIntFromUser("Please give 3rd int", userEmail, z.getHandle());
// Send the user the intermediate result and ask for one more integer
FutureValue<Integer> oneMoreInt = futureCall(new PromptJob(), intermediate, immediate(userEmail));
// Invoke MultJob on intermediate and oneMoreInt
return futureCall(new MultJob(), intermediate, oneMoreInt);
}
public static void getIntFromUser(String prompt, String userEmail, String promiseHandle) {
// 1. Send the user an e-mail containing the prompt.
// 2. Ask user to submit one more integer on some web page.
// 3. promiseHandle is a query string argument
// 4. Handler for submit invokes submitPromisedValue(promiseHandle, value)
}
}
class PromptJob extends Job2<Integer, Integer, String> {
@Override
public Value<Integer> run(Integer intermediate, String userEmail) {
String prompt = "The intermediate result is " + intermediate + "." + " Please give one more int";
PromisedValue<Integer> oneMoreInt = newPromise(Integer.class);
ExternalAgentJob.getIntFromUser(prompt, userEmail, oneMoreInt.getHandle());
return oneMoreInt;
}
}
Both jobs and job arguments are persisted to the App Engine datastore using Java serialization. This means that values can be any serializable Java data type.
It also means that job instances passed to futureCall()
may be given state in addition to the job's arguments. For example, values known at the time that futureCall()
is invoked may be passed to a constructor for the job instead of being passed as ImmediateValues
.
Finally it is also necessary for users to keep in mind that their job objects must be serializable. Since the parent class Job
is declared to implement Serializable
, this is not usually much of a burden. One thing to watch out for is writing Job
classes as anonymous inner classes. This is a fine technique, but it implies that the outer class must also be serializable.
The futureCall()
and startNewPipeline()
family of methods have a final vararg parameter of type JobSetting
. For example
<T, T1, T2> FutureValue<T> futureCall(Job2<T, T1, T2> jobInstance, Value<T1> v1, Value<T2> v2, JobSetting... settings);
Currently there are two categories of JobSettings
:
Sometimes when describing a job graph it is desirable to specify that one job should not run until another has completed, but the second job does not actually depend on the final output of the first job. Of course this problem could be solved by introducing a dummy argument which is never used. But the framework offers a more elegant solution. To specify that JobB
should wait until JobA
has finalized, use the following syntax:
class ExampleWaitForJob extends Job0<Void> {
public Value<Void> run() {
FutureValue<Void> a = futureCall(new JobA());
futureCall(new JobB(), waitFor(a));
return null;
}
}
This tells the framework not to run JobB
until JobA
has finalized. In the above example waitFor(a)
is another example of our syntactic sugar. It is equivalent to
new JobSetting.WaitForSetting(a)
The other category of JobSetting
the framework has currently is
settings that allow the user to customize the way a job is retried.
There are three such settings
- BackoffSeconds
- BackoffFactor
- MaxAttempts
When a job fails it will be retried MaxAttempts
number of times. After each failure the wait time before the next attempt will be
backoffSeconds * backoffFactor ^ attemptNumber
There are several ways users may test code they have written that uses the Java Pipeline framework.
Firstly, like all App Engine code, user's may run their code in the local development app server.
But this is not convenient for writing automated unit tests. For that reason App Engine provides a local testing framework. Since the Pipeline framework is built on top of the App Engine Task Queue, in order to get Pipeline to work in the local testing environment it is necessary to register a LocalTaskQueueCallback that handles Pipeline tasks. The framework provides `com.google.appengine.tools.pipeline.TestingTaskQueueCallback' with our test code for this purpose.
The [GettingStarted Python] version of Pipeline offers an additional feature to facilitate testing: the ability to run Pipelines in testing mode. This feature is not yet available in this first version of the Java framework, but will be coming soon. In testing mode, the behavior of futureCall()
and startNewPipeline()
will be changed so that instead of building job graphs these methods will synchronously invoke the target jobs. This will allow developers to test the logic of their code independently of the asynchrony inherent in the Pipeline framework.