diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ed23290be4..790b8f883b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3440,23 +3440,24 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); void tFreeMDropStreamReq(SMDropStreamReq* pReq); -typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - int8_t igNotExists; -} SMRecoverStreamReq; - -typedef struct { - int8_t reserved; -} SMRecoverStreamRsp; - typedef struct { int64_t recoverObjUid; int32_t taskId; int32_t hasCheckPoint; } SMVStreamGatherInfoReq; -// int32_t tSerializeSMRecoverStreamReq(void* buf, int32_t bufLen, const SMRecoverStreamReq* pReq); -// int32_t tDeserializeSMRecoverStreamReq(void* buf, int32_t bufLen, SMRecoverStreamReq* pReq); +typedef struct SVUpdateCheckpointInfoReq { + SMsgHead head; + int64_t streamId; + int32_t taskId; + int64_t checkpointId; + int64_t checkpointVer; + int64_t checkpointTs; + int32_t transId; + int8_t dropRelHTask; + int64_t hStreamId; + int64_t hTaskId; +} SVUpdateCheckpointInfoReq; typedef struct { int64_t leftForVer; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a5a3bd5ee0..12f92b1242 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -310,7 +310,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_UPDATE_CHKPT, "stream-update-chkptinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036 TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) @@ -321,6 +321,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) + TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index ce04ec6953..451f9a00eb 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -38,6 +38,7 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); void tqSetRestoreVersionInfo(SStreamTask* pTask); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c5d6d6e4c..9d5b7bc6f1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -736,6 +736,9 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode); +int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, + SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); // stream task state machine, and event handling SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index e448aec5e0..867b407296 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -208,11 +208,11 @@ int32_t downloadRsync(const char* id, const char* path) { int32_t code = execCommand(command); if (code != 0) { - uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + uError("[rsync] download checkpoint data failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); return -1; } - uDebug("[rsync] down data:%s successful", id); + uDebug("[rsync] download checkpoint data:%s successfully", id); return 0; } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 880e96adfb..b3c8ef4017 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -78,6 +78,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 73a73d19f5..826af71ace 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -967,6 +967,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 8c5d056893..2ac66fa1cd 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -35,23 +35,6 @@ struct SSnode { SMsgCb msgCb; }; -#if 0 -typedef struct { - SHashObj* pHash; // taskId -> SStreamTask -} SStreamMeta; - -SStreamMeta* sndMetaNew(); -void sndMetaDelete(SStreamMeta* pMeta); - -int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask); -SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId); -int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); - -int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId); -int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId); -int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId); -#endif - void initStreamStateAPI(SStorageAPI* pAPI); #ifdef __cplusplus diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 87f0681780..602264be73 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -168,6 +168,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_RESUME: return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); + case TDMT_STREAM_TASK_UPDATE_CHKPT: + return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); default: ASSERT(0); } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 3b60ef3427..196fa56c99 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -75,8 +75,10 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCountGetKeyByRange = streamStateCountGetKeyByRange; pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition; - pStore->streamStateCountWinAddIfNotExist = streamStateCountWinAddIfNotExist; - pStore->streamStateCountWinAdd = streamStateCountWinAdd; +//void initStreamStateAPI(SStorageAPI* pAPI) { +// initStateStoreAPI(&pAPI->stateStore); +// initFunctionStateStore(&pAPI->functionStore); +//} pStore->updateInfoInit = updateInfoInit; pStore->updateInfoFillBlockData = updateInfoFillBlockData; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 419ebd1a6c..9439f7f179 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -295,6 +295,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79f53e6dec..b75d517997 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -828,7 +828,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); - int64_t dstVer =pStep2Range->minVer; + int64_t dstVer = pStep2Range->minVer; pTask->chkInfo.nextProcessVer = dstVer; walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); @@ -1009,6 +1009,20 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); } +int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { + int32_t vgId = TD_VID(pTq->pVnode); + SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; + + if (!pTq->pVnode->restored) { + tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64 + ", transId:%d s-task:0x%x ignore it", + vgId, pReq->checkpointId, pReq->transId, pReq->taskId); + return TSDB_CODE_SUCCESS; + } + + return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); +} + int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 62c3b06b65..91de290e6a 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -65,6 +65,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) .fillHistory = pTask->info.fillHistory, .winRange = pTask->dataRange.window, }; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { handle.vnode = pVnode; handle.initTqReader = 1; @@ -577,22 +578,23 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } -void tqStreamRmTaskBackend(SStreamMeta* pMeta, STaskId* id) { +static void tqStreamRemoveTaskBackend(SStreamMeta* pMeta, const STaskId* pId) { char taskKey[128] = {0}; - sprintf(taskKey, "0x%" PRIx64 "-0x%x", id->streamId, (int32_t)id->taskId); + sprintf(taskKey, "0x%" PRIx64 "-0x%x", pId->streamId, (int32_t)pId->taskId); char* path = taosMemoryCalloc(1, strlen(pMeta->path) + 128); sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, taskKey); taosRemoveDir(path); + + tqInfo("vgId:%d drop stream task:0x%x file:%s", pMeta->vgId, (int32_t)pId->taskId, path); taosMemoryFree(path); - // do nothing } int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - int32_t vgId = pMeta->vgId; - STaskId hTaskId = {0}; + int32_t vgId = pMeta->vgId; + STaskId hTaskId = {0}; tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); streamMetaWLock(pMeta); @@ -634,10 +636,32 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaWUnLock(pMeta); - tqStreamRmTaskBackend(pMeta, &id); + tqStreamRemoveTaskBackend(pMeta, &id); return 0; } +int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { + SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; + + int32_t vgId = pMeta->vgId; + tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId); + + streamMetaWLock(pMeta); + + STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL && (*ppTask) != NULL) { + streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq); + } else { // failed to get the task. + tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already", + vgId, pReq->taskId); + } + + streamMetaWUnLock(pMeta); + return TSDB_CODE_SUCCESS; +} + static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int32_t vgId = pMeta->vgId; int32_t code = 0; @@ -927,9 +951,10 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); + + SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); if (code != 0) { return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6d97c1cd79..3fec3d9b8e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -625,6 +625,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; + case TDMT_STREAM_TASK_UPDATE_CHKPT: { + if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; case TDMT_STREAM_TASK_PAUSE: { if (pVnode->restored && vnodeIsLeader(pVnode) && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 704bc9a2f2..fbf902a237 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -141,7 +141,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); +STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId); void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy2(void* pBackend); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 3ccb25a62a..da8a24e6da 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -113,7 +113,7 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); int32_t getNumOfDispatchBranch(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask); -int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); +int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); @@ -122,7 +122,6 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); -int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b5294a3fb7..d9eea23d21 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -42,7 +42,7 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst); int32_t getCfIdx(const char* cfName); STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath); -int32_t backendCopyFiles(char* src, char* dst); +static int32_t backendCopyFiles(const char* src, const char* dst); void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); @@ -234,12 +234,14 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { tstrerror(TAOS_SYSTEM_ERROR(errno)), state); taosMkDir(state); } + taosMemoryFree(chkp); } - *dst = state; + *dst = state; return 0; } + int32_t remoteChkp_readMetaData(char* path, SArray* list) { char* metaPath = taosMemoryCalloc(1, strlen(path)); sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); @@ -323,7 +325,7 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return complete == 1 ? 0 : -1; } -int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { +int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) { // impl later int32_t code = 0; if (taosIsDir(chkpPath)) { @@ -336,10 +338,14 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { + stError("failed to download checkpoint data:%s", key); return code; } + stDebug("download backup checkpoint data into:%s, checkpointId:%" PRId64 ", %s", chkpPath, checkpointId, key); + code = backendCopyFiles(chkpPath, defaultPath); + return code; } @@ -378,13 +384,16 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char return code; } -int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { +int32_t rebuildFromRemoteCheckpoint(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_S3) { - return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath); + return rebuildFromRemoteChkp_s3(key, chkpPath, checkpointId, defaultPath); } else if (type == DATA_UPLOAD_RSYNC) { - return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath); + return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath); + } else { + stError("%s not remote backup checkpoint data for:%"PRId64, key, checkpointId); } + return -1; } @@ -403,7 +412,7 @@ int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) { return taosLinkFile(src, dst); } -int32_t backendFileCopyFilesImpl(char* src, char* dst) { +int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { const char* current = "CURRENT"; size_t currLen = strlen(current); @@ -412,20 +421,26 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) { int32_t dLen = strlen(dst); char* srcName = taosMemoryCalloc(1, sLen + 64); char* dstName = taosMemoryCalloc(1, dLen + 64); - // copy file to dst + // copy file to dst TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) { taosMemoryFree(srcName); taosMemoryFree(dstName); + code = TAOS_SYSTEM_ERROR(errno); + errno = 0; - return -1; + return code; } + errno = 0; TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); - if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) { + continue; + } sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); @@ -433,18 +448,21 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) { if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) { code = copyFiles_create(srcName, dstName, 0); if (code != 0) { - stError("failed to copy file, detail: %s to %s reason: %s", srcName, dstName, - tstrerror(TAOS_SYSTEM_ERROR(code))); + code = TAOS_SYSTEM_ERROR(code); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } } else { code = copyFiles_hardlink(srcName, dstName, 0); if (code != 0) { - stError("failed to hard line file, detail: %s to %s, reason: %s", srcName, dstName, - tstrerror(TAOS_SYSTEM_ERROR(code))); + code = TAOS_SYSTEM_ERROR(code); + stError("failed to hardlink file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; + } else { + stDebug("succ hard link file:%s to %s", srcName, dstName); } } + memset(srcName, 0, sLen + 64); memset(dstName, 0, dLen + 64); } @@ -453,76 +471,46 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) { taosMemoryFreeClear(dstName); taosCloseDir(&pDir); errno = 0; - return 0; + return code; + _ERROR: taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); taosCloseDir(&pDir); errno = 0; - return -1; + return code; } -int32_t backendCopyFiles(char* src, char* dst) { + +int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); - // // opt later, just hard link - // int32_t sLen = strlen(src); - // int32_t dLen = strlen(dst); - // char* srcName = taosMemoryCalloc(1, sLen + 64); - // char* dstName = taosMemoryCalloc(1, dLen + 64); - - // TdDirPtr pDir = taosOpenDir(src); - // if (pDir == NULL) { - // taosMemoryFree(srcName); - // taosMemoryFree(dstName); - // return -1; - // } - - // TdDirEntryPtr de = NULL; - // while ((de = taosReadDir(pDir)) != NULL) { - // char* name = taosGetDirEntryName(de); - // if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - - // sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); - // sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); - // // if (!taosDirEntryIsDir(de)) { - // // // code = taosCopyFile(srcName, dstName); - // // if (code == -1) { - // // goto _err; - // // } - // // } - // return backendFileCopyFilesImpl(src, dst); - - // memset(srcName, 0, sLen + 64); - // memset(dstName, 0, dLen + 64); - // } - - // _err: - // taosMemoryFreeClear(srcName); - // taosMemoryFreeClear(dstName); - // taosCloseDir(&pDir); - // return code >= 0 ? 0 : -1; - - // return 0; } -int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + +static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkpId, const char* defaultPath) { int32_t code = 0; + if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); taosMkDir(defaultPath); - stInfo("succ to clear stream backend %s", defaultPath); + stInfo("clear task backend path:%s, done", defaultPath); } - if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { - code = backendCopyFiles(chkpPath, defaultPath); - if (code != 0) { + + if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) { + code = backendCopyFiles(checkpointPath, defaultPath); + + if (code != TSDB_CODE_SUCCESS) { taosRemoveDir(defaultPath); taosMkDir(defaultPath); - stError("failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s", chkpPath, - tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); - code = 0; + stError("%s failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s", + pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + code = TSDB_CODE_SUCCESS; } else { - stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); + stInfo("%s start to restart stream backend at checkpoint path: %s", pTaskIdStr, checkpointPath); } + } else { + code = TSDB_CODE_FAILED; + stError("%s not valid checkpoint path/data in:%s", pTaskIdStr, checkpointPath); } return code; @@ -533,7 +521,7 @@ int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* return code; } -int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) { +int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) { // impl later int32_t code = 0; @@ -551,29 +539,33 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** taosMulMkDir(defaultPath); } - char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); - if (chkpId != 0) { - sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); - code = rebuildFromLocalChkp(key, chkpPath, chkpId, defaultPath); + stDebug("prepare local dir:%s, checkpointId:%d, key:%s succ", defaultPath, chkptId, key); + + char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256); + if (chkptId != 0) { + sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); + + code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); if (code != 0) { - code = rebuildFromRemoteChkp(key, chkpPath, chkpId, defaultPath); + code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath); } if (code != 0) { - stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, - tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, + tstrerror(code), defaultPath); code = taosMkDir(defaultPath); } } else { - sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", + sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", (int64_t)-1); - code = rebuildFromLocalChkp(key, chkpPath, -1, defaultPath); + code = rebuildFromLocalCheckpoint(key, chkptPath, -1, defaultPath); if (code != 0) { code = taosMkDir(defaultPath); } } - taosMemoryFree(chkpPath); + + taosMemoryFree(chkptPath); *dbPath = defaultPath; *dbPrefixPath = prefixPath; @@ -1055,6 +1047,7 @@ _ERROR: rocksdb_checkpoint_object_destroy(cp); return code; } + int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { if (nCf == 0) return 0; int code = 0; @@ -1098,6 +1091,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } + int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { SStreamMeta* pMeta = arg; @@ -1997,11 +1991,11 @@ _EXIT: return NULL; } -STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { +STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId) { char* statePath = NULL; char* dbPath = NULL; - if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) { + if (rebuildDirFormCheckpoint(path, key, chkptId, &statePath, &dbPath) != 0) { return NULL; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5a4e3a5439..853c6881ba 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -33,8 +33,11 @@ static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskBackupCheckpoint(const char* id, const char* path); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); +static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); +static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType); +static int32_t streamAlignCheckpoint(SStreamTask* pTask); -static int32_t streamAlignCheckpoint(SStreamTask* pTask) { +int32_t streamAlignCheckpoint(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num); if (old == 0) { @@ -44,7 +47,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) { return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); } -static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { +int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -94,7 +97,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); } -static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { +int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; @@ -110,7 +113,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream return code; } -int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { +int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); int64_t checkpointId = pDataBlock->info.version; int32_t transId = pDataBlock->info.window.skey; @@ -118,7 +121,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pTask->pMeta->vgId; - stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64 + stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64 ", transId:%d current checkpointingId:%" PRId64, id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId); @@ -141,7 +144,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int8_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); - continueDispatchCheckpointBlock(pBlock, pTask); + continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1); streamProcessCheckpointReadyMsg(pTask); @@ -182,7 +185,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks - code = continueDispatchCheckpointBlock(pBlock, pTask); + code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); } } @@ -227,75 +230,83 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } } -int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { - SStreamMeta* pMeta = p->pMeta; +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) { + SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; - const char* id = p->id.idStr; int32_t code = 0; - SCheckpointInfo* pCKInfo = &p->chkInfo; + const char* id = pTask->id.idStr; + SCheckpointInfo* pInfo = &pTask->chkInfo; - // fill-history task, rsma task, and sink task will not generate the checkpoint - if ((p->info.fillHistory == 1) || (p->info.taskLevel > TASK_LEVEL__SINK)) { - return code; - } + taosThreadMutexLock(&pTask->lock); - taosThreadMutexLock(&p->lock); + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - SStreamTaskState* pStatus = streamTaskGetStatus(p); - ETaskStatus prevStatus = pStatus->state; + stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + pInfo->checkpointTime, pReq->checkpointTs); - if (pStatus->state == TASK_STATUS__CK) { - ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && - pCKInfo->checkpointVer <= pCKInfo->processedVer); + // in the + if (pStatus->state != TASK_STATUS__DROPPING) { + ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer); - pCKInfo->checkpointId = pCKInfo->checkpointingId; - pCKInfo->checkpointVer = pCKInfo->processedVer; - pCKInfo->checkpointTime = pCKInfo->startTs; + pInfo->checkpointId = pReq->checkpointId; + pInfo->checkpointVer = pReq->checkpointVer; + pInfo->checkpointTime = pReq->checkpointTs; - streamTaskClearCheckInfo(p, false); - taosThreadMutexUnlock(&p->lock); + streamTaskClearCheckInfo(pTask, false); - code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + // todo handle error + if (pStatus->state == TASK_STATUS__CK) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + } else { + stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); + } } else { - stDebug("s-task:%s vgId:%d status:%s not keep the checkpoint metaInfo, checkpoint:%" PRId64 " failed", id, vgId, - pStatus->name, pCKInfo->checkpointingId); - taosThreadMutexUnlock(&p->lock); + stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed", + pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId); + taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } + if (pReq->dropRelHTask) { + stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", + pReq->taskId, vgId, pReq->hTaskId); + CLEAR_RELATED_FILLHISTORY_TASK(pTask); + } + + stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId, + streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask)->name); + + pTask->status.taskStatus = TASK_STATUS__READY; + + code = streamMetaSaveTask(pMeta, pTask); if (code != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId); + stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id, + vgId, pReq->checkpointId, terrstr()); return code; } - stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 - ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s", - vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, - streamTaskGetStatusStr(prevStatus)); + taosThreadMutexUnlock(&pTask->lock); + streamMetaWUnLock(pMeta); - // save the task if not sink task - if (p->info.taskLevel <= TASK_LEVEL__SINK) { - streamMetaWLock(pMeta); + // drop task should not in the meta-lock, and drop the related fill-history task now + if (pReq->dropRelHTask) { + streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - code = streamMetaSaveTask(pMeta, p); - if (code != TSDB_CODE_SUCCESS) { - streamMetaWUnLock(pMeta); - stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id, - vgId, checkpointId, terrstr()); - return code; - } - - code = streamMetaCommit(pMeta); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", - id, vgId, checkpointId, terrstr()); - } - - streamMetaWUnLock(pMeta); + // commit the update + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } - return code; + streamMetaWLock(pMeta); + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + + return TSDB_CODE_SUCCESS; } void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { @@ -334,6 +345,7 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } } } + taosCloseFile(&pFile); taosRemoveFile(file); taosMemoryFree(file); @@ -448,7 +460,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully if (code == TSDB_CODE_SUCCESS) { - code = streamSaveTaskCheckpointInfo(pTask, ckId); + STaskId* pHTaskId = &pTask->hTaskInfo.id; + code = streamBuildAndSendCheckpointUpdateMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, pHTaskId, &pTask->chkInfo, + dropRelHTask); if (code == TSDB_CODE_SUCCESS) { code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { @@ -459,22 +473,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } - if ((code == TSDB_CODE_SUCCESS) && dropRelHTask) { - // transferred from the halt status, it is done the fill-history procedure and finish with the checkpoint - // free it and remove fill-history task from disk meta-store - taosThreadMutexLock(&pTask->lock); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; - - stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1); - } else { - stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId); - } - - taosThreadMutexUnlock(&pTask->lock); - } - // clear the checkpoint info if failed if (code != TSDB_CODE_SUCCESS) { taosThreadMutexLock(&pTask->lock); @@ -488,7 +486,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { double el = (taosGetTimestampMs() - startTs) / 1000.0; stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id, - pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, + pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, (code == TSDB_CODE_SUCCESS) ? "succ" : "failed"); return code; @@ -586,7 +584,7 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch int32_t streamTaskDownloadCheckpointData(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("streamTaskDownloadCheckpointData parameters invalid"); + stError("down checkpoint data parameters invalid"); return -1; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 58c6e19581..f3569d8973 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -341,7 +341,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id, pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet); @@ -432,7 +432,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id, pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 047b169ec9..9747ebd2ff 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -589,7 +589,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput); + streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); continue; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4fa9b2c66f..95fd057929 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -556,19 +556,22 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ pTask->ver = SSTREAM_TASK_VER; } + SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); int64_t id[2] = {pTask->id.streamId, pTask->id.taskId}; - if (tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { + code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn); + if (code < 0) { stError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); - return -1; + } else { + stDebug("s-task:%s vgId:%d stream task write to meta file", pTask->id.idStr, pTask->pMeta->vgId); } taosMemoryFree(buf); - return 0; + return code; } int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1e622f615d..72302f981d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -686,16 +686,47 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI pReq->head.vgId = vgId; pReq->taskId = pTaskId->taskId; pReq->streamId = pTaskId->streamId; - pReq->resetRelHalt = resetRelHalt; + pReq->resetRelHalt = resetRelHalt; // todo: remove this attribute SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); - return code; + } else { + stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId); } - stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId); + return code; +} + +int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, + SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { + SVUpdateCheckpointInfoReq* pReq = rpcMallocCont(sizeof(SVUpdateCheckpointInfoReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = vgId; + pReq->taskId = pTaskId->taskId; + pReq->streamId = pTaskId->streamId; + pReq->dropRelHTask = dropRelHTask; + pReq->hStreamId = pHTaskId->streamId; + pReq->hTaskId = pHTaskId->taskId; + pReq->transId = pCheckpointInfo->transId; + + pReq->checkpointId = pCheckpointInfo->checkpointingId; + pReq->checkpointVer = pCheckpointInfo->processedVer; + pReq->checkpointTs = pCheckpointInfo->startTs; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_UPDATE_CHKPT, .pCont = pReq, .contLen = sizeof(SVUpdateCheckpointInfoReq)}; + int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); + + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d task:0x%x failed to send update checkpoint info msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); + } else { + stDebug("vgId:%d task:0x%x build and send update checkpoint info msg msg", vgId, pTaskId->taskId); + } return code; }