EVOLUTION-MANAGER
Edit File: ParallelTransfer.php
<?php /** * Copyright 2010-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ namespace DuplicatorPro\Aws\S3\Model\MultipartUpload; defined("ABSPATH") or die(""); use DuplicatorPro\Aws\Common\Exception\RuntimeException; use DuplicatorPro\Aws\Common\Enum\DateFormat; use DuplicatorPro\Aws\Common\Enum\UaString as Ua; use DuplicatorPro\Guzzle\Http\EntityBody; use DuplicatorPro\Guzzle\Http\ReadLimitEntityBody; /** * Transfers multipart upload parts in parallel */ class ParallelTransfer extends AbstractTransfer { /** * {@inheritdoc} */ protected function init() { parent::init(); if (!$this->source->isLocal() || $this->source->getWrapper() != 'plainfile') { throw new RuntimeException('The source data must be a local file stream when uploading in parallel.'); } if (empty($this->options['concurrency'])) { throw new RuntimeException('The `concurrency` option must be specified when instantiating.'); } } /** * {@inheritdoc} */ protected function transfer() { $totalParts = (int) ceil($this->source->getContentLength() / $this->partSize); $concurrency = min($totalParts, $this->options['concurrency']); $partsToSend = $this->prepareParts($concurrency); $eventData = $this->getEventData(); while (!$this->stopped && count($this->state) < $totalParts) { $currentTotal = count($this->state); $commands = array(); for ($i = 0; $i < $concurrency && $i + $currentTotal < $totalParts; $i++) { // Move the offset to the correct position $partsToSend[$i]->setOffset(($currentTotal + $i) * $this->partSize); // @codeCoverageIgnoreStart if ($partsToSend[$i]->getContentLength() == 0) { break; } // @codeCoverageIgnoreEnd $params = $this->state->getUploadId()->toParams(); $eventData['command'] = $this->client->getCommand('UploadPart', array_replace($params, array( 'PartNumber' => count($this->state) + 1 + $i, 'Body' => $partsToSend[$i], 'ContentMD5' => (bool) $this->options['part_md5'], Ua::OPTION => Ua::MULTIPART_UPLOAD ))); $commands[] = $eventData['command']; // Notify any listeners of the part upload $this->dispatch(self::BEFORE_PART_UPLOAD, $eventData); } // Allow listeners to stop the transfer if needed if ($this->stopped) { break; } // Execute each command, iterate over the results, and add to the transfer state /** @var \Guzzle\Service\Command\OperationCommand $command */ foreach ($this->client->execute($commands) as $command) { $this->state->addPart(UploadPart::fromArray(array( 'PartNumber' => $command['PartNumber'], 'ETag' => $command->getResponse()->getEtag(), 'Size' => (int) $command->getRequest()->getBody()->getContentLength(), 'LastModified' => gmdate(DateFormat::RFC2822) ))); $eventData['command'] = $command; // Notify any listeners the the part was uploaded $this->dispatch(self::AFTER_PART_UPLOAD, $eventData); } } } /** * Prepare the entity body handles to use while transferring * * @param int $concurrency Number of parts to prepare * * @return array Parts to send */ protected function prepareParts($concurrency) { $url = $this->source->getUri(); // Use the source EntityBody as the first part $parts = array(new ReadLimitEntityBody($this->source, $this->partSize)); // Open EntityBody handles for each part to upload in parallel for ($i = 1; $i < $concurrency; $i++) { $parts[] = new ReadLimitEntityBody(new EntityBody(fopen($url, 'r')), $this->partSize); } return $parts; } }