From 6898eba6edf9cb493413e1542c06239739ca96f8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 May 2024 14:34:31 +0800 Subject: [PATCH] fix(stream): expand stream tasks are divided into two phase, the first is to build stream task and then expand stream task before start stream tasks. --- include/dnode/vnode/tqCommon.h | 2 +- include/libs/executor/storageapi.h | 2 +- include/libs/function/function.h | 1 + include/libs/stream/streamState.h | 2 +- include/libs/stream/tstream.h | 13 ++-- source/common/src/rsync.c | 2 +- source/dnode/snode/src/snode.c | 10 +-- source/dnode/vnode/src/tq/tq.c | 37 +++++----- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 26 ++++--- source/dnode/vnode/src/vnd/vnodeSync.c | 23 +----- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 73 ++++++++++--------- source/libs/stream/src/streamCheckpoint.c | 11 ++- source/libs/stream/src/streamMeta.c | 25 +++++-- source/libs/stream/src/streamState.c | 4 +- 17 files changed, 122 insertions(+), 115 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 451f9a00eb..0cde499a6b 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -40,7 +40,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); -int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta); void tqSetRestoreVersionInfo(SStreamTask* pTask); #endif // TDENGINE_TQ_COMMON_H diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index ec92bd56dd..7042ec2d15 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -410,7 +410,7 @@ typedef struct SStateStore { void (*streamFileStateClear)(struct SStreamFileState* pFileState); bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); - SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); + SStreamState* (*streamStateOpen)(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); void (*streamStateClose)(SStreamState* pState, bool remove); int32_t (*streamStateBegin)(SStreamState* pState); int32_t (*streamStateCommit)(SStreamState* pState); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0afda2e160..87bbe21133 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -150,6 +150,7 @@ typedef struct SBackendCfWrapper { int64_t backendId; char idstr[64]; } SBackendCfWrapper; + typedef struct STdbState { SBackendCfWrapper *pBackendCfWrapper; int64_t backendCfWrapperId; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7813b2cc9a..ae5a733ae9 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -29,7 +29,7 @@ extern "C" { #include "storageapi.h" -SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); +SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9e376b9792..1f5aa46f49 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -61,7 +61,6 @@ extern "C" { // the load and start stream task should be executed after snode has started successfully, since the load of stream // tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources. #define STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS (-8) -#define STREAM_EXEC_T_LOAD_ALL_TASKS (-9) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -156,8 +155,6 @@ typedef enum EStreamTaskEvent { TASK_EVENT_DROPPING = 0xA, } EStreamTaskEvent; -typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); - typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); @@ -491,7 +488,6 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; int32_t role; - bool taskLoadFlag; bool closeFlag; bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; @@ -522,6 +518,9 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); +typedef int32_t (*__stream_task_expand_fn)(struct SStreamTask* pTask); + SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); @@ -675,7 +674,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); +int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); @@ -723,9 +722,9 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta); SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); void streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn fn); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn fn); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); // timer diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 149c36cec7..c4d14a6c2c 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -217,7 +217,7 @@ int32_t downloadRsync(const char* id, const char* path) { #endif ); - uDebug("[rsync] %s start to sync data from remote to local:%s, %s", id, path, command); + uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); while(times++ < MAX_RETRY) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e1b51e3c1a..ac10aa83a4 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -45,15 +45,10 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer if (code != TSDB_CODE_SUCCESS) { return code; } + pTask->pBackend = NULL; - streamTaskOpenAllUpstreamInput(pTask); - code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); @@ -96,6 +91,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } + streamMetaLoadAllTasks(pSnode->pMeta); return pSnode; FAIL: @@ -104,7 +100,7 @@ FAIL: } int32_t sndInit(SSnode *pSnode) { - streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS); + streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 19c0f60063..a59a235c50 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -92,6 +92,8 @@ int32_t tqInitialize(STQ* pTq) { return -1; } + streamMetaLoadAllTasks(pTq->pStreamMeta); + if (tqMetaTransform(pTq) < 0) { return -1; } @@ -715,17 +717,18 @@ static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); + tqDebug("s-task:0x%x start to build task", pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer); if (code != TSDB_CODE_SUCCESS) { return code; } - code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + pTask->pBackend = NULL; +// code = tqExpandStreamTask(pTask, pTq->pStreamMeta); +// if (code != TSDB_CODE_SUCCESS) { +// return code; +// } // sink STaskOutputInfo* pOutputInfo = &pTask->outputInfo; @@ -768,7 +771,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); if (pTask->info.fillHistory) { - tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x " "trigger:%" PRId64 " ms, inputVer:%" PRId64, @@ -777,7 +780,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer); } else { tqInfo( - "vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + "vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms, inputVer:%" PRId64, @@ -798,10 +801,10 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - if (!pTq->pVnode->restored) { - tqDebug("vgId:%d not restored, ignore the stream task deploy msg", TD_VID(pTq->pVnode)); - return TSDB_CODE_SUCCESS; - } +// if (!pTq->pVnode->restored) { +// tqDebug("vgId:%d not restored, ignore the stream task deploy msg", TD_VID(pTq->pVnode)); +// return TSDB_CODE_SUCCESS; +// } return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored); @@ -1016,12 +1019,12 @@ int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t vgId = TD_VID(pTq->pVnode); SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; - if (!pTq->pVnode->restored) { - tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64 - ", transId:%d s-task:0x%x ignore it", - vgId, pReq->checkpointId, pReq->transId, pReq->taskId); - return TSDB_CODE_SUCCESS; - } +// if (!pTq->pVnode->restored) { +// tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64 +// ", transId:%d s-task:0x%x ignore it", +// vgId, pReq->checkpointId, pReq->transId, pReq->taskId); +// return TSDB_CODE_SUCCESS; +// } return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 516a47606b..7224657b73 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1089,7 +1089,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) { int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd); if (code != 0) { tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cbd047cf88..dbda3a4541 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -39,9 +39,13 @@ static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { pTask->id.streamId = pId->streamId; } -int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) { - int32_t vgId = pMeta->vgId; - STaskId taskId = {0}; +int32_t tqExpandStreamTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t vgId = pMeta->vgId; + STaskId taskId = {0}; + int64_t st = taosGetTimestampMs(); + + tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId); if (pTask->info.fillHistory) { taskId = replaceStreamTaskId(pTask); @@ -67,7 +71,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) }; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - handle.vnode = pVnode; + handle.vnode = ((STQ*)pMeta->ahandle)->pVnode; handle.initTqReader = 1; } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); @@ -84,6 +88,9 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } + double el = (taosGetTimestampMs() - st) / 1000.0; + tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el); + return TSDB_CODE_SUCCESS; } @@ -706,7 +713,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaResetTaskStatus(pMeta); streamMetaWUnLock(pMeta); - streamMetaStartAllTasks(pMeta); + streamMetaStartAllTasks(pMeta, tqExpandStreamTask); } else { streamMetaResetStartInfo(&pMeta->startInfo); streamMetaWUnLock(pMeta); @@ -724,10 +731,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t vgId = pMeta->vgId; if (type == STREAM_EXEC_T_START_ONE_TASK) { - streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); + streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId, tqExpandStreamTask); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { - streamMetaStartAllTasks(pMeta); + streamMetaStartAllTasks(pMeta, tqExpandStreamTask); return 0; } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { restartStreamTasks(pMeta, isLeader); @@ -740,11 +747,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return code; } else if (type == STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS) { streamMetaLoadAllTasks(pMeta); - int32_t code = streamMetaStartAllTasks(pMeta); + int32_t code = streamMetaStartAllTasks(pMeta, tqExpandStreamTask); return code; - } else if (type == STREAM_EXEC_T_LOAD_ALL_TASKS) { - streamMetaLoadAllTasks(pMeta); - return 0; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 00203c7bb1..8f28871e3b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -576,37 +576,22 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) if (tsDisableStream) { vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { - vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", vgId); + vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId); if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, pMeta->startInfo.restartCount); } else { pMeta->startInfo.startAllTasks = 1; - - bool loadTaskInfo = pMeta->taskLoadFlag; - pMeta->taskLoadFlag = true; streamMetaWUnLock(pMeta); - if (loadTaskInfo) { - tqInfo("vgId:%d stream task already loaded, start them", vgId); - streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); - } else { - tqInfo("vgId:%d start load and launch stream task(s)", vgId); - streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS); - } - + tqInfo("vgId:%d stream task already loaded, start them", vgId); + streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); return; } } } else { - if (!pMeta->taskLoadFlag) { - pMeta->taskLoadFlag = true; - vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId); - streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS); - } else { - vInfo("vgId:%d, sync restore finished, not load stream tasks since already loaded for follower"); - } + vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); } streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index fbf902a237..48a9d07a5d 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -141,7 +141,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId); +STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId); void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy2(void* pBackend); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index da8a24e6da..0ac10fe9fe 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -164,7 +164,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); -int32_t streamTaskDownloadCheckpointData(char* id, char* path); +int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index dd897dc431..ad5d759b0b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -40,7 +40,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t void destroyRocksdbCfInst(RocksdbCfInst* inst); int32_t getCfIdx(const char* cfName); -STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath); +STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath); static int32_t backendCopyFiles(const char* src, const char* dst); @@ -325,7 +325,7 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return complete == 1 ? 0 : -1; } -int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { int32_t code = 0; if (taosIsDir(chkptPath)) { taosRemoveDir(chkptPath); @@ -335,7 +335,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpoi if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); taosMulMkDir(defaultPath); - stDebug("clear local backend dir:%s succ", defaultPath); + stDebug("clear local default dir before download checkpoint data:%s succ", defaultPath); } code = streamTaskDownloadCheckpointData(key, chkptPath); @@ -348,7 +348,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpoi return backendCopyFiles(chkptPath, defaultPath); } -int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { +int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; @@ -383,14 +383,14 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char return code; } -int32_t rebuildFromRemoteCheckpoint(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_S3) { return rebuildFromRemoteChkp_s3(key, chkpPath, checkpointId, defaultPath); } else if (type == DATA_UPLOAD_RSYNC) { return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath); } else { - stError("%s not remote backup checkpoint data for:%"PRId64, key, checkpointId); + stError("%s not remote backup checkpoint data for:%" PRId64" restore ", key, checkpointId); } return -1; @@ -484,24 +484,26 @@ int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); } -static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkptId, const char* defaultPath) { +static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, + const char* defaultPath) { int32_t code = 0; if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); taosMkDir(defaultPath); - stInfo("clear local backend dir:%s, done", defaultPath); + stInfo("%s clear local backend dir:%s, succ", pTaskIdStr, defaultPath); } if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) { - code = backendCopyFiles(checkpointPath, defaultPath); + stDebug("%s local checkpoint data existed, checkpointId:%d copy to backend dir", pTaskIdStr, checkpointId); + code = backendCopyFiles(checkpointPath, defaultPath); if (code != TSDB_CODE_SUCCESS) { taosRemoveDir(defaultPath); taosMkDir(defaultPath); - stError("%s failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s", - pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote", + pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); code = TSDB_CODE_SUCCESS; } else { stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath, @@ -509,18 +511,13 @@ static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpoi } } else { code = TSDB_CODE_FAILED; - stError("%s no valid checkpoint data for checkpointId:%" PRId64 " in %s", pTaskIdStr, chkptId, checkpointPath); + stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath); } return code; } -int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - int32_t code = 0; - return code; -} - -int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) { +int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) { int32_t code = 0; char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); @@ -533,13 +530,23 @@ int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, c char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256); sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state"); + if (!taosIsDir(defaultPath)) { taosMulMkDir(defaultPath); } - stDebug("local default dir:%s, checkpointId:%" PRId64 ", key:%s succ", defaultPath, chkptId, key); + + char* checkpointRoot = taosMemoryCalloc(1, strlen(path) + 256); + sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); + + if (!taosIsDir(checkpointRoot)) { + taosMulMkDir(checkpointRoot); + } + taosMemoryFree(checkpointRoot); + + stDebug("%s check local default:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256); - if (chkptId != 0) { + if (chkptId > 0) { sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); @@ -550,16 +557,9 @@ int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, c if (code != 0) { stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, tstrerror(code), defaultPath); - code = taosMkDir(defaultPath); - } - } else { - sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", - (int64_t)-1); - - code = rebuildFromLocalCheckpoint(key, chkptPath, -1, defaultPath); - if (code != 0) { - code = taosMkDir(defaultPath); } + } else { // no valid checkpoint id + stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key); } taosMemoryFree(chkptPath); @@ -646,7 +646,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); + stDebug("%s init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); taosMemoryFreeClear(backendPath); return (void*)pHandle; @@ -1933,6 +1933,7 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta *stateFullPath = statePath; return 0; } + void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { STaskDbWrapper* p = pTaskDb; taosThreadMutexLock(&p->mutex); @@ -1940,7 +1941,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { taosThreadMutexUnlock(&p->mutex); } -STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { +STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { char* err = NULL; char** cfNames = NULL; size_t nCf = 0; @@ -1955,7 +1956,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); if (nCf == 0) { - stInfo("newly create db, need to restart"); + stInfo("%s newly create db, need to restart", key); // pre create db pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); if (pTaskDb->db == NULL) goto _EXIT; @@ -1980,21 +1981,21 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { cfNames = NULL; } - stDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); + stDebug("init s-task backend in:%s, backend:%p, %s", dbPath, pTaskDb, key); return pTaskDb; -_EXIT: +_EXIT: taskDbDestroy(pTaskDb, false); if (err) taosMemoryFree(err); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); return NULL; } -STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId) { +STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId) { char* statePath = NULL; char* dbPath = NULL; - if (rebuildDirFormCheckpoint(path, key, chkptId, &statePath, &dbPath) != 0) { + if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath) != 0) { return NULL; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 389eccd66d..d09e5bf477 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -239,6 +239,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin taosThreadMutexLock(&pTask->lock); + if (pReq->checkpointId <= pInfo->checkpointId) { + stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored", + id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64 @@ -246,7 +254,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); - // in the if (pStatus->state != TASK_STATUS__DROPPING) { ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer); @@ -593,7 +600,7 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch return 0; } -int32_t streamTaskDownloadCheckpointData(char* id, char* path) { +int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("down checkpoint data parameters invalid"); return -1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 95fd057929..288d2eeaba 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -239,9 +239,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { - SStreamTask* pTask = arg; - +int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) { int64_t chkpId = pTask->chkInfo.checkpointId; taosThreadMutexLock(&pMeta->backendMutex); @@ -1358,7 +1356,7 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64 return TSDB_CODE_SUCCESS; } -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expandFn) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; int64_t now = taosGetTimestampMs(); @@ -1392,8 +1390,17 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { continue; } - // fill-history task can only be launched by related stream tasks. STaskExecStatisInfo* pInfo = &pTask->execInfo; + + code = expandFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:0x%x vgId:%d failed to build stream backend", pTaskId->taskId, vgId); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + // fill-history task can only be launched by related stream tasks. if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr); streamMetaReleaseTask(pMeta, pTask); @@ -1481,9 +1488,9 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { return true; } -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) { int32_t vgId = pMeta->vgId; - stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId); + stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); if (pTask == NULL) { @@ -1501,6 +1508,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas ASSERT(pTask->status.downstreamReady == 0); + if (pTask->pBackend == NULL) { // todo handle the error code + int32_t code = expandFn(pTask); + } + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (ret != TSDB_CODE_SUCCESS) { stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 52002b7ea8..47324bd8c9 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -98,7 +98,7 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { return winKeyCmprImpl(&pWin1->key, &pWin2->key); } -SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { +SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); if (pState == NULL) { @@ -127,7 +127,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); pState->parNameMap = tSimpleHashInit(1024, hashFn); - stInfo("succ to open state %p on backend %p 0x%" PRIx64 "-%d", pState, pMeta->streamBackend, pState->streamId, + stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId, pState->taskId); return pState;