From 8b53f766911a336715c72e03272085a5cf5ebeb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 15:56:34 +0800 Subject: [PATCH] fix(stream): discard the repeatly recv checkpoint-source msg. --- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/vnode/src/tq/tq.c | 14 ++++++- source/libs/stream/inc/streamInt.h | 24 ++++++----- source/libs/stream/src/streamBackendRocksdb.c | 12 +++--- source/libs/stream/src/streamCheckpoint.c | 40 +++++++++---------- source/libs/stream/src/streamQueue.c | 1 - 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d34e23c0ba..ba96dc0adf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 300; +int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 521f359f73..844aae0f57 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -847,7 +847,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { if (pIter == NULL) break; maxChkptId = TMAX(maxChkptId, pStream->checkpointId); - mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, + mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, pStream->checkpointId); sdbRelease(pSdb, pStream); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 567d61e27a..cf985cd164 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1156,14 +1156,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { - tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 - " transId:%d already received, ignore this msg and continue process checkpoint", + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 + " transId:%d already handled, ignore msg and continue process checkpoint", pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; + } else { // checkpoint already finished, and not in checkpoint status + if (req.checkpointId == pTask->chkInfo.checkpointId) { + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 + " transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + + return TSDB_CODE_SUCCESS; + } } streamProcessCheckpointSourceReq(pTask, &req); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index b3ed86cff8..44fb0706b8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -69,6 +69,7 @@ typedef struct { int64_t chkpId; char* dbPrefixPath; } SStreamTaskSnap; + struct STokenBucket { int32_t numCapacity; // total capacity, available token per second int32_t numOfToken; // total available tokens @@ -148,18 +149,19 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); -typedef enum UPLOAD_TYPE { - UPLOAD_DISABLE = -1, - UPLOAD_S3 = 0, - UPLOAD_RSYNC = 1, -} UPLOAD_TYPE; +typedef enum ECHECKPOINT_BACKUP_TYPE { + DATA_UPLOAD_DISABLE = -1, + DATA_UPLOAD_S3 = 0, + DATA_UPLOAD_RSYNC = 1, +} ECHECKPOINT_BACKUP_TYPE; -UPLOAD_TYPE getUploadType(); -int uploadCheckpoint(char* id, char* path); -int downloadCheckpoint(char* id, char* path); -int deleteCheckpoint(char* id); -int deleteCheckpointFile(char* id, char* name); -int downloadCheckpointByName(char* id, char* fname, char* dstName); +ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); + +int32_t streamTaskBackupCheckpoint(char* id, char* path); +int32_t downloadCheckpoint(char* id, char* path); +int32_t deleteCheckpoint(char* id); +int32_t deleteCheckpointFile(char* id, char* name); +int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 06093cbaf8..123458f372 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -376,10 +376,10 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char return code; } int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - UPLOAD_TYPE type = getUploadType(); - if (type == UPLOAD_S3) { + ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); + if (type == DATA_UPLOAD_S3) { return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath); - } else if (type == UPLOAD_RSYNC) { + } else if (type == DATA_UPLOAD_RSYNC) { return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath); } return -1; @@ -2111,11 +2111,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { STaskDbWrapper* pDb = arg; - UPLOAD_TYPE utype = type; + ECHECKPOINT_BACKUP_TYPE utype = type; - if (utype == UPLOAD_RSYNC) { + if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); - } else if (utype == UPLOAD_S3) { + } else if (utype == DATA_UPLOAD_S3) { return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); } return -1; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8efd661d12..e6d7c2fde8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -19,7 +19,7 @@ #include "streamInt.h" typedef struct { - UPLOAD_TYPE type; + ECHECKPOINT_BACKUP_TYPE type; char* taskId; int64_t chkpId; @@ -416,7 +416,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) { return code; } -int32_t doUploadChkp(void* param) { +int32_t uploadCheckpointData(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; int32_t code = 0; @@ -426,13 +426,13 @@ int32_t doUploadChkp(void* param) { (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } - if (arg->type == UPLOAD_S3) { + if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); } } - if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) { + if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); } @@ -459,8 +459,8 @@ int32_t doUploadChkp(void* param) { int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { // async upload - UPLOAD_TYPE type = getUploadType(); - if (type == UPLOAD_DISABLE) { + ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); + if (type == DATA_UPLOAD_DISABLE) { return 0; } @@ -474,7 +474,7 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { arg->chkpId = chkpId; arg->pTask = pTask; - return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); + return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { @@ -558,7 +558,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -static int uploadCheckpointToS3(char* id, char* path) { +static int32_t uploadCheckpointToS3(char* id, char* path) { TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; @@ -590,8 +590,8 @@ static int uploadCheckpointToS3(char* id, char* path) { return 0; } -static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { - int code = 0; +static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { + int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { @@ -601,19 +601,19 @@ static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { return code; } -UPLOAD_TYPE getUploadType() { +ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { if (strlen(tsSnodeAddress) != 0) { - return UPLOAD_RSYNC; + return DATA_UPLOAD_RSYNC; } else if (tsS3StreamEnabled) { - return UPLOAD_S3; + return DATA_UPLOAD_S3; } else { - return UPLOAD_DISABLE; + return DATA_UPLOAD_DISABLE; } } -int uploadCheckpoint(char* id, char* path) { +int32_t streamTaskBackupCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("uploadCheckpoint parameters invalid"); + stError("streamTaskBackupCheckpoint parameters invalid"); return -1; } if (strlen(tsSnodeAddress) != 0) { @@ -625,7 +625,7 @@ int uploadCheckpoint(char* id, char* path) { } // fileName: CURRENT -int downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; @@ -638,7 +638,7 @@ int downloadCheckpointByName(char* id, char* fname, char* dstName) { return 0; } -int downloadCheckpoint(char* id, char* path) { +int32_t downloadCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("downloadCheckpoint parameters invalid"); return -1; @@ -651,7 +651,7 @@ int downloadCheckpoint(char* id, char* path) { return 0; } -int deleteCheckpoint(char* id) { +int32_t deleteCheckpoint(char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); return -1; @@ -664,7 +664,7 @@ int deleteCheckpoint(char* id) { return 0; } -int deleteCheckpointFile(char* id, char* name) { +int32_t deleteCheckpointFile(char* id, char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); char* tmp = object; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9f79501471..9e872a1aff 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -17,7 +17,6 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec -#define WAIT_FOR_DURATION 10 // todo refactor: // read data from input queue