cos/put: separate put without cp into another func

This commit is contained in:
Minglei Jin 2023-11-30 17:11:24 +08:00
parent 249ffa6590
commit deec1c482d
1 changed files with 113 additions and 127 deletions

View File

@ -462,16 +462,120 @@ static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char c
s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data->contentLength) {
uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
(unsigned long long)data->contentLength);
uError("%s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data->contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
return code;
}
static int32_t s3PutObjectFromFileWithoutCp(const char *file, int64_t size, int32_t lmtime, const char *object) {
int32_t code = 0;
static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name,
int64_t contentLength, S3PutProperties *put_prop,
put_object_callback_data *data) {
int32_t code = 0;
uint64_t totalContentLength = contentLength;
uint64_t todoContentLength = contentLength;
UploadManager manager = {0};
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
manager.next_etags_pos = 0;
do {
S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
upload:
todoContentLength -= chunk_size * manager.next_etags_pos;
for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL) {
partData.put_object_data = *data;
}
partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength);
// printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
put_prop->md5 = 0;
do {
S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id,
partContentLength, 0, timeoutMsG, &partData);
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
if (partData.put_object_data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
contentLength -= chunk_size;
todoContentLength -= chunk_size;
}
int i;
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (i = 0; i < totalSeq; i++) {
if (!manager.etags[i]) {
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
n = snprintf(buf, sizeof(buf),
"<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
i + 1, manager.etags[i]);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do {
S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0,
timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
clean:
if (manager.upload_id) {
taosMemoryFree(manager.upload_id);
}
for (i = 0; i < manager.next_etags_pos; i++) {
taosMemoryFree(manager.etags[i]);
}
growbuffer_destroy(manager.gb);
taosMemoryFree(manager.etags);
return code;
}
@ -482,11 +586,9 @@ static int32_t s3PutObjectFromFileWithCp(const char *file, int64_t size, int32_t
return code;
}
int32_t s3PutObjectFromFile2(const char *file, const char *object) {
int32_t code = 0;
int32_t lmtime = 0;
const char *key = object;
// const char *uploadId = 0;
int32_t s3PutObjectFromFile2(const char *file, const char *object_name) {
int32_t code = 0;
int32_t lmtime = 0;
const char *filename = 0;
uint64_t contentLength = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
@ -523,125 +625,9 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
metaProperties, useServerSideEncryption};
if (contentLength <= MULTIPART_CHUNK_SIZE) {
code = s3PutObjectFromFileSimple(&bucketContext, key, contentLength, &putProperties, &data);
code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data);
} else {
uint64_t totalContentLength = contentLength;
uint64_t todoContentLength = contentLength;
UploadManager manager = {0};
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
manager.next_etags_pos = 0;
/*
if (uploadId) {
manager.upload_id = strdup(uploadId);
manager.remaining = contentLength;
if (!try_get_parts_info(tsS3BucketName, key, &manager)) {
fseek(data.infile, -(manager.remaining), 2);
taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END);
contentLength = manager.remaining;
goto upload;
} else {
goto clean;
}
}
*/
do {
S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
upload:
todoContentLength -= chunk_size * manager.next_etags_pos;
for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL) {
partData.put_object_data = data;
}
partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength);
// printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
putProperties.md5 = 0;
do {
S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id,
partContentLength, 0, timeoutMsG, &partData);
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
if (partData.put_object_data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
contentLength -= chunk_size;
todoContentLength -= chunk_size;
}
int i;
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (i = 0; i < totalSeq; i++) {
if (!manager.etags[i]) {
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
n = snprintf(buf, sizeof(buf),
"<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
i + 1, manager.etags[i]);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do {
S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0,
timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
clean:
if (manager.upload_id) {
taosMemoryFree(manager.upload_id);
}
for (i = 0; i < manager.next_etags_pos; i++) {
taosMemoryFree(manager.etags[i]);
}
growbuffer_destroy(manager.gb);
taosMemoryFree(manager.etags);
code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data);
}
if (data.infileFD) {