From afe7fdf672fac4de810b78958e0e244f1f5d47b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Oct 2023 14:12:06 +0800 Subject: [PATCH 01/23] fix(stream): avoid clear the restart flag . --- source/libs/stream/src/streamMeta.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b13e49beb4..41ad5d4840 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -211,7 +211,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF stage); return pMeta; -_err: + _err: taosMemoryFree(pMeta->path); if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -228,8 +228,12 @@ _err: } int32_t streamMetaReopen(SStreamMeta* pMeta) { + // backup the restart flag + int32_t restartFlag = pMeta->startInfo.startedAfterNodeUpdate; streamMetaClear(pMeta); + pMeta->startInfo.startedAfterNodeUpdate = restartFlag; + // NOTE: role should not be changed during reopen meta pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; @@ -1059,4 +1063,5 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); pStartInfo->startedAfterNodeUpdate = 0; + pStartInfo->readyTs = 0; } \ No newline at end of file From 62cc33715c3029411a9cb5fd96b9b6a6d078c843 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Oct 2023 15:03:21 +0800 Subject: [PATCH 02/23] fix(stream): fix deadlock. --- source/libs/stream/src/streamTaskSm.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index bc832c178c..c9ce76dea8 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -190,6 +190,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); if (pTrans == NULL) { stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name); + taosThreadMutexUnlock(&pTask->lock); return -1; } else { stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name, From 4c267b538fa0d07ef2aeac0f73bdb55991a92f41 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Oct 2023 23:05:41 +0800 Subject: [PATCH 03/23] fix(stream): add expired epset node list for stream tasks in hb to mnode. --- include/libs/stream/tstream.h | 22 ++--- source/dnode/mnode/impl/src/mndStream.c | 100 +++++++++++++++-------- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 6 +- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamDispatch.c | 6 +- source/libs/stream/src/streamExec.c | 8 +- source/libs/stream/src/streamMeta.c | 60 ++++++++++++-- source/libs/stream/src/streamQueue.c | 16 ++-- source/libs/stream/src/streamStart.c | 23 +++++- source/libs/stream/src/streamTask.c | 22 +++-- tests/script/tsim/stream/sliding.sim | 1 + 13 files changed, 189 insertions(+), 81 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d3717bc1e7..7859c60944 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -236,6 +236,11 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; +typedef struct { + int32_t nodeId; + SEpSet epset; +} SDownstreamTaskEpset; + typedef struct { int64_t stbUid; char stbFullName[TSDB_TABLE_FNAME_LEN]; @@ -327,15 +332,10 @@ typedef struct SDispatchMsgInfo { void* pTimer; // used to dispatch data after a given time duration } SDispatchMsgInfo; -typedef struct STaskOutputQueue { +typedef struct STaskQueue { int8_t status; SStreamQueue* queue; -} STaskOutputQueue; - -typedef struct STaskInputInfo { - int8_t status; - SStreamQueue* queue; -} STaskInputInfo; +} STaskQueue; typedef struct STaskSchedInfo { int8_t status; @@ -384,6 +384,7 @@ typedef struct STaskOutputInfo { }; int8_t type; STokenBucket* pTokenBucket; + SArray* pDownstreamUpdateList; } STaskOutputInfo; typedef struct SUpstreamInfo { @@ -395,8 +396,8 @@ struct SStreamTask { int64_t ver; SStreamTaskId id; SSTaskBasicInfo info; - STaskOutputQueue outputq; - STaskInputInfo inputInfo; + STaskQueue outputq; + STaskQueue inputq; STaskSchedInfo schedInfo; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; @@ -645,7 +646,8 @@ typedef struct STaskStatusEntry { typedef struct SStreamHbMsg { int32_t vgId; int32_t numOfTasks; - SArray* pTaskStatus; // SArray + SArray* pTaskStatus; // SArray + SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. } SStreamHbMsg; int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 14bdb73b4f..13b6e1c30c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -91,6 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); +static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -2119,7 +2120,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange continue; } - mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name); + mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, + pStream->name, pTrans->id); + int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again @@ -2216,23 +2219,26 @@ static void doExtractTasksFromStream(SMnode *pMnode) { } } -static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) { +static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + if (p == NULL) { + return TSDB_CODE_SUCCESS; + } - if (p != NULL) { - taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); - for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId* pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId, - (int32_t)taosArrayGetSize(pExecNode->pTaskList)); - break; - } + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num); + break; } } - return 0; + + return TSDB_CODE_SUCCESS; } static bool taskNodeExists(SArray* pList, int32_t nodeId) { @@ -2322,6 +2328,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); taosThreadMutexLock(&execInfo.lock); + removeExpirednodeEntryAndTask(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); @@ -2359,10 +2366,6 @@ typedef struct SMStreamNodeCheckMsg { int8_t placeHolder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; -typedef struct SMStreamTaskResetMsg { - int8_t placeHolder; -} SMStreamTaskResetMsg; - static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2577,6 +2580,43 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { return 0; } +int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { + int32_t num = taosArrayGetSize(pNodeList); + + for (int k = 0; k < num; ++k) { + int32_t* pVgId = taosArrayGet(pNodeList, k); + + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + for (int i = 0; i < numOfNodes; ++i) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + + if (pNodeEntry->nodeId == *pVgId) { + mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); + pNodeEntry->stageUpdated = true; + break; + } + } + } + + return TSDB_CODE_SUCCESS; +} + +static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + for(int32_t j = 0; j < numOfNodes; ++j) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); + if (pNodeEntry->nodeId == pTaskEntry->nodeId) { + + mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, + pTaskEntry->stage, stage, pTaskEntry->id.taskId); + + pNodeEntry->stageUpdated = true; + pTaskEntry->stage = stage; + break; + } + } +} + int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; @@ -2602,29 +2642,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } + setNodeEpsetExpiredFlag(req.pUpdateNodes); + for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); - if (pEntry == NULL) { + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); + if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); continue; } - if (p->stage != pEntry->stage && pEntry->stage != -1) { - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); - for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); - if (pNodeEntry->nodeId == pEntry->nodeId) { - mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, - pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId); - - pNodeEntry->stageUpdated = true; - pEntry->stage = p->stage; - break; - } - } + if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { + updateStageInfo(pTaskEntry, p->stage); } else { - streamTaskStatusCopy(pEntry, p); + streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { if (activeCheckpointId != 0) { ASSERT(activeCheckpointId == p->activeCheckpointId); @@ -2638,7 +2669,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - pEntry->status = p->status; + pTaskEntry->status = p->status; if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } @@ -2655,5 +2686,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); taosArrayDestroy(req.pTaskStatus); + taosArrayDestroy(req.pUpdateNodes); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 907dc8d88a..5446363698 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1550,7 +1550,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); - } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) { + } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { tqScanWalAsync(pTq, false); } else { streamSchedExec(pTask); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 62952078bc..50ee52f45b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); taosRUnLockLatch(&pTq->pStreamMeta->lock); - tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks); +// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks); // push data for stream processing: // 1. the vnode has already been restored. diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 7df8bdf891..202c765944 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -344,13 +344,13 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) { } // check if input queue is full or not - if (streamQueueIsFull(pTask->inputInfo.queue)) { + if (streamQueueIsFull(pTask->inputq.queue)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); return false; } // the input queue of downstream task is full, so the output is blocked, stopped for a while - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); return false; } @@ -444,7 +444,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue); + int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fcb61dc892..34b4677235 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -296,7 +296,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); } +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e34ec07eac..cd8bbacb98 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1043,8 +1043,8 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; // put data into inputQ of current task is also allowed - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", pTask->id.idStr, downstreamId, el); } else { @@ -1096,7 +1096,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; + pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream taosThreadMutexLock(&pTask->lock); taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2d5788fc4d..28d5336a5e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -106,7 +106,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return 0; } - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr); taosMsleep(1000); continue; @@ -217,7 +217,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { return 0; } - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); taosMsleep(10000); continue; @@ -374,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); - if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) { + if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -630,7 +630,7 @@ int32_t streamExecTask(SStreamTask* pTask) { } taosThreadMutexLock(&pTask->lock); - if ((streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0) || streamTaskShouldStop(pTask) || + if ((streamQueueGetNumOfItems(pTask->inputq.queue) == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 41ad5d4840..eb4fe3a498 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -799,6 +799,15 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; } + + int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); + if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1; + + for (int j = 0; j < numOfVgs; ++j) { + int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); + if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; + } + tEndEncode(pEncoder); return pEncoder->pos; } @@ -832,6 +841,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { taosArrayPush(pReq->pTaskStatus, &entry); } + int32_t numOfVgs = 0; + if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1; + + pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t)); + + for (int j = 0; j < numOfVgs; ++j) { + int32_t vgId = 0; + if (tDecodeI32(pDecoder, &vgId) < 0) return -1; + taosArrayPush(pReq->pUpdateNodes, &vgId); + } + tEndDecode(pDecoder); return 0; } @@ -886,13 +906,14 @@ void metaHbToMnode(void* param, void* tmrId) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; - bool hasValEpset = false; + bool hasMnodeEpset = false; + hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); + hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); // not report the status of fill-history task @@ -905,7 +926,7 @@ void metaHbToMnode(void* param, void* tmrId) { .status = streamTaskGetStatus(*pTask, NULL), .nodeId = pMeta->vgId, .stage = pMeta->stage, - .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; @@ -924,18 +945,39 @@ void metaHbToMnode(void* param, void* tmrId) { walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); } - taosArrayPush(hbMsg.pTaskStatus, &entry); + taosThreadMutexLock(&(*pTask)->lock); + int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); + for (int j = 0; j < num; ++j) { + int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - if (!hasValEpset) { + bool exist = false; + int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); + for (int k = 0; k < numOfExisted; ++k) { + if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { + exist = true; + break; + } + } + + if (!exist) { + taosArrayPush(hbMsg.pUpdateNodes, pNodeId); + } + } + + taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList); + taosThreadMutexUnlock(&(*pTask)->lock); + + taosArrayPush(hbMsg.pTaskStatus, &entry); + if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); - hasValEpset = true; + hasMnodeEpset = true; } } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); taosRUnLockLatch(&pMeta->lock); - if (hasValEpset) { + if (hasMnodeEpset) { int32_t code = 0; int32_t tlen = 0; @@ -980,6 +1022,8 @@ void metaHbToMnode(void* param, void* tmrId) { } taosArrayDestroy(hbMsg.pTaskStatus); + taosArrayDestroy(hbMsg.pUpdateNodes); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index e4c13699c9..eae4605dbc 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -169,7 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); if (qItem == NULL) { if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(WAIT_FOR_DURATION); @@ -211,7 +211,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputInfo.queue); + streamQueueProcessFail(pTask->inputq.queue); return TSDB_CODE_SUCCESS; } } else { @@ -232,7 +232,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputInfo.queue); + streamQueueProcessFail(pTask->inputq.queue); return TSDB_CODE_SUCCESS; } @@ -240,7 +240,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputInfo.queue); + streamQueueProcessSuccess(pTask->inputq.queue); if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); @@ -258,12 +258,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; - STaosQueue* pQueue = pTask->inputInfo.queue->pQueue; - int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1; + STaosQueue* pQueue = pTask->inputq.queue->pQueue; + int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1; if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputq.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", @@ -290,7 +290,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pTask->inputInfo.queue)) { + if (streamQueueIsFull(pTask->inputq.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 8b2fb4b9b2..4c71841062 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -346,6 +346,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, do not do check downstream again", id); @@ -354,8 +355,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - bool found = false; + bool found = false; int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); for (int32_t i = 0; i < numOfReqs; i++) { int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); @@ -402,6 +403,26 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + + taosThreadMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + bool existed = false; + for (int i = 0; i < num; ++i) { + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); + if (p->nodeId == pRsp->downstreamNodeId) { + existed = true; + break; + } + } + + if (!existed) { + SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId}; + taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId, + t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); + } + + taosThreadMutexUnlock(&pTask->lock); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ec9715c068..f949d46315 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -59,7 +59,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; if (fillHistory) { @@ -337,8 +337,8 @@ void tFreeStreamTask(SStreamTask* pTask) { } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); - if (pTask->inputInfo.queue) { - streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId); + if (pTask->inputq.queue) { + streamQueueClose(pTask->inputq.queue, pTask->id.taskId); } if (pTask->outputq.queue) { @@ -399,8 +399,11 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); - taosMemoryFree(pTask); + taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); + pTask->outputInfo.pDownstreamUpdateList = NULL; + + taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); } @@ -409,10 +412,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.timerActive = 0; - pTask->inputInfo.queue = streamQueueOpen(512 << 10); + pTask->inputq.queue = streamQueueOpen(512 << 10); pTask->outputq.queue = streamQueueOpen(512 << 10); - if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) { + if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return TSDB_CODE_OUT_OF_MEMORY; } @@ -425,7 +428,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->execInfo.created = taosGetTimestampMs(); - pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; @@ -462,6 +465,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i taosThreadMutexInit(&pTask->lock, &attr); streamTaskOpenAllUpstreamInput(pTask); + pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); + if (pTask->outputInfo.pDownstreamUpdateList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 18893245fa..97b2464bc8 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -582,6 +582,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); +sleep 1000 sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); From 3bf3d00d902501dd5ec7cde27f6d715cdfaaa3b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Oct 2023 23:45:50 +0800 Subject: [PATCH 04/23] fix(stream): set the trans exec sequentially. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 13b6e1c30c..78826a2311 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1205,6 +1205,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); + mndTransSetSerial(pTrans); const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, "checkpoint"); From 3c07c3ec2f931f8561ccd910eb0da4d5ac68ea7d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Oct 2023 02:27:27 +0800 Subject: [PATCH 05/23] fix(stream): fix race condition when state transferring. --- source/libs/stream/src/streamTaskSm.c | 70 +++++++++++++++------------ 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c9ce76dea8..79c3d47e69 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -107,16 +107,16 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { } // todo optimize the perf of find the trans objs by using hash table -static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) { +static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) { int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans); for (int32_t i = 0; i < numOfTrans; ++i) { STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i); - if (pTrans->state.state == pState->current.state && pTrans->event == event) { + if (pTrans->state.state == state && pTrans->event == event) { return pTrans; } } - if (event == TASK_EVENT_CHECKPOINT_DONE && pState->current.state == TASK_STATUS__STOP) { + if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { } else { ASSERT(0); @@ -182,21 +182,9 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { return NULL; } -int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { +static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) { SStreamTask* pTask = pSM->pTask; - taosThreadMutexLock(&pTask->lock); - - STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); - if (pTrans == NULL) { - stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name); - taosThreadMutexUnlock(&pTask->lock); - return -1; - } else { - stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name, - pSM->current.name); - } - if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); taosThreadMutexUnlock(&pTask->lock); @@ -210,20 +198,14 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name); return TSDB_CODE_SUCCESS; - } else {// this event has been handled already + } else { // this event has been handled already stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr, StreamTaskEventList[event].name); taosMsleep(100); } } - } else { - if (pSM->pActiveTrans != NULL) { - ASSERT(!pSM->pActiveTrans->autoInvokeEndFn); - stWarn("s-task:%s status:%s handle event:%s is interrupted, handle the new event:%s", pTask->id.idStr, - pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name, StreamTaskEventList[event].name); - } - + } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); @@ -239,6 +221,34 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { return TSDB_CODE_SUCCESS; } +int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { + SStreamTask* pTask = pSM->pTask; + STaskStateTrans* pTrans = NULL; + + while (1) { + taosThreadMutexLock(&pTask->lock); + if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) { + taosThreadMutexUnlock(&pTask->lock); + taosMsleep(100); + stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", + pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name); + } else { + pTrans = streamTaskFindTransform(pSM->current.state, event); + if (pSM->pActiveTrans != NULL) { + // currently in some state transfer procedure, not auto invoke transfer, abort it + stDebug("s-task:%s handle event %s quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, + StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name, + StreamTaskEventList[event].name); + } + + doHandleEvent(pSM, event, pTrans); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; @@ -258,7 +268,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); // the pSM->prev.evt may be 0, so print string is not appropriate. stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt, - pTask->id.idStr); + pTask->id.idStr); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; @@ -284,7 +294,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[pEvtInfo->event].name, pSM->current.name); - STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event); + STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); pSM->pActiveTrans = pNextTrans; @@ -396,12 +406,12 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, - streamTaskSetReadyForWal, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, + NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, - streamTaskSetReadyForWal, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, + NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status From 1e696287d868fedc5f4f2324d9d62f870ae29f52 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Oct 2023 02:45:47 +0800 Subject: [PATCH 06/23] fix(stream): set the trans exec sequentially. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 78826a2311..d48accb2f9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2111,6 +2111,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno; } + mndTransSetSerial(pTrans); } void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); From da6e3cb293a82647dcf5c5809eef0b9c2ac238de Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Oct 2023 17:37:10 +0800 Subject: [PATCH 07/23] fix(stream): do nodeUpdate trans only after check if vnodes are all ready. --- source/dnode/mnode/impl/src/mndStream.c | 42 ++++++++++++++++++++----- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d48accb2f9..690b78cd1a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -1157,7 +1157,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + if (!allReady) { + mWarn("not all vnodes are ready, ignore the checkpoint") + taosArrayDestroy(pNodeSnapshot); + return 0; + } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); @@ -1205,7 +1211,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); - mndTransSetSerial(pTrans); const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, "checkpoint"); @@ -2061,11 +2066,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; + *allReady = true; SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { @@ -2077,7 +2083,22 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { SNodeEntry entry = {0}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); entry.nodeId = pVgroup->vgId; - entry.hbTimestamp = -1; + entry.hbTimestamp = pVgroup->updateTime; + + if (*allReady) { + for (int32_t i = 0; i < pVgroup->replica; ++i) { + if (!pVgroup->vnodeGid[i].syncRestore) { + *allReady = false; + break; + } + + ESyncState state = pVgroup->vnodeGid[i].syncState; + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { + *allReady = false; + break; + } + } + } char buf[256] = {0}; EPSET_TO_STR(&entry.epset, buf); @@ -2111,7 +2132,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno; } - mndTransSetSerial(pTrans); } void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); @@ -2327,10 +2347,16 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + bool allVnodeReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); + if (!allVnodeReady) { + taosArrayDestroy(pNodeSnapshot); + atomic_store_32(&mndNodeCheckSentinel, 0); + mWarn("not all vnodes are ready, ignore the exec nodeUpdate check"); + return 0; + } taosThreadMutexLock(&execInfo.lock); - removeExpirednodeEntryAndTask(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); From 790d8694949f894bb5111ab3926450493c415510 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Oct 2023 22:57:33 +0800 Subject: [PATCH 08/23] fix(stream): add more check. --- source/dnode/mnode/impl/src/mndStream.c | 15 ++++++++++++--- source/libs/stream/src/streamTaskSm.c | 12 +++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 690b78cd1a..9f8d037268 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2706,9 +2706,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (checkpointFailed && activeCheckpointId != 0) { - // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint); - mndResetFromCheckpoint(pMnode); + bool allReady = true; + SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(p); + + if (allReady) { + // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal + mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", + execInfo.activeCheckpoint); + mndResetFromCheckpoint(pMnode); + } else { + mInfo("not all vgroups are ready, wait for next HB from stream tasks"); + } } taosThreadMutexUnlock(&execInfo.lock); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 79c3d47e69..17329766b6 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -234,11 +234,17 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pTask->id.idStr, pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name); } else { pTrans = streamTaskFindTransform(pSM->current.state, event); + if (pTrans == NULL) { + stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); + taosThreadMutexUnlock(&pTask->lock); + return -1; + } + if (pSM->pActiveTrans != NULL) { // currently in some state transfer procedure, not auto invoke transfer, abort it - stDebug("s-task:%s handle event %s quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, - StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name, - StreamTaskEventList[event].name); + stDebug("s-task:%s handle event procedure %s quit, status %s -> %s failed, handle event %s now", + pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, + pSM->pActiveTrans->next.name, StreamTaskEventList[event].name); } doHandleEvent(pSM, event, pTrans); From f708ddb792927dc7189dec68dc7c3515896c40cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 09:09:21 +0800 Subject: [PATCH 09/23] fix(stream): pause the task when they are still in uninit status. --- source/libs/stream/src/streamTaskSm.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 17329766b6..23dd06d640 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -460,9 +460,9 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); - taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); From 19c5882cbe2f2bebeafd3cff13fdc84f1acd9597 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 10:22:24 +0800 Subject: [PATCH 10/23] fix(stream): add event check when handling the end of event. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamStart.c | 4 ++-- source/libs/stream/src/streamTaskSm.c | 13 ++++++++++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7859c60944..971604ab1c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -746,7 +746,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM); +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 4c71841062..751b809fb7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -163,7 +163,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { } } else { stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskOnHandleEventSuccess(pTask->status.pSM); + streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); } return 0; @@ -313,7 +313,7 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) { // todo: refactor this function. static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { - streamTaskOnHandleEventSuccess(pTask->status.pSM); + streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); #if 0 const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 23dd06d640..5dfe3ec187 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -214,7 +214,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt // todo handle error code; if (pTrans->autoInvokeEndFn) { - streamTaskOnHandleEventSuccess(pSM); + streamTaskOnHandleEventSuccess(pSM, event); } } @@ -262,7 +262,7 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { pSM->prev.evt = pTrans->event; } -int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) { SStreamTask* pTask = pSM->pTask; // do update the task status @@ -280,6 +280,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { return TSDB_CODE_INVALID_PARA; } + if (pTrans->event != event) { + stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name, + pSM->current.name); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_INVALID_PARA; + } + keepPrevInfo(pSM); pSM->current = pTrans->next; @@ -309,7 +316,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { - return streamTaskOnHandleEventSuccess(pSM); + return streamTaskOnHandleEventSuccess(pSM, event); } else { return code; } From 93232a0e140c7e5df2d5d5df03dd853f553436b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 11:48:51 +0800 Subject: [PATCH 11/23] fix(stream): enable the set of fill-history task to be un-init. --- source/dnode/vnode/src/tq/tqStreamTask.c | 5 +---- source/libs/stream/src/streamTaskSm.c | 3 +++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 202c765944..b8aaf6bf60 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -242,10 +242,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - - if ((*pTask)->info.fillHistory != 1) { - streamTaskResetStatus(*pTask); - } + streamTaskResetStatus(*pTask); } return 0; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 5dfe3ec187..11a033a9a3 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -348,6 +348,9 @@ void streamTaskResetStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; taosThreadMutexLock(&pTask->lock); + stDebug("s-task:%s level:%d fill-history:%d vgId:%d set uninit, prev status:%s", pTask->id.idStr, + pTask->info.taskLevel, pTask->info.fillHistory, pTask->pMeta->vgId, pSM->current.name); + pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); From 3275452bcec0700591dffbb2700abf1b7221f4e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 14:38:51 +0800 Subject: [PATCH 12/23] fix(stream): fix the deadlock. --- source/dnode/vnode/src/tq/tq.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5446363698..0acdd30807 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1896,9 +1896,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { return rsp.code; } + taosWUnLockLatch(&pMeta->lock); + + // the following two functions should not be executed within the scope of meta lock to avoid deadlock streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); + // continue after lock the meta again + taosWLockLatch(&pMeta->lock); + SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); From fdf3f210acfe21ab317c0ba1c05a666698fbdad1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 14:39:24 +0800 Subject: [PATCH 13/23] refactor: do some internal refactor. --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/stream/src/streamStart.c | 7 ------- 5 files changed, 4 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index eac6603e8b..cd285fe6a1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -158,7 +158,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int32_t tqStartStreamTask(STQ* pTq); -int32_t tqStartStreamTasks(STQ* pTq); +int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0acdd30807..0ce7106670 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1987,7 +1987,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - tqStartStreamTasks(pTq); + tqResetStreamTaskStatus(pTq); tqCheckAndRunStreamTaskAsync(pTq); } else { vInfo("vgId:%d, follower node not start stream tasks", vgId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b8aaf6bf60..efdd865d6c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -227,7 +227,7 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } -int32_t tqStartStreamTasks(STQ* pTq) { +int32_t tqResetStreamTaskStatus(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = TD_VID(pTq->pVnode); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a6c743c87d..2b4a2fbfdd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -564,7 +564,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); - tqStartStreamTasks(pVnode->pTq); + tqResetStreamTaskStatus(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq); } } else { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 751b809fb7..e47f7fa6e7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -271,13 +271,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { ETaskStatus status = streamTaskGetStatus(pTask, &p); ASSERT(status == TASK_STATUS__READY); - // todo refactor: remove this later -// if (pTask->info.fillHistory == 1) { -// stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); -// pTask->status.taskStatus = TASK_STATUS__DROPPING; -// ASSERT(pTask->hTaskInfo.id.taskId == 0); -// } - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); From bc1c2a879ae70e5850337e0c2fb9d2249edc027a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 16:12:08 +0800 Subject: [PATCH 14/23] fix(tsdb): avoid to read duplicated rows in buf. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 55bd4eeeb8..1898553640 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1196,11 +1196,12 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p } } -static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SFileDataBlockInfo* pBlock) { - bool ascScan = ASCENDING_TRAVERSE(order); +static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, int64_t keyInStt) { + bool ascScan = ASCENDING_TRAVERSE(order); + int64_t key = ascScan? MIN(pBlock->record.firstKey, keyInStt):MAX(pBlock->record.lastKey, keyInStt); - return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->record.firstKey)) || - (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->record.lastKey)); + return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts <= key)) || + (!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts >= key)); } static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) { @@ -2664,6 +2665,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { initLastBlockReader(pLastBlockReader, pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); + bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); + ASSERT(bHasDataInLastBlock); + + int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) { code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { @@ -2672,15 +2677,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader); - } else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo)) { + } else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo, tsLast)) { // data in memory that are earlier than current file block // rows in buffer should be less than the file block in asc, greater than file block in desc - int64_t endKey = - (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; + int64_t endKey = asc? MIN(pBlockInfo->record.firstKey, tsLast):MAX(pBlockInfo->record.lastKey, tsLast); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); - int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; if (!bHasDataInLastBlock || ((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) { // whole block is required, return it directly From ed074864054a695283c2d5602344ee599690b583 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Oct 2023 16:40:09 +0800 Subject: [PATCH 15/23] fix(tsdb): fix error in decide if data buffer overlaps with stt files/block-files. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 1898553640..aac0f3edcc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1200,8 +1200,8 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY keyInBuf, SFileDataB bool ascScan = ASCENDING_TRAVERSE(order); int64_t key = ascScan? MIN(pBlock->record.firstKey, keyInStt):MAX(pBlock->record.lastKey, keyInStt); - return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts <= key)) || - (!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts >= key)); + return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) || + (!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key)); } static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) { From eaf76854f65f6b69109cf0ecbf2437fd851c3f6c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 09:29:24 +0800 Subject: [PATCH 16/23] fix(tsdb): handle the case where stt not exists, possibly exists while the stt-trigger is 1. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 53 +++++++++++++++++-------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index aac0f3edcc..c17b0df571 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1196,9 +1196,25 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p } } -static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, int64_t keyInStt) { +static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) { + bool ascScan = ASCENDING_TRAVERSE(order); + bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); + + int64_t key = 0; + if (bHasDataInLastBlock) { + int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); + key = ascScan ? MIN(pBlock->record.firstKey, keyInStt) : MAX(pBlock->record.lastKey, keyInStt); + } else { + key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey; + } + + return key; +} + +static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, + SLastBlockReader* pLastBlockReader, int32_t order) { bool ascScan = ASCENDING_TRAVERSE(order); - int64_t key = ascScan? MIN(pBlock->record.firstKey, keyInStt):MAX(pBlock->record.lastKey, keyInStt); + int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order); return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) || (!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key)); @@ -2638,6 +2654,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } } +static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) { + if(!hasDataInLastBlock(pLastBlockReader)) { + return true; + } else { + int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); + return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt); + } +} + static int32_t doBuildDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; @@ -2665,10 +2690,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { initLastBlockReader(pLastBlockReader, pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); - bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); - ASSERT(bHasDataInLastBlock); - - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) { code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { @@ -2677,14 +2698,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader); - } else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo, tsLast)) { - // data in memory that are earlier than current file block + } else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) { + // data in memory that are earlier than current file block and stt blocks // rows in buffer should be less than the file block in asc, greater than file block in desc - int64_t endKey = asc? MIN(pBlockInfo->record.firstKey, tsLast):MAX(pBlockInfo->record.lastKey, tsLast); + int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (!bHasDataInLastBlock || - ((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) { + if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) { // whole block is required, return it directly SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; pInfo->rows = pBlockInfo->record.numRow; @@ -2695,7 +2715,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; + pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey; tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", @@ -2722,10 +2742,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } // data in stt now overlaps with current active file data block, need to composed with file data block. - int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); - if ((keyInStt >= pBlockInfo->record.firstKey && asc) || (keyInStt <= pBlockInfo->record.lastKey && (!asc))) { - tsdbDebug("%p keyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, - keyInStt, pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); + int64_t lastKeyInStt = getCurrentKeyInLastBlock(pLastBlockReader); + if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) || + (lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) { + tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, + lastKeyInStt, pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); break; } } From 72f3e4446d984cd90b1025f6ea911d1435aed57d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 10:38:17 +0800 Subject: [PATCH 17/23] fix(stream): handle failure during checkpoint --- source/libs/stream/src/streamCheckpoint.c | 6 +++++- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamTaskSm.c | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 28b67029ce..9eaa9fcb92 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -175,7 +175,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set task status if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { pTask->checkpointingId = checkpointId; - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); + return code; + } } { // todo: remove this when the pipeline checkpoint generating is used. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 28d5336a5e..e4365fe625 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { ETaskStatus s = streamTaskGetStatus(pTask, NULL); - return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); + return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING) || (s == TASK_STATUS__UNINIT); } bool streamTaskShouldPause(const SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 11a033a9a3..2dcf440729 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -118,9 +118,12 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { + } else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) { + // the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block. } else { ASSERT(0); } + return NULL; } @@ -237,7 +240,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, StreamTaskEventList[event].name); taosThreadMutexUnlock(&pTask->lock); - return -1; + return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event. } if (pSM->pActiveTrans != NULL) { From 8b9bc988f17c16f7c8be8290bdbce9d851ab4864 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 10:42:34 +0800 Subject: [PATCH 18/23] fix(tsdb): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c17b0df571..24fd33b9ec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1203,7 +1203,7 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader int64_t key = 0; if (bHasDataInLastBlock) { int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); - key = ascScan ? MIN(pBlock->record.firstKey, keyInStt) : MAX(pBlock->record.lastKey, keyInStt); + key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt); } else { key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey; } From e01c7f3cfdca8ddac2c9d85097fd42fc791d3a63 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 11:51:47 +0800 Subject: [PATCH 19/23] fix(stream): adjust the stop condition for stream tasks. --- source/libs/stream/src/streamExec.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e4365fe625..15b809d6bd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { ETaskStatus s = streamTaskGetStatus(pTask, NULL); - return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING) || (s == TASK_STATUS__UNINIT); + return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); } bool streamTaskShouldPause(const SStreamTask* pTask) { @@ -525,7 +525,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; - if (streamTaskShouldStop(pTask)) { + if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__UNINIT)) { stDebug("s-task:%s stream task is stopped", id); break; } From 4c0242a58d24deec89451c254079433afd043576 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 16:39:14 +0800 Subject: [PATCH 20/23] fix(stream): fix error in finishing event handling. --- include/libs/stream/tstream.h | 6 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 22 +++--- source/dnode/vnode/src/tq/tqStreamTask.c | 3 +- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/stream/src/streamMeta.c | 24 +----- source/libs/stream/src/streamStart.c | 73 +++++-------------- source/libs/stream/src/streamTaskSm.c | 22 +++--- tests/script/tsim/stream/pauseAndResume.sim | 2 + .../system-test/8-stream/max_delay_session.py | 2 + 10 files changed, 57 insertions(+), 103 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 971604ab1c..5e145d8fbb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -432,7 +432,7 @@ struct SStreamTask { typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; - int32_t startedAfterNodeUpdate; + int32_t startAllTasksFlag; SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; } STaskStartInfo; @@ -735,7 +735,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask); int32_t onNormalTaskReady(SStreamTask* pTask); int32_t onScanhistoryTaskReady(SStreamTask* pTask); -//int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); @@ -798,7 +797,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); -int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta); @@ -808,12 +806,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); +int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask); - int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 823e9d57f6..f184702eda 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -231,7 +231,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); +int32_t tqLaunchStreamTaskAsync(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0ce7106670..fab18bb3bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1264,12 +1264,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); -#if 0 - // the fill-history task starts to process data in wal, let's set it status to be normal now - if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { - streamSetStatusNormal(pTask); - } -#endif // now the fill-history task starts to scan data from wal files. streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); @@ -1527,14 +1521,17 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } streamTaskResume(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SINK) { + if (status == TASK_STATUS__UNINIT) { + + } streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } - ETaskStatus status = streamTaskGetStatus(pTask, NULL); if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { @@ -1555,6 +1552,11 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } else { streamSchedExec(pTask); } + } else if (status == TASK_STATUS__UNINIT) { + if (pTask->info.fillHistory == 0) { + EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + streamTaskHandleEvent(pTask->status.pSM, event); + } } streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1948,7 +1950,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.startedAfterNodeUpdate = 1; + pMeta->startInfo.startAllTasksFlag = 1; if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -1957,7 +1959,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.startedAfterNodeUpdate = 0; + pMeta->startInfo.startAllTasksFlag = 0; taosWUnLockLatch(&pMeta->lock); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); @@ -1988,7 +1990,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); - tqCheckAndRunStreamTaskAsync(pTq); + tqLaunchStreamTaskAsync(pTq); } else { vInfo("vgId:%d, follower node not start stream tasks", vgId); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index efdd865d6c..1f1dd61c3c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -98,6 +98,7 @@ int32_t tqStartStreamTask(STQ* pTq) { streamLaunchFillHistoryTask(pTask); } + streamMetaUpdateTaskReadyInfo(pTask); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -111,7 +112,7 @@ int32_t tqStartStreamTask(STQ* pTq) { return 0; } -int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { +int32_t tqLaunchStreamTaskAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2b4a2fbfdd..7d19f23e23 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); - if (pVnode->pTq->pStreamMeta->startInfo.startedAfterNodeUpdate) { + if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); return; @@ -565,7 +565,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqResetStreamTaskStatus(pVnode->pTq); - tqCheckAndRunStreamTaskAsync(pVnode->pTq); + tqLaunchStreamTaskAsync(pVnode->pTq); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index eb4fe3a498..f788e244cd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -229,10 +229,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF int32_t streamMetaReopen(SStreamMeta* pMeta) { // backup the restart flag - int32_t restartFlag = pMeta->startInfo.startedAfterNodeUpdate; + int32_t restartFlag = pMeta->startInfo.startAllTasksFlag; streamMetaClear(pMeta); - pMeta->startInfo.startedAfterNodeUpdate = restartFlag; + pMeta->startInfo.startAllTasksFlag = restartFlag; // NOTE: role should not be changed during reopen meta pMeta->streamBackendRid = -1; @@ -446,24 +446,6 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { return (int32_t)size; } -int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { - int32_t num = 0; - size_t size = taosArrayGetSize(pMeta->pTaskList); - for (int32_t i = 0; i < size; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); - if (p == NULL) { - continue; - } - - if ((*p)->info.fillHistory == 0) { - num += 1; - } - } - - return num; -} - SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); @@ -1106,6 +1088,6 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); - pStartInfo->startedAfterNodeUpdate = 0; + pStartInfo->startAllTasksFlag = 0; pStartInfo->readyTs = 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index e47f7fa6e7..2d951147d0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -35,7 +35,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); static void tryLaunchHistoryTask(void* param, void* tmrId); -static int32_t updateTaskReadyInMeta(SStreamTask* pTask); +static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { char* p = NULL; @@ -57,7 +57,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - updateTaskReadyInMeta(pTask); + streamMetaUpdateTaskReadyInfo(pTask); return TSDB_CODE_SUCCESS; } @@ -114,7 +114,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { } // check status -static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { +void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; @@ -163,10 +163,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { } } else { stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); + doProcessDownstreamReadyRsp(pTask); } - - return 0; } static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -272,8 +270,13 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { ASSERT(status == TASK_STATUS__READY); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (startVer == -1) { + startVer = pTask->chkInfo.nextProcessVer; + } + stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, - id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); + id, p, pTask->status.schedStatus, startVer); } else { stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus); } @@ -304,36 +307,15 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// todo: refactor this function. -static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { - streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT); - -#if 0 - const char* id = pTask->id.idStr; - - int8_t status = pTask->status.taskStatus; - const char* str = streamGetTaskStatusStr(status); - - ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__READY); - streamTaskSetRangeStreamCalc(pTask); - - if (status == TASK_STATUS__SCAN_HISTORY) { - stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); - streamTaskStartScanHistory(pTask); - // start the related fill-history task, when current task is ready - streamLaunchFillHistoryTask(pTask); +void doProcessDownstreamReadyRsp(SStreamTask* pTask) { + EStreamTaskEvent event; + if (pTask->info.fillHistory == 0) { + event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT; } else { - // fill-history tasks are not allowed to reach here. - if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - ASSERT(pTask->hTaskInfo.id.taskId == 0); - } else { - stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); - streamTaskEnablePause(pTask); - } + event = TASK_EVENT_INIT_SCANHIST; } -#endif + + streamTaskOnHandleEventSuccess(pTask->status.pSM, event); } int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -951,17 +933,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } } -// only the downstream tasks are ready, set the task to be ready to work. -void streamTaskCheckDownstream(SStreamTask* pTask) { -// if (pTask->info.fillHistory) { -// ASSERT(0); -// stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); -// return; -// } - - doCheckDownstreamStatus(pTask); -} - void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { #if 0 int8_t status = pTask->status.taskStatus; @@ -1076,7 +1047,7 @@ void streamTaskEnablePause(SStreamTask* pTask) { pTask->status.pauseAllowed = 1; } -int32_t updateTaskReadyInMeta(SStreamTask* pTask) { +int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); @@ -1088,13 +1059,9 @@ int32_t updateTaskReadyInMeta(SStreamTask* pTask) { if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { STaskStartInfo* pStartInfo = &pMeta->startInfo; - pStartInfo->readyTs = pTask->execInfo.start; - if (pStartInfo->startTs != 0) { - pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; - } else { - pStartInfo->elapsedTime = 0; - } + pStartInfo->readyTs = pTask->execInfo.start; + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; streamMetaResetStartInfo(pStartInfo); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 2dcf440729..8055139971 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -141,9 +141,9 @@ void streamTaskRestoreStatus(SStreamTask* pTask) { pSM->prev.evt = 0; pSM->startTs = taosGetTimestampMs(); + stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name); } SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { @@ -187,6 +187,7 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) { SStreamTask* pTask = pSM->pTask; + const char* id = pTask->id.idStr; if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent); @@ -199,16 +200,15 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt taosThreadMutexUnlock(&pTask->lock); if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { - stDebug("s-task:%s attached event:%s handled", pTask->id.idStr, StreamTaskEventList[pTrans->event].name); + stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name); return TSDB_CODE_SUCCESS; } else { // this event has been handled already - stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr, - StreamTaskEventList[event].name); + stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name); taosMsleep(100); } } - } else { // override current active trans + } else { // override current active trans pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); taosThreadMutexUnlock(&pTask->lock); @@ -245,7 +245,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pSM->pActiveTrans != NULL) { // currently in some state transfer procedure, not auto invoke transfer, abort it - stDebug("s-task:%s handle event procedure %s quit, status %s -> %s failed, handle event %s now", + stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, StreamTaskEventList[pSM->pActiveTrans->event].name, pSM->current.name, pSM->pActiveTrans->next.name, StreamTaskEventList[event].name); } @@ -276,16 +276,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even ETaskStatus s = pSM->current.state; ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP); // the pSM->prev.evt may be 0, so print string is not appropriate. - stDebug("status not handled success, current status:%s, trigger event:%d, %s", pSM->current.name, pSM->prev.evt, - pTask->id.idStr); + stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr, + StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pSM->prev.evt].name); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } if (pTrans->event != event) { - stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name, - pSM->current.name); + stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, + StreamTaskEventList[event].name, pSM->current.name, StreamTaskEventList[pTrans->event].name); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } @@ -319,7 +319,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even int32_t code = pNextTrans->pAction(pSM->pTask); if (pNextTrans->autoInvokeEndFn) { - return streamTaskOnHandleEventSuccess(pSM, event); + return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event); } else { return code; } diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 402e0086f7..aa1cb4dbb9 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -17,6 +17,7 @@ sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); +sleep 1000 sql pause stream streams1; sql insert into ts1 values(1648791213001,1,12,3,1.0); @@ -246,6 +247,7 @@ sql create table ts4 using st tags(4,2,2); sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt3 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt4 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt5 as select _wstart, count(*) c1, sum(a) c3 from ts1 interval(10s); +sleep 1000 sql pause stream streams3; diff --git a/tests/system-test/8-stream/max_delay_session.py b/tests/system-test/8-stream/max_delay_session.py index 1a734e0e61..46c4c5801d 100644 --- a/tests/system-test/8-stream/max_delay_session.py +++ b/tests/system-test/8-stream/max_delay_session.py @@ -35,6 +35,8 @@ class TDTestCase: self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) init_num = 0 + time.sleep(1) + for i in range(self.tdCom.range_count): if i == 0: window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session']) From 4694859325bfa23b35c6a0d4ecdaadf836454b5c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 19:31:00 +0800 Subject: [PATCH 21/23] fix(stream): add jump out loop by checking task status. --- source/libs/stream/src/streamTaskSm.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 8055139971..b623172381 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -202,9 +202,12 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) { stDebug("s-task:%s attached event:%s handled", id, StreamTaskEventList[pTrans->event].name); return TSDB_CODE_SUCCESS; - } else { // this event has been handled already + } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP) { // this event has been handled already stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, StreamTaskEventList[event].name); taosMsleep(100); + } else { + stDebug("s-task:%s is dropped or stopped already, not wait.", id); + return TSDB_CODE_INVALID_PARA; } } From 639d5cde4701b0ab2aa784e76398a21c6d605868 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 31 Oct 2023 23:14:02 +0800 Subject: [PATCH 22/23] fix(stream): add unsupport trans. --- source/dnode/vnode/src/tq/tq.c | 7 +++++-- source/libs/stream/src/streamStart.c | 2 +- source/libs/stream/src/streamTaskSm.c | 17 +++++++++++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fab18bb3bc..a5a3175652 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1266,9 +1266,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - tqScanWalAsync(pTq, false); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + if (code == TSDB_CODE_SUCCESS) { + tqScanWalAsync(pTq, false); + } } + streamMetaReleaseTask(pMeta, pStreamTask); } else { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 2d951147d0..7756d7a2e0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -580,7 +580,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; // execute in the scan history complete call back msg, ready to process data from inputQ - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); streamTaskSetSchedStatusInactive(pTask); taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index b623172381..205994b7cc 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -106,6 +106,17 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE +static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) { + if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) { + if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) { + return true; + } + } + + return false; +} + // todo optimize the perf of find the trans objs by using hash table static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) { int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans); @@ -116,10 +127,8 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream } } - if (event == TASK_EVENT_CHECKPOINT_DONE && state == TASK_STATUS__STOP) { - - } else if (event == TASK_EVENT_GEN_CHECKPOINT && state == TASK_STATUS__UNINIT) { - // the task is set to uninit due to nodeEpset update, during processing checkpoint-trigger block. + if (isUnsupportedTransform(state, event)) { + return NULL; } else { ASSERT(0); } From 0fb4cfd94f346fb9b2d9b0d9ecec1087273017b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 1 Nov 2023 00:33:17 +0800 Subject: [PATCH 23/23] fix(stream): remove invalid assert. --- source/libs/stream/src/streamExec.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 15b809d6bd..c0589f6ab1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -309,9 +309,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL); - ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) && - pTask->status.appendTranstateBlock == true); - STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; // It must be halted for a source stream task, since when the related scan-history-data task start scan the history