From bf16c596a6a280a35e34e1aa9da962d126392e7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 08:46:09 +0800 Subject: [PATCH 01/17] fix(stream): fix syntax error. --- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 15c5272f3c..e4a87ba353 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2207,10 +2207,10 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32); - sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId); + sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { - cleanDir(temp, NULL); + cleanDir(temp, ""); } else { taosMkDir(temp); } From 8fd9baf6f576600747aae11dd574ef726c68321e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 09:11:16 +0800 Subject: [PATCH 02/17] fix(stream):synchronized upload checkpoint data to snode. --- source/libs/stream/src/streamCheckpoint.c | 103 ++++++++++------------ 1 file changed, 48 insertions(+), 55 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index eedd8f20d6..6676a05548 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -562,76 +562,67 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l return code; } -int32_t uploadCheckpointData(void* param) { - SAsyncUploadArg* pParam = param; +int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); - char* taskStr = pParam->taskId ? pParam->taskId : "NULL"; + int64_t now = taosGetTimestampMs(); + SStreamMeta* pMeta = pTask->pMeta; + const char* idStr = pTask->id.idStr; - void* pBackend = taskAcquireDb(pParam->dbRefId); - if (pBackend == NULL) { - stError("s-task:%s failed to acquire db", taskStr); - taosMemoryFree(pParam->taskId); - taosMemoryFree(pParam); - return -1; + if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, + pTask->id.idStr)) != 0) { + stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId); } - if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt, - pParam->chkpId, (int8_t)(pParam->type), &path, toDelFiles)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId); - } - - if (pParam->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId); + if (type == DATA_UPLOAD_S3) { + if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { + stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId); } } if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadCheckpoint(pParam->taskId, path); + code = streamTaskUploadCheckpoint(idStr, path); if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId); + stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { - stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path); + stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path); } } - taskReleaseDb(pParam->dbRefId); - - if (code == 0) { + if (code == TSDB_CODE_SUCCESS) { int32_t size = taosArrayGetSize(toDelFiles); - stDebug("s-task:%s remove redundant %d files", taskStr, size); + stDebug("s-task:%s remove redundant %d files", idStr, size); for (int i = 0; i < size; i++) { char* pName = taosArrayGetP(toDelFiles, i); - code = deleteCheckpointFile(pParam->taskId, pName); + code = deleteCheckpointFile(idStr, pName); if (code != 0) { - stDebug("s-task:%s failed to del file: %s", taskStr, pName); + stDebug("s-task:%s failed to remove file: %s", idStr, pName); break; } } - stDebug("s-task:%s remove redundant files done", taskStr); + stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId); } taosArrayDestroyP(toDelFiles, taosMemoryFree); + double el = (taosGetTimestampMs() - now) / 1000.0; if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path); + stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s", + idStr, checkpointId, el, path); taosRemoveDir(path); } else { - stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId); + stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", + idStr, checkpointId, el); } taosMemoryFree(path); - taosMemoryFree(pParam->taskId); - taosMemoryFree(pParam); - return code; } -int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) { +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -641,15 +632,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI return 0; } - SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); - arg->type = type; - arg->taskId = taosStrdup(taskId); - arg->chkpId = checkpointId; - arg->pTask = pTask; - arg->dbRefId = taskGetDBRef(pTask->pBackend); - arg->pMeta = pTask->pMeta; + int64_t dbRefId = taskGetDBRef(pTask->pBackend); + void* pBackend = taskAcquireDb(dbRefId); + if (pBackend == NULL) { + stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr); + return -1; + } - return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); + int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type); + taskReleaseDb(dbRefId); + + return code; } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { @@ -670,6 +663,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } + // TODO: monitoring the checkpoint-source msg // send check point response to upstream task if (code == TSDB_CODE_SUCCESS) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -679,27 +673,26 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } if (code != TSDB_CODE_SUCCESS) { - // todo: let's retry send rsp to upstream/mnode + // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId, tstrerror(code)); } } + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskRemoteBackupCheckpoint(pTask, ckId); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code)); + } + } else { + stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId); + } + + // TODO: monitoring the checkpoint-report msg // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); - if (code == TSDB_CODE_SUCCESS) { - code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); - } - } else { - stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); - } - } - - // clear the checkpoint info if failed - if (code != TSDB_CODE_SUCCESS) { + } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); @@ -710,7 +703,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } double el = (taosGetTimestampMs() - startTs) / 1000.0; - stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id, + stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id, pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, (code == TSDB_CODE_SUCCESS) ? "succ" : "failed"); From ece139d921d037d97091972690169497cb009cda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 09:22:09 +0800 Subject: [PATCH 03/17] fix(stream): initialize the sink stream task. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 50a52e58c5..def556534c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -997,8 +997,10 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t ETaskStatus status = streamTaskGetStatus(pTask)->state; int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SINK) { + if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) { if (status == TASK_STATUS__UNINIT) { + tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); } streamMetaReleaseTask(pMeta, pTask); return 0; @@ -1025,9 +1027,9 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else { streamTrySchedExec(pTask); } - } else if (status == TASK_STATUS__UNINIT) { - // todo: fill-history task init ? + } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? if (pTask->info.fillHistory == 0) { + tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); } } From 2cb9644471b7ae68ee0420ff720480b04e599f16 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 09:23:29 +0800 Subject: [PATCH 04/17] fix(stream): update the checkpoint info only when the status is normal. --- include/dnode/vnode/tqCommon.h | 2 +- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +- source/libs/stream/inc/streamBackendRocksdb.h | 3 +- source/libs/stream/src/streamBackendRocksdb.c | 10 +-- source/libs/stream/src/streamCheckpoint.c | 70 +++++++++---------- source/libs/stream/src/streamDispatch.c | 4 +- source/libs/stream/src/streamMeta.c | 1 - 10 files changed, 50 insertions(+), 51 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index f0ab2a0e95..4a8fc6260a 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -41,7 +41,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen); void tqSetRestoreVersionInfo(SStreamTask* pTask); int32_t tqExpandStreamTask(SStreamTask* pTask); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 03c42e5c7e..ba366a2e02 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -770,7 +770,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode); int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); -int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq); SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); // stream task state machine, and event handling diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 481033508b..0eb0db002f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -154,7 +154,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_STREAM_TASK_RESUME: return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); case TDMT_STREAM_TASK_UPDATE_CHKPT: - return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 08fdda0e29..07d77905cb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1013,7 +1013,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); + return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen); } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index def556534c..f3ae3b773d 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -640,7 +640,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) { SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; int32_t vgId = pMeta->vgId; @@ -652,13 +652,14 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, in SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL && (*ppTask) != NULL) { - streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq); + streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); } else { // failed to get the task. tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already", vgId, pReq->taskId); } streamMetaWUnLock(pMeta); + // always return success when handling the requirement issued by mnode during transaction. return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 35114a1be6..6b81ac87ee 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -257,7 +257,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); -int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); +int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list, + const char* id); void* taskAcquireDb(int64_t refId); void taskReleaseDb(int64_t refId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e4a87ba353..65ffb17d87 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2181,7 +2181,6 @@ void taskDbDestroy(void* pDb, bool flush) { void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { - int64_t st = taosGetTimestampMs(); int32_t code = -1; int64_t refId = pDb->refId; @@ -2202,7 +2201,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char return code; } -int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) { +int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) { int32_t code = 0; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; @@ -2210,7 +2209,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { - cleanDir(temp, ""); + cleanDir(temp, idStr); } else { taosMkDir(temp); } @@ -2220,7 +2219,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } -int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { + +int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) { int32_t code = -1; STaskDbWrapper* pDb = arg; ECHECKPOINT_BACKUP_TYPE utype = type; @@ -2229,7 +2229,7 @@ int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t typ if (utype == DATA_UPLOAD_RSYNC) { code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path); } else if (utype == DATA_UPLOAD_S3) { - code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); + code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr); } taskDbUnRefChkp(pDb, chkpId); return code; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6676a05548..c7301f09b2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -18,16 +18,6 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" -typedef struct { - ECHECKPOINT_BACKUP_TYPE type; - - char* taskId; - int64_t chkpId; - SStreamTask* pTask; - int64_t dbRefId; - void* pMeta; -} SAsyncUploadArg; - static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); @@ -412,7 +402,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } } -int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) { +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; int32_t code = 0; @@ -429,7 +419,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin pReq->transId); taosThreadMutexUnlock(&pTask->lock); - { // destroy the related fill-history tasks + { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { @@ -446,34 +436,42 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64 - " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, - pInfo->checkpointTime, pReq->checkpointTs); - - if (pStatus->state != TASK_STATUS__DROPPING) { - ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer); - - pInfo->checkpointId = pReq->checkpointId; - pInfo->checkpointVer = pReq->checkpointVer; - pInfo->checkpointTime = pReq->checkpointTs; - - streamTaskClearCheckInfo(pTask, false); - - // todo handle error - if (pStatus->state == TASK_STATUS__CK) { - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - } else { - stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); - } - } else { - stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed", - pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId); + if ((!restored) && (pStatus->state != TASK_STATUS__CK)) { + stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " failed", + pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } + if (!restored) { // during restore procedure, do update checkpoint-info + stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + pInfo->checkpointTime, pReq->checkpointTs); + } else { // not in restore status, must be in checkpoint status + stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + pInfo->checkpointTime, pReq->checkpointTs); + } + + ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && + pInfo->processedVer >= pReq->checkpointVer); + + pInfo->checkpointId = pReq->checkpointId; + pInfo->checkpointVer = pReq->checkpointVer; + pInfo->checkpointTime = pReq->checkpointTs; + + streamTaskClearCheckInfo(pTask, false); + + if (pStatus->state == TASK_STATUS__CK) { + // todo handle error + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + } else { + stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); + } + if (pReq->dropRelHTask) { stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", pReq->taskId, vgId, pReq->hTaskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b17d0206f0..3652de3eba 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -906,9 +906,9 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { tmsgSendRsp(&pInfo->msg); taosArrayClear(pList); - stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); } else { - stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel); } taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e8800c3370..03c7b93f91 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -231,7 +231,6 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { stInfo("vgId:%d stream state need covert backend format", pMeta->vgId); - return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { stError( From f8578b4a698356eb857eb95fd3cc4e5634c24c46 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 09:34:18 +0800 Subject: [PATCH 05/17] fix(stream): fix syntax error. --- source/libs/stream/src/streamCheckpoint.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c7301f09b2..75d2fdb2c7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -683,7 +683,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code)); } } else { - stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId); + stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); } // TODO: monitoring the checkpoint-report msg From aea4254d40d78cebe21ff3afc2fda3f0690110bf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 10:37:06 +0800 Subject: [PATCH 06/17] fix(stream): async start task. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 ++--- source/libs/stream/src/streamCheckpoint.c | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f3ae3b773d..5e28568763 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -854,7 +854,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } else if (pState->state == TASK_STATUS__UNINIT) { tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); ASSERT(pTask->status.downstreamReady == 0); -// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); @@ -1001,7 +1000,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) { if (status == TASK_STATUS__UNINIT) { tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } streamMetaReleaseTask(pMeta, pTask); return 0; @@ -1031,7 +1030,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? if (pTask->info.fillHistory == 0) { tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 75d2fdb2c7..d986a36343 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -683,7 +683,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code)); } } else { - stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); + stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); } // TODO: monitoring the checkpoint-report msg From e7105edaa44d08cac986ea3dc7427dbcb2ab6aea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 14:14:44 +0800 Subject: [PATCH 07/17] fix(stream): disable pause if task is un-init. --- source/dnode/mnode/impl/src/mndStream.c | 44 ++++++++++++++++++++- source/dnode/vnode/src/tqCommon/tqCommon.c | 38 +++++++++++++----- source/libs/stream/src/streamCheckStatus.c | 5 +-- source/libs/stream/src/streamStartHistory.c | 1 - source/libs/stream/src/streamTask.c | 2 +- source/libs/stream/src/streamTaskSm.c | 4 +- 6 files changed, 76 insertions(+), 18 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c9598c4b38..a0fb8ae40a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1075,7 +1075,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); taosThreadMutexUnlock(&execInfo.lock); - return 0; + return true; } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); @@ -1911,9 +1911,51 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { bool updated = taskNodeIsUpdated(pMnode); if (updated) { mError("tasks are not ready for pause, node update detected"); + sdbRelease(pMnode->pSdb, pStream); return -1; } + + { // check for tasks, if tasks are not ready, not allowed to pause + bool found = false; + bool readyToPause = true; + taosThreadMutexLock(&execInfo.lock); + + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->id.streamId != pStream->uid) { + continue; + } + + if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) { + mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%x in checkpoint/uninit status, not ready for pause", + pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId); + readyToPause = false; + } + + found = true; + } + + taosThreadMutexUnlock(&execInfo.lock); + if (!found) { + mError("stream:%s task not report status yet, not ready for pause", pauseReq.name); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + if (!readyToPause) { + mError("stream:%s task not ready for pause yet", pauseReq.name); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + } + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5e28568763..688be61015 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -997,11 +997,19 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t ETaskStatus status = streamTaskGetStatus(pTask)->state; int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) { - if (status == TASK_STATUS__UNINIT) { - tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); - tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); - } + if (level == TASK_LEVEL__SINK) { + ASSERT (status != TASK_STATUS__UNINIT); /*{ +// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); +// +// if (pTask->pBackend == NULL) { // TODO: add test cases for this +// int32_t code = pMeta->expandTaskFn(pTask); +// if (code != TSDB_CODE_SUCCESS) { +// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId); +// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); +// } +// } +// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + }*/ streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1027,11 +1035,21 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else { streamTrySchedExec(pTask); } - } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? - if (pTask->info.fillHistory == 0) { - tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); - tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); - } + } else { + ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ? +// if (pTask->info.fillHistory == 0) { + // tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); + // + // if (pTask->pBackend == NULL) { // TODO: add test cases for this + // int32_t code = pMeta->expandTaskFn(pTask); + // if (code != TSDB_CODE_SUCCESS) { + // tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId); + // streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); + // } + // } + // int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); +// } +// } } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index b64e0bb6d2..1728147c11 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -58,8 +58,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage < stage) { - stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 - ", prev:%" PRId64, + stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64, id, upstreamTaskId, vgId, stage, pInfo->stage); // record the checkpoint failure id and sent to mnode taosThreadMutexLock(&pTask->lock); @@ -170,13 +169,13 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); if (pTask != NULL) { pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); - streamMetaReleaseTask(pMeta, pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask); stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); + streamMetaReleaseTask(pMeta, pTask); } else { pRsp->status = TASK_DOWNSTREAM_NOT_READY; stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 050d88aaf1..adf4c3bef9 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -240,7 +240,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { checkFillhistoryTaskStatus(pTask, pHisTask); } - } streamMetaReleaseTask(pMeta, pHisTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7d869ce538..4a6e98f4a0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -836,7 +836,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { SStreamMeta* pMeta = pTask->pMeta; int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num); // in case of fill-history task, stop the tsdb file scan operation. if (pTask->info.fillHistory == 1) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 82ea2f88ef..75d62ff324 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -623,9 +623,9 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL); From 119001e30ba4d52ab9c17a19c6209ca6c2c29c94 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 15:11:48 +0800 Subject: [PATCH 08/17] fix(stream): update the retrieve checkpoint-trigger msg. --- include/libs/stream/tstream.h | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 ++++---- source/libs/stream/src/streamCheckpoint.c | 13 ++++++++++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ba366a2e02..bf223e8c28 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -681,7 +681,8 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, + SRpcHandleInfo* pInfo, int32_t code); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 688be61015..0f52483149 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -883,7 +883,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready", pTask->id.idStr, (int32_t)pReq->downstreamTaskId); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -901,7 +901,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) // re-send the lost checkpoint-trigger msg to downstream task tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, (int32_t)pReq->downstreamTaskId, checkpointId, transId); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; streamTaskGetTriggerRecvStatus(pTask, &recv, &total); @@ -914,7 +914,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT); @@ -922,7 +922,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d986a36343..fc281e9c79 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -104,8 +104,15 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri return TSDB_CODE_SUCCESS; } -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) { - SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp)); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, + SRpcHandleInfo* pRpcInfo, int32_t code) { + int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); + + void* pBuf = rpcMallocCont(size); + SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + + ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); + pRsp->streamId = pTask->id.streamId; pRsp->upstreamTaskId = pTask->id.taskId; pRsp->taskId = dstTaskId; @@ -120,7 +127,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->rspCode = code; - SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); return 0; } From 53b51b9b71dc9dd6a753ba2bb62d23ec27d170ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 15:23:15 +0800 Subject: [PATCH 09/17] fix(stream): fix syntax error. --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0fb8ae40a..7aa95202dd 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1934,7 +1934,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) { - mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%x in checkpoint/uninit status, not ready for pause", + mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 + " in checkpoint/uninit status, not ready for pause", pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId); readyToPause = false; } From 53f9af06ffd788c2782c00f58947997a22d8a375 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 15:44:38 +0800 Subject: [PATCH 10/17] fix(stream): fix invalid write. --- source/dnode/mnode/impl/src/mndStream.c | 5 ++--- source/libs/stream/src/streamCheckpoint.c | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7aa95202dd..af20617457 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1934,9 +1934,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) { - mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 - " in checkpoint/uninit status, not ready for pause", - pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId); + mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name, + pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status)); readyToPause = false; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fc281e9c79..6dee0d363b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -813,6 +813,7 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { const char* pId = pTask->id.idStr; int32_t size = taosArrayGetSize(pNotSendList); int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask); + int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId; if (size <= 0) { stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId); @@ -838,15 +839,14 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { pReq->downstreamNodeId = vgId; pReq->upstreamTaskId = pUpstreamTask->taskId; pReq->upstreamNodeId = pUpstreamTask->nodeId; - pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId; - + pReq->checkpointId = checkpointId; SRpcMsg rpcMsg = {0}; initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId, - pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId); + pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId); } return TSDB_CODE_SUCCESS; From 5966b4f83fb4e82b447823071465fffc7e8974e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 17:17:39 +0800 Subject: [PATCH 11/17] fix(stream): fix error in assert. --- source/libs/stream/src/streamCheckpoint.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6dee0d363b..d8b86edf53 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -464,7 +464,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && - pInfo->processedVer >= pReq->checkpointVer); + pInfo->processedVer <= pReq->checkpointVer); pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointVer = pReq->checkpointVer; From 5a9500055daf8f28d21f4bbd2ce21ed5ce150130 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 18:44:54 +0800 Subject: [PATCH 12/17] fix(stream): check the task status checkpoint-ready msg send timer. --- source/libs/stream/src/streamBackendRocksdb.c | 6 +++++- source/libs/stream/src/streamCheckpoint.c | 9 ++++----- source/libs/stream/src/streamDispatch.c | 8 ++++++++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cd2f304991..c151193284 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2053,7 +2053,11 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { stInfo("%s newly create db in state-backend", key); // pre create db pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); - if (pTaskDb->db == NULL) goto _EXIT; + if (pTaskDb->db == NULL) { + stError("%s open state-backend failed, reason:%s", key, err); + goto _EXIT; + } + rocksdb_close(pTaskDb->db); if (cfNames != NULL) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d8b86edf53..4c4f5e98ab 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -737,14 +737,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } pActiveInfo->checkCounter = 0; - stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now); + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr, - vgId, ref); + stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pTask->pMeta, pTask); @@ -754,8 +753,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", - pTask->id.idStr, vgId, ref); + stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, + ref); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pTask->pMeta, pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 3652de3eba..dce30cc028 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -796,6 +796,14 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); taosThreadMutexLock(&pActiveInfo->lock); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state != TASK_STATUS__CK) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, ref); + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } SArray* pList = pActiveInfo->pReadyMsgList; SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); From 82febd30a0456603f42c12e1314fa3c16a70cf68 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 18:48:34 +0800 Subject: [PATCH 13/17] fix(stream): check the task status checkpoint-ready msg send timer. --- source/libs/stream/src/streamDispatch.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index dce30cc028..c0f559b881 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -781,7 +781,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { // check the status every 100ms if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; } @@ -799,7 +799,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s vgId:%d not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, ref); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, + pState->name, ref); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pTask->pMeta, pTask); return; From 94f3c6ec2d213ae39d3cd1c5ebe8ac8c7702c6fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 23:40:03 +0800 Subject: [PATCH 14/17] fix(stream): fix deadlock --- source/libs/stream/src/streamCheckpoint.c | 1 + source/libs/stream/src/streamDispatch.c | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4c4f5e98ab..8e97d05f7f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -630,6 +630,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { + stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c0f559b881..8811bec889 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -795,7 +795,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { pActiveInfo->sendReadyCheckCounter = 0; stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); - taosThreadMutexLock(&pActiveInfo->lock); + taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -806,6 +806,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { return; } + taosThreadMutexLock(&pActiveInfo->lock); + SArray* pList = pActiveInfo->pReadyMsgList; SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); From 095510ba41e0775963052a2682eaf14ad70129a1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 23:43:25 +0800 Subject: [PATCH 15/17] fix(stream): fix deadlock --- source/libs/stream/src/streamDispatch.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8811bec889..afce6e47c4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -805,6 +805,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); return; } + taosThreadMutexUnlock(&pTask->lock); taosThreadMutexLock(&pActiveInfo->lock); From 21e1763ff460c2b6c2819271bd084751909d88bf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jun 2024 11:16:20 +0800 Subject: [PATCH 16/17] fix(stream): set correct flag for checkpoint. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckpoint.c | 12 +++++++----- source/libs/stream/src/streamDispatch.c | 5 ++--- source/libs/stream/src/streamTask.c | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index be3da64c6a..08d0a5e486 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -194,7 +194,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo); -void streamClearChkptReadyMsg(SStreamTask* pTask); +void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo); EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8e97d05f7f..ad33d44b15 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -405,7 +405,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks if (clearChkpReadyMsg) { - streamClearChkptReadyMsg(pTask); + streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); } } @@ -696,14 +696,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // TODO: monitoring the checkpoint-report msg // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. - if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { - code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); - } else { // clear the checkpoint info if failed + if (code == TSDB_CODE_SUCCESS) { + if (pTask->pMsgCb != NULL) { + code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); + } + } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index afce6e47c4..0f5559df89 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -856,7 +856,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { "and quit from timer, ref:%d", id, vgId, ref); - streamClearChkptReadyMsg(pTask); + streamClearChkptReadyMsg(pActiveInfo); taosThreadMutexUnlock(&pActiveInfo->lock); streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1128,8 +1128,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, return 0; } -void streamClearChkptReadyMsg(SStreamTask* pTask) { - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; +void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { if (pActiveInfo == NULL) { return; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4a6e98f4a0..70e3790209 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -254,7 +254,7 @@ void tFreeStreamTask(SStreamTask* pTask) { walCloseReader(pTask->exec.pWalReader); } - streamClearChkptReadyMsg(pTask); + streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); if (pTask->msgInfo.pData != NULL) { clearBufferedDispatchMsg(pTask); From 393369fcd8ee0fe0e271337ae70af23a2780e784 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Jun 2024 00:40:12 +0800 Subject: [PATCH 17/17] fix(stream):clear checkpoint-ready msg. --- source/libs/stream/src/streamCheckpoint.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ad33d44b15..1fddb5a97d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -470,7 +470,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - streamTaskClearCheckInfo(pTask, false); + streamTaskClearCheckInfo(pTask, true); if (pStatus->state == TASK_STATUS__CK) { // todo handle error