diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d5b7bc6f1..bddc76cfb5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,6 +58,10 @@ extern "C" { #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) +// the load and start stream task should be executed after snode has started successfully, since the load of stream +// tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources. +#define STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS (-8) +#define STREAM_EXEC_T_LOAD_ALL_TASKS (-9) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 302f17942f..149c36cec7 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -130,6 +130,7 @@ void startRsync() { uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA); return; } + removeEmptyDir(); char confDir[PATH_MAX] = {0}; @@ -186,18 +187,20 @@ int32_t uploadRsync(const char* id, const char* path) { if (code != 0) { uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, tsSnodeAddress, code, ERRNO_ERR_DATA); - return -1; + } else { + int64_t el = (taosGetTimestampMs() - st); + uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, + tsSnodeAddress, el); } - int64_t el = (taosGetTimestampMs() - st); - uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, - tsSnodeAddress, el); - return 0; + return code; } int32_t downloadRsync(const char* id, const char* path) { int64_t st = taosGetTimestampMs(); - uDebug("[rsync] %s start to sync data from remote to local:%s", id, path); + int32_t MAX_RETRY = 60; + int32_t times = 0; + int32_t code = 0; #ifdef WINDOWS char pathTransform[PATH_MAX] = {0}; @@ -205,7 +208,7 @@ int32_t downloadRsync(const char* id, const char* path) { #endif char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + snprintf(command, PATH_MAX, "rsync -av --debug=all --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", tsSnodeAddress, id, #ifdef WINDOWS pathTransform @@ -214,13 +217,20 @@ int32_t downloadRsync(const char* id, const char* path) { #endif ); - int32_t code = execCommand(command); + uDebug("[rsync] %s start to sync data from remote to local:%s, %s", id, path, command); - int32_t el = taosGetTimestampMs() - st; - if (code != 0) { - uError("[rsync] %s download checkpoint data:%s failed, code:%d," ERRNO_ERR_FORMAT, id, path, code, ERRNO_ERR_DATA); - } else { - uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el); + while(times++ < MAX_RETRY) { + + code = execCommand(command); + if (code != TSDB_CODE_SUCCESS) { + uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, code:%d," ERRNO_ERR_FORMAT, id, path, code, + ERRNO_ERR_DATA); + taosSsleep(1); + } else { + int32_t el = taosGetTimestampMs() - st; + uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el); + break; + } } return code; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 602264be73..e1b51e3c1a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -86,6 +86,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; } + stopRsync(); + startRsync(); + pSnode->msgCb = pOption->msgCb; pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); if (pSnode->pMeta == NULL) { @@ -93,11 +96,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } - streamMetaLoadAllTasks(pSnode->pMeta); - - stopRsync(); - startRsync(); - return pSnode; FAIL: @@ -106,8 +104,7 @@ FAIL: } int32_t sndInit(SSnode *pSnode) { - streamMetaResetTaskStatus(pSnode->pMeta); - streamMetaStartAllTasks(pSnode->pMeta); + streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b75d517997..19c0f60063 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -92,8 +92,6 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - streamMetaLoadAllTasks(pTq->pStreamMeta); - if (tqMetaTransform(pTq) < 0) { return -1; } @@ -800,6 +798,11 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + if (!pTq->pVnode->restored) { + tqDebug("vgId:%d not restored, ignore the stream task deploy msg", TD_VID(pTq->pVnode)); + return TSDB_CODE_SUCCESS; + } + return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored); } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 76322c527f..404cbf26dd 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -178,6 +178,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { goto END; } } + END: tdbFree(pKey); tdbFree(pVal); @@ -514,35 +515,6 @@ int32_t tqMetaTransform(STQ* pTq) { return code; } -//int32_t tqMetaRestoreHandle(STQ* pTq) { -// int code = 0; -// TBC* pCur = NULL; -// if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { -// return -1; -// } -// -// void* pKey = NULL; -// int kLen = 0; -// void* pVal = NULL; -// int vLen = 0; -// -// tdbTbcMoveToFirst(pCur); -// -// while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { -// STqHandle handle = {0}; -// code = restoreHandle(pTq, pVal, vLen, &handle); -// if (code < 0) { -// tqDestroyTqHandle(&handle); -// break; -// } -// } -// -// tdbFree(pKey); -// tdbFree(pVal); -// tdbTbcClose(pCur); -// return code; -//} - int32_t tqMetaGetHandle(STQ* pTq, const char* key) { void* pVal = NULL; int vLen = 0; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 91de290e6a..cbd047cf88 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -158,10 +158,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL || *ppTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped", vgId, req.taskId); + tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId, req.taskId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); return rsp.code; } @@ -739,6 +738,13 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); return code; + } else if (type == STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS) { + streamMetaLoadAllTasks(pMeta); + int32_t code = streamMetaStartAllTasks(pMeta); + return code; + } else if (type == STREAM_EXEC_T_LOAD_ALL_TASKS) { + streamMetaLoadAllTasks(pMeta); + return 0; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 166a230c76..d2c20500be 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -576,24 +576,22 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) if (tsDisableStream) { vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { - vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); - int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta); - if (numOfTasks > 0) { - if (pMeta->startInfo.startAllTasks == 1) { - pMeta->startInfo.restartCount += 1; - tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, - pMeta->startInfo.restartCount); - } else { - pMeta->startInfo.startAllTasks = 1; + vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", pVnode->config.vgId); + if (pMeta->startInfo.startAllTasks == 1) { + pMeta->startInfo.restartCount += 1; + tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, + pMeta->startInfo.restartCount); + } else { + pMeta->startInfo.startAllTasks = 1; + streamMetaWUnLock(pMeta); - streamMetaWUnLock(pMeta); - tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); - return; - } + streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS); + return; } } } else { - vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); + vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId); + streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS); } streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1a05acf3eb..dd897dc431 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -490,7 +490,7 @@ static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpoi if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); taosMkDir(defaultPath); - stInfo("clear task backend dir:%s, done", defaultPath); + stInfo("clear local backend dir:%s, done", defaultPath); } if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2f1771279f..1b6c9cc791 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -30,7 +30,7 @@ typedef struct { static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); -static int32_t streamTaskBackupCheckpoint(const char* id, const char* path); +static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); @@ -321,7 +321,7 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l int32_t code = downloadCheckpointDataByName(id, "META", file); if (code != 0) { - stDebug("chkp failed to download meta file:%s", file); + stDebug("%s chkp failed to download meta file:%s", id, file); taosMemoryFree(file); return code; } @@ -379,7 +379,7 @@ int32_t uploadCheckpointData(void* param) { } if (code == TSDB_CODE_SUCCESS) { - code = streamTaskBackupCheckpoint(pParam->taskId, path); + code = streamTaskUploadCheckpoint(pParam->taskId, path); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to upload checkpoint data:%s, checkpointId:%" PRId64, taskStr, path, pParam->chkpId); } else { @@ -562,9 +562,9 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { } } -int32_t streamTaskBackupCheckpoint(const char* id, const char* path) { +int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("streamTaskBackupCheckpoint parameters invalid"); + stError("invalid parameters in upload checkpoint, %s", id); return -1; } @@ -580,7 +580,7 @@ int32_t streamTaskBackupCheckpoint(const char* id, const char* path) { // fileName: CURRENT int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { - stError("uploadCheckpointByName parameters invalid"); + stError("down load checkpoint data parameters invalid"); return -1; } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 52e7431e70..9bd12a4fd8 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -52,7 +52,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 return -1; } - stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + if (streamId != 0) { + stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + } else { + stDebug("vgId:%d create msg to exec, type:%d", vgId, execType); + } pRunReq->head.vgId = vgId; pRunReq->streamId = streamId;