From dd21d8e3ade60225d74d2cf37ea49fe4027bf9ba Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 17 Oct 2023 17:24:50 +0800 Subject: [PATCH] cos/multipart: first round multipart uploading --- source/dnode/vnode/src/vnd/vnodeCos.c | 256 ++++++++++++++++++++++++-- 1 file changed, 244 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 2a36a898e0..b52d4809b2 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -72,10 +72,10 @@ static void s3PrintError(const char *func, S3Status status, char error_details[] } typedef struct { - uint64_t content_length; + char err_msg[128]; S3Status status; + uint64_t content_length; char *buf; - char err_msg[4096]; } TS3SizeCBD; static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { @@ -201,14 +201,14 @@ static void growbuffer_destroy(growbuffer *gb) { } typedef struct put_object_callback_data { + char err_msg[128]; + S3Status status; // FILE *infile; TdFilePtr infileFD; growbuffer *gb; uint64_t contentLength, originalContentLength; uint64_t totalContentLength, totalOriginalContentLength; int noStatus; - S3Status status; - char err_msg[4096]; } put_object_callback_data; static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { @@ -241,6 +241,8 @@ static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackDat #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M typedef struct UploadManager { + char err_msg[128]; + S3Status status; // used for initial multipart char *upload_id; @@ -253,7 +255,26 @@ typedef struct UploadManager { int remaining; } UploadManager; +typedef struct list_parts_callback_data { + char err_msg[128]; + S3Status status; + int isTruncated; + char nextPartNumberMarker[24]; + char initiatorId[256]; + char initiatorDisplayName[256]; + char ownerId[256]; + char ownerDisplayName[256]; + char storageClass[256]; + int partsCount; + int handlePartsStart; + int allDetails; + int noPrint; + UploadManager *manager; +} list_parts_callback_data; + typedef struct MultipartPartData { + char err_msg[128]; + S3Status status; put_object_callback_data put_object_data; int seq; UploadManager *manager; @@ -288,16 +309,120 @@ static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackD return ret; } +static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId, + const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName, + const char *storageClass, int partsCount, int handlePartsStart, + const S3ListPart *parts, void *callbackData) { + list_parts_callback_data *data = (list_parts_callback_data *)callbackData; + + data->isTruncated = isTruncated; + data->handlePartsStart = handlePartsStart; + UploadManager *manager = data->manager; + + if (nextPartNumberMarker) { + snprintf(data->nextPartNumberMarker, sizeof(data->nextPartNumberMarker), "%s", nextPartNumberMarker); + } else { + data->nextPartNumberMarker[0] = 0; + } + + if (initiatorId) { + snprintf(data->initiatorId, sizeof(data->initiatorId), "%s", initiatorId); + } else { + data->initiatorId[0] = 0; + } + + if (initiatorDisplayName) { + snprintf(data->initiatorDisplayName, sizeof(data->initiatorDisplayName), "%s", initiatorDisplayName); + } else { + data->initiatorDisplayName[0] = 0; + } + + if (ownerId) { + snprintf(data->ownerId, sizeof(data->ownerId), "%s", ownerId); + } else { + data->ownerId[0] = 0; + } + + if (ownerDisplayName) { + snprintf(data->ownerDisplayName, sizeof(data->ownerDisplayName), "%s", ownerDisplayName); + } else { + data->ownerDisplayName[0] = 0; + } + + if (storageClass) { + snprintf(data->storageClass, sizeof(data->storageClass), "%s", storageClass); + } else { + data->storageClass[0] = 0; + } + + if (partsCount && !data->partsCount && !data->noPrint) { + // printListPartsHeader(); + } + + int i; + for (i = 0; i < partsCount; i++) { + const S3ListPart *part = &(parts[i]); + char timebuf[256]; + if (data->noPrint) { + manager->etags[handlePartsStart + i] = strdup(part->eTag); + manager->next_etags_pos++; + manager->remaining = manager->remaining - part->size; + } else { + time_t t = (time_t)part->lastModified; + strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t)); + printf("%-30s", timebuf); + printf("%-15llu", (unsigned long long)part->partNumber); + printf("%-45s", part->eTag); + printf("%-15llu\n", (unsigned long long)part->size); + } + } + + data->partsCount += partsCount; + + return S3StatusOK; +} + static int try_get_parts_info(const char *bucketName, const char *key, UploadManager *manager) { - // + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + + S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback}; + + list_parts_callback_data data; + + memset(&data, 0, sizeof(list_parts_callback_data)); + + data.partsCount = 0; + data.allDetails = 0; + data.manager = manager; + data.noPrint = 1; + do { + data.isTruncated = 0; + do { + S3_list_parts(&bucketContext, key, data.nextPartNumberMarker, manager->upload_id, 0, 0, 0, timeoutMsG, + &listPartsHandler, &data); + } while (S3_status_is_retryable(data.status) && should_retry()); + if (data.status != S3StatusOK) { + break; + } + } while (data.isTruncated); + + if (data.status == S3StatusOK) { + if (!data.partsCount) { + // printListMultipartHeader(data.allDetails); + } + } else { + s3PrintError(__func__, data.status, data.err_msg); + return -1; + } return 0; } int32_t s3PutObjectFromFile2(const char *file, const char *object) { - int32_t code = 0; - const char *key = object; - const char *uploadId = 0; + int32_t code = 0; + const char *key = object; + // const char *uploadId = 0; const char *filename = 0; uint64_t contentLength = 0; const char *cacheControl = 0, *contentType = 0, *md5 = 0; @@ -360,20 +485,127 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { (unsigned long long)data.contentLength); } } else { - uint64_t totalContentLength = contentLength; - uint64_t todoContentLength = contentLength; + uint64_t totalContentLength = contentLength; + uint64_t todoContentLength = contentLength; + UploadManager manager; + manager.upload_id = 0; + manager.gb = 0; + + // div round up + int seq; + int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) / MULTIPART_CHUNK_SIZE); + + MultipartPartData partData; + memset(&partData, 0, sizeof(MultipartPartData)); + int partContentLength = 0; + + S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback}, + &initial_multipart_callback}; + + S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, + &putObjectDataCallback}; + + S3MultipartCommitHandler commit_handler = { + {&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + + manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq); + manager.next_etags_pos = 0; + /* + if (uploadId) { + manager.upload_id = strdup(uploadId); + manager.remaining = contentLength; + if (!try_get_parts_info(tsS3BucketName, key, &manager)) { + fseek(data.infile, -(manager.remaining), 2); + taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END); + contentLength = manager.remaining; + goto upload; + } else { + goto clean; + } + } + */ + do { + S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + + if (manager.upload_id == 0 || manager.status != S3StatusOK) { + s3PrintError(__func__, manager.status, manager.err_msg); + goto clean; + } + + upload: + todoContentLength -= MULTIPART_CHUNK_SIZE * manager.next_etags_pos; + for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) { + partData.manager = &manager; + partData.seq = seq; + if (partData.put_object_data.gb == NULL) { + partData.put_object_data = data; + } + partContentLength = ((contentLength > MULTIPART_CHUNK_SIZE) ? MULTIPART_CHUNK_SIZE : contentLength); + // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength); + partData.put_object_data.contentLength = partContentLength; + partData.put_object_data.originalContentLength = partContentLength; + partData.put_object_data.totalContentLength = todoContentLength; + partData.put_object_data.totalOriginalContentLength = totalContentLength; + putProperties.md5 = 0; + do { + S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id, + partContentLength, 0, timeoutMsG, &partData); + } while (S3_status_is_retryable(partData.status) && should_retry()); + if (partData.status != S3StatusOK) { + s3PrintError(__func__, partData.status, partData.err_msg); + goto clean; + } + contentLength -= MULTIPART_CHUNK_SIZE; + todoContentLength -= MULTIPART_CHUNK_SIZE; + } + + int i; + int size = 0; + size += growbuffer_append(&(manager.gb), "", strlen("")); + char buf[256]; + int n; + for (i = 0; i < totalSeq; i++) { + n = snprintf(buf, sizeof(buf), + "%d" + "%s", + i + 1, manager.etags[i]); + size += growbuffer_append(&(manager.gb), buf, n); + } + size += growbuffer_append(&(manager.gb), "", strlen("")); + manager.remaining = size; + + do { + S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0, + timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + if (manager.status != S3StatusOK) { + s3PrintError(__func__, manager.status, manager.err_msg); + goto clean; + } + + clean: + if (manager.upload_id) { + taosMemoryFree(manager.upload_id); + } + for (i = 0; i < manager.next_etags_pos; i++) { + taosMemoryFree(manager.etags[i]); + } + growbuffer_destroy(manager.gb); + taosMemoryFree(manager.etags); } + return 0; } typedef struct list_bucket_callback_data { + char err_msg[128]; + S3Status status; int isTruncated; char nextMarker[1024]; int keyCount; int allDetails; SArray *objectArray; - S3Status status; - char err_msg[4096]; } list_bucket_callback_data; static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,