Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking code in reactive package #124

Open
bekirtaskin opened this issue Sep 25, 2022 · 0 comments
Open

Blocking code in reactive package #124

bekirtaskin opened this issue Sep 25, 2022 · 0 comments

Comments

@bekirtaskin
Copy link

bekirtaskin commented Sep 25, 2022

In EventuateCommonReactiveJdbcOperations class, There is a block call in columnToJson method.
`public String columnToJson(EventuateSchema eventuateSchema, String column) {

BiFunction<String, List<Object>, List<Map<String, Object>>> selectCallback =
        (sql, params) -> reactiveJdbcStatementExecutor
                .query(sql, params.toArray())
                .collectList()
                .block(Duration.ofMillis(blockingTimeoutForRetrievingMetadata));

return eventuateSqlDialect.castToJson("?",
        eventuateSchema, "message", column, selectCallback);

}`

Blockhoud would not let it run.

I had to change the code a bit to get rid of blocking code part.

`import static io.eventuate.common.jdbc.EventuateJdbcOperationsUtils.MESSAGE_AUTO_GENERATED_ID_COLUMN;

import java.util.HashMap;
import java.util.Map;

import io.eventuate.common.id.IdGenerator;
import io.eventuate.common.jdbc.EventuateJdbcOperationsUtils;
import io.eventuate.common.jdbc.EventuateSchema;
import io.eventuate.common.jdbc.sqldialect.EventuateSqlDialect;
import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.common.reactive.jdbc.EventuateCommonReactiveJdbcOperations;
import io.eventuate.common.reactive.jdbc.EventuateReactiveJdbcStatementExecutor;
import reactor.core.publisher.Mono;

public class EventuateCommonReactiveOperations extends EventuateCommonReactiveJdbcOperations {

private static final String COLUMN_TYPE_SQL = "select data_type from information_schema.columns where table_schema = ? and table_name = ? and column_name = ?";
private static final String MESSAGE_TABLE_NAME = "message";

private EventuateJdbcOperationsUtils eventuateJdbcOperationsUtils;
private EventuateReactiveJdbcStatementExecutor reactiveJdbcStatementExecutor;
private EventuateSqlDialect eventuateSqlDialect;

public EventuateCommonReactiveOperations(
    EventuateJdbcOperationsUtils eventuateJdbcOperationsUtils,
    EventuateReactiveJdbcStatementExecutor reactiveJdbcStatementExecutor,
    EventuateSqlDialect eventuateSqlDialect,
    int blockingTimeoutForRetrievingMetadata) {
super(eventuateJdbcOperationsUtils, reactiveJdbcStatementExecutor, eventuateSqlDialect, blockingTimeoutForRetrievingMetadata);
this.eventuateJdbcOperationsUtils = eventuateJdbcOperationsUtils;
this.reactiveJdbcStatementExecutor = reactiveJdbcStatementExecutor;
this.eventuateSqlDialect = eventuateSqlDialect;
}

@Override
public Mono<String> insertIntoMessageTable(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    EventuateSchema eventuateSchema) {

return insertIntoMessageTableImpl(idGenerator, payload, destination, headers, eventuateSchema, false);
}

@Override
public Mono<String> insertPublishedMessageIntoMessageTable(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    EventuateSchema eventuateSchema) {

return insertIntoMessageTableImpl(idGenerator, payload, destination, headers, eventuateSchema, true);
}

private Mono<String> insertIntoMessageTableImpl(
    IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    EventuateSchema eventuateSchema,
    boolean published) {
if (idGenerator.databaseIdRequired()) {
    return insertIntoMessageTableDatabaseIdImpl(idGenerator,
	    payload, destination, headers, published, eventuateSchema);
} else {
    return insertIntoMessageTableApplicationIdImpl(idGenerator,
	    payload, destination, headers, published, eventuateSchema);
}
}

private Mono<String> insertIntoMessageTableApplicationIdImpl(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    boolean published,
    EventuateSchema eventuateSchema) {
headers = new HashMap<>(headers);
String messageId = idGenerator.genId(null).asString();
headers.put("ID", messageId);
String serializedHeaders = JSonMapper.toJson(headers);
return this
	.createInsertQueryIntoMessageTableApplicationIdSql(eventuateSchema)
	.flatMap(sql -> reactiveJdbcStatementExecutor.update(
		sql,
		messageId,
		destination,
		serializedHeaders,
		payload,
		eventuateJdbcOperationsUtils.booleanToInt(published)))
	.map(rowsUpdated -> messageId);
}

private Mono<String> insertIntoMessageTableDatabaseIdImpl(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    boolean published,
    EventuateSchema eventuateSchema) {
String serializedHeaders = JSonMapper.toJson(headers);
return this
	.createInsertQueryIntoMessageTableDbIdSql(eventuateSchema)
	.flatMap(sql -> reactiveJdbcStatementExecutor.insertAndReturnId(
		sql,
		MESSAGE_AUTO_GENERATED_ID_COLUMN,
		destination,
		serializedHeaders,
		payload,
		eventuateJdbcOperationsUtils.booleanToInt(published)))
	.map(id -> idGenerator.genId(id).asString());
}

public Mono<String> createInsertQueryIntoMessageTableApplicationIdSql(final EventuateSchema eventuateSchema) {
return Mono.zip(
	this.getHeadersColumnType(eventuateSchema),
	this.getPayloadColumnType(eventuateSchema),
	(headersJson, payloadJson) -> {
	    String sql = "insert into %s(id, destination, headers, payload, creation_time, published) values(?, ?, %s, %s, %s, ?)";
	    return String.format(sql,
		    eventuateSchema.qualifyTable(MESSAGE_TABLE_NAME),
		    headersJson,
		    payloadJson,
		    eventuateSqlDialect.getCurrentTimeInMillisecondsExpression());
	});
}

public Mono<String> createInsertQueryIntoMessageTableDbIdSql(final EventuateSchema eventuateSchema) {
return Mono.zip(
	this.getHeadersColumnType(eventuateSchema),
	this.getPayloadColumnType(eventuateSchema),
	(headersJson, payloadJson) -> {
	    String sql = "insert into %s(id, destination, headers, payload, creation_time, published) values('', ?, %s, %s, %s, ?)";
	    return String.format(sql,
		    eventuateSchema.qualifyTable(MESSAGE_TABLE_NAME),
		    headersJson,
		    payloadJson,
		    eventuateSqlDialect.getCurrentTimeInMillisecondsExpression());
	});
}

private Mono<String> getHeadersColumnType(final EventuateSchema eventuateSchema) {
return reactiveJdbcStatementExecutor
	.query(COLUMN_TYPE_SQL, eventuateSchema.getEventuateDatabaseSchema(), MESSAGE_TABLE_NAME, "headers")
	.last()
	.map(r -> r.get("data_type"))
	.map(columnType -> String.format("?::%s", columnType))
	.defaultIfEmpty("?");
}

private Mono<String> getPayloadColumnType(final EventuateSchema eventuateSchema) {
return reactiveJdbcStatementExecutor
	.query(COLUMN_TYPE_SQL, eventuateSchema.getEventuateDatabaseSchema(), MESSAGE_TABLE_NAME, "payload")
	.last()
	.map(r -> r.get("data_type"))
	.map(columnType -> String.format("?::%s", columnType))
	.defaultIfEmpty("?");
}`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant