Skip to content

Commit

Permalink
Adding isSimulated methods to be used in simulate mapping validation …
Browse files Browse the repository at this point in the history
…work (#108791)
  • Loading branch information
masseyke committed May 20, 2024
1 parent d305f64 commit 93fdfe5
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REMOVE_ES_SOURCE_OPTIONS = def(8_661_00_0);
public static final TransportVersion NODE_STATS_INGEST_BYTES = def(8_662_00_0);
public static final TransportVersion SEMANTIC_QUERY = def(8_663_00_0);
public static final TransportVersion SIMULATE_VALIDATES_MAPPINGS = def(8_664_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ private void executeBulkRequestsByShard(
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[0])
requests.toArray(new BulkItemRequest[0]),
bulkRequest.isSimulated()
);
var indexMetadata = clusterState.getMetadata().index(shardId.getIndexName());
if (indexMetadata != null && indexMetadata.getInferenceFields().isEmpty() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,4 +463,12 @@ public long ramBytesUsed() {
public Set<String> getIndices() {
return Collections.unmodifiableSet(indices);
}

/**
* Returns true if this is a request for a simulation rather than a real bulk request.
* @return true if this is a simulated bulk request
*/
public boolean isSimulated() {
return false; // Always false, but may be overridden by a subclass
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
Expand All @@ -34,18 +35,29 @@ public final class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequ
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);

private final BulkItemRequest[] items;
private final boolean isSimulated;

private transient Map<String, InferenceFieldMetadata> inferenceFieldMap = null;

public BulkShardRequest(StreamInput in) throws IOException {
super(in);
items = in.readArray(i -> i.readOptionalWriteable(inpt -> new BulkItemRequest(shardId, inpt)), BulkItemRequest[]::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) {
isSimulated = in.readBoolean();
} else {
isSimulated = false;
}
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
this(shardId, refreshPolicy, items, false);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) {
super(shardId);
this.items = items;
setRefreshPolicy(refreshPolicy);
this.isSimulated = isSimulated;
}

/**
Expand Down Expand Up @@ -126,6 +138,9 @@ public void writeTo(StreamOutput out) throws IOException {
o.writeBoolean(false);
}
}, items);
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) {
out.writeBoolean(isSimulated);
}
}

@Override
Expand All @@ -149,6 +164,9 @@ public String toString() {
case NONE:
break;
}
if (isSimulated) {
b.append(", simulated");
}
return b.toString();
}

Expand Down Expand Up @@ -186,4 +204,8 @@ public long ramBytesUsed() {
}
return sum;
}

public boolean isSimulated() {
return isSimulated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public void writeTo(StreamOutput out) throws IOException {
public Map<String, Map<String, Object>> getPipelineSubstitutions() {
return pipelineSubstitutions;
}

@Override
public boolean isSimulated() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected void createMissingIndicesAndIndexData(
request.version(),
((IndexRequest) request).source(),
((IndexRequest) request).getContentType(),
((IndexRequest) request).getExecutedPipelines()
((IndexRequest) request).getExecutedPipelines(),
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -29,13 +32,19 @@
public class SimulateIndexResponse extends IndexResponse {
private final BytesReference source;
private final XContentType sourceXContentType;
private final Exception exception;

@SuppressWarnings("this-escape")
public SimulateIndexResponse(StreamInput in) throws IOException {
super(in);
this.source = in.readBytesReference();
this.sourceXContentType = XContentType.valueOf(in.readString());
setShardInfo(ShardInfo.EMPTY);
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) {
this.exception = in.readException();
} else {
this.exception = null;
}
}

@SuppressWarnings("this-escape")
Expand All @@ -45,13 +54,15 @@ public SimulateIndexResponse(
long version,
BytesReference source,
XContentType sourceXContentType,
List<String> pipelines
List<String> pipelines,
@Nullable Exception exception
) {
// We don't actually care about most of the IndexResponse fields:
super(new ShardId(index, "", 0), id == null ? "<n/a>" : id, 0, 0, version, true, pipelines);
this.source = source;
this.sourceXContentType = sourceXContentType;
setShardInfo(ShardInfo.EMPTY);
this.exception = exception;
}

@Override
Expand All @@ -62,6 +73,11 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field("_source", XContentHelper.convertToMap(source, false, sourceXContentType).v2());
assert executedPipelines != null : "executedPipelines is null when it shouldn't be - we always list pipelines in simulate mode";
builder.array("executed_pipelines", executedPipelines.toArray());
if (exception != null) {
builder.startObject("error");
ElasticsearchException.generateThrowableXContent(builder, params, exception);
builder.endObject();
}
return builder;
}

Expand All @@ -75,6 +91,9 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(source);
out.writeString(sourceXContentType.name());
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) {
out.writeException(exception);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

import static org.apache.lucene.tests.util.TestUtil.randomSimpleString;
import static org.hamcrest.Matchers.equalTo;

public class BulkShardRequestTests extends ESTestCase {
public void testToString() {
Expand All @@ -30,5 +37,42 @@ public void testToString() {
r = new BulkShardRequest(shardId, RefreshPolicy.WAIT_UNTIL, new BulkItemRequest[count]);
assertEquals("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh", r.toString());
assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription());

r = new BulkShardRequest(shardId, RefreshPolicy.WAIT_UNTIL, new BulkItemRequest[count], true);
assertEquals(
"BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh, simulated",
r.toString()
);
assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription());

r = new BulkShardRequest(shardId, RefreshPolicy.WAIT_UNTIL, new BulkItemRequest[count], false);
assertEquals("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh", r.toString());
assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription());
}

public void testSerialization() throws IOException {
// Note: BulkShardRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a Writable
BulkShardRequest bulkShardRequest = randomBulkShardRequest();
BulkShardRequest copy = copyWriteable(bulkShardRequest, null, BulkShardRequest::new);
assertThat(bulkShardRequest.items().length, equalTo(copy.items().length));
assertThat(bulkShardRequest.isSimulated(), equalTo(copy.isSimulated()));
assertThat(bulkShardRequest.getRefreshPolicy(), equalTo(copy.getRefreshPolicy()));
}

protected BulkShardRequest randomBulkShardRequest() {
String indexName = randomAlphaOfLength(100);
ShardId shardId = new ShardId(indexName, randomAlphaOfLength(50), randomInt());
RefreshPolicy refreshPolicy = randomFrom(RefreshPolicy.values());
BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(0, 100)];
for (int i = 0; i < items.length; i++) {
final DocWriteRequest<?> request = switch (randomFrom(DocWriteRequest.OpType.values())) {
case INDEX -> new IndexRequest(indexName).id("id_" + i);
case CREATE -> new IndexRequest(indexName).id("id_" + i).create(true);
case UPDATE -> new UpdateRequest(indexName, "id_" + i);
case DELETE -> new DeleteRequest(indexName, "id_" + i);
};
items[i] = new BulkItemRequest(i, request);
}
return new BulkShardRequest(shardId, refreshPolicy, items, randomBoolean());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -38,8 +39,17 @@ public void testToXContent() throws IOException {
String source = """
{"doc": {"key1": "val1", "key2": "val2"}}""";
BytesReference sourceBytes = BytesReference.fromByteBuffer(ByteBuffer.wrap(source.getBytes(StandardCharsets.UTF_8)));
SimulateIndexResponse indexResponse = new SimulateIndexResponse(id, index, version, sourceBytes, XContentType.JSON, pipelines);
String output = Strings.toString(indexResponse);

SimulateIndexResponse indexResponse = new SimulateIndexResponse(
id,
index,
version,
sourceBytes,
XContentType.JSON,
pipelines,
null
);

assertEquals(
XContentHelper.stripWhitespace(
Strings.format(
Expand All @@ -58,7 +68,39 @@ public void testToXContent() throws IOException {
pipelines.stream().map(pipeline -> "\"" + pipeline + "\"").collect(Collectors.joining(","))
)
),
output
Strings.toString(indexResponse)
);

SimulateIndexResponse indexResponseWithException = new SimulateIndexResponse(
id,
index,
version,
sourceBytes,
XContentType.JSON,
pipelines,
new ElasticsearchException("Some failure")
);

assertEquals(
XContentHelper.stripWhitespace(
Strings.format(
"""
{
"_id": "%s",
"_index": "%s",
"_version": %d,
"_source": %s,
"executed_pipelines": [%s],
"error":{"type":"exception","reason":"Some failure"}
}""",
id,
index,
version,
source,
pipelines.stream().map(pipeline -> "\"" + pipeline + "\"").collect(Collectors.joining(","))
)
),
Strings.toString(indexResponseWithException)
);
}

Expand All @@ -85,6 +127,14 @@ private static SimulateIndexResponse randomIndexResponse() {
}
XContentType xContentType = randomFrom(XContentType.values());
BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType);
return new SimulateIndexResponse(id, index, version, sourceBytes, xContentType, pipelines);
return new SimulateIndexResponse(
id,
index,
version,
sourceBytes,
xContentType,
pipelines,
randomBoolean() ? null : new ElasticsearchException("failed")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ private BulkItemResponse getSuccessBulkItemResponse(String id, String source) {
3,
BytesReference.fromByteBuffers(sourceByteBuffer),
XContentType.JSON,
List.of("pipeline1", "pipeline2")
List.of("pipeline1", "pipeline2"),
null
)
);
}
Expand Down

0 comments on commit 93fdfe5

Please sign in to comment.