vnode/cos: fix multipart uploading

This commit is contained in:
Minglei Jin 2023-10-20 13:47:25 +08:00
parent 52c8bc91f9
commit 74d05c2771
1 changed files with 41 additions and 30 deletions

View File

@ -211,33 +211,6 @@ typedef struct put_object_callback_data {
int noStatus; int noStatus;
} put_object_callback_data; } put_object_callback_data;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
put_object_callback_data *data = (put_object_callback_data *)callbackData;
int ret = 0;
if (data->contentLength) {
int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength);
if (data->gb) {
growbuffer_read(&(data->gb), toRead, &ret, buffer);
} else if (data->infileFD) {
// ret = fread(buffer, 1, toRead, data->infile);
ret = taosReadFile(data->infileFD, buffer, toRead);
}
}
data->contentLength -= ret;
data->totalContentLength -= ret;
if (data->contentLength && !data->noStatus) {
vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength);
vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) /
data->totalOriginalContentLength));
}
return ret;
}
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
typedef struct UploadManager { typedef struct UploadManager {
@ -280,6 +253,37 @@ typedef struct MultipartPartData {
UploadManager *manager; UploadManager *manager;
} MultipartPartData; } MultipartPartData;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
put_object_callback_data *data = (put_object_callback_data *)callbackData;
if (data->infileFD == 0) {
MultipartPartData *mpd = (MultipartPartData *)callbackData;
data = &mpd->put_object_data;
}
int ret = 0;
if (data->contentLength) {
int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength);
if (data->gb) {
growbuffer_read(&(data->gb), toRead, &ret, buffer);
} else if (data->infileFD) {
// ret = fread(buffer, 1, toRead, data->infile);
ret = taosReadFile(data->infileFD, buffer, toRead);
}
}
data->contentLength -= ret;
data->totalContentLength -= ret;
/* log too many open files
if (data->contentLength && !data->noStatus) {
vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength);
vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) /
data->totalOriginalContentLength));
}
*/
return ret;
}
S3Status initial_multipart_callback(const char *upload_id, void *callbackData) { S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
UploadManager *manager = (UploadManager *)callbackData; UploadManager *manager = (UploadManager *)callbackData;
manager->upload_id = strdup(upload_id); manager->upload_id = strdup(upload_id);
@ -308,7 +312,7 @@ static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackD
manager->remaining -= ret; manager->remaining -= ret;
return ret; return ret;
} }
/*
static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId, static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId,
const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName, const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName,
const char *storageClass, int partsCount, int handlePartsStart, const char *storageClass, int partsCount, int handlePartsStart,
@ -418,7 +422,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
return 0; return 0;
} }
*/
int32_t s3PutObjectFromFile2(const char *file, const char *object) { int32_t s3PutObjectFromFile2(const char *file, const char *object) {
int32_t code = 0; int32_t code = 0;
const char *key = object; const char *key = object;
@ -480,9 +484,11 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
if (data.status != S3StatusOK) { if (data.status != S3StatusOK) {
s3PrintError(__func__, data.status, data.err_msg); s3PrintError(__func__, data.status, data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data.contentLength) { } else if (data.contentLength) {
vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
(unsigned long long)data.contentLength); (unsigned long long)data.contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
} }
} else { } else {
uint64_t totalContentLength = contentLength; uint64_t totalContentLength = contentLength;
@ -493,7 +499,9 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
// div round up // div round up
int seq; int seq;
// uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 8;
int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) / MULTIPART_CHUNK_SIZE); int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) / MULTIPART_CHUNK_SIZE);
// int totalSeq = ((contentLength + chunk_size - 1) / chunk_size);
MultipartPartData partData; MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData)); memset(&partData, 0, sizeof(MultipartPartData));
@ -530,6 +538,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
if (manager.upload_id == 0 || manager.status != S3StatusOK) { if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__func__, manager.status, manager.err_msg); s3PrintError(__func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean; goto clean;
} }
@ -554,6 +563,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
} while (S3_status_is_retryable(partData.status) && should_retry()); } while (S3_status_is_retryable(partData.status) && should_retry());
if (partData.status != S3StatusOK) { if (partData.status != S3StatusOK) {
s3PrintError(__func__, partData.status, partData.err_msg); s3PrintError(__func__, partData.status, partData.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean; goto clean;
} }
contentLength -= MULTIPART_CHUNK_SIZE; contentLength -= MULTIPART_CHUNK_SIZE;
@ -581,6 +591,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
} while (S3_status_is_retryable(manager.status) && should_retry()); } while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) { if (manager.status != S3StatusOK) {
s3PrintError(__func__, manager.status, manager.err_msg); s3PrintError(__func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean; goto clean;
} }
@ -595,7 +606,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
taosMemoryFree(manager.etags); taosMemoryFree(manager.etags);
} }
return 0; return code;
} }
typedef struct list_bucket_callback_data { typedef struct list_bucket_callback_data {