cos/multipart: first round multipart uploading
This commit is contained in:
parent
ddfdbb3207
commit
dd21d8e3ad
|
@ -72,10 +72,10 @@ static void s3PrintError(const char *func, S3Status status, char error_details[]
|
|||
}
|
||||
|
||||
typedef struct {
|
||||
uint64_t content_length;
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
uint64_t content_length;
|
||||
char *buf;
|
||||
char err_msg[4096];
|
||||
} TS3SizeCBD;
|
||||
|
||||
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
|
||||
|
@ -201,14 +201,14 @@ static void growbuffer_destroy(growbuffer *gb) {
|
|||
}
|
||||
|
||||
typedef struct put_object_callback_data {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
// FILE *infile;
|
||||
TdFilePtr infileFD;
|
||||
growbuffer *gb;
|
||||
uint64_t contentLength, originalContentLength;
|
||||
uint64_t totalContentLength, totalOriginalContentLength;
|
||||
int noStatus;
|
||||
S3Status status;
|
||||
char err_msg[4096];
|
||||
} put_object_callback_data;
|
||||
|
||||
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
|
||||
|
@ -241,6 +241,8 @@ static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackDat
|
|||
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
|
||||
|
||||
typedef struct UploadManager {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
// used for initial multipart
|
||||
char *upload_id;
|
||||
|
||||
|
@ -253,7 +255,26 @@ typedef struct UploadManager {
|
|||
int remaining;
|
||||
} UploadManager;
|
||||
|
||||
typedef struct list_parts_callback_data {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
int isTruncated;
|
||||
char nextPartNumberMarker[24];
|
||||
char initiatorId[256];
|
||||
char initiatorDisplayName[256];
|
||||
char ownerId[256];
|
||||
char ownerDisplayName[256];
|
||||
char storageClass[256];
|
||||
int partsCount;
|
||||
int handlePartsStart;
|
||||
int allDetails;
|
||||
int noPrint;
|
||||
UploadManager *manager;
|
||||
} list_parts_callback_data;
|
||||
|
||||
typedef struct MultipartPartData {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
put_object_callback_data put_object_data;
|
||||
int seq;
|
||||
UploadManager *manager;
|
||||
|
@ -288,8 +309,112 @@ static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackD
|
|||
return ret;
|
||||
}
|
||||
|
||||
static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId,
|
||||
const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName,
|
||||
const char *storageClass, int partsCount, int handlePartsStart,
|
||||
const S3ListPart *parts, void *callbackData) {
|
||||
list_parts_callback_data *data = (list_parts_callback_data *)callbackData;
|
||||
|
||||
data->isTruncated = isTruncated;
|
||||
data->handlePartsStart = handlePartsStart;
|
||||
UploadManager *manager = data->manager;
|
||||
|
||||
if (nextPartNumberMarker) {
|
||||
snprintf(data->nextPartNumberMarker, sizeof(data->nextPartNumberMarker), "%s", nextPartNumberMarker);
|
||||
} else {
|
||||
data->nextPartNumberMarker[0] = 0;
|
||||
}
|
||||
|
||||
if (initiatorId) {
|
||||
snprintf(data->initiatorId, sizeof(data->initiatorId), "%s", initiatorId);
|
||||
} else {
|
||||
data->initiatorId[0] = 0;
|
||||
}
|
||||
|
||||
if (initiatorDisplayName) {
|
||||
snprintf(data->initiatorDisplayName, sizeof(data->initiatorDisplayName), "%s", initiatorDisplayName);
|
||||
} else {
|
||||
data->initiatorDisplayName[0] = 0;
|
||||
}
|
||||
|
||||
if (ownerId) {
|
||||
snprintf(data->ownerId, sizeof(data->ownerId), "%s", ownerId);
|
||||
} else {
|
||||
data->ownerId[0] = 0;
|
||||
}
|
||||
|
||||
if (ownerDisplayName) {
|
||||
snprintf(data->ownerDisplayName, sizeof(data->ownerDisplayName), "%s", ownerDisplayName);
|
||||
} else {
|
||||
data->ownerDisplayName[0] = 0;
|
||||
}
|
||||
|
||||
if (storageClass) {
|
||||
snprintf(data->storageClass, sizeof(data->storageClass), "%s", storageClass);
|
||||
} else {
|
||||
data->storageClass[0] = 0;
|
||||
}
|
||||
|
||||
if (partsCount && !data->partsCount && !data->noPrint) {
|
||||
// printListPartsHeader();
|
||||
}
|
||||
|
||||
int i;
|
||||
for (i = 0; i < partsCount; i++) {
|
||||
const S3ListPart *part = &(parts[i]);
|
||||
char timebuf[256];
|
||||
if (data->noPrint) {
|
||||
manager->etags[handlePartsStart + i] = strdup(part->eTag);
|
||||
manager->next_etags_pos++;
|
||||
manager->remaining = manager->remaining - part->size;
|
||||
} else {
|
||||
time_t t = (time_t)part->lastModified;
|
||||
strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t));
|
||||
printf("%-30s", timebuf);
|
||||
printf("%-15llu", (unsigned long long)part->partNumber);
|
||||
printf("%-45s", part->eTag);
|
||||
printf("%-15llu\n", (unsigned long long)part->size);
|
||||
}
|
||||
}
|
||||
|
||||
data->partsCount += partsCount;
|
||||
|
||||
return S3StatusOK;
|
||||
}
|
||||
|
||||
static int try_get_parts_info(const char *bucketName, const char *key, UploadManager *manager) {
|
||||
//
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
|
||||
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback};
|
||||
|
||||
list_parts_callback_data data;
|
||||
|
||||
memset(&data, 0, sizeof(list_parts_callback_data));
|
||||
|
||||
data.partsCount = 0;
|
||||
data.allDetails = 0;
|
||||
data.manager = manager;
|
||||
data.noPrint = 1;
|
||||
do {
|
||||
data.isTruncated = 0;
|
||||
do {
|
||||
S3_list_parts(&bucketContext, key, data.nextPartNumberMarker, manager->upload_id, 0, 0, 0, timeoutMsG,
|
||||
&listPartsHandler, &data);
|
||||
} while (S3_status_is_retryable(data.status) && should_retry());
|
||||
if (data.status != S3StatusOK) {
|
||||
break;
|
||||
}
|
||||
} while (data.isTruncated);
|
||||
|
||||
if (data.status == S3StatusOK) {
|
||||
if (!data.partsCount) {
|
||||
// printListMultipartHeader(data.allDetails);
|
||||
}
|
||||
} else {
|
||||
s3PrintError(__func__, data.status, data.err_msg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -297,7 +422,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
|
|||
int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
||||
int32_t code = 0;
|
||||
const char *key = object;
|
||||
const char *uploadId = 0;
|
||||
// const char *uploadId = 0;
|
||||
const char *filename = 0;
|
||||
uint64_t contentLength = 0;
|
||||
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
|
||||
|
@ -362,18 +487,125 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
|||
} else {
|
||||
uint64_t totalContentLength = contentLength;
|
||||
uint64_t todoContentLength = contentLength;
|
||||
UploadManager manager;
|
||||
manager.upload_id = 0;
|
||||
manager.gb = 0;
|
||||
|
||||
// div round up
|
||||
int seq;
|
||||
int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) / MULTIPART_CHUNK_SIZE);
|
||||
|
||||
MultipartPartData partData;
|
||||
memset(&partData, 0, sizeof(MultipartPartData));
|
||||
int partContentLength = 0;
|
||||
|
||||
S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback},
|
||||
&initial_multipart_callback};
|
||||
|
||||
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
|
||||
&putObjectDataCallback};
|
||||
|
||||
S3MultipartCommitHandler commit_handler = {
|
||||
{&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
|
||||
|
||||
manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq);
|
||||
manager.next_etags_pos = 0;
|
||||
/*
|
||||
if (uploadId) {
|
||||
manager.upload_id = strdup(uploadId);
|
||||
manager.remaining = contentLength;
|
||||
if (!try_get_parts_info(tsS3BucketName, key, &manager)) {
|
||||
fseek(data.infile, -(manager.remaining), 2);
|
||||
taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END);
|
||||
contentLength = manager.remaining;
|
||||
goto upload;
|
||||
} else {
|
||||
goto clean;
|
||||
}
|
||||
}
|
||||
*/
|
||||
do {
|
||||
S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager);
|
||||
} while (S3_status_is_retryable(manager.status) && should_retry());
|
||||
|
||||
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
||||
s3PrintError(__func__, manager.status, manager.err_msg);
|
||||
goto clean;
|
||||
}
|
||||
|
||||
upload:
|
||||
todoContentLength -= MULTIPART_CHUNK_SIZE * manager.next_etags_pos;
|
||||
for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
|
||||
partData.manager = &manager;
|
||||
partData.seq = seq;
|
||||
if (partData.put_object_data.gb == NULL) {
|
||||
partData.put_object_data = data;
|
||||
}
|
||||
partContentLength = ((contentLength > MULTIPART_CHUNK_SIZE) ? MULTIPART_CHUNK_SIZE : contentLength);
|
||||
// printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
|
||||
partData.put_object_data.contentLength = partContentLength;
|
||||
partData.put_object_data.originalContentLength = partContentLength;
|
||||
partData.put_object_data.totalContentLength = todoContentLength;
|
||||
partData.put_object_data.totalOriginalContentLength = totalContentLength;
|
||||
putProperties.md5 = 0;
|
||||
do {
|
||||
S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id,
|
||||
partContentLength, 0, timeoutMsG, &partData);
|
||||
} while (S3_status_is_retryable(partData.status) && should_retry());
|
||||
if (partData.status != S3StatusOK) {
|
||||
s3PrintError(__func__, partData.status, partData.err_msg);
|
||||
goto clean;
|
||||
}
|
||||
contentLength -= MULTIPART_CHUNK_SIZE;
|
||||
todoContentLength -= MULTIPART_CHUNK_SIZE;
|
||||
}
|
||||
|
||||
int i;
|
||||
int size = 0;
|
||||
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
|
||||
char buf[256];
|
||||
int n;
|
||||
for (i = 0; i < totalSeq; i++) {
|
||||
n = snprintf(buf, sizeof(buf),
|
||||
"<Part><PartNumber>%d</PartNumber>"
|
||||
"<ETag>%s</ETag></Part>",
|
||||
i + 1, manager.etags[i]);
|
||||
size += growbuffer_append(&(manager.gb), buf, n);
|
||||
}
|
||||
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
|
||||
manager.remaining = size;
|
||||
|
||||
do {
|
||||
S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0,
|
||||
timeoutMsG, &manager);
|
||||
} while (S3_status_is_retryable(manager.status) && should_retry());
|
||||
if (manager.status != S3StatusOK) {
|
||||
s3PrintError(__func__, manager.status, manager.err_msg);
|
||||
goto clean;
|
||||
}
|
||||
|
||||
clean:
|
||||
if (manager.upload_id) {
|
||||
taosMemoryFree(manager.upload_id);
|
||||
}
|
||||
for (i = 0; i < manager.next_etags_pos; i++) {
|
||||
taosMemoryFree(manager.etags[i]);
|
||||
}
|
||||
growbuffer_destroy(manager.gb);
|
||||
taosMemoryFree(manager.etags);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
typedef struct list_bucket_callback_data {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
int isTruncated;
|
||||
char nextMarker[1024];
|
||||
int keyCount;
|
||||
int allDetails;
|
||||
SArray *objectArray;
|
||||
S3Status status;
|
||||
char err_msg[4096];
|
||||
} list_bucket_callback_data;
|
||||
|
||||
static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,
|
||||
|
|
Loading…
Reference in New Issue