From 74d05c2771c4c04b59cb3bf927b2eb6811838e92 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 20 Oct 2023 13:47:25 +0800 Subject: [PATCH] vnode/cos: fix multipart uploading --- source/dnode/vnode/src/vnd/vnodeCos.c | 71 ++++++++++++++++----------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index ccda20e049..9c69f2452c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -211,33 +211,6 @@ typedef struct put_object_callback_data { int noStatus; } put_object_callback_data; -static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { - put_object_callback_data *data = (put_object_callback_data *)callbackData; - - int ret = 0; - - if (data->contentLength) { - int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength); - if (data->gb) { - growbuffer_read(&(data->gb), toRead, &ret, buffer); - } else if (data->infileFD) { - // ret = fread(buffer, 1, toRead, data->infile); - ret = taosReadFile(data->infileFD, buffer, toRead); - } - } - - data->contentLength -= ret; - data->totalContentLength -= ret; - - if (data->contentLength && !data->noStatus) { - vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength); - vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) / - data->totalOriginalContentLength)); - } - - return ret; -} - #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M typedef struct UploadManager { @@ -280,6 +253,37 @@ typedef struct MultipartPartData { UploadManager *manager; } MultipartPartData; +static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { + put_object_callback_data *data = (put_object_callback_data *)callbackData; + if (data->infileFD == 0) { + MultipartPartData *mpd = (MultipartPartData *)callbackData; + data = &mpd->put_object_data; + } + + int ret = 0; + + if (data->contentLength) { + int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength); + if (data->gb) { + growbuffer_read(&(data->gb), toRead, &ret, buffer); + } else if (data->infileFD) { + // ret = fread(buffer, 1, toRead, data->infile); + ret = taosReadFile(data->infileFD, buffer, toRead); + } + } + + data->contentLength -= ret; + data->totalContentLength -= ret; + /* log too many open files + if (data->contentLength && !data->noStatus) { + vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength); + vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) / + data->totalOriginalContentLength)); + } + */ + return ret; +} + S3Status initial_multipart_callback(const char *upload_id, void *callbackData) { UploadManager *manager = (UploadManager *)callbackData; manager->upload_id = strdup(upload_id); @@ -308,7 +312,7 @@ static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackD manager->remaining -= ret; return ret; } - +/* static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId, const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName, const char *storageClass, int partsCount, int handlePartsStart, @@ -418,7 +422,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan return 0; } - +*/ int32_t s3PutObjectFromFile2(const char *file, const char *object) { int32_t code = 0; const char *key = object; @@ -480,9 +484,11 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { if (data.status != S3StatusOK) { s3PrintError(__func__, data.status, data.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); } else if (data.contentLength) { vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data.contentLength); + code = TAOS_SYSTEM_ERROR(EIO); } } else { uint64_t totalContentLength = contentLength; @@ -493,7 +499,9 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { // div round up int seq; + // uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 8; int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) / MULTIPART_CHUNK_SIZE); + // int totalSeq = ((contentLength + chunk_size - 1) / chunk_size); MultipartPartData partData; memset(&partData, 0, sizeof(MultipartPartData)); @@ -530,6 +538,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { if (manager.upload_id == 0 || manager.status != S3StatusOK) { s3PrintError(__func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); goto clean; } @@ -554,6 +563,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } while (S3_status_is_retryable(partData.status) && should_retry()); if (partData.status != S3StatusOK) { s3PrintError(__func__, partData.status, partData.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); goto clean; } contentLength -= MULTIPART_CHUNK_SIZE; @@ -581,6 +591,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } while (S3_status_is_retryable(manager.status) && should_retry()); if (manager.status != S3StatusOK) { s3PrintError(__func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); goto clean; } @@ -595,7 +606,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { taosMemoryFree(manager.etags); } - return 0; + return code; } typedef struct list_bucket_callback_data {