-
Notifications
You must be signed in to change notification settings - Fork 482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Size computation mode #1093
Size computation mode #1093
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package org.datatransferproject.spi.cloud.connection; | ||
|
||
import java.io.IOException; | ||
import java.net.HttpURLConnection; | ||
import java.net.URL; | ||
import java.util.UUID; | ||
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; | ||
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; | ||
import org.datatransferproject.types.common.DownloadableItem; | ||
|
||
public class ConnectionProvider { | ||
|
||
private final TemporaryPerJobDataStore jobStore; | ||
|
||
public ConnectionProvider(TemporaryPerJobDataStore jobStore) { | ||
this.jobStore = jobStore; | ||
} | ||
|
||
public InputStreamWrapper getInputStreamForItem(UUID jobId, DownloadableItem item) | ||
throws IOException { | ||
|
||
String fetchableUrl = item.getFetchableUrl(); | ||
if (item.isInTempStore()) { | ||
return jobStore.getStream(jobId, fetchableUrl); | ||
} | ||
|
||
HttpURLConnection conn = getConnection(fetchableUrl); | ||
return new InputStreamWrapper( | ||
conn.getInputStream(), Math.max(conn.getContentLengthLong(), 0)); | ||
} | ||
|
||
public static HttpURLConnection getConnection(String urlStr) throws IOException { | ||
URL url = new URL(urlStr); | ||
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); | ||
conn.connect(); | ||
return conn; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package org.datatransferproject.spi.cloud.connection; | ||
|
||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyString; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import com.google.common.truth.Truth; | ||
import java.util.UUID; | ||
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; | ||
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; | ||
import org.datatransferproject.types.common.DownloadableItem; | ||
import org.datatransferproject.types.common.models.photos.PhotoModel; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
public class ConnectionProviderTest { | ||
|
||
private TemporaryPerJobDataStore jobStore; | ||
private ConnectionProvider connectionProvider; | ||
|
||
@Before | ||
public void setUp() throws Exception { | ||
jobStore = mock(TemporaryPerJobDataStore.class); | ||
connectionProvider = new ConnectionProvider(jobStore); | ||
} | ||
|
||
@Test | ||
public void getInputStreamFromTempStore() throws Exception { | ||
long expectedBytes = 323; | ||
when(jobStore.getStream(any(), anyString())).thenReturn( | ||
new InputStreamWrapper(null, expectedBytes)); | ||
boolean inTempStore = true; | ||
String fetchableUrl = "https://example.com"; | ||
DownloadableItem item = new PhotoModel("title", fetchableUrl, "description", "jpeg", | ||
"123", "album", inTempStore); | ||
UUID jobId = UUID.randomUUID(); | ||
InputStreamWrapper streamWrapper = connectionProvider.getInputStreamForItem( | ||
jobId, item); | ||
|
||
Truth.assertThat(streamWrapper.getBytes()).isEqualTo(expectedBytes); | ||
verify(jobStore).getStream(eq(jobId), eq(fetchableUrl)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package org.datatransferproject.transfer; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.Collection; | ||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.UUID; | ||
import java.util.concurrent.Callable; | ||
import org.datatransferproject.spi.cloud.connection.ConnectionProvider; | ||
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; | ||
import org.datatransferproject.types.common.DownloadableItem; | ||
|
||
public class CallableSizeCalculator implements Callable<Map<String, Long>> { | ||
|
||
private final UUID jobId; | ||
private final ConnectionProvider connectionProvider; | ||
private final Collection<? extends DownloadableItem> items; | ||
|
||
public CallableSizeCalculator( | ||
UUID jobId, ConnectionProvider connectionProvider, Collection<? extends DownloadableItem> items) { | ||
this.jobId = Objects.requireNonNull(jobId); | ||
this.connectionProvider = Objects.requireNonNull(connectionProvider); | ||
this.items = Objects.requireNonNull(items); | ||
} | ||
|
||
@Override | ||
public Map<String, Long> call() throws Exception { | ||
Map<String, Long> result = new LinkedHashMap<>(); | ||
for (DownloadableItem item : items) { | ||
InputStreamWrapper stream = connectionProvider.getInputStreamForItem(jobId, item); | ||
long size = stream.getBytes(); | ||
if (size <= 0) { | ||
size = computeSize(stream); | ||
} | ||
|
||
result.put(item.getIdempotentId(), size); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
// Reads the input stream in full | ||
private Long computeSize(InputStreamWrapper stream) throws IOException { | ||
long size = 0; | ||
try (InputStream inStream = stream.getStream()) { | ||
byte[] buffer = new byte[1024 * 1024]; // 1MB | ||
int chunkBytesRead; | ||
while ((chunkBytesRead = inStream.read(buffer)) != -1) { | ||
size += chunkBytesRead; | ||
} | ||
} | ||
|
||
return size; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,12 +20,16 @@ | |
import java.io.IOException; | ||
import java.time.Clock; | ||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder; | ||
import org.datatransferproject.api.launcher.Monitor; | ||
import org.datatransferproject.launcher.monitor.events.EventCode; | ||
import org.datatransferproject.spi.cloud.connection.ConnectionProvider; | ||
import org.datatransferproject.spi.cloud.storage.JobStore; | ||
import org.datatransferproject.spi.cloud.types.PortabilityJob; | ||
import org.datatransferproject.spi.cloud.types.PortabilityJob.TransferMode; | ||
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; | ||
import org.datatransferproject.spi.transfer.provider.ExportResult; | ||
import org.datatransferproject.spi.transfer.provider.Exporter; | ||
|
@@ -35,9 +39,13 @@ | |
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason; | ||
import org.datatransferproject.transfer.CallableExporter; | ||
import org.datatransferproject.transfer.CallableImporter; | ||
import org.datatransferproject.transfer.CallableSizeCalculator; | ||
import org.datatransferproject.transfer.JobMetadata; | ||
import org.datatransferproject.types.common.DownloadableItem; | ||
import org.datatransferproject.types.common.ExportInformation; | ||
import org.datatransferproject.types.common.models.DataModel; | ||
import org.datatransferproject.types.common.models.photos.PhotosContainerResource; | ||
import org.datatransferproject.types.common.models.videos.VideosContainerResource; | ||
import org.datatransferproject.types.transfer.auth.AuthData; | ||
import org.datatransferproject.types.transfer.errors.ErrorDetail; | ||
import org.datatransferproject.types.transfer.retry.RetryException; | ||
|
@@ -101,7 +109,21 @@ protected ExportResult<?> copyIteration( | |
|
||
DataModel exportedData = exportResult.getExportedData(); | ||
if (exportedData != null) { | ||
importIteration(jobId, importAuthData, jobIdPrefix, copyIteration, exportedData); | ||
PortabilityJob job = jobStore.findJob(jobId); | ||
TransferMode transferMode = | ||
job.transferMode() == null ? TransferMode.DATA_TRANSFER : job.transferMode(); | ||
switch (transferMode) { | ||
case DATA_TRANSFER: | ||
importIteration(jobId, importAuthData, jobIdPrefix, copyIteration, exportedData); | ||
break; | ||
case SIZE_CALCULATION: | ||
sizeCalculationIteration(jobId, jobIdPrefix, exportedData); | ||
break; | ||
default: | ||
throw new IllegalStateException( | ||
"Job mode " + transferMode.name() + " is not supported by " | ||
+ getClass().getSimpleName()); | ||
} | ||
} | ||
|
||
return exportResult; | ||
|
@@ -205,6 +227,34 @@ private void importIteration( | |
} | ||
} | ||
|
||
private void sizeCalculationIteration(UUID jobId, String jobIdPrefix, | ||
DataModel exportedData) throws CopyException { | ||
Collection<? extends DownloadableItem> items; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (shouldn't block PR) another future consideration is make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if (exportedData instanceof PhotosContainerResource) { | ||
items = ((PhotosContainerResource) exportedData).getPhotos(); | ||
} else if (exportedData instanceof VideosContainerResource) { | ||
items = ((VideosContainerResource) exportedData).getVideos(); | ||
} else { | ||
return; | ||
} | ||
|
||
CallableSizeCalculator callableSizeCalculator = | ||
new CallableSizeCalculator(jobId, new ConnectionProvider(jobStore), items); | ||
try { | ||
RetryingCallable<Map<String, Long>> retryingImporter = | ||
new RetryingCallable<>( | ||
callableSizeCalculator, | ||
retryStrategyLibraryProvider.get(), | ||
Clock.systemUTC(), | ||
monitor, | ||
JobMetadata.getDataType(), | ||
JobMetadata.getImportService()); | ||
jobStore.addBytes(jobId, retryingImporter.call()); | ||
} catch (RetryException | RuntimeException e) { | ||
throw convertToCopyException(jobIdPrefix, "size estimation", e); | ||
} | ||
} | ||
|
||
private CopyException convertToCopyException(String jobIdPrefix, String suffix, Exception e) { | ||
if (e.getClass() == RetryException.class | ||
&& CopyExceptionWithFailureReason.class.isAssignableFrom(e.getCause().getClass())) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(shouldn't block PR) consideration for the future: If I'm understanding this right, then I think we could use a counting proxy and then maybe just have a single pass