From a3fdb6dd59a98d63b3761e09b01c3d615302a6a9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 10 Nov 2023 16:06:08 +0800 Subject: [PATCH] Merge branch '3.0' into enh/refactorBackend --- source/common/src/cos.c | 89 ++++++++++--------- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 55 +++++++++--- source/libs/stream/src/streamCheckpoint.c | 4 +- 4 files changed, 90 insertions(+), 60 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 0b6b0db885..c0b44a5df0 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -70,7 +70,7 @@ static void s3PrintError(const char *func, S3Status status, char error_details[] if (status < S3StatusErrorAccessDenied) { uError("%s: %s", __func__, S3_get_status_name(status)); } else { - uError("%s: %s, %s", __func__, S3_get_status_name(status), error_details); + uError("%s: %s, %s, %d", __func__, S3_get_status_name(status), error_details, status); } } @@ -85,7 +85,7 @@ typedef struct { char err_msg[128]; S3Status status; uint64_t content_length; - char *buf; + char * buf; int64_t buf_pos; } TS3SizeCBD; @@ -271,7 +271,7 @@ typedef struct MultipartPartData { S3Status status; put_object_callback_data put_object_data; int seq; - UploadManager *manager; + UploadManager * manager; } MultipartPartData; static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { @@ -316,7 +316,7 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti MultipartPartData *data = (MultipartPartData *)callbackData; int seq = data->seq; - const char *etag = properties->eTag; + const char * etag = properties->eTag; data->manager->etags[seq - 1] = strdup(etag); data->manager->next_etags_pos = seq; return S3StatusOK; @@ -449,10 +449,10 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { int32_t code = 0; const char *key = object; // const char *uploadId = 0; - const char *filename = 0; + const char * filename = 0; uint64_t contentLength = 0; - const char *cacheControl = 0, *contentType = 0, *md5 = 0; - const char *contentDispositionFilename = 0, *contentEncoding = 0; + const char * cacheControl = 0, *contentType = 0, *md5 = 0; + const char * contentDispositionFilename = 0, *contentEncoding = 0; int64_t expires = -1; S3CannedAcl cannedAcl = S3CannedAclPrivate; int metaPropertiesCount = 0; @@ -466,6 +466,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { data.gb = 0; data.noStatus = noStatus; + uError("ERROR: %s stat file %s: ", __func__, file); if (taosStatFile(file, &contentLength, NULL, NULL) < 0) { uError("ERROR: %s Failed to stat file %s: ", __func__, file); code = TAOS_SYSTEM_ERROR(errno); @@ -637,7 +638,7 @@ typedef struct list_bucket_callback_data { char nextMarker[1024]; int keyCount; int allDetails; - SArray *objectArray; + SArray * objectArray; } list_bucket_callback_data; static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount, @@ -682,11 +683,11 @@ static void s3FreeObjectKey(void *pItem) { static SArray *getListByPrefix(const char *prefix) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listBucketCallback}; - const char *marker = 0, *delimiter = 0; + const char * marker = 0, *delimiter = 0; int maxkeys = 0, allDetails = 0; list_bucket_callback_data data; data.objectArray = taosArrayInit(32, sizeof(void *)); @@ -725,7 +726,7 @@ static SArray *getListByPrefix(const char *prefix) { void s3DeleteObjects(const char *object_name[], int nobject) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; for (int i = 0; i < nobject; ++i) { @@ -776,7 +777,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, const char *ifMatch = 0, *ifNotMatch = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &getObjectDataCallback}; @@ -814,7 +815,7 @@ int32_t s3GetObjectToFile(const char *object_name, char *fileName) { const char *ifMatch = 0, *ifNotMatch = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &getObjectCallback}; @@ -845,7 +846,7 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { if (objectArray == NULL) return -1; for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) { - char *object = taosArrayGetP(objectArray, i); + char * object = taosArrayGetP(objectArray, i); const char *tmp = strchr(object, '/'); tmp = (tmp == NULL) ? object : tmp + 1; char fileName[PATH_MAX] = {0}; @@ -934,12 +935,12 @@ static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) { int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { int32_t code = 0; - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; - cos_status_t *s = NULL; + cos_status_t * s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket, object, file; - cos_table_t *resp_headers; + cos_table_t * resp_headers; // int traffic_limit = 0; cos_pool_create(&p, NULL); @@ -970,14 +971,14 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { int32_t code = 0; - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; - cos_status_t *s = NULL; - cos_request_options_t *options = NULL; + cos_status_t * s = NULL; + cos_request_options_t * options = NULL; cos_string_t bucket, object, file; - cos_table_t *resp_headers; + cos_table_t * resp_headers; int traffic_limit = 0; - cos_table_t *headers = NULL; + cos_table_t * headers = NULL; cos_resumable_clt_params_t *clt_params = NULL; cos_pool_create(&p, NULL); @@ -1010,11 +1011,11 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { } void s3DeleteObjectsByPrefix(const char *prefix_str) { - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; cos_request_options_t *options = NULL; int is_cname = 0; cos_string_t bucket; - cos_status_t *s = NULL; + cos_status_t * s = NULL; cos_string_t prefix; cos_pool_create(&p, NULL); @@ -1029,10 +1030,10 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) { } void s3DeleteObjects(const char *object_name[], int nobject) { - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; cos_string_t bucket; - cos_table_t *resp_headers = NULL; + cos_table_t * resp_headers = NULL; cos_request_options_t *options = NULL; cos_list_t object_list; cos_list_t deleted_object_list; @@ -1066,14 +1067,14 @@ void s3DeleteObjects(const char *object_name[], int nobject) { bool s3Exists(const char *object_name) { bool ret = false; - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; - cos_status_t *s = NULL; - cos_request_options_t *options = NULL; + cos_status_t * s = NULL; + cos_request_options_t * options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t *resp_headers; - cos_table_t *headers = NULL; + cos_table_t * resp_headers; + cos_table_t * headers = NULL; cos_object_exist_status_e object_exist; cos_pool_create(&p, NULL); @@ -1100,15 +1101,15 @@ bool s3Exists(const char *object_name) { bool s3Get(const char *object_name, const char *path) { bool ret = false; - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; - cos_status_t *s = NULL; + cos_status_t * s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; cos_string_t file; - cos_table_t *resp_headers = NULL; - cos_table_t *headers = NULL; + cos_table_t * resp_headers = NULL; + cos_table_t * headers = NULL; int traffic_limit = 0; //创建内存池 @@ -1144,15 +1145,15 @@ bool s3Get(const char *object_name, const char *path) { int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) { (void)check; int32_t code = 0; - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; - cos_status_t *s = NULL; + cos_status_t * s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t *resp_headers; - cos_table_t *headers = NULL; - cos_buf_t *content = NULL; + cos_table_t * resp_headers; + cos_table_t * headers = NULL; + cos_buf_t * content = NULL; // cos_string_t file; // int traffic_limit = 0; char range_buf[64]; @@ -1246,7 +1247,7 @@ void s3EvictCache(const char *path, long object_size) { terrno = TAOS_SYSTEM_ERROR(errno); vError("failed to open %s since %s", dir_name, terrstr()); } - SArray *evict_files = taosArrayInit(16, sizeof(SEvictFile)); + SArray * evict_files = taosArrayInit(16, sizeof(SEvictFile)); tdbDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char *name = taosGetDirEntryName(pDirEntry); @@ -1288,13 +1289,13 @@ void s3EvictCache(const char *path, long object_size) { long s3Size(const char *object_name) { long size = 0; - cos_pool_t *p = NULL; + cos_pool_t * p = NULL; int is_cname = 0; - cos_status_t *s = NULL; + cos_status_t * s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t *resp_headers = NULL; + cos_table_t * resp_headers = NULL; //创建内存池 cos_pool_create(&p, NULL); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index cd620ed5b7..bce51a9d2f 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -254,5 +254,5 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); -int32_t taskDbGenChkpUploadPath(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj); +int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 519019892f..604f5f7c2a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1736,7 +1736,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return 0; } -int32_t taskDbGenChkpUploadPath(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) { +int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) { STaskDbWrapper* pDb = arg; UPLOAD_TYPE utype = type; @@ -3363,13 +3363,27 @@ _err: return code >= 0 ? 0 : -1; } +int32_t isBkdDataMeta(char* name) { + const char* pCurrent = "CURRENT"; + int32_t currLen = strlen(pCurrent); + + const char* pManifest = "MANIFEST-"; + int32_t maniLen = strlen(pManifest); + + if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { + return 1; + } else if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { + return 1; + } + return 0; +} int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { int32_t code = 0; size_t len = 0; void* pIter = taosHashIterate(p2, NULL); while (pIter) { char* name = taosHashGetKey(pIter, &len); - if (!taosHashGet(p1, name, len)) { + if (!isBkdDataMeta(name) && !taosHashGet(p1, name, len)) { char* p = taosStrdup(name); taosArrayPush(diff, &p); } @@ -3431,13 +3445,22 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { continue; } } + + void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); + while (pIter) { + char *name = taosHashGetKey(pIter, NULL); + stError("curr file list: %s", name); + pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); + } + if (p->init == 0) { void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); while (pIter) { size_t len; char* name = taosHashGetKey(pIter, &len); - if (name != NULL && len != 0) { - taosArrayPush(p->pAdd, &name); + if (name != NULL && !isBkdDataMeta(name)) { + char* fname = taosStrdup(name); + taosArrayPush(p->pAdd, &fname); } pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); } @@ -3538,12 +3561,12 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { goto _ERROR; } - code = taosMkDir(dstDir); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr()); - goto _ERROR; - } + // code = taosMkDir(dstDir); + // if (code != 0) { + // terrno = TAOS_SYSTEM_ERROR(errno); + // stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr()); + // goto _ERROR; + // } // clear current file memset(dstBuf, 0, len); @@ -3563,7 +3586,9 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); - taosCopyFile(srcBuf, dstBuf); + if (taosCopyFile(srcBuf, dstBuf) < 0) { + stError("failed to copy file from %s to %s", srcBuf, dstBuf); + } } // del file in $name for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { @@ -3580,14 +3605,18 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { memset(dstBuf, 0, len); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); - taosCopyFile(srcBuf, dstBuf); + if (taosCopyFile(srcBuf, dstBuf) < 0) { + stError("failed to copy file from %s to %s", srcBuf, dstBuf); + } // copy manifest file to dst dir memset(srcBuf, 0, len); memset(dstBuf, 0, len); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); - taosCopyFile(srcBuf, dstBuf); + if (taosCopyFile(srcBuf, dstBuf) < 0) { + stError("failed to copy file from %s to %s", srcBuf, dstBuf); + } // clear delta data buf taosArrayClearP(p->pAdd, taosMemoryFree); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 03696e1122..854cdc12b1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -341,12 +341,12 @@ int32_t doUploadChkp(void* param) { char* path = NULL; int32_t code = 0; - if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, + if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) { - stError("s-task:%s faile to upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); } taosMemoryFree(path);