From 49ecb1093f8069082b8ac56b30a0044346172566 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 30 Nov 2023 17:20:21 +0800 Subject: [PATCH] cos/put: new arg withcp to control resumable uploading --- include/common/cos.h | 2 +- source/common/src/cos.c | 20 +++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbRetention.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 2 +- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/include/common/cos.h b/include/common/cos.h index c6b159c1da..afeca3ca03 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -34,7 +34,7 @@ extern int32_t tsS3UploadDelaySec; int32_t s3Init(); void s3CleanUp(); int32_t s3PutObjectFromFile(const char *file, const char *object); -int32_t s3PutObjectFromFile2(const char *file, const char *object); +int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp); void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 6b3abc6f37..53a59ea3b0 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -580,13 +580,18 @@ clean: return code; } -static int32_t s3PutObjectFromFileWithCp(const char *file, int64_t size, int32_t lmtime, const char *object) { +static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, char const *object_name, + int64_t contentLength, S3PutProperties *put_prop, + put_object_callback_data *data) { + /* + static int32_t s3PutObjectFromFileWithCp(const char *file, int64_t size, int32_t lmtime, const char *object) { + */ int32_t code = 0; return code; } -int32_t s3PutObjectFromFile2(const char *file, const char *object_name) { +int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { int32_t code = 0; int32_t lmtime = 0; const char *filename = 0; @@ -627,7 +632,11 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name) { if (contentLength <= MULTIPART_CHUNK_SIZE) { code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data); } else { - code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data); + if (withcp) { + code = s3PutObjectFromFileWithCp(&bucketContext, object_name, contentLength, &putProperties, &data); + } else { + code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data); + } } if (data.infileFD) { @@ -981,7 +990,7 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { return code; } -int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { +int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_t withcp) { int32_t code = 0; cos_pool_t *p = NULL; int is_cname = 0; @@ -993,6 +1002,7 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { cos_table_t *headers = NULL; cos_resumable_clt_params_t *clt_params = NULL; + (void)withcp; cos_pool_create(&p, NULL); options = cos_request_options_create(p); s3InitRequestOptions(options, is_cname); @@ -1342,7 +1352,7 @@ long s3Size(const char *object_name) { int32_t s3Init() { return 0; } void s3CleanUp() {} int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } -int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; } +int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index d8f1ad7c6c..43d3fdd383 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -112,7 +112,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile TSDB_CHECK_CODE(code, lino, _exit); char *object_name = taosDirEntryBaseName(fname); - code = s3PutObjectFromFile2(from->fname, object_name); + code = s3PutObjectFromFile2(from->fname, object_name, 1); TSDB_CHECK_CODE(code, lino, _exit); taosCloseFile(&fdFrom); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e2561de841..9458539a9d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -515,7 +515,7 @@ static int uploadCheckpointToS3(char* id, char* path) { char object[PATH_MAX] = {0}; snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); - if (s3PutObjectFromFile2(filename, object) != 0) { + if (s3PutObjectFromFile2(filename, object, 0) != 0) { taosCloseDir(&pDir); return -1; }