Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Goal

In the previous article on S3 uploading, we looked at how we can use the generic Blob APIs from jclouds to upload content to S3. In this article we will use the S3 specific asynchronous API from jclouds to upload content and leverage the multipart upload functionality provided by S3.

2. Preparation

2.1. Set up the Custom API

The first part of the upload process is creating the jclouds API – this is a custom API for Amazon S3:

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. Determining the Number of Parts for the Content

Amazon S3 has a 5 MB limit for each part to be uploaded. As such, the first thing we need to do is determine the right number of parts that we can split our content into so that we don’t have parts below this 5 MB limit:

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. Breaking the Content into Parts

Were going to break the byte array into a set number of parts:

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

We’re going to test the logic of breaking the byte array into parts – we’re going to generate some bytes, split the byte array, recompose it back together using Guava and verify that we get back the original:

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length, 
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

To generate the data, we simply use the support from Random:

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2.4. Creating the Payloads

Now that we have determined the correct number of parts for our content and we managed to break the content into parts, we need to generate the Payload objects for the jclouds API:

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. Upload

The upload process is a flexible multi-step process – this means:

  • the upload can be started before having all the data – data can be uploaded as it’s coming in
  • data is uploaded in chunks – if one of these operations fails, it can simply be retrieved
  • chunks can be uploaded in parallel – this can greatly increase the upload speed, especially in the case of large files

3.1. Initiating the Upload Operation

The first step in the Upload operation is to initiate the process. This request to S3 must contain the standard HTTP headers – the ContentMD5 header in particular needs to be computed. Were going to use the Guava hash function support here:

Hashing.md5().hashBytes(byteArray).asBytes();

This is the md5 hash of the entire byte array, not of the parts yet.

To initiate the upload, and for all further interactions with S3, we’re going to use the AWSS3AsyncClient – the asynchronous API we created earlier:

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

The key is the handle assigned to the object – this needs to be a unique identifier specified by the client.

Also notice that, even though we’re using the async version of the API, we’re blocking for the result of this operation – this is because we will need the result of the initialize to be able to move forward.

The result of the operation is an upload id returned by S3 – this will identify the upload throughout it’s lifecycle and will be present in all subsequent upload operations.

3.2. Uploading the Parts

The next step is uploading the parts. Our goal here is to send these requests in parallel, as the upload parts operation represent the bulk of the upload process:

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

The part numbers need to be continuous but the order in which the requests are send is not relevant.

After all of the upload part requests have been submitted, we need to wait for their responses so that we can collect the individual ETag value of each part:

Function<ListenableFuture<String>, String> getEtagFromOp = 
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

If, for whatever reason, one of the upload part operations fails, the operation can be retried until it succeeds. The logic above does not contain the retry mechanism, but building it in should be straightforward enough.

3.3. Completing the Upload Operation

The final step of the upload process is completing the multipart operation. The S3 API requires the responses from the previous parts upload as a Map, which we can now easily create from the list of ETags that we obtained above:

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

And finally, send the complete request:

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

This will return final ETag of the finished object and will complete the entire upload process.

4. Conclusion

In this article we built a multipart enabled, fully parallel upload operation to S3, using the custom S3 jclouds API. This operation is ready to be used as is, but it can be improved in a few ways.

First, retry logic should be added around the upload operations to better deal with failures.

Next, for really large files, even though the mechanism is sending all upload multipart requests in parallel, a throttling mechanism should still limit the number of parallel requests being sent. This is both to avoid bandwidth becoming a bottleneck as well as to make sure Amazon itself doesn’t flag the upload process as exceeding an allowed limit of requests per second – the Guava RateLimiter can potentially be very well suited for this.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Microservices (eBook) (cat=Cloud/Spring Cloud)
Comments are closed on this article!