diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 75dafcdbff..32ac38cee6 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -32,5 +32,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t startStreamTasks(SStreamMeta* pMeta); int32_t resetStreamTaskStatus(SStreamMeta* pMeta); +int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); #endif // TDENGINE_TQ_COMMON_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6730d211df..812b57b01f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -460,14 +460,18 @@ struct SStreamTask { char reserve[256]; }; +typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); + typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; int32_t tasksWillRestart; - int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. - SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing - SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed + int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. + SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing + SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed int64_t elapsedTime; + int32_t restartCount; // restart task counter + startComplete_fn_t completeFn; // complete callback function } STaskStartInfo; typedef struct STaskUpdateInfo { @@ -827,7 +831,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); void streamMetaCleanup(); -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage); +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 380be1dd38..432f80824a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -127,7 +127,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { } pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs()); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 138c58b45f..76049d21b1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -96,7 +96,8 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1); + int32_t vgId = TD_VID(pTq->pVnode); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); if (pTq->pStreamMeta == NULL) { return -1; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 480ed7fd38..004eb240b3 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -699,8 +699,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { } int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); if (numOfTasks == 0) { @@ -723,16 +723,17 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int32_t code = 0; int64_t st = taosGetTimestampMs(); - while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); + streamMetaWLock(pMeta); + if (pMeta->startInfo.taskStarting == 1) { + pMeta->startInfo.restartCount += 1; + tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, %d", vgId, pMeta->startInfo.restartCount); + streamMetaWUnLock(pMeta); + return TSDB_CODE_SUCCESS; } + pMeta->startInfo.taskStarting = 1; + streamMetaWUnLock(pMeta); + terrno = 0; tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, pMeta->updateInfo.transId); @@ -791,7 +792,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); + pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); @@ -808,4 +809,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } } +int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + taosWLockLatch(&pMeta->lock); + if (pStartInfo->restartCount > 0) { + pStartInfo->restartCount -= 1; + + ASSERT(pStartInfo->taskStarting == 0); + tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role, + pStartInfo->restartCount); + + taosWUnLockLatch(&pMeta->lock); + restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); + } else { + taosWUnLockLatch(&pMeta->lock); + tqDebug("vgId:%d start all tasks completed", pMeta->vgId); + } + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 23cb6f5a35..8868a5719c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -277,7 +277,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); return 0; } -SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { + +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, + startComplete_fn_t fn) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -343,6 +345,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; + pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); // pMeta->chkpId = streamGetLatestCheckpointId(pMeta); @@ -1258,8 +1261,9 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pFailedTaskSet); pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; + // reset the sentinel flag value to be 0 - atomic_store_32(&pStartInfo->taskStarting, 0); + pStartInfo->taskStarting = 0; } void streamMetaRLock(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 53f87591e8..dd713290f3 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1083,6 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; + bool restart = true; streamMetaWLock(pMeta); SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; @@ -1106,6 +1107,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); + + pStartInfo->completeFn(pMeta); } else { stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); }