Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit 4fb004c

Browse files
authored
Merge pull request #34 from Netflix/dev
Merge 1.6.0-rc1 to master
2 parents 11578aa + cd57159 commit 4fb004c

142 files changed

Lines changed: 2560 additions & 4795 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitattributes

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
docs/* linguist-documentation
2+
server/src/main/resources/swagger-ui/* linguist-vendored
3+
4+

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
![Conductor](https://netflix.github.io/conductor/img/corner-logo2.png)
1+
![Conductor](docs/docs/img/conductor-vector-x.png)
22

33
## Conductor
44
Conductor is an _orchestration_ engine that runs in the cloud.
@@ -12,7 +12,7 @@ Conductor is an _orchestration_ engine that runs in the cloud.
1212

1313
[Getting Started](http://netflix.github.io/conductor/intro) guide.
1414

15-
## Get Condcutor
15+
## Get Conductor
1616
Binaries are available from Maven Central and jcenter.
1717

1818
|Group|Artifact|Latest Stable Version|
@@ -30,6 +30,7 @@ Below are the various artifacts published:
3030
|conductor-ui|node.js based UI for Conductor|
3131
|conductor-contribs|Optional contrib package that holds extended workflow tasks and support for SQS|
3232
|conductor-client|Java client for Conductor that includes helpers for running a worker tasks|
33+
|conductor-server|Self contained Jetty server|
3334
|conductor-test-harness|Used for building test harness and an in-memory kitchensink demo|
3435

3536
## Building
@@ -58,7 +59,7 @@ com.netflix.conductor.dao.RedisESWorkflowModule
5859

5960
* The default persistence used is [Dynomite](https://github.com/Netflix/dynomite)
6061
* For queues, we are relying on [dyno-queues](https://github.com/Netflix/dyno-queues)
61-
* The indexing backend is [Elastic](https://www.elastic.co/) (2.+)
62+
* The indexing backend is [Elasticsearch](https://www.elastic.co/) (2.+)
6263

6364
## Other Requirements
6465
* JDK 1.8+

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ subprojects {
2929

3030
repositories {
3131
jcenter()
32+
maven { url 'https://dl.bintray.com/netflixoss/oss-candidate/' }
3233
}
3334

3435
dependencies {

client/src/main/java/com/netflix/conductor/client/http/ClientBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,10 @@ protected <T> T getForEntity(String url, Object[] queryParams, GenericType<T> re
182182
}
183183

184184
private Builder resource(URI URI, Object entity) {
185-
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN);
185+
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON);
186186
}
187187

188188
private void handleException(Exception e) {
189-
e.printStackTrace();
190189
throw new RuntimeException(e);
191190
}
192191

client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ public void skipTaskFromWorkflow(String workflowId, String taskReferenceName) {
141141
put("workflow/{workflowId}/skiptask/{taskReferenceName}", null, workflowId, taskReferenceName);
142142
}
143143

144-
public void runDecider(String workflowName) {
145-
put("workflow/decide/{workflowName}", null, workflowName);
144+
public void runDecider(String workflowId) {
145+
put("workflow/decide/{workflowId}", null, null, workflowId);
146146
}
147147

148148
public SearchResult<WorkflowSummary> search(String query) {

common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public TaskResult() {
6363

6464
}
6565

66+
/**
67+
*
68+
* @return Workflow instance id for which the task result is produced
69+
*/
6670
public String getWorkflowInstanceId() {
6771
return workflowInstanceId;
6872
}
@@ -106,6 +110,10 @@ public void setWorkerId(String workerId) {
106110
public Status getTaskStatus() {
107111
return status;
108112
}
113+
114+
public void setTaskStatus(Status status) {
115+
this.status = status;
116+
}
109117

110118
/**
111119
*

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class WorkflowDef extends Auditable {
4343

4444
private List<String> inputParameters = new LinkedList<String>();
4545

46-
private Map<String, String> outputParameters = new HashMap<>();
46+
private Map<String, Object> outputParameters = new HashMap<>();
4747

4848
private String failureWorkflow;
4949

@@ -109,14 +109,14 @@ public void setInputParameters(List<String> inputParameters) {
109109
/**
110110
* @return the outputParameters
111111
*/
112-
public Map<String, String> getOutputParameters() {
112+
public Map<String, Object> getOutputParameters() {
113113
return outputParameters;
114114
}
115115

116116
/**
117117
* @param outputParameters the outputParameters to set
118118
*/
119-
public void setOutputParameters(Map<String, String> outputParameters) {
119+
public void setOutputParameters(Map<String, Object> outputParameters) {
120120
this.outputParameters = outputParameters;
121121
}
122122

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static boolean is(String name) {
6060

6161
//Key: Name of the input parameter. MUST be one of the keys defined in TaskDef (e.g. fileName)
6262
//Value: mapping of the parameter from another task (e.g. task1.someOutputParameterAsFileName)
63-
private Map<String, String> inputParameters = new HashMap<String, String>();
63+
private Map<String, Object> inputParameters = new HashMap<String, Object>();
6464

6565
private String type = Type.SIMPLE.name();
6666

@@ -88,11 +88,6 @@ public static boolean is(String name) {
8888

8989
private List<String> joinOn = new LinkedList<>();
9090

91-
// Used for Titus, if Titus job calls back to update the task the value should be true
92-
// else false. If false Workflow processing will only check if the titus job is finished and
93-
// then move on
94-
private boolean callbackFromWorker = true;
95-
9691
/**
9792
* @return the name
9893
*/
@@ -124,14 +119,14 @@ public void setTaskReferenceName(String taskReferenceName) {
124119
/**
125120
* @return the inputParameters
126121
*/
127-
public Map<String, String> getInputParameters() {
122+
public Map<String, Object> getInputParameters() {
128123
return inputParameters;
129124
}
130125

131126
/**
132127
* @param inputParameters the inputParameters to set
133128
*/
134-
public void setInputParameters(Map<String, String> inputParameters) {
129+
public void setInputParameters(Map<String, Object> inputParameters) {
135130
this.inputParameters = inputParameters;
136131
}
137132

@@ -297,14 +292,6 @@ public void setJoinOn(List<String> joinOn) {
297292
this.joinOn = joinOn;
298293
}
299294

300-
public boolean isCallbackFromWorker() {
301-
return callbackFromWorker;
302-
}
303-
304-
public void setCallbackFromWorker(boolean callbackFromWorker) {
305-
this.callbackFromWorker = callbackFromWorker;
306-
}
307-
308295
private Collection<List<WorkflowTask>> children(){
309296
Collection<List<WorkflowTask>> v1 = new LinkedList<>();
310297
Type tt = Type.USER_DEFINED;

contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,20 @@
1919
package com.netflix.conductor.contribs.http;
2020

2121

22+
import java.io.IOException;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

2627
import javax.inject.Inject;
2728
import javax.inject.Singleton;
2829
import javax.ws.rs.core.MediaType;
30+
import javax.ws.rs.core.MultivaluedMap;
2931

3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
3234

3335
import com.fasterxml.jackson.annotation.JsonInclude.Include;
34-
import com.fasterxml.jackson.core.JsonParseException;
3536
import com.fasterxml.jackson.core.type.TypeReference;
3637
import com.fasterxml.jackson.databind.DeserializationFeature;
3738
import com.fasterxml.jackson.databind.JsonNode;
@@ -87,7 +88,6 @@ public HttpTask(String name, RestClientManager rcm, Configuration config) {
8788
logger.info("HttpTask initialized...");
8889
}
8990

90-
9191
@Override
9292
public void start(Workflow workflow, Task task, WorkflowExecutor executor) throws Exception {
9393
Object request = task.getInputData().get(requestParameter);
@@ -116,8 +116,13 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) throw
116116

117117
try {
118118

119-
Object response = httpCall(input);
120-
task.setStatus(Status.COMPLETED);
119+
HttpResponse response = httpCall(input);
120+
if(response.statusCode > 199 && response.statusCode < 300) {
121+
task.setStatus(Status.COMPLETED);
122+
} else {
123+
task.setReasonForIncompletion(response.body.toString());
124+
task.setStatus(Status.FAILED);
125+
}
121126
if(response != null) {
122127
task.getOutputData().put("response", response);
123128
}
@@ -127,16 +132,15 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) throw
127132
task.setReasonForIncompletion(e.getMessage());
128133
task.getOutputData().put("response", e.getMessage());
129134
}
130-
131135
}
132136

133137
/**
134138
*
135139
* @param input HTTP Request
136-
* @return Response if any, null otherwise
140+
* @return Response of the http call
137141
* @throws Exception If there was an error making http call
138142
*/
139-
protected Object httpCall(Input input) throws Exception {
143+
protected HttpResponse httpCall(Input input) throws Exception {
140144
Client client = rcm.getClient(input);
141145
Builder builder = client.resource(input.uri).type(MediaType.APPLICATION_JSON);
142146
if(input.body != null) {
@@ -146,35 +150,30 @@ protected Object httpCall(Input input) throws Exception {
146150
builder.header(e.getKey(), e.getValue());
147151
});
148152

149-
Object response = null;
153+
HttpResponse response = new HttpResponse();
150154
try {
151-
152-
String json = builder.accept(input.accept).method(input.method, String.class);
153-
logger.debug(json);
154-
try {
155-
JsonNode node = om.readTree(json);
156-
if(node.isArray()) {
157-
response = om.convertValue(node, listOfObj);
158-
} else if (node.isObject()) {
159-
response = om.convertValue(node, mapOfObj);
160-
} else if (node.isNumber()) {
161-
response = om.convertValue(node, Double.class);
162-
} else {
163-
response = node.asText();
164-
}
165-
166-
}catch(JsonParseException jpe) {
167-
logger.error(jpe.getMessage(), jpe);
168-
response = json;
155+
156+
ClientResponse cr = builder.accept(input.accept).method(input.method, ClientResponse.class);
157+
if (cr.hasEntity()) {
158+
response.body = extractBody(cr);
169159
}
160+
response.statusCode = cr.getStatus();
161+
response.headers = cr.getHeaders();
170162
return response;
171-
163+
172164
} catch(UniformInterfaceException ex) {
173165

174166
ClientResponse cr = ex.getResponse();
175167

176168
if(cr.getStatus() > 199 && cr.getStatus() < 300) {
177-
return cr.getEntity(String.class);
169+
170+
if(cr.hasEntity()) {
171+
response.body = extractBody(cr);
172+
}
173+
response.headers = cr.getHeaders();
174+
response.statusCode = cr.getStatus();
175+
return response;
176+
178177
}else {
179178
String reason = cr.getEntity(String.class);
180179
logger.error(reason, ex);
@@ -183,6 +182,30 @@ protected Object httpCall(Input input) throws Exception {
183182
}
184183
}
185184

185+
private Object extractBody(ClientResponse cr) {
186+
187+
String json = cr.getEntity(String.class);
188+
logger.debug(json);
189+
190+
try {
191+
192+
JsonNode node = om.readTree(json);
193+
if (node.isArray()) {
194+
return om.convertValue(node, listOfObj);
195+
} else if (node.isObject()) {
196+
return om.convertValue(node, mapOfObj);
197+
} else if (node.isNumber()) {
198+
return om.convertValue(node, Double.class);
199+
} else {
200+
return node.asText();
201+
}
202+
203+
} catch (IOException jpe) {
204+
logger.error(jpe.getMessage(), jpe);
205+
return json;
206+
}
207+
}
208+
186209
@Override
187210
public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor) throws Exception {
188211
if (task.getStatus().equals(Status.SCHEDULED)) {
@@ -212,6 +235,20 @@ private static ObjectMapper objectMapper() {
212235
return om;
213236
}
214237

238+
public static class HttpResponse {
239+
240+
public Object body;
241+
242+
public MultivaluedMap<String, String> headers;
243+
244+
public int statusCode;
245+
246+
@Override
247+
public String toString() {
248+
return "HttpResponse [body=" + body + ", headers=" + headers + ", statusCode=" + statusCode + "]";
249+
}
250+
}
251+
215252
public static class Input {
216253

217254
private String method; //PUT, POST, GET, DELETE, OPTIONS, HEAD

contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private void addPolicy(List<String> accountsToAuthorize) {
222222
}
223223

224224
private String getPolicy(List<String> accountIds) {
225-
Policy policy = new Policy("ReloadedWorkerAccessPolicy");
225+
Policy policy = new Policy("AuthorizedWorkerAccessPolicy");
226226
Statement stmt = new Statement(Effect.Allow);
227227
Action action = SQSActions.SendMessage;
228228
stmt.getActions().add(action);

0 commit comments

Comments
 (0)