Skip to content

Commit

Permalink
Update the photo and video importers of Koofr and Backblaze to log by…
Browse files Browse the repository at this point in the history
…tes (#1195)

* Update the photo and video importers of Koofr and Backblaze to log bytes transferred

Summary: Updated the photo and video importers of Koofr and Backblaze to log the number of bytes transferred

* Updated

* Fix formatting
  • Loading branch information
saratchebrolu authored Dec 14, 2022
1 parent bde428a commit 05b39a4
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;

import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.datatransfer.backblaze.common.BackblazeDataTransferClient;
import org.datatransferproject.datatransfer.backblaze.common.BackblazeDataTransferClientFactory;
Expand Down Expand Up @@ -80,14 +82,23 @@ public ImportResult importItem(
}
}

final LongAdder totalImportedFilesSizes = new LongAdder();
if (data.getPhotos() != null && data.getPhotos().size() > 0) {
for (PhotoModel photo : data.getPhotos()) {
idempotentExecutor.importAndSwallowIOExceptions(
photo, p -> importSinglePhoto(idempotentExecutor, b2Client, jobId, p));
photo,
p -> {
ItemImportResult<String> fileImportResult =
importSinglePhoto(idempotentExecutor, b2Client, jobId, p);
if (fileImportResult.hasBytes()) {
totalImportedFilesSizes.add(fileImportResult.getBytes());
}
return fileImportResult;
});
}
}

return ImportResult.OK;
return ImportResult.OK.copyWithBytes(totalImportedFilesSizes.longValue());
}

private ItemImportResult<String> importSinglePhoto(
Expand Down Expand Up @@ -115,8 +126,11 @@ private ItemImportResult<String> importSinglePhoto(
} catch (Exception e) {
// Swallow the exception caused by Remove data so that existing flows continue
monitor.info(
() -> format("Exception swallowed while removing data for jobId %s, localPath %s",
jobId, photo.getFetchableUrl()), e);
() ->
format(
"Exception swallowed while removing data for jobId %s, localPath %s",
jobId, photo.getFetchableUrl()),
e);
}

return ItemImportResult.success(response, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;

import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.datatransfer.backblaze.common.BackblazeDataTransferClient;
import org.datatransferproject.datatransfer.backblaze.common.BackblazeDataTransferClientFactory;
Expand Down Expand Up @@ -69,29 +71,32 @@ public ImportResult importItem(

BackblazeDataTransferClient b2Client = b2ClientFactory.getOrCreateB2Client(jobId, authData);

final LongAdder totalImportedFilesSizes = new LongAdder();
if (data.getVideos() != null && data.getVideos().size() > 0) {
for (VideoModel video : data.getVideos()) {
idempotentExecutor.importAndSwallowIOExceptions(
video,
v -> importSingleVideo(jobId, b2Client, v));
v -> {
ItemImportResult<String> fileImportResult = importSingleVideo(jobId, b2Client, v);
if (fileImportResult.hasBytes()) {
totalImportedFilesSizes.add(fileImportResult.getBytes());
}
return fileImportResult;
});
}
}

return ImportResult.OK;
return ImportResult.OK.copyWithBytes(totalImportedFilesSizes.longValue());
}

private ItemImportResult<String> importSingleVideo(
UUID jobId,
BackblazeDataTransferClient b2Client,
VideoModel video)
throws IOException {
UUID jobId, BackblazeDataTransferClient b2Client, VideoModel video) throws IOException {
try (InputStream videoFileStream =
connectionProvider.getInputStreamForItem(jobId, video).getStream()) {
File file = jobStore
.getTempFileFromInputStream(videoFileStream, video.getDataId(), ".mp4");
String res = b2Client.uploadFile(
String.format("%s/%s.mp4", VIDEO_TRANSFER_MAIN_FOLDER, video.getDataId()),
file);
File file = jobStore.getTempFileFromInputStream(videoFileStream, video.getDataId(), ".mp4");
String res =
b2Client.uploadFile(
String.format("%s/%s.mp4", VIDEO_TRANSFER_MAIN_FOLDER, video.getDataId()), file);
return ItemImportResult.success(res, file.length());
} catch (FileNotFoundException e) {
monitor.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@
import org.mockito.ArgumentCaptor;

public class BackblazePhotosImporterTest {
Monitor monitor;
TemporaryPerJobDataStore dataStore;
ConnectionProvider streamProvider;
BackblazeDataTransferClientFactory clientFactory;
IdempotentImportExecutor executor;
TokenSecretAuthData authData;
BackblazeDataTransferClient client;
Monitor monitor;
TemporaryPerJobDataStore dataStore;
ConnectionProvider streamProvider;
BackblazeDataTransferClientFactory clientFactory;
IdempotentImportExecutor executor;
TokenSecretAuthData authData;
BackblazeDataTransferClient client;

@BeforeEach
public void setUp() {
Expand All @@ -68,8 +68,7 @@ public void setUp() {
client = mock(BackblazeDataTransferClient.class);
}

@TempDir
public Path folder;
@TempDir public Path folder;

@Test
public void testNullData() throws Exception {
Expand All @@ -88,7 +87,7 @@ public void testNullPhotosAndAlbums() throws Exception {
BackblazePhotosImporter sut =
new BackblazePhotosImporter(monitor, dataStore, streamProvider, clientFactory);
ImportResult result = sut.importItem(UUID.randomUUID(), executor, authData, data);
assertEquals(ImportResult.OK, result);
assertEquals(ImportResult.ResultType.OK, result.getType());
}

@Test
Expand All @@ -100,7 +99,7 @@ public void testEmptyPhotosAndAlbums() throws Exception {
BackblazePhotosImporter sut =
new BackblazePhotosImporter(monitor, dataStore, streamProvider, clientFactory);
ImportResult result = sut.importItem(UUID.randomUUID(), executor, authData, data);
assertEquals(ImportResult.OK, result);
assertEquals(ImportResult.ResultType.OK, result.getType());
}

@Test
Expand All @@ -113,8 +112,8 @@ public void testImportPhoto() throws Exception {
String response = "response";
UUID jobId = UUID.randomUUID();
PhotoModel photoModel = new PhotoModel(title, photoUrl, "", "", dataId, albumId, false);
PhotosContainerResource data = new PhotosContainerResource(Collections.emptyList(),
Collections.singletonList(photoModel));
PhotosContainerResource data =
new PhotosContainerResource(Collections.emptyList(), Collections.singletonList(photoModel));

when(executor.getCachedValue(albumId)).thenReturn(albumName);

Expand All @@ -131,8 +130,8 @@ public void testImportPhoto() throws Exception {
new BackblazePhotosImporter(monitor, dataStore, streamProvider, clientFactory);
sut.importItem(jobId, executor, authData, data);

ArgumentCaptor<ImportFunction<PhotoModel, String>> importCapture = ArgumentCaptor.forClass(
ImportFunction.class);
ArgumentCaptor<ImportFunction<PhotoModel, String>> importCapture =
ArgumentCaptor.forClass(ImportFunction.class);
verify(executor, times(1))
.importAndSwallowIOExceptions(eq(photoModel), importCapture.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public class BackblazeVideosImporterTest {
IdempotentImportExecutor executor;
TokenSecretAuthData authData;
BackblazeDataTransferClient client;
@TempDir
public Path folder;
@TempDir public Path folder;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -84,7 +83,7 @@ public void testNullVideos() throws Exception {
BackblazeVideosImporter sut =
new BackblazeVideosImporter(monitor, dataStore, streamProvider, clientFactory);
ImportResult result = sut.importItem(UUID.randomUUID(), executor, authData, data);
assertEquals(ImportResult.OK, result);
assertEquals(ImportResult.ResultType.OK, result.getType());
}

@Test
Expand All @@ -95,7 +94,7 @@ public void testEmptyVideos() throws Exception {
BackblazeVideosImporter sut =
new BackblazeVideosImporter(monitor, dataStore, streamProvider, clientFactory);
ImportResult result = sut.importItem(UUID.randomUUID(), executor, authData, data);
assertEquals(ImportResult.OK, result);
assertEquals(ImportResult.ResultType.OK, result.getType());
}

@Test
Expand Down Expand Up @@ -130,8 +129,8 @@ public void testImportVideo() throws Exception {
new BackblazeVideosImporter(monitor, dataStore, streamProvider, clientFactory);
sut.importItem(jobId, executor, authData, data);

ArgumentCaptor<ImportFunction<VideoModel, String>> importCapture = ArgumentCaptor.forClass(
ImportFunction.class);
ArgumentCaptor<ImportFunction<VideoModel, String>> importCapture =
ArgumentCaptor.forClass(ImportFunction.class);
verify(executor, times(1))
.importAndSwallowIOExceptions(eq(videoObject), importCapture.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
*/
package org.datatransferproject.transfer.koofr.photos;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;

import org.apache.commons.imaging.Imaging;
import org.apache.commons.imaging.common.ImageMetadata;
import org.apache.commons.imaging.formats.jpeg.JpegImageMetadata;
Expand All @@ -34,7 +34,9 @@
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.JobStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor;
import org.datatransferproject.spi.transfer.idempotentexecutor.ItemImportResult;
import org.datatransferproject.spi.transfer.provider.ImportResult;
import org.datatransferproject.spi.transfer.provider.Importer;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
Expand Down Expand Up @@ -103,13 +105,20 @@ public ImportResult importItem(
album.getId(), album.getName(), () -> createAlbumFolder(album, koofrClient));
}

final LongAdder totalImportedFilesSizes = new LongAdder();
for (PhotoModel photoModel : resource.getPhotos()) {
idempotentImportExecutor.executeAndSwallowIOExceptions(
photoModel.getIdempotentId(),
photoModel.getTitle(),
() -> importSinglePhoto(photoModel, jobId, idempotentImportExecutor, koofrClient));
idempotentImportExecutor.importAndSwallowIOExceptions(
photoModel,
photo -> {
ItemImportResult<String> fileImportResult =
importSinglePhoto(photoModel, jobId, idempotentImportExecutor, koofrClient);
if (fileImportResult != null && fileImportResult.hasBytes()) {
totalImportedFilesSizes.add(fileImportResult.getBytes());
}
return fileImportResult;
});
}
return ImportResult.OK;
return ImportResult.OK.copyWithBytes(totalImportedFilesSizes.longValue());
}

private String createAlbumFolder(PhotoAlbum album, KoofrClient koofrClient)
Expand All @@ -132,61 +141,89 @@ private String createAlbumFolder(PhotoAlbum album, KoofrClient koofrClient)
return fullPath;
}

private String importSinglePhoto(
private ItemImportResult<String> importSinglePhoto(
PhotoModel photo,
UUID jobId,
IdempotentImportExecutor idempotentImportExecutor,
KoofrClient koofrClient)
throws IOException, InvalidTokenException, DestinationMemoryFullException {
monitor.debug(() -> String.format("Import single photo %s", photo.getTitle()));
Long size = null;
try {
InputStreamWrapper inputStreamWrapper =
connectionProvider.getInputStreamForItem(jobId, photo);
ItemImportResult<String> response;

try (InputStream inputStream =
connectionProvider.getInputStreamForItem(jobId, photo).getStream()) {
final byte[] bytes = IOUtils.toByteArray(inputStream);

Date dateCreated = getDateCreated(photo, bytes);

String title = buildPhotoTitle(jobId, photo.getTitle(), dateCreated);
String description = KoofrClient.trimDescription(photo.getDescription());

String parentPath = idempotentImportExecutor.getCachedValue(photo.getAlbumId());
String fullPath = parentPath + "/" + title;
try (InputStream inputStream = inputStreamWrapper.getStream()) {
final byte[] bytes = IOUtils.toByteArray(inputStream);

if (koofrClient.fileExists(fullPath)) {
monitor.debug(() -> String.format("Photo already exists %s", photo.getTitle()));
Date dateCreated = getDateCreated(photo, bytes);

return fullPath;
}
String title = buildPhotoTitle(jobId, photo.getTitle(), dateCreated);
String description = KoofrClient.trimDescription(photo.getDescription());

final ByteArrayInputStream inMemoryInputStream = new ByteArrayInputStream(bytes);
String parentPath = idempotentImportExecutor.getCachedValue(photo.getAlbumId());
String fullPath = parentPath + "/" + title;

String response;
if (koofrClient.fileExists(fullPath)) {
monitor.debug(() -> String.format("Photo already exists %s", photo.getTitle()));

try {
response = koofrClient.uploadFile(
parentPath, title, inMemoryInputStream, photo.getMediaType(), dateCreated, description);
} catch (KoofrClientIOException e) {
if (e.getCode() == 404) {
monitor.info(() -> String.format("Can't find album during importSingleItem for id: %s", photo.getDataId()), e);
response = "skipped-"+photo.getDataId();
return ItemImportResult.success(fullPath);
}
else {
throw e;

final ByteArrayInputStream inMemoryInputStream = new ByteArrayInputStream(bytes);

try {
long inputStreamBytes = inputStreamWrapper.getBytes();
String stringResult =
koofrClient.uploadFile(
parentPath,
title,
inMemoryInputStream,
photo.getMediaType(),
dateCreated,
description);
if (stringResult != null && !stringResult.isEmpty()) {
response = ItemImportResult.success(stringResult, inputStreamBytes);
} else {
response =
ItemImportResult.success(
String.format(SKIPPED_FILE_RESULT_FORMAT, photo.getDataId()));
}
size = inputStreamBytes;
} catch (KoofrClientIOException exception) {
if (exception.getCode() == 404) {
monitor.info(
() ->
String.format(
"Can't find album during importSingleItem for id: %s", photo.getDataId()),
exception);
response =
ItemImportResult.success(
String.format(SKIPPED_FILE_RESULT_FORMAT, photo.getDataId()));
} else {
return ItemImportResult.error(exception, size);
}
}
}

try {
if (photo.isInTempStore()) {
jobStore.removeData(jobId, photo.getFetchableUrl());
try {
if (photo.isInTempStore()) {
jobStore.removeData(jobId, photo.getFetchableUrl());
}
} catch (Exception e) {
// Swallow the exception caused by Remove data so that existing flows continue
monitor.info(
() ->
format(
"Exception swallowed while removing data for jobId %s, localPath %s",
jobId, photo.getFetchableUrl()),
e);
}
} catch (Exception e) {
// Swallow the exception caused by Remove data so that existing flows continue
monitor.info(
() -> format("Exception swallowed while removing data for jobId %s, localPath %s",
jobId, photo.getFetchableUrl()), e);
}

return response;
} catch (KoofrClientIOException exception) {
return ItemImportResult.error(exception, size);
}
}

Expand Down
Loading

0 comments on commit 05b39a4

Please sign in to comment.