diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index f0ab2a0e95..4a8fc6260a 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -41,7 +41,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen); void tqSetRestoreVersionInfo(SStreamTask* pTask); int32_t tqExpandStreamTask(SStreamTask* pTask); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 03c42e5c7e..ba366a2e02 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -770,7 +770,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode); int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); -int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq); SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); // stream task state machine, and event handling diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 481033508b..0eb0db002f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -154,7 +154,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_STREAM_TASK_RESUME: return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); case TDMT_STREAM_TASK_UPDATE_CHKPT: - return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 08fdda0e29..07d77905cb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1013,7 +1013,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); + return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen); } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index def556534c..f3ae3b773d 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -640,7 +640,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) { SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; int32_t vgId = pMeta->vgId; @@ -652,13 +652,14 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, in SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL && (*ppTask) != NULL) { - streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq); + streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); } else { // failed to get the task. tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already", vgId, pReq->taskId); } streamMetaWUnLock(pMeta); + // always return success when handling the requirement issued by mnode during transaction. return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 35114a1be6..6b81ac87ee 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -257,7 +257,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); -int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); +int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list, + const char* id); void* taskAcquireDb(int64_t refId); void taskReleaseDb(int64_t refId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e4a87ba353..65ffb17d87 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2181,7 +2181,6 @@ void taskDbDestroy(void* pDb, bool flush) { void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { - int64_t st = taosGetTimestampMs(); int32_t code = -1; int64_t refId = pDb->refId; @@ -2202,7 +2201,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char return code; } -int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) { +int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) { int32_t code = 0; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; @@ -2210,7 +2209,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { - cleanDir(temp, ""); + cleanDir(temp, idStr); } else { taosMkDir(temp); } @@ -2220,7 +2219,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } -int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { + +int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) { int32_t code = -1; STaskDbWrapper* pDb = arg; ECHECKPOINT_BACKUP_TYPE utype = type; @@ -2229,7 +2229,7 @@ int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t typ if (utype == DATA_UPLOAD_RSYNC) { code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path); } else if (utype == DATA_UPLOAD_S3) { - code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); + code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr); } taskDbUnRefChkp(pDb, chkpId); return code; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6676a05548..c7301f09b2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -18,16 +18,6 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" -typedef struct { - ECHECKPOINT_BACKUP_TYPE type; - - char* taskId; - int64_t chkpId; - SStreamTask* pTask; - int64_t dbRefId; - void* pMeta; -} SAsyncUploadArg; - static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); @@ -412,7 +402,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } } -int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) { +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; int32_t code = 0; @@ -429,7 +419,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin pReq->transId); taosThreadMutexUnlock(&pTask->lock); - { // destroy the related fill-history tasks + { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { @@ -446,34 +436,42 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64 - " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, - pInfo->checkpointTime, pReq->checkpointTs); - - if (pStatus->state != TASK_STATUS__DROPPING) { - ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer); - - pInfo->checkpointId = pReq->checkpointId; - pInfo->checkpointVer = pReq->checkpointVer; - pInfo->checkpointTime = pReq->checkpointTs; - - streamTaskClearCheckInfo(pTask, false); - - // todo handle error - if (pStatus->state == TASK_STATUS__CK) { - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - } else { - stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); - } - } else { - stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed", - pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId); + if ((!restored) && (pStatus->state != TASK_STATUS__CK)) { + stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " failed", + pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } + if (!restored) { // during restore procedure, do update checkpoint-info + stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + pInfo->checkpointTime, pReq->checkpointTs); + } else { // not in restore status, must be in checkpoint status + stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + pInfo->checkpointTime, pReq->checkpointTs); + } + + ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && + pInfo->processedVer >= pReq->checkpointVer); + + pInfo->checkpointId = pReq->checkpointId; + pInfo->checkpointVer = pReq->checkpointVer; + pInfo->checkpointTime = pReq->checkpointTs; + + streamTaskClearCheckInfo(pTask, false); + + if (pStatus->state == TASK_STATUS__CK) { + // todo handle error + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + } else { + stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); + } + if (pReq->dropRelHTask) { stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", pReq->taskId, vgId, pReq->hTaskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b17d0206f0..3652de3eba 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -906,9 +906,9 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { tmsgSendRsp(&pInfo->msg); taosArrayClear(pList); - stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); } else { - stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel); } taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e8800c3370..03c7b93f91 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -231,7 +231,6 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { stInfo("vgId:%d stream state need covert backend format", pMeta->vgId); - return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { stError(