Skip to content

Commit 729e25e

Browse files
Workflow run token (#2421)
Add WorkflowOperationToken
1 parent 963ea9f commit 729e25e

File tree

10 files changed

+698
-330
lines changed

10 files changed

+698
-330
lines changed

temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public interface TestOtherWorkflow {
111111
public static class OtherWorkflowImpl implements TestOtherWorkflow {
112112
@Override
113113
public String workflow(String input) {
114+
Workflow.sleep(Duration.ofSeconds(1));
114115
return "Hello, " + input + "!";
115116
}
116117
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.nexus;
22+
23+
import com.fasterxml.jackson.annotation.JsonCreator;
24+
import com.fasterxml.jackson.annotation.JsonValue;
25+
26+
public enum OperationTokenType {
27+
UNKNOWN(0),
28+
WORKFLOW_RUN(1);
29+
30+
private final int value;
31+
32+
OperationTokenType(int i) {
33+
this.value = i;
34+
}
35+
36+
@JsonValue
37+
public int toValue() {
38+
return value;
39+
}
40+
41+
@JsonCreator
42+
public static OperationTokenType fromValue(Integer value) {
43+
if (value == null) {
44+
return UNKNOWN;
45+
}
46+
for (OperationTokenType b : OperationTokenType.values()) {
47+
if (b.value == value) {
48+
return b;
49+
}
50+
}
51+
return UNKNOWN;
52+
}
53+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.nexus;
22+
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
import com.fasterxml.jackson.databind.JavaType;
25+
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import com.fasterxml.jackson.databind.ObjectWriter;
27+
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
28+
import com.google.common.base.Strings;
29+
import java.util.Base64;
30+
31+
public class OperationTokenUtil {
32+
private static final ObjectMapper mapper = new ObjectMapper().registerModule(new Jdk8Module());
33+
private static final ObjectWriter ow = mapper.writer();
34+
private static final Base64.Decoder decoder = Base64.getUrlDecoder();
35+
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
36+
37+
/**
38+
* Load a workflow run operation token from an operation token.
39+
*
40+
* @throws FallbackToWorkflowIdException if the operation token is not a workflow run token
41+
* @throws IllegalArgumentException if the operation token is invalid
42+
*/
43+
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken)
44+
throws FallbackToWorkflowIdException {
45+
WorkflowRunOperationToken token;
46+
try {
47+
JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class);
48+
token = mapper.readValue(decoder.decode(operationToken), reference);
49+
} catch (Exception e) {
50+
throw new FallbackToWorkflowIdException("Failed to parse operation token: " + e.getMessage());
51+
}
52+
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
53+
throw new IllegalArgumentException(
54+
"Invalid workflow run token: incorrect operation token type: " + token.getType());
55+
}
56+
if (token.getVersion() != null) {
57+
throw new IllegalArgumentException("Invalid workflow run token: unexpected version field");
58+
}
59+
if (Strings.isNullOrEmpty(token.getWorkflowId())) {
60+
throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)");
61+
}
62+
return token;
63+
}
64+
65+
/**
66+
* Attempt to extract the workflow Id from an operation token.
67+
*
68+
* @throws IllegalArgumentException if the operation token is invalid
69+
*/
70+
public static String loadWorkflowIdFromOperationToken(String operationToken) {
71+
try {
72+
WorkflowRunOperationToken token = loadWorkflowRunOperationToken(operationToken);
73+
return token.getWorkflowId();
74+
} catch (OperationTokenUtil.FallbackToWorkflowIdException e) {
75+
// Previous versions of the SDK simply used the workflow ID as the operation token
76+
// This fallback is provided for backwards compatibility for those cases.
77+
// This fallback will be removed in a future release.
78+
// See: https://github.com/temporalio/sdk-java/issues/2423
79+
return operationToken;
80+
}
81+
}
82+
83+
/** Generate a workflow run operation token from a workflow ID and namespace. */
84+
public static String generateWorkflowRunOperationToken(String workflowId, String namespace)
85+
throws JsonProcessingException {
86+
String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId));
87+
return encoder.encodeToString(json.getBytes());
88+
}
89+
90+
public static class FallbackToWorkflowIdException extends RuntimeException {
91+
public FallbackToWorkflowIdException(String message) {
92+
super(message);
93+
}
94+
}
95+
96+
private OperationTokenUtil() {}
97+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.nexus;
22+
23+
import com.fasterxml.jackson.annotation.JsonInclude;
24+
import com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
public class WorkflowRunOperationToken {
27+
@JsonProperty("v")
28+
@JsonInclude(JsonInclude.Include.NON_NULL)
29+
private final Integer version;
30+
31+
@JsonProperty("t")
32+
private final OperationTokenType type;
33+
34+
@JsonProperty("ns")
35+
private final String namespace;
36+
37+
@JsonProperty("wid")
38+
private final String workflowId;
39+
40+
public WorkflowRunOperationToken(
41+
@JsonProperty("t") Integer type,
42+
@JsonProperty("ns") String namespace,
43+
@JsonProperty("wid") String workflowId,
44+
@JsonProperty("v") Integer version) {
45+
this.type = OperationTokenType.fromValue(type);
46+
this.namespace = namespace;
47+
this.workflowId = workflowId;
48+
this.version = version;
49+
}
50+
51+
public WorkflowRunOperationToken(String namespace, String workflowId) {
52+
this.type = OperationTokenType.WORKFLOW_RUN;
53+
this.namespace = namespace;
54+
this.workflowId = workflowId;
55+
this.version = null;
56+
}
57+
58+
public Integer getVersion() {
59+
return version;
60+
}
61+
62+
public OperationTokenType getType() {
63+
return type;
64+
}
65+
66+
public String getNamespace() {
67+
return namespace;
68+
}
69+
70+
public String getWorkflowId() {
71+
return workflowId;
72+
}
73+
}

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
2424
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;
2525

26+
import com.fasterxml.jackson.core.JsonProcessingException;
2627
import io.nexusrpc.OperationInfo;
2728
import io.nexusrpc.handler.*;
2829
import io.nexusrpc.handler.OperationHandler;
@@ -33,6 +34,7 @@
3334
import io.temporal.internal.client.NexusStartWorkflowRequest;
3435
import io.temporal.internal.nexus.CurrentNexusOperationContext;
3536
import io.temporal.internal.nexus.InternalNexusOperationContext;
37+
import io.temporal.internal.nexus.OperationTokenUtil;
3638
import java.net.URISyntaxException;
3739

3840
class WorkflowRunOperationImpl<T, R> implements OperationHandler<T, R> {
@@ -70,19 +72,31 @@ public OperationStartResult<R> start(
7072
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
7173
.build();
7274
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
75+
// Generate the operation token for the new workflow.
76+
String operationToken;
7377
try {
74-
OperationStartResult.Builder<R> result =
75-
OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId());
76-
if (nexusLink != null) {
77-
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
78-
}
79-
return result.build();
80-
} catch (URISyntaxException e) {
78+
operationToken =
79+
OperationTokenUtil.generateWorkflowRunOperationToken(
80+
workflowExec.getWorkflowId(), nexusCtx.getNamespace());
81+
} catch (JsonProcessingException e) {
8182
// Not expected as the link is constructed by the SDK.
8283
throw new HandlerException(
83-
HandlerException.ErrorType.INTERNAL,
84-
new IllegalArgumentException("failed to parse URI", e));
84+
HandlerException.ErrorType.BAD_REQUEST,
85+
new IllegalArgumentException("failed to generate workflow operation token", e));
86+
}
87+
// Attach the link to the operation result.
88+
OperationStartResult.Builder<R> result = OperationStartResult.newAsyncBuilder(operationToken);
89+
if (nexusLink != null) {
90+
try {
91+
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
92+
} catch (URISyntaxException e) {
93+
// Not expected as the link is constructed by the SDK.
94+
throw new HandlerException(
95+
HandlerException.ErrorType.INTERNAL,
96+
new IllegalArgumentException("failed to parse URI", e));
97+
}
8598
}
99+
return result.build();
86100
}
87101

88102
@Override
@@ -100,7 +114,18 @@ public OperationInfo fetchInfo(
100114
@Override
101115
public void cancel(
102116
OperationContext operationContext, OperationCancelDetails operationCancelDetails) {
117+
String workflowId;
118+
try {
119+
workflowId =
120+
OperationTokenUtil.loadWorkflowIdFromOperationToken(
121+
operationCancelDetails.getOperationToken());
122+
} catch (IllegalArgumentException e) {
123+
throw new HandlerException(
124+
HandlerException.ErrorType.BAD_REQUEST,
125+
new IllegalArgumentException("failed to parse operation token", e));
126+
}
127+
103128
WorkflowClient client = CurrentNexusOperationContext.get().getWorkflowClient();
104-
client.newUntypedWorkflowStub(operationCancelDetails.getOperationToken()).cancel();
129+
client.newUntypedWorkflowStub(workflowId).cancel();
105130
}
106131
}

0 commit comments

Comments
 (0)