From e9cc54d87b70afdc0f85d2a307894ab1cbd0bf82 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 5 Dec 2023 16:37:28 +0800 Subject: [PATCH] cos_cp/load: load checkpoint from json --- include/common/cos_cp.h | 37 ++-- source/common/src/cos.c | 182 ++++++++++++++++- source/common/src/cos_cp.c | 254 ++++++++++++++++++++++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 12 +- 4 files changed, 442 insertions(+), 43 deletions(-) diff --git a/include/common/cos_cp.h b/include/common/cos_cp.h index 5e80ddde11..fd778b1a1d 100644 --- a/include/common/cos_cp.h +++ b/include/common/cos_cp.h @@ -17,6 +17,7 @@ #define _TD_COMMON_COS_CP_H_ #include "os.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -32,41 +33,39 @@ typedef struct { int64_t offset; // the offset point of part int64_t size; // the size of part int completed; // COS_TRUE completed, COS_FALSE uncompleted - char* etag; // the etag of part, for upload + char etag[128]; // the etag of part, for upload uint64_t crc64; } SCheckpointPart; typedef struct { - char* md5; // the md5 of checkout content - ECpType cp_type; // 1 upload, 2 download - TdFilePtr* thefile; // the handle of checkpoint file + ECpType cp_type; // 0 upload, 1 download + char md5[64]; // the md5 of checkout content + TdFilePtr thefile; // the handle of checkpoint file - char* file_path; // local file path - int64_t file_size; // local file size, for upload - int32_t file_last_modified; // local file last modified time, for upload - char* file_md5; // the md5 of the local file content, for upload, reserved + char file_path[TSDB_FILENAME_LEN]; // local file path + int64_t file_size; // local file size, for upload + int32_t file_last_modified; // local file last modified time, for upload + char file_md5[64]; // md5 of the local file content, for upload, reserved - char* object_name; // object name - int64_t object_size; // object size, for download - char* object_last_modified; // object last modified time, for download - char* object_etag; // object etag, for download + char object_name[128]; // object name + int64_t object_size; // object size, for download + char object_last_modified[64]; // object last modified time, for download + char object_etag[128]; // object etag, for download - char* upload_id; // upload id + char upload_id[128]; // upload id int part_num; // the total number of parts int64_t part_size; // the part size, byte SCheckpointPart* parts; // the parts of local or object, from 0 } SCheckpoint; -void cos_cp_get_path(char const* filepath, char* cp_path); -TdFilePtr cos_cp_open(char const* cp_path, SCheckpoint* checkpoint); -void cos_cp_close(TdFilePtr fd); -void cos_cp_remove(char const* filepath); -bool cos_cp_exist(char const* filepath); +int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint); +void cos_cp_close(TdFilePtr fd); +void cos_cp_remove(char const* filepath); int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint); int32_t cos_cp_dump(SCheckpoint* checkpoint); -void cos_cp_get_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t consume_bytes); +void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes); void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64); void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, char const* upload_id, int64_t part_size); diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 53a59ea3b0..ed562a87cc 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -2,6 +2,7 @@ #include "cos.h" #include "cos_cp.h" +#include "tdef.h" extern char tsS3Endpoint[]; extern char tsS3AccessKeyId[]; @@ -324,6 +325,17 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti return S3StatusOK; } +S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) { + responsePropertiesCallbackNull(properties, callbackData); + + MultipartPartData *data = (MultipartPartData *)callbackData; + int seq = data->seq; + const char *etag = properties->eTag; + data->manager->etags[seq - 1] = strdup(etag); + // data->manager->next_etags_pos = seq; + return S3StatusOK; +} + static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackData) { UploadManager *manager = (UploadManager *)callbackData; int ret = 0; @@ -580,14 +592,172 @@ clean: return code; } -static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, char const *object_name, - int64_t contentLength, S3PutProperties *put_prop, +static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime, + char const *object_name, int64_t contentLength, S3PutProperties *put_prop, put_object_callback_data *data) { - /* - static int32_t s3PutObjectFromFileWithCp(const char *file, int64_t size, int32_t lmtime, const char *object) { - */ int32_t code = 0; + uint64_t totalContentLength = contentLength; + // uint64_t todoContentLength = contentLength; + UploadManager manager = {0}; + + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + + bool need_init_upload = true; + char file_cp_path[TSDB_FILENAME_LEN]; + snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + + SCheckpoint cp = {0}; + cp.parts = taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + + if (taosCheckExistFile(file_cp_path)) { + if (!cos_cp_load(file_cp_path, &cp) && cos_cp_is_valid_upload(&cp, contentLength, lmtime)) { + manager.upload_id = strdup(cp.upload_id); + need_init_upload = false; + } else { + cos_cp_remove(file_cp_path); + } + } + + if (need_init_upload) { + S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, + &initial_multipart_callback}; + do { + S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + + if (manager.upload_id == 0 || manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + cos_cp_build_upload(&cp, file, contentLength, lmtime, manager.upload_id, chunk_size); + } + + if (cos_cp_open(file_cp_path, &cp)) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + int part_num = 0; + int64_t consume_bytes = 0; + // SCheckpointPart *parts = taosMemoryCalloc(cp.part_num, sizeof(SCheckpointPart)); + // cos_cp_get_undo_parts(&cp, &part_num, parts, &consume_bytes); + + MultipartPartData partData; + memset(&partData, 0, sizeof(MultipartPartData)); + int partContentLength = 0; + + S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallbackWithCp, &responseCompleteCallback}, + &putObjectDataCallback}; + + S3MultipartCommitHandler commit_handler = { + {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + + manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *)); + manager.next_etags_pos = 0; + +upload: + // todoContentLength -= chunk_size * manager.next_etags_pos; + for (int i = 0; i < cp.part_num; ++i) { + if (cp.parts[i].completed) { + continue; + } + + int seq = cp.parts[i].index + 1; + + partData.manager = &manager; + partData.seq = seq; + if (partData.put_object_data.gb == NULL) { + partData.put_object_data = *data; + } + + partContentLength = cp.parts[i].size; + partData.put_object_data.contentLength = partContentLength; + partData.put_object_data.originalContentLength = partContentLength; + // partData.put_object_data.totalContentLength = todoContentLength; + partData.put_object_data.totalOriginalContentLength = totalContentLength; + put_prop->md5 = 0; + do { + S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id, + partContentLength, 0, timeoutMsG, &partData); + } while (S3_status_is_retryable(partData.put_object_data.status) && should_retry()); + if (partData.put_object_data.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + + //(void)cos_cp_dump(&cp); + goto clean; + } + + if (!manager.etags[seq - 1]) { + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + cos_cp_update(&cp, cp.parts[seq - 1].index, manager.etags[seq - 1], 0); + (void)cos_cp_dump(&cp); + + contentLength -= chunk_size; + // todoContentLength -= chunk_size; + } + + cos_cp_close(cp.thefile); + + int size = 0; + size += growbuffer_append(&(manager.gb), "", strlen("")); + char buf[256]; + int n; + for (int i = 0; i < cp.part_num; ++i) { + n = snprintf(buf, sizeof(buf), + "%d" + "%s", + // i + 1, manager.etags[i]); + cp.parts[i].index + 1, cp.parts[i].etag); + size += growbuffer_append(&(manager.gb), buf, n); + } + size += growbuffer_append(&(manager.gb), "", strlen("")); + manager.remaining = size; + + do { + S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0, + timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + if (manager.status != S3StatusOK) { + s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + cos_cp_remove(file_cp_path); + +clean: + /* + if (parts) { + taosMemoryFree(parts); + } + */ + if (cp.parts) { + taosMemoryFree(cp.parts); + } + if (manager.upload_id) { + taosMemoryFree(manager.upload_id); + } + for (int i = 0; i < cp.part_num; ++i) { + if (manager.etags[i]) { + taosMemoryFree(manager.etags[i]); + } + } + taosMemoryFree(manager.etags); + growbuffer_destroy(manager.gb); + return code; } @@ -633,7 +803,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data); } else { if (withcp) { - code = s3PutObjectFromFileWithCp(&bucketContext, object_name, contentLength, &putProperties, &data); + code = s3PutObjectFromFileWithCp(&bucketContext, file, lmtime, object_name, contentLength, &putProperties, &data); } else { code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data); } diff --git a/source/common/src/cos_cp.c b/source/common/src/cos_cp.c index 8899c2d9f2..6d742c09ce 100644 --- a/source/common/src/cos_cp.c +++ b/source/common/src/cos_cp.c @@ -1,17 +1,247 @@ #define ALLOW_FORBID_FUNC #include "cos_cp.h" +#include "cJSON.h" -void cos_cp_get_path(char const* filepath, char* cp_path) {} -TdFilePtr cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) { return NULL; } -void cos_cp_close(TdFilePtr fd) {} -void cos_cp_remove(char const* filepath) {} -bool cos_cp_exist(char const* filepath) { return true; } +int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) { + int32_t code = 0; -int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) { return 0; } -int32_t cos_cp_dump(SCheckpoint* checkpoint) { return 0; } -void cos_cp_get_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t consume_bytes) {} -void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64) {} -void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, - char const* upload_id, int64_t part_size) {} -bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) { return true; } + TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_READ); + if (!fd) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to open %s", __func__, cp_path); + return code; + } + + checkpoint->thefile = fd; + + return code; +} + +void cos_cp_close(TdFilePtr fd) { taosCloseFile(&fd); } +void cos_cp_remove(char const* filepath) { taosRemoveFile(filepath); } + +static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) { + int32_t code = 0; + cJSON const* item2 = NULL; + + cJSON* json = cJSON_Parse(cp_body); + if (NULL == json) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to parse json", __func__); + goto _exit; + } + + cJSON const* item = cJSON_GetObjectItem(json, "ver"); + if (!cJSON_IsNumber(item) || item->valuedouble != 1) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to parse json ver: %f", __func__, item->valuedouble); + goto _exit; + } + + item = cJSON_GetObjectItem(json, "type"); + if (!cJSON_IsNumber(item)) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to parse json", __func__); + goto _exit; + } + cp->cp_type = item->valuedouble; + + item = cJSON_GetObjectItem(json, "md5"); + if (cJSON_IsString(item)) { + memcpy(cp->md5, item->valuestring, strlen(item->valuestring)); + } + + item2 = cJSON_GetObjectItem(json, "file"); + if (cJSON_IsObject(item2)) { + item = cJSON_GetObjectItem(item2, "size"); + if (cJSON_IsNumber(item)) { + cp->file_size = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "lastmodified"); + if (cJSON_IsNumber(item)) { + cp->file_last_modified = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "path"); + if (cJSON_IsString(item)) { + strncpy(cp->file_path, item->valuestring, TSDB_FILENAME_LEN); + } + + item = cJSON_GetObjectItem(item2, "file_md5"); + if (cJSON_IsString(item)) { + strncpy(cp->file_md5, item->valuestring, 64); + } + } + + item2 = cJSON_GetObjectItem(json, "object"); + if (cJSON_IsObject(item2)) { + item = cJSON_GetObjectItem(item2, "object_size"); + if (cJSON_IsNumber(item)) { + cp->object_size = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "object_name"); + if (cJSON_IsString(item)) { + strncpy(cp->object_name, item->valuestring, 128); + } + + item = cJSON_GetObjectItem(item2, "object_last_modified"); + if (cJSON_IsString(item)) { + strncpy(cp->object_last_modified, item->valuestring, 64); + } + + item = cJSON_GetObjectItem(item2, "object_etag"); + if (cJSON_IsString(item)) { + strncpy(cp->object_etag, item->valuestring, 128); + } + } + + item2 = cJSON_GetObjectItem(json, "cpparts"); + if (cJSON_IsObject(item2)) { + item = cJSON_GetObjectItem(item2, "number"); + if (cJSON_IsNumber(item)) { + cp->part_num = item->valuedouble; + } + + item = cJSON_GetObjectItem(item2, "size"); + if (cJSON_IsNumber(item)) { + cp->part_size = item->valuedouble; + } + + item2 = cJSON_GetObjectItem(json, "parts"); + if (cJSON_IsArray(item2) && cp->part_num > 0) { + cJSON_ArrayForEach(item, item2) { + cJSON const* item3 = cJSON_GetObjectItem(item, "index"); + int32_t index = 0; + if (cJSON_IsNumber(item3)) { + index = item->valuedouble; + cp->parts[index].index = index; + } + + item3 = cJSON_GetObjectItem(item, "offset"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].offset = item->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "size"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].size = item->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "completed"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].completed = item->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "crc64"); + if (cJSON_IsNumber(item3)) { + cp->parts[index].crc64 = item->valuedouble; + } + + item3 = cJSON_GetObjectItem(item, "etag"); + if (cJSON_IsString(item)) { + strncpy(cp->parts[index].etag, item->valuestring, 128); + } + } + } + } + +_exit: + if (json) cJSON_Delete(json); + if (cp_body) taosMemoryFree(cp_body); + + return code; +} + +int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) { + int32_t code = 0; + + TdFilePtr fd = taosOpenFile(filepath, TD_FILE_READ); + if (!fd) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to open %s", __func__, filepath); + goto _exit; + } + + int64_t size = -1; + code = taosStatFile(filepath, &size, NULL, NULL); + if (code) { + uError("ERROR: %s Failed to stat %s", __func__, filepath); + goto _exit; + } + + char* cp_body = taosMemoryMalloc(size + 1); + + int64_t n = taosReadFile(fd, cp_body, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to read %s", __func__, filepath); + goto _exit; + } else if (n != size) { + code = TSDB_CODE_FILE_CORRUPTED; + uError("ERROR: %s Failed to read %s %" PRId64 "/%" PRId64, __func__, filepath, n, size); + goto _exit; + } + taosCloseFile(&fd); + cp_body[size] = '\0'; + + return cos_cp_parse_body(cp_body, checkpoint); + +_exit: + if (fd) { + taosCloseFile(&fd); + } + if (cp_body) { + taosMemoryFree(cp_body); + } + + return code; +} + +int32_t cos_cp_dump(SCheckpoint* checkpoint) { + int32_t code = 0; + + return code; +} + +void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes) {} + +void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64) { + checkpoint->parts[part_index].completed = 1; + strncpy(checkpoint->parts[part_index].etag, etag, 128); + checkpoint->parts[part_index].crc64 = crc64; +} + +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, + char const* upload_id, int64_t part_size) { + int i = 0; + + checkpoint->cp_type = COS_CP_TYPE_UPLOAD; + strncpy(checkpoint->file_path, filepath, TSDB_FILENAME_LEN); + + checkpoint->file_size = size; + checkpoint->file_last_modified = mtime; + strncpy(checkpoint->upload_id, upload_id, 128); + + checkpoint->part_size = part_size; + for (; i * part_size < size; i++) { + checkpoint->parts[i].index = i; + checkpoint->parts[i].offset = i * part_size; + checkpoint->parts[i].size = TMIN(part_size, (size - i * part_size)); + checkpoint->parts[i].completed = 0; + checkpoint->parts[i].etag[0] = '\0'; + } + checkpoint->part_num = i; +} + +static bool cos_cp_verify_md5(SCheckpoint* cp) { return true; } + +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) { + if (cos_cp_verify_md5(checkpoint) && checkpoint->file_size == size && checkpoint->file_last_modified == mtime) { + return true; + } + + return false; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5076599753..f55d331a66 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -53,11 +53,10 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } static int32_t tsdbOpenBCache(STsdb *pTsdb) { - int32_t code = 0; - // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); - int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; - - SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5); + int32_t code = 0; + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + int64_t szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize; + SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5); if (pCache == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -67,8 +66,9 @@ static int32_t tsdbOpenBCache(STsdb *pTsdb) { taosThreadMutexInit(&pTsdb->bMutex, NULL); -_err: pTsdb->bCache = pCache; + +_err: return code; }