Skip to content

JdbcAggregateOperations delete by query #2084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* @author Myeonghyeon Lee
* @author Chirag Tailor
* @author Mikhail Polivakha
* @author Jaeyeon Kim
* @since 2.0
*/
class AggregateChangeExecutor {
Expand Down Expand Up @@ -101,10 +102,16 @@ private void execute(DbAction<?> action, JdbcAggregateChangeExecutionContext exe
executionContext.executeBatchDeleteRoot(batchDeleteRoot);
} else if (action instanceof DbAction.DeleteAllRoot<?> deleteAllRoot) {
executionContext.executeDeleteAllRoot(deleteAllRoot);
} else if (action instanceof DbAction.DeleteRootByIdIn<?> deleteRootByIdIn) {
executionContext.executeDeleteRootByIdIn(deleteRootByIdIn);
} else if (action instanceof DbAction.DeleteByRootIdIn<?> deleteByRootIdIn) {
executionContext.executeDeleteByRootIdIn(deleteByRootIdIn);
} else if (action instanceof DbAction.AcquireLockRoot<?> acquireLockRoot) {
executionContext.executeAcquireLock(acquireLockRoot);
} else if (action instanceof DbAction.AcquireLockAllRoot<?> acquireLockAllRoot) {
executionContext.executeAcquireLockAllRoot(acquireLockAllRoot);
} else if (action instanceof DbAction.AcquireLockAllRootByQuery<?> acquireLockAllRootByQuery) {
executionContext.executeAcquireLockRootByQuery(acquireLockAllRootByQuery);
} else {
throw new RuntimeException("unexpected action");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.springframework.data.mapping.PersistentPropertyPath;
import org.springframework.data.mapping.PersistentPropertyPathAccessor;
import org.springframework.data.relational.core.conversion.DbAction;
import org.springframework.data.relational.core.conversion.SelectIdsDbActionExecutionResult;
import org.springframework.data.relational.core.conversion.DbActionExecutionResult;
import org.springframework.data.relational.core.conversion.IdValueSource;
import org.springframework.data.relational.core.mapping.AggregatePath;
Expand All @@ -60,6 +61,7 @@
* @author Myeonghyeon Lee
* @author Chirag Tailor
* @author Mark Paluch
* @author Jaeyeon Kim
*/
@SuppressWarnings("rawtypes")
class JdbcAggregateChangeExecutionContext {
Expand All @@ -72,6 +74,7 @@ class JdbcAggregateChangeExecutionContext {
private final DataAccessStrategy accessStrategy;

private final Map<DbAction<?>, DbActionExecutionResult> results = new LinkedHashMap<>();
private final Map<DbAction.SelectIds<?>, SelectIdsDbActionExecutionResult> selectIdsDbActionExecutionResult = new LinkedHashMap<>();

JdbcAggregateChangeExecutionContext(JdbcConverter converter, DataAccessStrategy accessStrategy) {

Expand Down Expand Up @@ -169,6 +172,34 @@ <T> void executeDeleteAll(DbAction.DeleteAll<T> delete) {
accessStrategy.deleteAll(delete.getPropertyPath());
}

<T> void executeDeleteRootByIdIn(DbAction.DeleteRootByIdIn<T> deleteRootByIdIn) {
SelectIdsDbActionExecutionResult result = getRequiredSelectIdsResult(deleteRootByIdIn.getSelectIdsAction());

List<Object> rootIds = new ArrayList<>(result.getSelectedIds());
if (rootIds.isEmpty()) {
return;
}
accessStrategy.delete(rootIds, deleteRootByIdIn.getEntityType());
}

<T> void executeDeleteByRootIdIn(DbAction.DeleteByRootIdIn<T> deleteByRootIdIn) {
SelectIdsDbActionExecutionResult result = getRequiredSelectIdsResult(deleteByRootIdIn.getSelectIdsAction());

List<Object> rootIds = new ArrayList<>(result.getSelectedIds());
if (rootIds.isEmpty()) {
return;
}
accessStrategy.delete(rootIds, deleteByRootIdIn.getPropertyPath());
}

private SelectIdsDbActionExecutionResult getRequiredSelectIdsResult(DbAction.SelectIds selectIdsAction) {
SelectIdsDbActionExecutionResult result = selectIdsDbActionExecutionResult.get(selectIdsAction);
if (result == null) {
throw new IllegalArgumentException("Expected SelectIdsDbActionExecutionResult for given selectIdsAction but found none");
}
return result;
}

<T> void executeAcquireLock(DbAction.AcquireLockRoot<T> acquireLock) {
accessStrategy.acquireLockById(acquireLock.getId(), LockMode.PESSIMISTIC_WRITE, acquireLock.getEntityType());
}
Expand All @@ -177,6 +208,13 @@ <T> void executeAcquireLockAllRoot(DbAction.AcquireLockAllRoot<T> acquireLock) {
accessStrategy.acquireLockAll(LockMode.PESSIMISTIC_WRITE, acquireLock.getEntityType());
}

<T> void executeAcquireLockRootByQuery(DbAction.AcquireLockAllRootByQuery<T> acquireLock) {

List<?> rootIds = accessStrategy.acquireLockAndFindIdsByQuery(acquireLock.getQuery(), LockMode.PESSIMISTIC_WRITE, acquireLock.getEntityType());

selectIdsDbActionExecutionResult.put(acquireLock, new SelectIdsDbActionExecutionResult(rootIds, acquireLock));
}

private void add(DbActionExecutionResult result) {
results.put(result.getAction(), result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* @author Diego Krupitza
* @author Myeonghyeon Lee
* @author Sergey Korotaev
* @author Jaeyeon Kim
*/
public interface JdbcAggregateOperations {

Expand Down Expand Up @@ -324,4 +325,13 @@ public interface JdbcAggregateOperations {
* @param <T> the type of the aggregate roots.
*/
<T> void deleteAll(Iterable<? extends T> aggregateRoots);

/**
* Deletes all aggregates of the given type that match the provided query.
*
* @param query Must not be {@code null}.
* @param domainType the type of the aggregate root. Must not be {@code null}.
* @param <T> the type of the aggregate root.
*/
<T> void deleteAllByQuery(Query query, Class<T> domainType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* @author Diego Krupitza
* @author Sergey Korotaev
* @author Mikhail Polivakha
* @author Jaeyeon Kim
*/
public class JdbcAggregateTemplate implements JdbcAggregateOperations {

Expand Down Expand Up @@ -461,6 +462,17 @@ public <T> void deleteAll(Iterable<? extends T> instances) {
}
}

@Override
public <T> void deleteAllByQuery(Query query, Class<T> domainType) {

Assert.notNull(query, "Query must not be null");
Assert.notNull(domainType, "Domain type must not be null");

MutableAggregateChange<?> change = createDeletingChange(query, domainType);

executor.executeDelete(change);
}

private <T> void verifyIdProperty(T instance) {
// accessing the id property just to raise an exception in the case it does not exist.
context.getRequiredPersistentEntity(instance.getClass()).getRequiredIdProperty();
Expand Down Expand Up @@ -639,6 +651,13 @@ private MutableAggregateChange<?> createDeletingChange(Class<?> domainType) {
return aggregateChange;
}

private MutableAggregateChange<?> createDeletingChange(Query query, Class<?> domainType) {

MutableAggregateChange<?> aggregateChange = MutableAggregateChange.forDelete(domainType);
jdbcEntityDeleteWriter.writeForQuery(query, aggregateChange);
return aggregateChange;
}

private <T> List<T> triggerAfterConvert(Iterable<T> all) {

List<T> result = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* @author Chirag Tailor
* @author Diego Krupitza
* @author Sergey Korotaev
* @author Jaeyeon Kim
* @since 1.1
*/
public class CascadingDataAccessStrategy implements DataAccessStrategy {
Expand Down Expand Up @@ -119,6 +120,11 @@ public <T> void acquireLockAll(LockMode lockMode, Class<T> domainType) {
collectVoid(das -> das.acquireLockAll(lockMode, domainType));
}

@Override
public <T> List<?> acquireLockAndFindIdsByQuery(Query query, LockMode lockMode, Class<T> domainType) {
return collect(das -> das.acquireLockAndFindIdsByQuery(query, lockMode, domainType));
}

@Override
public long count(Class<?> domainType) {
return collect(das -> das.count(domainType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* @author Chirag Tailor
* @author Diego Krupitza
* @author Sergey Korotaev
* @author Jaeyeon Kim
*/
public interface DataAccessStrategy extends ReadingDataAccessStrategy, RelationResolver {

Expand Down Expand Up @@ -194,6 +195,18 @@ public interface DataAccessStrategy extends ReadingDataAccessStrategy, RelationR
*/
<T> void acquireLockAll(LockMode lockMode, Class<T> domainType);

/**
* Acquire a lock on all aggregates that match the given {@link Query} and return their identifiers.
* The resulting SQL will include a {@code SELECT id FROM … WHERE … (LOCK CLAUSE)} to retrieve and lock the matching rows.
*
* @param query the query specifying which entities to lock. Must not be {@code null}.
* @param lockMode the lock mode to apply to the query (e.g. {@code FOR UPDATE}). Must not be {@code null}.
* @param domainType the domain type of the entities to be locked. Must not be {@code null}.
* @param <T> the type of the domain entity.
* @return a {@link List} of ids corresponding to the rows locked by the query.
*/
<T> List<?> acquireLockAndFindIdsByQuery(Query query, LockMode lockMode, Class<T> domainType);

/**
* Counts the rows in the table representing the given domain type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.springframework.data.relational.core.sql.LockMode;
import org.springframework.data.relational.core.sql.SqlIdentifier;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.SingleColumnRowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
Expand All @@ -63,6 +64,7 @@
* @author Diego Krupitza
* @author Sergey Korotaev
* @author Mikhail Polivakha
* @author Jaeyeon Kim
* @since 1.1
*/
public class DefaultDataAccessStrategy implements DataAccessStrategy {
Expand Down Expand Up @@ -259,6 +261,28 @@ public <T> void acquireLockAll(LockMode lockMode, Class<T> domainType) {
operations.getJdbcOperations().query(acquireLockAllSql, ResultSet::next);
}

@Override
public <T> List<?> acquireLockAndFindIdsByQuery(Query query, LockMode lockMode, Class<T> domainType) {

MapSqlParameterSource parameterSource = new MapSqlParameterSource();
String acquireLockByQuerySql = sql(domainType).getAcquireLockAndFindIdsByQuery(query, parameterSource, lockMode);

RelationalPersistentEntity<?> entity = context.getRequiredPersistentEntity(domainType);
RelationalPersistentProperty idProperty = entity.getRequiredIdProperty();

return operations.query(acquireLockByQuerySql, parameterSource, getIdRowMapper(idProperty));
}

private RowMapper<?> getIdRowMapper(RelationalPersistentProperty idProperty) {
RelationalPersistentEntity<?> complexId = context.getPersistentEntity(idProperty.getType());

if (complexId == null) {
return SingleColumnRowMapper.newInstance(idProperty.getType(), converter.getConversionService());
} else {
return new EntityRowMapper<>(context.getRequiredPersistentEntity(idProperty.getType()), converter);
}
}

@Override
public long count(Class<?> domainType) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* @author Chirag Tailor
* @author Diego Krupitza
* @author Sergey Korotaev
* @author Jaeyeon Kim
* @since 1.1
*/
public class DelegatingDataAccessStrategy implements DataAccessStrategy {
Expand Down Expand Up @@ -119,6 +120,11 @@ public <T> void acquireLockAll(LockMode lockMode, Class<T> domainType) {
delegate.acquireLockAll(lockMode, domainType);
}

@Override
public <T> List<?> acquireLockAndFindIdsByQuery(Query query, LockMode lockMode, Class<T> domainType) {
return delegate.acquireLockAndFindIdsByQuery(query, lockMode, domainType);
}

@Override
public long count(Class<?> domainType) {
return delegate.count(domainType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* @author Hari Ohm Prasath
* @author Viktor Ardelean
* @author Kurt Niemi
* @author Jaeyeon Kim
*/
public class SqlGenerator {

Expand Down Expand Up @@ -377,6 +378,18 @@ String getAcquireLockAll(LockMode lockMode) {
return this.createAcquireLockAll(lockMode);
}

/**
* Create a {@code SELECT id FROM … WHERE … (LOCK CLAUSE)} statement based on the given query.
*
* @param query the query to base the select on. Must not be null.
* @param parameterSource the source for holding the bindings.
* @param lockMode Lock clause mode.
* @return the SQL statement as a {@link String}. Guaranteed to be not {@literal null}.
*/
String getAcquireLockAndFindIdsByQuery(Query query, MapSqlParameterSource parameterSource, LockMode lockMode) {
return this.createAcquireLockByQuery(query, parameterSource, lockMode);
}

/**
* Create a {@code INSERT INTO … (…) VALUES(…)} statement.
*
Expand Down Expand Up @@ -594,6 +607,23 @@ private String createAcquireLockAll(LockMode lockMode) {
return render(select);
}

private String createAcquireLockByQuery(Query query, MapSqlParameterSource parameterSource, LockMode lockMode) {

Assert.notNull(parameterSource, "parameterSource must not be null");

Table table = this.getTable();

SelectBuilder.SelectWhere selectBuilder = StatementBuilder
.select(getIdColumns())
.from(table);

Select select = applyQueryOnSelect(query, parameterSource, selectBuilder)
.lock(lockMode)
.build();

return render(select);
}

private String createFindAllSql() {
return render(selectBuilder().build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* @author Christopher Klein
* @author Mikhail Polivakha
* @author Sergey Korotaev
* @author Jaeyeon Kim
*/
public class MyBatisDataAccessStrategy implements DataAccessStrategy {

Expand Down Expand Up @@ -253,6 +254,11 @@ public <T> void acquireLockAll(LockMode lockMode, Class<T> domainType) {
sqlSession().selectOne(statement, parameter);
}

@Override
public <T> List<?> acquireLockAndFindIdsByQuery(Query query, LockMode lockMode, Class<T> domainType) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public <T> T findById(Object id, Class<T> domainType) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.springframework.data.jdbc.testing.TestConfiguration;
import org.springframework.data.relational.core.mapping.Embedded;
import org.springframework.data.relational.core.mapping.RelationalMappingContext;
import org.springframework.data.relational.core.query.Criteria;
import org.springframework.data.relational.core.query.Query;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;

/**
* Integration tests for {@link JdbcAggregateTemplate} and it's handling of entities with embedded entities as keys.
*
* @author Jens Schauder
* @author Jaeyeon Kim
*/
@IntegrationTest
@EnabledOnDatabase(DatabaseType.HSQL)
Expand Down Expand Up @@ -132,6 +134,26 @@ void deleteMultipleSimpleEntityWithEmbeddedPk() {
assertThat(reloaded).containsExactly(entities.get(2));
}

@Test // GH-1978
void deleteAllByQueryWithEmbeddedPk() {

List<SimpleEntityWithEmbeddedPk> entities = (List<SimpleEntityWithEmbeddedPk>) template.insertAll(List.of(
new SimpleEntityWithEmbeddedPk(new EmbeddedPk(1L, "a"), "alpha"),
new SimpleEntityWithEmbeddedPk(new EmbeddedPk(2L, "b"), "beta"),
new SimpleEntityWithEmbeddedPk(new EmbeddedPk(3L, "b"), "gamma")
));

Query query = Query.query(Criteria.where("name").is("beta"));
template.deleteAllByQuery(query, SimpleEntityWithEmbeddedPk.class);

assertThat(
template.findAll(SimpleEntityWithEmbeddedPk.class))
.containsExactlyInAnyOrder(
entities.get(0), // alpha
entities.get(2) // gamma
);
}

@Test // GH-574
void existsSingleSimpleEntityWithEmbeddedPk() {

Expand Down
Loading