Skip to content

Commit 412c8a3

Browse files
committed
Directly return responses from Local Cluster client
Signed-off-by: Daniel Widdis <[email protected]>
1 parent 9656e34 commit 412c8a3

19 files changed

+1218
-293
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
77
### Features
88
### Enhancements
99
### Bug Fixes
10+
- Directly return responses from Local Cluster client ([#141](https://github.com/opensearch-project/opensearch-remote-metadata-sdk/pull/141))
11+
1012
### Infrastructure
1113
### Documentation
1214
### Maintenance

core/src/main/java/org/opensearch/remote/metadata/client/AbstractSdkClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void initialize(Map<String, String> metadataSettings) {
4444
}
4545

4646
/**
47-
* Execute this priveleged action asynchronously
47+
* Execute this privileged action asynchronously
4848
* @param <T> The return type of the completable future to be returned
4949
* @param action the action to execute
5050
* @param executor the executor for the action

core/src/main/java/org/opensearch/remote/metadata/client/BulkDataObjectResponse.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@
88
*/
99
package org.opensearch.remote.metadata.client;
1010

11+
import org.opensearch.OpenSearchException;
12+
import org.opensearch.action.bulk.BulkResponse;
13+
import org.opensearch.common.Nullable;
14+
import org.opensearch.core.rest.RestStatus;
1115
import org.opensearch.core.xcontent.XContentParser;
16+
import org.opensearch.remote.metadata.common.SdkClientUtils;
17+
18+
import java.io.IOException;
19+
import java.util.Arrays;
1220

1321
import static org.opensearch.action.bulk.BulkResponse.NO_INGEST_TOOK;
1422

@@ -21,7 +29,38 @@ public class BulkDataObjectResponse {
2129
private final long tookInMillis;
2230
private final long ingestTookInMillis;
2331
private final boolean failures;
32+
// Only one of these will be non-null
2433
private final XContentParser parser;
34+
private final BulkResponse bulkResponse;
35+
36+
/**
37+
* Instantiate this response with a {@link BulkResponse}.
38+
* @param bulkResponse a pre-completed Bulk response
39+
*/
40+
public BulkDataObjectResponse(BulkResponse bulkResponse) {
41+
this(
42+
generateDataObjectResponseArray(bulkResponse),
43+
bulkResponse.getTook().millis(),
44+
NO_INGEST_TOOK,
45+
bulkResponse.hasFailures(),
46+
null,
47+
bulkResponse
48+
);
49+
}
50+
51+
/**
52+
* Generate an array of DataObjectResponses from a BulkResponse
53+
* @param bulkResponse The BulkResponse
54+
* @return An array of DataObjectResponse corresponding to the items. Array elements may be null on failed responses.
55+
*/
56+
private static DataObjectResponse[] generateDataObjectResponseArray(BulkResponse bulkResponse) {
57+
return Arrays.stream(bulkResponse.getItems()).map(itemResponse -> switch (itemResponse.getOpType()) {
58+
case INDEX, CREATE -> new PutDataObjectResponse(itemResponse);
59+
case UPDATE -> new UpdateDataObjectResponse(itemResponse);
60+
case DELETE -> new DeleteDataObjectResponse(itemResponse);
61+
default -> throw new OpenSearchException("Invalid operation type for bulk response", RestStatus.INTERNAL_SERVER_ERROR);
62+
}).toArray(DataObjectResponse[]::new);
63+
}
2564

2665
/**
2766
* Instantiate this response
@@ -31,7 +70,7 @@ public class BulkDataObjectResponse {
3170
* @param parser a parser that can be used to recreate the object
3271
*/
3372
public BulkDataObjectResponse(DataObjectResponse[] responses, long tookInMillis, boolean failures, XContentParser parser) {
34-
this(responses, tookInMillis, NO_INGEST_TOOK, failures, parser);
73+
this(responses, tookInMillis, NO_INGEST_TOOK, failures, parser, null);
3574
}
3675

3776
/**
@@ -48,12 +87,24 @@ public BulkDataObjectResponse(
4887
long ingestTookInMillis,
4988
boolean failures,
5089
XContentParser parser
90+
) {
91+
this(responses, tookInMillis, ingestTookInMillis, failures, parser, null);
92+
}
93+
94+
private BulkDataObjectResponse(
95+
DataObjectResponse[] responses,
96+
long tookInMillis,
97+
long ingestTookInMillis,
98+
boolean failures,
99+
XContentParser parser,
100+
BulkResponse bulkResponse
51101
) {
52102
this.responses = responses;
53103
this.tookInMillis = tookInMillis;
54104
this.ingestTookInMillis = ingestTookInMillis;
55105
this.failures = failures;
56106
this.parser = parser;
107+
this.bulkResponse = bulkResponse;
57108
}
58109

59110
/**
@@ -88,11 +139,33 @@ public boolean hasFailures() {
88139
return this.failures;
89140
}
90141

142+
/**
143+
* Returns the BulkhResponse object
144+
* @return the bulk response if present, or parsed otherwise
145+
*/
146+
public @Nullable BulkResponse bulkResponse() {
147+
if (this.bulkResponse == null) {
148+
try {
149+
return BulkResponse.fromXContent(parser);
150+
} catch (IOException | NullPointerException e) {
151+
return null;
152+
}
153+
}
154+
return this.bulkResponse;
155+
}
156+
91157
/**
92158
* Returns the parser
93159
* @return the parser
94160
*/
95161
public XContentParser parser() {
162+
if (this.parser == null) {
163+
try {
164+
return SdkClientUtils.createParser(bulkResponse);
165+
} catch (IOException | NullPointerException e) {
166+
return null;
167+
}
168+
}
96169
return this.parser;
97170
}
98171
}

core/src/main/java/org/opensearch/remote/metadata/client/DeleteDataObjectResponse.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,58 @@
88
*/
99
package org.opensearch.remote.metadata.client;
1010

11+
import org.opensearch.OpenSearchException;
12+
import org.opensearch.action.DocWriteResponse;
13+
import org.opensearch.action.bulk.BulkItemResponse;
14+
import org.opensearch.action.delete.DeleteResponse;
15+
import org.opensearch.common.Nullable;
1116
import org.opensearch.core.rest.RestStatus;
1217
import org.opensearch.core.xcontent.XContentParser;
18+
import org.opensearch.remote.metadata.common.SdkClientUtils;
19+
20+
import java.io.IOException;
1321

1422
/**
1523
* A class abstracting an OpenSearch DeleteResponse
1624
*/
1725
public class DeleteDataObjectResponse extends DataObjectResponse {
1826

27+
// If populated directly, will populate superclass fields
28+
private final DeleteResponse deleteResponse;
29+
30+
/**
31+
* Instantiate this response with a {@link DeleteResponse}.
32+
* @param deleteResponse a pre-completed Get response
33+
*/
34+
public DeleteDataObjectResponse(DeleteResponse deleteResponse) {
35+
super(deleteResponse.getIndex(), deleteResponse.getId(), null, false, null, null);
36+
this.deleteResponse = deleteResponse;
37+
}
38+
39+
/**
40+
* Instantiate this response with a {@link BulkItemResponse} for a delete operation.
41+
* @param itemResponse a bulk item response for a delete operation
42+
*/
43+
public DeleteDataObjectResponse(BulkItemResponse itemResponse) {
44+
super(
45+
itemResponse.getIndex(),
46+
itemResponse.getId(),
47+
null,
48+
itemResponse.isFailed(),
49+
itemResponse.getFailure() != null ? itemResponse.getFailure().getCause() : null,
50+
itemResponse.getFailure() != null ? itemResponse.getFailure().getStatus() : null
51+
);
52+
DocWriteResponse response = itemResponse.getResponse();
53+
this.deleteResponse = switch (response) {
54+
case null -> null;
55+
case DeleteResponse dr -> dr;
56+
default -> throw new OpenSearchException(
57+
"Expected DeleteResponse but got " + response.getClass().getSimpleName(),
58+
RestStatus.INTERNAL_SERVER_ERROR
59+
);
60+
};
61+
}
62+
1963
/**
2064
* Instantiate this request with an id and parser representing a DeleteResponse
2165
* <p>
@@ -29,6 +73,34 @@ public class DeleteDataObjectResponse extends DataObjectResponse {
2973
*/
3074
public DeleteDataObjectResponse(String index, String id, XContentParser parser, boolean failed, Exception cause, RestStatus status) {
3175
super(index, id, parser, failed, cause, status);
76+
this.deleteResponse = null;
77+
}
78+
79+
/**
80+
* Returns the DeleteResponse object
81+
* @return the delete response if present, or parsed otherwise
82+
*/
83+
public @Nullable DeleteResponse deleteResponse() {
84+
if (this.deleteResponse == null) {
85+
try {
86+
return DeleteResponse.fromXContent(parser());
87+
} catch (IOException | NullPointerException e) {
88+
return null;
89+
}
90+
}
91+
return this.deleteResponse;
92+
}
93+
94+
@Override
95+
public XContentParser parser() {
96+
if (super.parser() == null) {
97+
try {
98+
return SdkClientUtils.createParser(this.deleteResponse);
99+
} catch (IOException | NullPointerException e) {
100+
return null;
101+
}
102+
}
103+
return super.parser();
32104
}
33105

34106
/**

core/src/main/java/org/opensearch/remote/metadata/client/GetDataObjectResponse.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,34 @@
88
*/
99
package org.opensearch.remote.metadata.client;
1010

11+
import org.opensearch.action.get.GetResponse;
12+
import org.opensearch.common.Nullable;
1113
import org.opensearch.core.rest.RestStatus;
1214
import org.opensearch.core.xcontent.XContentParser;
15+
import org.opensearch.remote.metadata.common.SdkClientUtils;
1316

17+
import java.io.IOException;
1418
import java.util.Collections;
1519
import java.util.Map;
20+
import java.util.Optional;
1621

1722
/**
1823
* A class abstracting an OpenSearch GetResponse
1924
*/
2025
public class GetDataObjectResponse extends DataObjectResponse {
2126
private final Map<String, Object> source;
27+
// If populated directly, will populate superclass fields
28+
private final GetResponse getResponse;
29+
30+
/**
31+
* Instantiate this response with a {@link GetResponse}.
32+
* @param getResponse a pre-completed Get response
33+
*/
34+
public GetDataObjectResponse(GetResponse getResponse) {
35+
super(getResponse.getIndex(), getResponse.getId(), null, false, null, null);
36+
this.getResponse = getResponse;
37+
this.source = Optional.ofNullable(getResponse.getSourceAsMap()).orElse(Collections.emptyMap());
38+
}
2239

2340
/**
2441
* Instantiate this request with an id and parser/map used to recreate the data object.
@@ -42,6 +59,7 @@ public GetDataObjectResponse(
4259
Map<String, Object> source
4360
) {
4461
super(index, id, parser, failed, cause, status);
62+
this.getResponse = null;
4563
this.source = source;
4664
}
4765

@@ -53,6 +71,33 @@ public Map<String, Object> source() {
5371
return this.source;
5472
}
5573

74+
/**
75+
* Returns the GetResponse object
76+
* @return the get response if present, or parsed otherwise
77+
*/
78+
public @Nullable GetResponse getResponse() {
79+
if (this.getResponse == null) {
80+
try {
81+
return GetResponse.fromXContent(parser());
82+
} catch (IOException | NullPointerException e) {
83+
return null;
84+
}
85+
}
86+
return this.getResponse;
87+
}
88+
89+
@Override
90+
public XContentParser parser() {
91+
if (super.parser() == null) {
92+
try {
93+
return SdkClientUtils.createParser(this.getResponse);
94+
} catch (IOException | NullPointerException e) {
95+
return null;
96+
}
97+
}
98+
return super.parser();
99+
}
100+
56101
/**
57102
* Instantiate a builder for this object
58103
* @return a builder instance

0 commit comments

Comments
 (0)