diff --git a/include/common/cos.h b/include/common/cos.h index c6b159c1da..afeca3ca03 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -34,7 +34,7 @@ extern int32_t tsS3UploadDelaySec; int32_t s3Init(); void s3CleanUp(); int32_t s3PutObjectFromFile(const char *file, const char *object); -int32_t s3PutObjectFromFile2(const char *file, const char *object); +int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp); void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); diff --git a/include/common/cos_cp.h b/include/common/cos_cp.h new file mode 100644 index 0000000000..fd778b1a1d --- /dev/null +++ b/include/common/cos_cp.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_COMMON_COS_CP_H_ +#define _TD_COMMON_COS_CP_H_ + +#include "os.h" +#include "tdef.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + COS_CP_TYPE_UPLOAD, // upload + COS_CP_TYPE_DOWNLOAD // download +} ECpType; + +typedef struct { + int32_t index; // the index of part, start from 0 + int64_t offset; // the offset point of part + int64_t size; // the size of part + int completed; // COS_TRUE completed, COS_FALSE uncompleted + char etag[128]; // the etag of part, for upload + uint64_t crc64; +} SCheckpointPart; + +typedef struct { + ECpType cp_type; // 0 upload, 1 download + char md5[64]; // the md5 of checkout content + TdFilePtr thefile; // the handle of checkpoint file + + char file_path[TSDB_FILENAME_LEN]; // local file path + int64_t file_size; // local file size, for upload + int32_t file_last_modified; // local file last modified time, for upload + char file_md5[64]; // md5 of the local file content, for upload, reserved + + char object_name[128]; // object name + int64_t object_size; // object size, for download + char object_last_modified[64]; // object last modified time, for download + char object_etag[128]; // object etag, for download + + char upload_id[128]; // upload id + + int part_num; // the total number of parts + int64_t part_size; // the part size, byte + SCheckpointPart* parts; // the parts of local or object, from 0 +} SCheckpoint; + +int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint); +void cos_cp_close(TdFilePtr fd); +void cos_cp_remove(char const* filepath); + +int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint); +int32_t cos_cp_dump(SCheckpoint* checkpoint); +void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes); +void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64); +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, + char const* upload_id, int64_t part_size); +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime); + +void cos_cp_build_download(SCheckpoint* checkpoint, char const* filepath, char const* object_name, int64_t object_size, + char const* object_lmtime, char const* object_etag, int64_t part_size); +bool cos_cp_is_valid_download(SCheckpoint* checkpoint, char const* object_name, int64_t object_size, + char const* object_lmtime, char const* object_etag); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_COMMON_COS_CP_H_*/ diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 7c8676e9f5..bcbd02b2cd 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1,6 +1,8 @@ #define ALLOW_FORBID_FUNC #include "cos.h" +#include "cos_cp.h" +#include "tdef.h" extern char tsS3Endpoint[]; extern char tsS3AccessKeyId[]; @@ -86,7 +88,7 @@ typedef struct { char err_msg[128]; S3Status status; uint64_t content_length; - char * buf; + char *buf; int64_t buf_pos; } TS3SizeCBD; @@ -270,7 +272,7 @@ typedef struct list_parts_callback_data { typedef struct MultipartPartData { put_object_callback_data put_object_data; int seq; - UploadManager * manager; + UploadManager *manager; } MultipartPartData; static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { @@ -317,12 +319,23 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti MultipartPartData *data = (MultipartPartData *)callbackData; int seq = data->seq; - const char * etag = properties->eTag; + const char *etag = properties->eTag; data->manager->etags[seq - 1] = strdup(etag); data->manager->next_etags_pos = seq; return S3StatusOK; } +S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) { + responsePropertiesCallbackNull(properties, callbackData); + + MultipartPartData *data = (MultipartPartData *)callbackData; + int seq = data->seq; + const char *etag = properties->eTag; + data->manager->etags[seq - 1] = strdup(etag); + // data->manager->next_etags_pos = seq; + return S3StatusOK; +} + static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackData) { UploadManager *manager = (UploadManager *)callbackData; int ret = 0; @@ -446,37 +459,335 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan return 0; } */ -int32_t s3PutObjectFromFile2(const char *file, const char *object) { - int32_t code = 0; - const char *key = object; - // const char *uploadId = 0; - const char * filename = 0; + +static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char const *object_name, int64_t size, + S3PutProperties *put_prop, put_object_callback_data *data) { + int32_t code = 0; + S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, + &putObjectDataCallback}; + + do { + S3_put_object(bucket_context, object_name, size, put_prop, 0, 0, &putObjectHandler, data); + } while (S3_status_is_retryable(data->status) && should_retry()); + + if (data->status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + } else if (data->contentLength) { + uError("%s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data->contentLength); + code = TAOS_SYSTEM_ERROR(EIO); + } + + return code; +} + +static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name, + int64_t contentLength, S3PutProperties *put_prop, + put_object_callback_data *data) { + int32_t code = 0; + uint64_t totalContentLength = contentLength; + uint64_t todoContentLength = contentLength; + UploadManager manager = {0}; + + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + + MultipartPartData partData; + memset(&partData, 0, sizeof(MultipartPartData)); + int partContentLength = 0; + + S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, + &initial_multipart_callback}; + + S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, + &putObjectDataCallback}; + + S3MultipartCommitHandler commit_handler = { + {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + + manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); + manager.next_etags_pos = 0; + do { + S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + + if (manager.upload_id == 0 || manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + +upload: + todoContentLength -= chunk_size * manager.next_etags_pos; + for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) { + partData.manager = &manager; + partData.seq = seq; + if (partData.put_object_data.gb == NULL) { + partData.put_object_data = *data; + } + partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength); + // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength); + partData.put_object_data.contentLength = partContentLength; + partData.put_object_data.originalContentLength = partContentLength; + partData.put_object_data.totalContentLength = todoContentLength; + partData.put_object_data.totalOriginalContentLength = totalContentLength; + put_prop->md5 = 0; + do { + S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id, + partContentLength, 0, timeoutMsG, &partData); + } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); + if (partData.put_object_data.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + contentLength -= chunk_size; + todoContentLength -= chunk_size; + } + + int i; + int size = 0; + size += growbuffer_append(&(manager.gb), "", strlen("")); + char buf[256]; + int n; + for (i = 0; i < totalSeq; i++) { + if (!manager.etags[i]) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + n = snprintf(buf, sizeof(buf), + "%d" + "%s", + i + 1, manager.etags[i]); + size += growbuffer_append(&(manager.gb), buf, n); + } + size += growbuffer_append(&(manager.gb), "", strlen("")); + manager.remaining = size; + + do { + S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0, + timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + if (manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + +clean: + if (manager.upload_id) { + taosMemoryFree(manager.upload_id); + } + for (i = 0; i < manager.next_etags_pos; i++) { + taosMemoryFree(manager.etags[i]); + } + growbuffer_destroy(manager.gb); + taosMemoryFree(manager.etags); + + return code; +} + +static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime, + char const *object_name, int64_t contentLength, S3PutProperties *put_prop, + put_object_callback_data *data) { + int32_t code = 0; + + uint64_t totalContentLength = contentLength; + // uint64_t todoContentLength = contentLength; + UploadManager manager = {0}; + + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + + bool need_init_upload = true; + char file_cp_path[TSDB_FILENAME_LEN]; + snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + + SCheckpoint cp = {0}; + cp.parts = taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + + if (taosCheckExistFile(file_cp_path)) { + if (!cos_cp_load(file_cp_path, &cp) && cos_cp_is_valid_upload(&cp, contentLength, lmtime)) { + manager.upload_id = strdup(cp.upload_id); + need_init_upload = false; + } else { + cos_cp_remove(file_cp_path); + } + } + + if (need_init_upload) { + S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, + &initial_multipart_callback}; + do { + S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + + if (manager.upload_id == 0 || manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + cos_cp_build_upload(&cp, file, contentLength, lmtime, manager.upload_id, chunk_size); + } + + if (cos_cp_open(file_cp_path, &cp)) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + int part_num = 0; + int64_t consume_bytes = 0; + // SCheckpointPart *parts = taosMemoryCalloc(cp.part_num, sizeof(SCheckpointPart)); + // cos_cp_get_undo_parts(&cp, &part_num, parts, &consume_bytes); + + MultipartPartData partData; + memset(&partData, 0, sizeof(MultipartPartData)); + int partContentLength = 0; + + S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallbackWithCp, &responseCompleteCallback}, + &putObjectDataCallback}; + + S3MultipartCommitHandler commit_handler = { + {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + + manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); + manager.next_etags_pos = 0; + +upload: + // todoContentLength -= chunk_size * manager.next_etags_pos; + for (int i = 0; i < cp.part_num; ++i) { + if (cp.parts[i].completed) { + continue; + } + + int seq = cp.parts[i].index + 1; + + partData.manager = &manager; + partData.seq = seq; + if (partData.put_object_data.gb == NULL) { + partData.put_object_data = *data; + } + + partContentLength = cp.parts[i].size; + partData.put_object_data.contentLength = partContentLength; + partData.put_object_data.originalContentLength = partContentLength; + // partData.put_object_data.totalContentLength = todoContentLength; + partData.put_object_data.totalOriginalContentLength = totalContentLength; + put_prop->md5 = 0; + do { + S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id, + partContentLength, 0, timeoutMsG, &partData); + } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); + if (partData.put_object_data.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + + //(void)cos_cp_dump(&cp); + goto clean; + } + + if (!manager.etags[seq - 1]) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + cos_cp_update(&cp, cp.parts[seq - 1].index, manager.etags[seq - 1], 0); + (void)cos_cp_dump(&cp); + + contentLength -= chunk_size; + // todoContentLength -= chunk_size; + } + + cos_cp_close(cp.thefile); + + int size = 0; + size += growbuffer_append(&(manager.gb), "", strlen("")); + char buf[256]; + int n; + for (int i = 0; i < cp.part_num; ++i) { + n = snprintf(buf, sizeof(buf), + "%d" + "%s", + // i + 1, manager.etags[i]); + cp.parts[i].index + 1, cp.parts[i].etag); + size += growbuffer_append(&(manager.gb), buf, n); + } + size += growbuffer_append(&(manager.gb), "", strlen("")); + manager.remaining = size; + + do { + S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0, + timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + if (manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + cos_cp_remove(file_cp_path); + +clean: + /* + if (parts) { + taosMemoryFree(parts); + } + */ + if (cp.thefile) { + cos_cp_close(cp.thefile); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + if (manager.upload_id) { + taosMemoryFree(manager.upload_id); + } + for (int i = 0; i < cp.part_num; ++i) { + if (manager.etags[i]) { + taosMemoryFree(manager.etags[i]); + } + } + taosMemoryFree(manager.etags); + growbuffer_destroy(manager.gb); + + return code; +} + +int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { + int32_t code = 0; + int32_t lmtime = 0; + const char *filename = 0; uint64_t contentLength = 0; - const char * cacheControl = 0, *contentType = 0, *md5 = 0; - const char * contentDispositionFilename = 0, *contentEncoding = 0; + const char *cacheControl = 0, *contentType = 0, *md5 = 0; + const char *contentDispositionFilename = 0, *contentEncoding = 0; int64_t expires = -1; S3CannedAcl cannedAcl = S3CannedAclPrivate; int metaPropertiesCount = 0; S3NameValue metaProperties[S3_MAX_METADATA_COUNT]; char useServerSideEncryption = 0; put_object_callback_data data = {0}; - // int noStatus = 0; - // data.infile = 0; - // data.gb = 0; - // data.infileFD = NULL; - // data.noStatus = noStatus; - - // uError("ERROR: %s stat file %s: ", __func__, file); - if (taosStatFile(file, &contentLength, NULL, NULL) < 0) { - uError("ERROR: %s Failed to stat file %s: ", __func__, file); + if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) { code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to stat file %s: ", __func__, file); return code; } if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) { - uError("ERROR: %s Failed to open file %s: ", __func__, file); code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to open file %s: ", __func__, file); return code; } @@ -493,143 +804,13 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { metaProperties, useServerSideEncryption}; if (contentLength <= MULTIPART_CHUNK_SIZE) { - S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, - &putObjectDataCallback}; - - do { - S3_put_object(&bucketContext, key, contentLength, &putProperties, 0, 0, &putObjectHandler, &data); - } while (S3_status_is_retryable(data.status) && should_retry()); - - if (data.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - } else if (data.contentLength) { - uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, - (unsigned long long)data.contentLength); - code = TAOS_SYSTEM_ERROR(EIO); - } + code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data); } else { - uint64_t totalContentLength = contentLength; - uint64_t todoContentLength = contentLength; - UploadManager manager = {0}; - // manager.upload_id = 0; - // manager.gb = 0; - - // div round up - int seq; - uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; - int totalSeq = (contentLength + chunk_size - 1) / chunk_size; - const int max_part_num = 10000; - if (totalSeq > max_part_num) { - chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; - totalSeq = (contentLength + chunk_size - 1) / chunk_size; + if (withcp) { + code = s3PutObjectFromFileWithCp(&bucketContext, file, lmtime, object_name, contentLength, &putProperties, &data); + } else { + code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data); } - - MultipartPartData partData; - memset(&partData, 0, sizeof(MultipartPartData)); - int partContentLength = 0; - - S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, - &initial_multipart_callback}; - - S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, - &putObjectDataCallback}; - - S3MultipartCommitHandler commit_handler = { - {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; - - manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); - manager.next_etags_pos = 0; - /* - if (uploadId) { - manager.upload_id = strdup(uploadId); - manager.remaining = contentLength; - if (!try_get_parts_info(tsS3BucketName, key, &manager)) { - fseek(data.infile, -(manager.remaining), 2); - taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END); - contentLength = manager.remaining; - goto upload; - } else { - goto clean; - } - } - */ - do { - S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager); - } while (S3_status_is_retryable(manager.status) && should_retry()); - - if (manager.upload_id == 0 || manager.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - - upload: - todoContentLength -= chunk_size * manager.next_etags_pos; - for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) { - partData.manager = &manager; - partData.seq = seq; - if (partData.put_object_data.gb == NULL) { - partData.put_object_data = data; - } - partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength); - // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength); - partData.put_object_data.contentLength = partContentLength; - partData.put_object_data.originalContentLength = partContentLength; - partData.put_object_data.totalContentLength = todoContentLength; - partData.put_object_data.totalOriginalContentLength = totalContentLength; - putProperties.md5 = 0; - do { - S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id, - partContentLength, 0, timeoutMsG, &partData); - } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); - if (partData.put_object_data.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - contentLength -= chunk_size; - todoContentLength -= chunk_size; - } - - int i; - int size = 0; - size += growbuffer_append(&(manager.gb), "", strlen("")); - char buf[256]; - int n; - for (i = 0; i < totalSeq; i++) { - if (!manager.etags[i]) { - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - n = snprintf(buf, sizeof(buf), - "%d" - "%s", - i + 1, manager.etags[i]); - size += growbuffer_append(&(manager.gb), buf, n); - } - size += growbuffer_append(&(manager.gb), "", strlen("")); - manager.remaining = size; - - do { - S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0, - timeoutMsG, &manager); - } while (S3_status_is_retryable(manager.status) && should_retry()); - if (manager.status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); - code = TAOS_SYSTEM_ERROR(EIO); - goto clean; - } - - clean: - if (manager.upload_id) { - taosMemoryFree(manager.upload_id); - } - for (i = 0; i < manager.next_etags_pos; i++) { - taosMemoryFree(manager.etags[i]); - } - growbuffer_destroy(manager.gb); - taosMemoryFree(manager.etags); } if (data.infileFD) { @@ -648,7 +829,7 @@ typedef struct list_bucket_callback_data { char nextMarker[1024]; int keyCount; int allDetails; - SArray * objectArray; + SArray *objectArray; } list_bucket_callback_data; static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount, @@ -693,11 +874,11 @@ static void s3FreeObjectKey(void *pItem) { static SArray *getListByPrefix(const char *prefix) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listBucketCallback}; - const char * marker = 0, *delimiter = 0; + const char *marker = 0, *delimiter = 0; int maxkeys = 0, allDetails = 0; list_bucket_callback_data data; data.objectArray = taosArrayInit(32, sizeof(void *)); @@ -738,7 +919,7 @@ static SArray *getListByPrefix(const char *prefix) { void s3DeleteObjects(const char *object_name[], int nobject) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; for (int i = 0; i < nobject; ++i) { @@ -789,7 +970,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, const char *ifMatch = 0, *ifNotMatch = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &getObjectDataCallback}; @@ -827,7 +1008,7 @@ int32_t s3GetObjectToFile(const char *object_name, char *fileName) { const char *ifMatch = 0, *ifNotMatch = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &getObjectCallback}; @@ -858,7 +1039,7 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { if (objectArray == NULL) return -1; for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) { - char * object = taosArrayGetP(objectArray, i); + char *object = taosArrayGetP(objectArray, i); const char *tmp = strchr(object, '/'); tmp = (tmp == NULL) ? object : tmp + 1; char fileName[PATH_MAX] = {0}; @@ -949,12 +1130,12 @@ static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) { int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { int32_t code = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket, object, file; - cos_table_t * resp_headers; + cos_table_t *resp_headers; // int traffic_limit = 0; cos_pool_create(&p, NULL); @@ -983,18 +1164,19 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { return code; } -int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { +int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_t withcp) { int32_t code = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; - cos_request_options_t * options = NULL; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; cos_string_t bucket, object, file; - cos_table_t * resp_headers; + cos_table_t *resp_headers; int traffic_limit = 0; - cos_table_t * headers = NULL; + cos_table_t *headers = NULL; cos_resumable_clt_params_t *clt_params = NULL; + (void)withcp; cos_pool_create(&p, NULL); options = cos_request_options_create(p); s3InitRequestOptions(options, is_cname); @@ -1025,11 +1207,11 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { } void s3DeleteObjectsByPrefix(const char *prefix_str) { - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; cos_request_options_t *options = NULL; int is_cname = 0; cos_string_t bucket; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_string_t prefix; cos_pool_create(&p, NULL); @@ -1044,10 +1226,10 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) { } void s3DeleteObjects(const char *object_name[], int nobject) { - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; cos_string_t bucket; - cos_table_t * resp_headers = NULL; + cos_table_t *resp_headers = NULL; cos_request_options_t *options = NULL; cos_list_t object_list; cos_list_t deleted_object_list; @@ -1081,14 +1263,14 @@ void s3DeleteObjects(const char *object_name[], int nobject) { bool s3Exists(const char *object_name) { bool ret = false; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; - cos_request_options_t * options = NULL; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t * resp_headers; - cos_table_t * headers = NULL; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; cos_object_exist_status_e object_exist; cos_pool_create(&p, NULL); @@ -1115,15 +1297,15 @@ bool s3Exists(const char *object_name) { bool s3Get(const char *object_name, const char *path) { bool ret = false; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; cos_string_t file; - cos_table_t * resp_headers = NULL; - cos_table_t * headers = NULL; + cos_table_t *resp_headers = NULL; + cos_table_t *headers = NULL; int traffic_limit = 0; //创建内存池 @@ -1159,15 +1341,15 @@ bool s3Get(const char *object_name, const char *path) { int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) { (void)check; int32_t code = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t * resp_headers; - cos_table_t * headers = NULL; - cos_buf_t * content = NULL; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; + cos_buf_t *content = NULL; // cos_string_t file; // int traffic_limit = 0; char range_buf[64]; @@ -1261,7 +1443,7 @@ void s3EvictCache(const char *path, long object_size) { terrno = TAOS_SYSTEM_ERROR(errno); vError("failed to open %s since %s", dir_name, terrstr()); } - SArray * evict_files = taosArrayInit(16, sizeof(SEvictFile)); + SArray *evict_files = taosArrayInit(16, sizeof(SEvictFile)); tdbDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char *name = taosGetDirEntryName(pDirEntry); @@ -1303,13 +1485,13 @@ void s3EvictCache(const char *path, long object_size) { long s3Size(const char *object_name) { long size = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t * resp_headers = NULL; + cos_table_t *resp_headers = NULL; //创建内存池 cos_pool_create(&p, NULL); @@ -1344,7 +1526,7 @@ long s3Size(const char *object_name) { int32_t s3Init() { return 0; } void s3CleanUp() {} int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } -int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; } +int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } diff --git a/source/common/src/cos_cp.c b/source/common/src/cos_cp.c new file mode 100644 index 0000000000..6d37b4d4dc --- /dev/null +++ b/source/common/src/cos_cp.c @@ -0,0 +1,417 @@ +#define ALLOW_FORBID_FUNC + +#include "cos_cp.h" +#include "cJSON.h" +#include "tutil.h" + +int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) { + int32_t code = 0; + + TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_WRITE | TD_FILE_CREATE /* | TD_FILE_TRUNC*/ | TD_FILE_WRITE_THROUGH); + if (!fd) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to open %s", __func__, cp_path); + return code; + } + + checkpoint->thefile = fd; + + return code; +} + +void cos_cp_close(TdFilePtr fd) { taosCloseFile(&fd); } +void cos_cp_remove(char const* filepath) { taosRemoveFile(filepath); } + +static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) { + int32_t code = 0; + cJSON const* item2 = NULL; + + cJSON* json = cJSON_Parse(cp_body); + if (NULL == json) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to parse json", __func__); + goto _exit; + } + + cJSON const* item = cJSON_GetObjectItem(json, "ver"); + if (!cJSON_IsNumber(item) || item->valuedouble != 1) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to parse json ver: %f", __func__, item->valuedouble); + goto _exit; + } + + item = cJSON_GetObjectItem(json, "type"); + if (!cJSON_IsNumber(item)) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to parse json", __func__); + goto _exit; + } + cp->cp_type = item->valuedouble; + + item = cJSON_GetObjectItem(json, "md5"); + if (cJSON_IsString(item)) { + memcpy(cp->md5, item->valuestring, strlen(item->valuestring)); + } + + item = cJSON_GetObjectItem(json, "upload_id"); + if (cJSON_IsString(item)) { + strncpy(cp->upload_id, item->valuestring, 128); + } + + item2 = cJSON_GetObjectItem(json, "file"); + if (cJSON_IsObject(item2)) { + item = cJSON_GetObjectItem(item2, "size"); + if (cJSON_IsNumber(item)) { + cp->file_size = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "lastmodified"); + if (cJSON_IsNumber(item)) { + cp->file_last_modified = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "path"); + if (cJSON_IsString(item)) { + strncpy(cp->file_path, item->valuestring, TSDB_FILENAME_LEN); + } + + item = cJSON_GetObjectItem(item2, "file_md5"); + if (cJSON_IsString(item)) { + strncpy(cp->file_md5, item->valuestring, 64); + } + } + + item2 = cJSON_GetObjectItem(json, "object"); + if (cJSON_IsObject(item2)) { + item = cJSON_GetObjectItem(item2, "object_size"); + if (cJSON_IsNumber(item)) { + cp->object_size = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "object_name"); + if (cJSON_IsString(item)) { + strncpy(cp->object_name, item->valuestring, 128); + } + + item = cJSON_GetObjectItem(item2, "object_last_modified"); + if (cJSON_IsString(item)) { + strncpy(cp->object_last_modified, item->valuestring, 64); + } + + item = cJSON_GetObjectItem(item2, "object_etag"); + if (cJSON_IsString(item)) { + strncpy(cp->object_etag, item->valuestring, 128); + } + } + + item2 = cJSON_GetObjectItem(json, "cpparts"); + if (cJSON_IsObject(item2)) { + item = cJSON_GetObjectItem(item2, "number"); + if (cJSON_IsNumber(item)) { + cp->part_num = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "size"); + if (cJSON_IsNumber(item)) { + cp->part_size = item->valuedouble; + } + + item2 = cJSON_GetObjectItem(item2, "parts"); + if (cJSON_IsArray(item2) && cp->part_num > 0) { + cJSON_ArrayForEach(item, item2) { + cJSON const* item3 = cJSON_GetObjectItem(item, "index"); + int32_t index = 0; + if (cJSON_IsNumber(item3)) { + index = item3->valuedouble; + cp->parts[index].index = index; + } + + item3 = cJSON_GetObjectItem(item, "offset"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].offset = item3->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "size"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].size = item3->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "completed"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].completed = item3->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "crc64"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].crc64 = item3->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "etag"); + if (cJSON_IsString(item3)) { + strncpy(cp->parts[index].etag, item3->valuestring, 128); + } + } + } + } + +_exit: + if (json) cJSON_Delete(json); + if (cp_body) taosMemoryFree(cp_body); + + return code; +} + +int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) { + int32_t code = 0; + + TdFilePtr fd = taosOpenFile(filepath, TD_FILE_READ); + if (!fd) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to open %s", __func__, filepath); + goto _exit; + } + + int64_t size = -1; + code = taosStatFile(filepath, &size, NULL, NULL); + if (code) { + uError("ERROR: %s Failed to stat %s", __func__, filepath); + goto _exit; + } + + char* cp_body = taosMemoryMalloc(size + 1); + + int64_t n = taosReadFile(fd, cp_body, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to read %s", __func__, filepath); + goto _exit; + } else if (n != size) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to read %s %" PRId64 "/%" PRId64, __func__, filepath, n, size); + goto _exit; + } + taosCloseFile(&fd); + cp_body[size] = '\0'; + + return cos_cp_parse_body(cp_body, checkpoint); + +_exit: + if (fd) { + taosCloseFile(&fd); + } + if (cp_body) { + taosMemoryFree(cp_body); + } + + return code; +} + +static int32_t cos_cp_save_json(cJSON const* json, SCheckpoint* checkpoint) { + int32_t code = 0; + + char* data = cJSON_PrintUnformatted(json); + if (NULL == data) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + TdFilePtr fp = checkpoint->thefile; + if (taosFtruncateFile(fp, 0) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + if (taosLSeekFile(fp, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + if (taosWriteFile(fp, data, strlen(data)) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + if (taosFsyncFile(fp) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + +_exit: + taosMemoryFree(data); + return code; +} + +int32_t cos_cp_dump(SCheckpoint* cp) { + int32_t code = 0; + int32_t lino = 0; + + cJSON* ojson = NULL; + cJSON* json = cJSON_CreateObject(); + if (!json) return TSDB_CODE_OUT_OF_MEMORY; + + if (NULL == cJSON_AddNumberToObject(json, "ver", 1)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (NULL == cJSON_AddNumberToObject(json, "type", cp->cp_type)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (NULL == cJSON_AddStringToObject(json, "md5", cp->md5)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (NULL == cJSON_AddStringToObject(json, "upload_id", cp->upload_id)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (COS_CP_TYPE_UPLOAD == cp->cp_type) { + ojson = cJSON_AddObjectToObject(json, "file"); + if (!ojson) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->file_size)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(ojson, "lastmodified", cp->file_last_modified)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddStringToObject(ojson, "path", cp->file_path)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddStringToObject(ojson, "file_md5", cp->file_md5)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + } else if (COS_CP_TYPE_DOWNLOAD == cp->cp_type) { + ojson = cJSON_AddObjectToObject(json, "object"); + if (!ojson) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(ojson, "object_size", cp->object_size)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddStringToObject(ojson, "object_name", cp->object_name)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddStringToObject(ojson, "object_last_modified", cp->object_last_modified)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddStringToObject(ojson, "object_etag", cp->object_etag)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + ojson = cJSON_AddObjectToObject(json, "cpparts"); + if (!ojson) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(ojson, "number", cp->part_num)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->part_size)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + cJSON* ajson = cJSON_AddArrayToObject(ojson, "parts"); + if (!ajson) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int i = 0; i < cp->part_num; ++i) { + cJSON* item = cJSON_CreateObject(); + if (!item) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + cJSON_AddItemToArray(ajson, item); + + if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(item, "offset", cp->parts[i].offset)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(item, "size", cp->parts[i].size)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(item, "completed", cp->parts[i].completed)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddNumberToObject(item, "crc64", cp->parts[i].crc64)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (NULL == cJSON_AddStringToObject(item, "etag", cp->parts[i].etag)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + code = cos_cp_save_json(json, cp); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + cJSON_Delete(json); + return code; +} + +void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes) {} + +void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64) { + checkpoint->parts[part_index].completed = 1; + strncpy(checkpoint->parts[part_index].etag, etag, 128); + checkpoint->parts[part_index].crc64 = crc64; +} + +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, + char const* upload_id, int64_t part_size) { + int i = 0; + + checkpoint->cp_type = COS_CP_TYPE_UPLOAD; + strncpy(checkpoint->file_path, filepath, TSDB_FILENAME_LEN); + + checkpoint->file_size = size; + checkpoint->file_last_modified = mtime; + strncpy(checkpoint->upload_id, upload_id, 128); + + checkpoint->part_size = part_size; + for (; i * part_size < size; i++) { + checkpoint->parts[i].index = i; + checkpoint->parts[i].offset = i * part_size; + checkpoint->parts[i].size = TMIN(part_size, (size - i * part_size)); + checkpoint->parts[i].completed = 0; + checkpoint->parts[i].etag[0] = '\0'; + } + checkpoint->part_num = i; +} + +static bool cos_cp_verify_md5(SCheckpoint* cp) { return true; } + +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) { + if (cos_cp_verify_md5(checkpoint) && checkpoint->file_size == size && checkpoint->file_last_modified == mtime) { + return true; + } + + return false; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c7bf9399a0..3842400535 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -53,11 +53,10 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } static int32_t tsdbOpenBCache(STsdb *pTsdb) { - int32_t code = 0; - // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); - int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; - - SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5); + int32_t code = 0; + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + int64_t szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize; + SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5); if (pCache == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -67,8 +66,9 @@ static int32_t tsdbOpenBCache(STsdb *pTsdb) { taosThreadMutexInit(&pTsdb->bMutex, NULL); -_err: pTsdb->bCache = pCache; + +_err: return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index e933b3a7dc..2511fa443b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -529,7 +529,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) { if (taosIsDir(file->aname)) continue; - if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { + if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL && + strncmp(file->aname + strlen(file->aname) - 3, ".cp", 3)) { int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1); } @@ -1113,7 +1114,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa TARRAY2_FOREACH(fs->fSetArr, fset) { int64_t ever = VERSION_MAX; if (pHash) { - int32_t fid = fset->fid; + int32_t fid = fset->fid; STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid)); if (u) { ever = u->sver - 1; @@ -1145,10 +1146,10 @@ int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { return tsdbFS int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, TFileSetRangeArray **fsrArr) { - int32_t code = 0; - STFileSet *fset; + int32_t code = 0; + STFileSet *fset; STFileSetRange *fsr1 = NULL; - SHashObj *pHash = NULL; + SHashObj *pHash = NULL; fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0])); if (fsrArr[0] == NULL) { @@ -1171,7 +1172,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev int64_t ever1 = ever; if (pHash) { - int32_t fid = fset->fid; + int32_t fid = fset->fid; STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid)); if (u) { sver1 = u->sver; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index dc98f46ac5..9ecb2fd5ba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -112,7 +112,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile TSDB_CHECK_CODE(code, lino, _exit); char *object_name = taosDirEntryBaseName(fname); - code = s3PutObjectFromFile2(from->fname, object_name); + code = s3PutObjectFromFile2(from->fname, object_name, 1); TSDB_CHECK_CODE(code, lino, _exit); taosCloseFile(&fdFrom); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e2561de841..9458539a9d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -515,7 +515,7 @@ static int uploadCheckpointToS3(char* id, char* path) { char object[PATH_MAX] = {0}; snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); - if (s3PutObjectFromFile2(filename, object) != 0) { + if (s3PutObjectFromFile2(filename, object, 0) != 0) { taosCloseDir(&pDir); return -1; }