Skip to content

Add backup restore options for users and roles #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
package io.weaviate.client.v1.async.backup.api;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpStatus;

import com.google.gson.annotations.SerializedName;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
Expand All @@ -13,22 +25,13 @@
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.model.BackupRestoreResponse;
import io.weaviate.client.v1.backup.model.BackupRestoreStatusResponse;
import io.weaviate.client.v1.backup.model.RbacRestoreOption;
import io.weaviate.client.v1.backup.model.RestoreStatus;
import lombok.Builder;
import lombok.Getter;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpStatus;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

public class BackupRestorer extends AsyncBaseClient<BackupRestoreResponse>
implements AsyncClientResult<BackupRestoreResponse> {
implements AsyncClientResult<BackupRestoreResponse> {

private static final long WAIT_INTERVAL = 1000;

Expand All @@ -41,14 +44,13 @@ public class BackupRestorer extends AsyncBaseClient<BackupRestoreResponse>
private boolean waitForCompletion;
private final Executor executor;


public BackupRestorer(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider, BackupRestoreStatusGetter statusGetter, Executor executor) {
public BackupRestorer(CloseableHttpAsyncClient client, Config config, AccessTokenProvider tokenProvider,
BackupRestoreStatusGetter statusGetter, Executor executor) {
super(client, config, tokenProvider);
this.statusGetter = statusGetter;
this.executor = executor;
}


public BackupRestorer withIncludeClassNames(String... classNames) {
this.includeClassNames = classNames;
return this;
Expand Down Expand Up @@ -87,19 +89,20 @@ public Future<Result<BackupRestoreResponse>> run(FutureCallback<Result<BackupRes
return restore(callback);
}


private Future<Result<BackupRestoreResponse>> restore(FutureCallback<Result<BackupRestoreResponse>> callback) {
BackupRestore payload = BackupRestore.builder()
.config(BackupRestoreConfig.builder().build())
.include(includeClassNames)
.exclude(excludeClassNames)
.config(config)
.build();
String path = String.format("/backups/%s/%s/restore", UrlEncoder.encodePathParam(backend), UrlEncoder.encodePathParam(backupId));
.config(BackupRestoreConfig.builder().build())
.include(includeClassNames)
.exclude(excludeClassNames)
.config(config)
.build();
String path = String.format("/backups/%s/%s/restore", UrlEncoder.encodePathParam(backend),
UrlEncoder.encodePathParam(backupId));
return sendPostRequest(path, payload, BackupRestoreResponse.class, callback);
}

private Future<Result<BackupRestoreResponse>> restoreAndWaitForCompletion(FutureCallback<Result<BackupRestoreResponse>> callback) {
private Future<Result<BackupRestoreResponse>> restoreAndWaitForCompletion(
FutureCallback<Result<BackupRestoreResponse>> callback) {
CompletableFuture<Result<BackupRestoreResponse>> future = new CompletableFuture<>();
FutureCallback<Result<BackupRestoreResponse>> internalCallback = new FutureCallback<Result<BackupRestoreResponse>>() {
@Override
Expand All @@ -124,64 +127,65 @@ public void cancelled() {
restore(internalCallback);

return future.thenCompose(restoreResult -> {
if (restoreResult.hasErrors()) {
return CompletableFuture.completedFuture(restoreResult);
}
return getStatusRecursively(backend, backupId, restoreResult);
})
.whenComplete((restoreResult, throwable) -> {
if (callback != null) {
if (throwable != null) {
callback.failed((Exception) throwable);
} else {
callback.completed(restoreResult);
if (restoreResult.hasErrors()) {
return CompletableFuture.completedFuture(restoreResult);
}
return getStatusRecursively(backend, backupId, restoreResult);
})
.whenComplete((restoreResult, throwable) -> {
if (callback != null) {
if (throwable != null) {
callback.failed((Exception) throwable);
} else {
callback.completed(restoreResult);
}
}
}
});
});
}

private CompletableFuture<Result<BackupRestoreStatusResponse>> getStatus(String backend, String backupId) {
CompletableFuture<Result<BackupRestoreStatusResponse>> future = new CompletableFuture<>();
statusGetter.withBackend(backend).withBackupId(backupId)
.run(new FutureCallback<Result<BackupRestoreStatusResponse>>() {
@Override
public void completed(Result<BackupRestoreStatusResponse> createStatusResult) {
future.complete(createStatusResult);
}
.run(new FutureCallback<Result<BackupRestoreStatusResponse>>() {
@Override
public void completed(Result<BackupRestoreStatusResponse> createStatusResult) {
future.complete(createStatusResult);
}

@Override
public void failed(Exception e) {
future.completeExceptionally(e);
}
@Override
public void failed(Exception e) {
future.completeExceptionally(e);
}

@Override
public void cancelled() {
}
});
@Override
public void cancelled() {
}
});
return future;
}

private CompletableFuture<Result<BackupRestoreResponse>> getStatusRecursively(String backend, String backupId,
Result<BackupRestoreResponse> restoreResult) {
Result<BackupRestoreResponse> restoreResult) {
return Futures.thenComposeAsync(getStatus(backend, backupId), restoreStatusResult -> {
boolean isRunning = Optional.of(restoreStatusResult)
.filter(r -> !r.hasErrors())
.map(Result::getResult)
.map(BackupRestoreStatusResponse::getStatus)
.filter(status -> {
switch (status) {
case RestoreStatus.SUCCESS:
case RestoreStatus.FAILED:
return false;
default:
return true;
}
})
.isPresent();
.filter(r -> !r.hasErrors())
.map(Result::getResult)
.map(BackupRestoreStatusResponse::getStatus)
.filter(status -> {
switch (status) {
case RestoreStatus.SUCCESS:
case RestoreStatus.FAILED:
return false;
default:
return true;
}
})
.isPresent();

if (isRunning) {
try {
return Futures.supplyDelayed(() -> getStatusRecursively(backend, backupId, restoreResult), WAIT_INTERVAL, executor);
return Futures.supplyDelayed(() -> getStatusRecursively(backend, backupId, restoreResult), WAIT_INTERVAL,
executor);
} catch (InterruptedException e) {
throw new CompletionException(e);
}
Expand All @@ -191,7 +195,7 @@ private CompletableFuture<Result<BackupRestoreResponse>> getStatusRecursively(St
}

private Result<BackupRestoreResponse> merge(Result<BackupRestoreStatusResponse> restoreStatusResult,
Result<BackupRestoreResponse> restoreResult) {
Result<BackupRestoreResponse> restoreResult) {
BackupRestoreStatusResponse restoreStatusResponse = restoreStatusResult.getResult();
BackupRestoreResponse restoreResponse = restoreResult.getResult();

Expand All @@ -215,20 +219,22 @@ private Result<BackupRestoreResponse> merge(Result<BackupRestoreStatusResponse>
List<WeaviateErrorMessage> messages = error.getMessages();

errorResponse = WeaviateErrorResponse.builder()
.code(statusCode)
.error(messages)
.build();
.code(statusCode)
.error(messages)
.build();
}

return new Result<>(statusCode, merged, errorResponse);
}


@Getter
@Builder
private static class BackupRestore {
@SerializedName("config")
BackupRestoreConfig config;
@SerializedName("include")
String[] include;
@SerializedName("exclude")
String[] exclude;
}

Expand All @@ -241,5 +247,9 @@ public static class BackupRestoreConfig {
String bucket;
@SerializedName("Path")
String path;
@SerializedName("usersOptions")
RbacRestoreOption usersRestore;
@SerializedName("rolesOptions")
RbacRestoreOption rolesRestore;
}
}
35 changes: 21 additions & 14 deletions src/main/java/io/weaviate/client/v1/backup/api/BackupRestorer.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package io.weaviate.client.v1.backup.api;

import com.google.gson.annotations.SerializedName;
import io.weaviate.client.v1.backup.model.BackupRestoreResponse;
import io.weaviate.client.v1.backup.model.BackupRestoreStatusResponse;
import io.weaviate.client.v1.backup.model.RestoreStatus;
import lombok.Builder;
import lombok.Getter;

import io.weaviate.client.Config;
import io.weaviate.client.base.BaseClient;
import io.weaviate.client.base.ClientResult;
import io.weaviate.client.base.Response;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.http.HttpClient;
import io.weaviate.client.v1.backup.model.BackupRestoreResponse;
import io.weaviate.client.v1.backup.model.BackupRestoreStatusResponse;
import io.weaviate.client.v1.backup.model.RbacRestoreOption;
import io.weaviate.client.v1.backup.model.RestoreStatus;
import lombok.Builder;
import lombok.Getter;

public class BackupRestorer extends BaseClient<BackupRestoreResponse> implements ClientResult<BackupRestoreResponse> {

Expand Down Expand Up @@ -68,19 +70,17 @@ public BackupRestorer withWaitForCompletion(boolean waitForCompletion) {
@Override
public Result<BackupRestoreResponse> run() {
BackupRestore payload = BackupRestore.builder()
.config(BackupRestoreConfig.builder().build())
.include(includeClassNames)
.exclude(excludeClassNames)
.config(config)
.build();
.include(includeClassNames)
.exclude(excludeClassNames)
.config(config)
.build();

if (waitForCompletion) {
return restoreAndWaitForCompletion(payload);
}
return restore(payload);
}


private Result<BackupRestoreResponse> restore(BackupRestore payload) {
Response<BackupRestoreResponse> response = sendPostRequest(path(), payload, BackupRestoreResponse.class);
return new Result<>(response);
Expand All @@ -93,7 +93,7 @@ private Result<BackupRestoreResponse> restoreAndWaitForCompletion(BackupRestore
}

statusGetter.withBackend(backend).withBackupId(backupId);
while(true) {
while (true) {
Response<BackupRestoreStatusResponse> statusResponse = statusGetter.statusRestore();
if (new Result<>(statusResponse).hasErrors()) {
return merge(statusResponse, result);
Expand All @@ -117,7 +117,8 @@ private String path() {
return String.format("/backups/%s/%s/restore", backend, backupId);
}

private Result<BackupRestoreResponse> merge(Response<BackupRestoreStatusResponse> response, Result<BackupRestoreResponse> result) {
private Result<BackupRestoreResponse> merge(Response<BackupRestoreStatusResponse> response,
Result<BackupRestoreResponse> result) {
BackupRestoreStatusResponse statusRestoreResponse = response.getBody();
BackupRestoreResponse restoreResponse = result.getResult();

Expand All @@ -136,12 +137,14 @@ private Result<BackupRestoreResponse> merge(Response<BackupRestoreStatusResponse
return new Result<>(response.getStatusCode(), merged, response.getErrors());
}


@Getter
@Builder
private static class BackupRestore {
@SerializedName("config")
BackupRestoreConfig config;
@SerializedName("include")
String[] include;
@SerializedName("exclude")
String[] exclude;
}

Expand All @@ -154,5 +157,9 @@ public static class BackupRestoreConfig {
String bucket;
@SerializedName("Path")
String path;
@SerializedName("usersOptions")
RbacRestoreOption usersRestore;
@SerializedName("rolesOptions")
RbacRestoreOption rolesRestore;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.weaviate.client.v1.backup.model;

import com.google.gson.annotations.SerializedName;

public enum RbacRestoreOption {
@SerializedName("noRestore")
NO_RESTORE,
@SerializedName("all")
ALL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public Weaviate(String dockerImageName, boolean withOffloadS3) {
withEnv("PERSISTENCE_FLUSH_IDLE_MEMTABLES_AFTER", "1");
withEnv("ENABLE_MODULES", String.join(",", enableModules));
withCreateContainerCmdModifier(cmd -> cmd.withHostName("weaviate"));

withEnv("AUTHENTICATION_DB_USERS_ENABLED", "true");
}
}

Expand Down
Loading