Skip to content

Commit

Permalink
Add photos batching (#910)
Browse files Browse the repository at this point in the history
* Add photos batching

Summary: This uses the same technique we used for videos to batch photos

Test Plan: Ran a transfer on my machine using the docker container

* Fix comment formatting nit
  • Loading branch information
wmorland authored Jul 22, 2020
1 parent 345776d commit 1f5577a
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ public Status getStatus() {
public GoogleMediaItem getMediaItem() {
return mediaItem;
}

public String getUploadToken() {
return uploadToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,35 @@
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.json.JsonFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
import com.google.rpc.Code;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.datatransfer.google.common.GoogleCredentialFactory;
import org.datatransferproject.datatransfer.google.mediaModels.BatchMediaItemResponse;
import org.datatransferproject.datatransfer.google.mediaModels.GoogleAlbum;
import org.datatransferproject.datatransfer.google.mediaModels.GoogleMediaItem;
import org.datatransferproject.datatransfer.google.mediaModels.NewMediaItem;
import org.datatransferproject.datatransfer.google.mediaModels.NewMediaItemResult;
import org.datatransferproject.datatransfer.google.mediaModels.NewMediaItemUpload;
import org.datatransferproject.datatransfer.google.mediaModels.Status;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor;
import org.datatransferproject.spi.transfer.provider.ImportResult;
import org.datatransferproject.spi.transfer.provider.Importer;
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.spi.transfer.types.PermissionDeniedException;
Expand Down Expand Up @@ -123,15 +130,35 @@ public ImportResult importItem(

long bytes = 0L;
// Uploads photos
if (data.getPhotos() != null && data.getPhotos().size() > 0) {
for (PhotoModel photo : data.getPhotos()) {
final PhotoResult photoResult =
idempotentImportExecutor.executeAndSwallowIOExceptions(
photo.getAlbumId() + "-" + photo.getDataId(),
photo.getTitle(),
() -> importSinglePhoto(jobId, authData, photo, idempotentImportExecutor));
if (photoResult != null) {
bytes += photoResult.getBytes();
Collection<PhotoModel> photos = data.getPhotos();
if (photos != null && photos.size() > 0) {
Map<String, List<PhotoModel>> photosByAlbum =
photos.stream()
.filter(photo -> !idempotentImportExecutor.isKeyCached(getIdempotentId(photo)))
.collect(Collectors.groupingBy(PhotoModel::getAlbumId));
for (Entry<String, List<PhotoModel>> albumEntry : photosByAlbum.entrySet()) {
String originalAlbumId = albumEntry.getKey();
String googleAlbumId;
if (Strings.isNullOrEmpty(originalAlbumId)) {
// This is ok, since NewMediaItemUpload will ignore all null values and it's possible to
// upload a NewMediaItem without a corresponding album id.
googleAlbumId = null;
} else {
// Note this will throw if creating the album failed, which is what we want
// because that will also mark this photo as being failed.
googleAlbumId = idempotentImportExecutor.getCachedValue(originalAlbumId);
}

// We partition into groups of 49 as 50 is the maximum number of items that can be created
// in one call. (We use 49 to avoid potential off by one errors)
// https://developers.google.com/photos/library/guides/upload-media#creating-media-item
UnmodifiableIterator<List<PhotoModel>> batches =
Iterators.partition(albumEntry.getValue().iterator(), 49);
while (batches.hasNext()) {
long batchBytes =
importPhotoBatch(
jobId, authData, batches.next(), idempotentImportExecutor, googleAlbumId);
bytes += batchBytes;
}
}
}
Expand All @@ -158,67 +185,104 @@ String importSingleAlbum(UUID jobId, TokensAndUrlAuthData authData, PhotoAlbum i
return responseAlbum.getId();
}

@VisibleForTesting
PhotoResult importSinglePhoto(
long importPhotoBatch(
UUID jobId,
TokensAndUrlAuthData authData,
PhotoModel inputPhoto,
IdempotentImportExecutor idempotentImportExecutor)
throws IOException, CopyExceptionWithFailureReason {
List<PhotoModel> photos,
IdempotentImportExecutor executor,
String albumId)
throws Exception {
final ArrayList<NewMediaItem> mediaItems = new ArrayList<>();
final HashMap<String, PhotoModel> uploadTokenToDataId = new HashMap<>();
final HashMap<String, Long> uploadTokenToLength = new HashMap<>();

/*
TODO: resumable uploads https://developers.google.com/photos/library/guides/resumable-uploads
Resumable uploads would allow the upload of larger media that don't fit in memory. To do this,
however, seems to require knowledge of the total file size.
*/
// Upload photo
InputStream inputStream;
Long bytes;
if (inputPhoto.isInTempStore()) {
final InputStreamWrapper streamWrapper =
jobStore.getStream(jobId, inputPhoto.getFetchableUrl());
bytes = streamWrapper.getBytes();
inputStream = streamWrapper.getStream();
} else {
HttpURLConnection conn = imageStreamProvider.getConnection(inputPhoto.getFetchableUrl());
final long contentLengthLong = conn.getContentLengthLong();
bytes = contentLengthLong != -1 ? contentLengthLong : 0;
inputStream = conn.getInputStream();
}

String uploadToken =
getOrCreatePhotosInterface(jobId, authData).uploadPhotoContent(inputStream);

String description = getPhotoDescription(inputPhoto);
NewMediaItem newMediaItem = new NewMediaItem(description, uploadToken);
// Upload photos
for (PhotoModel photo : photos) {
try {
InputStream inputStream;
long bytes;
if (photo.isInTempStore()) {
final InputStreamWrapper streamWrapper =
jobStore.getStream(jobId, photo.getFetchableUrl());
bytes = streamWrapper.getBytes();
inputStream = streamWrapper.getStream();
} else {
HttpURLConnection conn = imageStreamProvider.getConnection(photo.getFetchableUrl());
final long contentLengthLong = conn.getContentLengthLong();
bytes = contentLengthLong != -1 ? contentLengthLong : 0;
inputStream = conn.getInputStream();
}

String albumId;
if (Strings.isNullOrEmpty(inputPhoto.getAlbumId())) {
// This is ok, since NewMediaItemUpload will ignore all null values and it's possible to
// upload a NewMediaItem without a corresponding album id.
albumId = null;
} else {
// Note this will throw if creating the album failed, which is what we want
// because that will also mark this photo as being failed.
albumId = idempotentImportExecutor.getCachedValue(inputPhoto.getAlbumId());
String uploadToken =
getOrCreatePhotosInterface(jobId, authData).uploadPhotoContent(inputStream);
mediaItems.add(new NewMediaItem(getPhotoDescription(photo), uploadToken));
uploadTokenToDataId.put(uploadToken, photo);
uploadTokenToLength.put(uploadToken, bytes);
} catch (IOException e) {
executor.executeAndSwallowIOExceptions(
getIdempotentId(photo),
photo.getTitle(),
() -> {
throw e;
});
}
}
if (mediaItems.isEmpty()) {
// Either we were not passed in any videos or we failed upload on all of them.
return 0L;
}

NewMediaItemUpload uploadItem =
new NewMediaItemUpload(albumId, Collections.singletonList(newMediaItem));
long totalBytes = 0L;
NewMediaItemUpload uploadItem = new NewMediaItemUpload(albumId, mediaItems);
try {
BatchMediaItemResponse photoCreationResponse =
getOrCreatePhotosInterface(jobId, authData).createPhoto(uploadItem);
if (photoCreationResponse == null) {
throw new IOException("The photo was not created");
}
getOrCreatePhotosInterface(jobId, authData).createPhotos(uploadItem);
Preconditions.checkNotNull(photoCreationResponse);
NewMediaItemResult[] mediaItemResults = photoCreationResponse.getResults();
if (mediaItemResults == null || mediaItemResults.length != 1) {
throw new IOException("No media item results returned");
Preconditions.checkNotNull(mediaItemResults);
for (NewMediaItemResult mediaItem : mediaItemResults) {
String uploadToken = mediaItem.getUploadToken();
Status status = mediaItem.getStatus();

PhotoModel photo = uploadTokenToDataId.get(uploadToken);
Preconditions.checkNotNull(photo);
if (status.getCode() == Code.OK_VALUE) {
Long bytes = uploadTokenToLength.get(uploadToken);
Preconditions.checkNotNull(bytes);
executor.executeAndSwallowIOExceptions(
getIdempotentId(photo),
photo.getTitle(),
() -> new PhotoResult(mediaItem.getMediaItem().getId(), bytes));
totalBytes += bytes;
} else {
executor.executeAndSwallowIOExceptions(
getIdempotentId(photo),
photo.getTitle(),
() -> {
throw new IOException(
String.format(
"Photo could not be created. Code: %d Message: %s",
status.getCode(), status.getMessage()));
});
}
uploadTokenToDataId.remove(uploadToken);
}
GoogleMediaItem mediaItem = mediaItemResults[0].getMediaItem();
if (mediaItem == null) {
throw new IOException("No media item returned");
if (!uploadTokenToDataId.isEmpty()) {
for (PhotoModel photo : uploadTokenToDataId.values()) {
executor.executeAndSwallowIOExceptions(
getIdempotentId(photo),
photo.getTitle(),
() -> {
throw new IOException("Photo was missing from results list.");
});
}
}
return new PhotoResult(mediaItem.getId(), bytes);

} catch (IOException e) {
if (e.getMessage() != null
&& e.getMessage().contains("The remaining storage in the user's account is not enough")) {
Expand All @@ -227,6 +291,12 @@ PhotoResult importSinglePhoto(
throw e;
}
}

return totalBytes;
}

String getIdempotentId(PhotoModel photo) {
return photo.getAlbumId() + "-" + photo.getDataId();
}

private String getPhotoDescription(PhotoModel inputPhoto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ String uploadPhotoContent(InputStream inputStream)
BASE_URL + "uploads/", Optional.of(PHOTO_UPLOAD_PARAMS), httpContent, String.class);
}

BatchMediaItemResponse createPhoto(NewMediaItemUpload newMediaItemUpload)
BatchMediaItemResponse createPhotos(NewMediaItemUpload newMediaItemUpload)
throws IOException, InvalidTokenException, PermissionDeniedException {
HashMap<String, Object> map = createJsonMap(newMediaItemUpload);
HttpContent httpContent = new JsonHttpContent(new JacksonFactory(), map);
Expand Down
Loading

0 comments on commit 1f5577a

Please sign in to comment.