diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 7696d084e1..6b3abc6f37 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -462,16 +462,120 @@ static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char c s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg); code = TAOS_SYSTEM_ERROR(EIO); } else if (data->contentLength) { - uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, - (unsigned long long)data->contentLength); + uError("%s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data->contentLength); code = TAOS_SYSTEM_ERROR(EIO); } return code; } -static int32_t s3PutObjectFromFileWithoutCp(const char *file, int64_t size, int32_t lmtime, const char *object) { - int32_t code = 0; +static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name, + int64_t contentLength, S3PutProperties *put_prop, + put_object_callback_data *data) { + int32_t code = 0; + uint64_t totalContentLength = contentLength; + uint64_t todoContentLength = contentLength; + UploadManager manager = {0}; + + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + + MultipartPartData partData; + memset(&partData, 0, sizeof(MultipartPartData)); + int partContentLength = 0; + + S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, + &initial_multipart_callback}; + + S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, + &putObjectDataCallback}; + + S3MultipartCommitHandler commit_handler = { + {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + + manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); + manager.next_etags_pos = 0; + do { + S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + + if (manager.upload_id == 0 || manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + +upload: + todoContentLength -= chunk_size * manager.next_etags_pos; + for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) { + partData.manager = &manager; + partData.seq = seq; + if (partData.put_object_data.gb == NULL) { + partData.put_object_data = *data; + } + partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength); + // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength); + partData.put_object_data.contentLength = partContentLength; + partData.put_object_data.originalContentLength = partContentLength; + partData.put_object_data.totalContentLength = todoContentLength; + partData.put_object_data.totalOriginalContentLength = totalContentLength; + put_prop->md5 = 0; + do { + S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id, + partContentLength, 0, timeoutMsG, &partData); + } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); + if (partData.put_object_data.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + contentLength -= chunk_size; + todoContentLength -= chunk_size; + } + + int i; + int size = 0; + size += growbuffer_append(&(manager.gb), "", strlen("")); + char buf[256]; + int n; + for (i = 0; i < totalSeq; i++) { + if (!manager.etags[i]) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + n = snprintf(buf, sizeof(buf), + "%d" + "%s", + i + 1, manager.etags[i]); + size += growbuffer_append(&(manager.gb), buf, n); + } + size += growbuffer_append(&(manager.gb), "", strlen("")); + manager.remaining = size; + + do { + S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0, + timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + if (manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + +clean: + if (manager.upload_id) { + taosMemoryFree(manager.upload_id); + } + for (i = 0; i < manager.next_etags_pos; i++) { + taosMemoryFree(manager.etags[i]); + } + growbuffer_destroy(manager.gb); + taosMemoryFree(manager.etags); return code; } @@ -482,11 +586,9 @@ static int32_t s3PutObjectFromFileWithCp(const char *file, int64_t size, int32_t return code; } -int32_t s3PutObjectFromFile2(const char *file, const char *object) { - int32_t code = 0; - int32_t lmtime = 0; - const char *key = object; - // const char *uploadId = 0; +int32_t s3PutObjectFromFile2(const char *file, const char *object_name) { + int32_t code = 0; + int32_t lmtime = 0; const char *filename = 0; uint64_t contentLength = 0; const char *cacheControl = 0, *contentType = 0, *md5 = 0; @@ -523,125 +625,9 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { metaProperties, useServerSideEncryption}; if (contentLength <= MULTIPART_CHUNK_SIZE) { - code = s3PutObjectFromFileSimple(&bucketContext, key, contentLength, &putProperties, &data); + code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data); } else { - uint64_t totalContentLength = contentLength; - uint64_t todoContentLength = contentLength; - UploadManager manager = {0}; - - uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; - int totalSeq = (contentLength + chunk_size - 1) / chunk_size; - const int max_part_num = 10000; - if (totalSeq > max_part_num) { - chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; - totalSeq = (contentLength + chunk_size - 1) / chunk_size; - } - - MultipartPartData partData; - memset(&partData, 0, sizeof(MultipartPartData)); - int partContentLength = 0; - - S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, - &initial_multipart_callback}; - - S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, - &putObjectDataCallback}; - - S3MultipartCommitHandler commit_handler = { - {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; - - manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); - manager.next_etags_pos = 0; - /* - if (uploadId) { - manager.upload_id = strdup(uploadId); - manager.remaining = contentLength; - if (!try_get_parts_info(tsS3BucketName, key, &manager)) { - fseek(data.infile, -(manager.remaining), 2); - taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END); - contentLength = manager.remaining; - goto upload; - } else { - goto clean; - } - } - */ - do { - S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager); - } while (S3_status_is_retryable(manager.status) && should_retry()); - - if (manager.upload_id == 0 || manager.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - - upload: - todoContentLength -= chunk_size * manager.next_etags_pos; - for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) { - partData.manager = &manager; - partData.seq = seq; - if (partData.put_object_data.gb == NULL) { - partData.put_object_data = data; - } - partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength); - // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength); - partData.put_object_data.contentLength = partContentLength; - partData.put_object_data.originalContentLength = partContentLength; - partData.put_object_data.totalContentLength = todoContentLength; - partData.put_object_data.totalOriginalContentLength = totalContentLength; - putProperties.md5 = 0; - do { - S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id, - partContentLength, 0, timeoutMsG, &partData); - } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); - if (partData.put_object_data.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - contentLength -= chunk_size; - todoContentLength -= chunk_size; - } - - int i; - int size = 0; - size += growbuffer_append(&(manager.gb), "", strlen("")); - char buf[256]; - int n; - for (i = 0; i < totalSeq; i++) { - if (!manager.etags[i]) { - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - n = snprintf(buf, sizeof(buf), - "%d" - "%s", - i + 1, manager.etags[i]); - size += growbuffer_append(&(manager.gb), buf, n); - } - size += growbuffer_append(&(manager.gb), "", strlen("")); - manager.remaining = size; - - do { - S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0, - timeoutMsG, &manager); - } while (S3_status_is_retryable(manager.status) && should_retry()); - if (manager.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - - clean: - if (manager.upload_id) { - taosMemoryFree(manager.upload_id); - } - for (i = 0; i < manager.next_etags_pos; i++) { - taosMemoryFree(manager.etags[i]); - } - growbuffer_destroy(manager.gb); - taosMemoryFree(manager.etags); + code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data); } if (data.infileFD) {