Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Size computation mode #1093

Merged
merged 5 commits into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion portability-spi-cloud/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ dependencies {
compile("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}")
compile("com.google.auto.value:auto-value:${autoValueVersion}")
compile("com.google.inject:guice:${guiceVersion}")

testCompile("junit:junit:${junitVersion}")
testCompile("org.mockito:mockito-core:${mockitoVersion}")
testCompile("com.google.truth:truth:${truthVersion}")
}

sourceSets {
Expand All @@ -43,4 +47,4 @@ sourceSets {
}
}

configurePublication(project)
configurePublication(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.datatransferproject.spi.cloud.connection;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.UUID;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.types.common.DownloadableItem;

public class ConnectionProvider {

private final TemporaryPerJobDataStore jobStore;

public ConnectionProvider(TemporaryPerJobDataStore jobStore) {
this.jobStore = jobStore;
}

public InputStreamWrapper getInputStreamForItem(UUID jobId, DownloadableItem item)
throws IOException {

String fetchableUrl = item.getFetchableUrl();
if (item.isInTempStore()) {
return jobStore.getStream(jobId, fetchableUrl);
}

HttpURLConnection conn = getConnection(fetchableUrl);
return new InputStreamWrapper(
conn.getInputStream(), Math.max(conn.getContentLengthLong(), 0));
}

public static HttpURLConnection getConnection(String urlStr) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
return conn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ default Map<String, Integer> getCounts(UUID jobId) {
*/
default void addBytes(UUID jobId, Long bytes) throws IOException {}

/**
* Increments the bytes count for downloadable items of the given job.
*
* @param bytes key is idempotent id of a DownloadableItem
*/
default void addBytes(UUID jobId, Map<String, Long> bytes) {}

/** Provides the total number of bytes transferred. */
default Long getBytes(UUID jobId) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class PortabilityJob {
private static final String EXPORT_ENCRYPTED_INITIAL_AUTH_DATA =
"EXPORT_ENCRYPTED_INITIAL_AUTH_DATA";
private static final String JOB_STATE = "JOB_STATE";
private static final String TRANSFER_MODE = "TRANSFER_MODE";
private static final String FAILURE_REASON = "FAILURE_REASON";
private static final String NUMBER_OF_FAILED_FILES_KEY = "NUM_FAILED_FILES";
private static final String USER_TIMEZONE = "USER_TIMEZONE";
Expand Down Expand Up @@ -94,6 +95,11 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
String userLocale =
properties.containsKey(USER_LOCALE) ? (String) properties.get(USER_LOCALE) : null;

TransferMode transferMode =
properties.containsKey(TRANSFER_MODE)
? TransferMode.valueOf((String) properties.get(TRANSFER_MODE))
: TransferMode.DATA_TRANSFER;

return PortabilityJob.builder()
.setState(state)
.setExportService((String) properties.get(EXPORT_SERVICE_KEY))
Expand All @@ -117,6 +123,7 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
.build())
.setUserTimeZone(userTimeZone)
.setUserLocale(userLocale)
.setTransferMode(transferMode)
.build();
}

Expand Down Expand Up @@ -171,6 +178,10 @@ private static void isSet(String... strings) {
@JsonProperty("userLocale")
public abstract String userLocale();

@Nullable
@JsonProperty("transferMode")
public abstract TransferMode transferMode();

public abstract PortabilityJob.Builder toBuilder();

public Map<String, Object> toMap() {
Expand Down Expand Up @@ -229,6 +240,10 @@ public Map<String, Object> toMap() {
builder.put(USER_LOCALE, userLocale());
}

if (null != transferMode()) {
builder.put(TRANSFER_MODE, transferMode().toString());
}

return builder.build();
}

Expand All @@ -242,6 +257,19 @@ public enum State {
PREEMPTED
}

public enum TransferMode {
/**
* Regular data transfer mode: export data from a service, then import into another service.
*/
DATA_TRANSFER,

/**
* Do not import the data. Instead, compute the size of every exported item and report the sizes
* to the job store.
*/
SIZE_CALCULATION
}

@AutoValue.Builder
public abstract static class Builder {
@JsonCreator
Expand Down Expand Up @@ -306,6 +334,10 @@ public Builder setAndValidateJobAuthorization(JobAuthorization jobAuthorization)
@JsonProperty("userLocale")
public abstract Builder setUserLocale(String locale);

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("transferMode")
public abstract Builder setTransferMode(TransferMode transferMode);

// For internal use only; clients should use setAndValidateJobAuthorization
protected abstract Builder setJobAuthorization(JobAuthorization jobAuthorization);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.datatransferproject.spi.cloud.connection;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.truth.Truth;
import java.util.UUID;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.types.common.DownloadableItem;
import org.datatransferproject.types.common.models.photos.PhotoModel;
import org.junit.Before;
import org.junit.Test;

public class ConnectionProviderTest {

private TemporaryPerJobDataStore jobStore;
private ConnectionProvider connectionProvider;

@Before
public void setUp() throws Exception {
jobStore = mock(TemporaryPerJobDataStore.class);
connectionProvider = new ConnectionProvider(jobStore);
}

@Test
public void getInputStreamFromTempStore() throws Exception {
long expectedBytes = 323;
when(jobStore.getStream(any(), anyString())).thenReturn(
new InputStreamWrapper(null, expectedBytes));
boolean inTempStore = true;
String fetchableUrl = "https://example.com";
DownloadableItem item = new PhotoModel("title", fetchableUrl, "description", "jpeg",
"123", "album", inTempStore);
UUID jobId = UUID.randomUUID();
InputStreamWrapper streamWrapper = connectionProvider.getInputStreamForItem(
jobId, item);

Truth.assertThat(streamWrapper.getBytes()).isEqualTo(expectedBytes);
verify(jobStore).getStream(eq(jobId), eq(fetchableUrl));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.datatransferproject.transfer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.types.common.DownloadableItem;

public class CallableSizeCalculator implements Callable<Map<String, Long>> {

private final UUID jobId;
private final ConnectionProvider connectionProvider;
private final Collection<? extends DownloadableItem> items;

public CallableSizeCalculator(
UUID jobId, ConnectionProvider connectionProvider, Collection<? extends DownloadableItem> items) {
this.jobId = Objects.requireNonNull(jobId);
this.connectionProvider = Objects.requireNonNull(connectionProvider);
this.items = Objects.requireNonNull(items);
}

@Override
public Map<String, Long> call() throws Exception {
Map<String, Long> result = new LinkedHashMap<>();
for (DownloadableItem item : items) {
InputStreamWrapper stream = connectionProvider.getInputStreamForItem(jobId, item);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(shouldn't block PR) consideration for the future: If I'm understanding this right, then I think we could use a counting proxy and then maybe just have a single pass

long size = stream.getBytes();
if (size <= 0) {
size = computeSize(stream);
}

result.put(item.getIdempotentId(), size);
}

return result;
}

// Reads the input stream in full
private Long computeSize(InputStreamWrapper stream) throws IOException {
long size = 0;
try (InputStream inStream = stream.getStream()) {
byte[] buffer = new byte[1024 * 1024]; // 1MB
int chunkBytesRead;
while ((chunkBytesRead = inStream.read(buffer)) != -1) {
size += chunkBytesRead;
}
}

return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.launcher.monitor.events.EventCode;
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.JobStore;
import org.datatransferproject.spi.cloud.types.PortabilityJob;
import org.datatransferproject.spi.cloud.types.PortabilityJob.TransferMode;
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor;
import org.datatransferproject.spi.transfer.provider.ExportResult;
import org.datatransferproject.spi.transfer.provider.Exporter;
Expand All @@ -35,9 +39,13 @@
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason;
import org.datatransferproject.transfer.CallableExporter;
import org.datatransferproject.transfer.CallableImporter;
import org.datatransferproject.transfer.CallableSizeCalculator;
import org.datatransferproject.transfer.JobMetadata;
import org.datatransferproject.types.common.DownloadableItem;
import org.datatransferproject.types.common.ExportInformation;
import org.datatransferproject.types.common.models.DataModel;
import org.datatransferproject.types.common.models.photos.PhotosContainerResource;
import org.datatransferproject.types.common.models.videos.VideosContainerResource;
import org.datatransferproject.types.transfer.auth.AuthData;
import org.datatransferproject.types.transfer.errors.ErrorDetail;
import org.datatransferproject.types.transfer.retry.RetryException;
Expand Down Expand Up @@ -101,7 +109,21 @@ protected ExportResult<?> copyIteration(

DataModel exportedData = exportResult.getExportedData();
if (exportedData != null) {
importIteration(jobId, importAuthData, jobIdPrefix, copyIteration, exportedData);
PortabilityJob job = jobStore.findJob(jobId);
TransferMode transferMode =
job.transferMode() == null ? TransferMode.DATA_TRANSFER : job.transferMode();
switch (transferMode) {
case DATA_TRANSFER:
importIteration(jobId, importAuthData, jobIdPrefix, copyIteration, exportedData);
break;
case SIZE_CALCULATION:
sizeCalculationIteration(jobId, jobIdPrefix, exportedData);
break;
default:
throw new IllegalStateException(
"Job mode " + transferMode.name() + " is not supported by "
+ getClass().getSimpleName());
}
}

return exportResult;
Expand Down Expand Up @@ -205,6 +227,34 @@ private void importIteration(
}
}

private void sizeCalculationIteration(UUID jobId, String jobIdPrefix,
DataModel exportedData) throws CopyException {
Collection<? extends DownloadableItem> items;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(shouldn't block PR) another future consideration is make getItems an interface all models are required to have so we don't have to maintain this coverage. We should definitely at least come back to this code after you merge and drop a TODO here to fill out the coverage (either via interface or by hand for the current known models).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (exportedData instanceof PhotosContainerResource) {
items = ((PhotosContainerResource) exportedData).getPhotos();
} else if (exportedData instanceof VideosContainerResource) {
items = ((VideosContainerResource) exportedData).getVideos();
} else {
return;
}

CallableSizeCalculator callableSizeCalculator =
new CallableSizeCalculator(jobId, new ConnectionProvider(jobStore), items);
try {
RetryingCallable<Map<String, Long>> retryingImporter =
new RetryingCallable<>(
callableSizeCalculator,
retryStrategyLibraryProvider.get(),
Clock.systemUTC(),
monitor,
JobMetadata.getDataType(),
JobMetadata.getImportService());
jobStore.addBytes(jobId, retryingImporter.call());
} catch (RetryException | RuntimeException e) {
throw convertToCopyException(jobIdPrefix, "size estimation", e);
}
}

private CopyException convertToCopyException(String jobIdPrefix, String suffix, Exception e) {
if (e.getClass() == RetryException.class
&& CopyExceptionWithFailureReason.class.isAssignableFrom(e.getCause().getClass())) {
Expand Down
Loading