From bef0d4bff32acde73dfd07f7d431cb570a7e7b4a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 20:01:59 +0800 Subject: [PATCH 01/14] fix(stream): add assert. --- source/libs/stream/src/streamRecover.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4b86b9713c..edeabc8c0d 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -610,7 +610,8 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // todo failed to create timer taosMemoryFree(pInfo); } else { - atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active + int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active + ASSERT(ref == 1); qDebug("s-task:%s set timer active flag", pTask->id.idStr); } } else { // timer exists From 95da66e3e8e38071156ab8da4030d1a158ee411c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 20:50:12 +0800 Subject: [PATCH 02/14] fix(stream): fix error. --- source/dnode/mnode/impl/src/mndStream.c | 22 ++++---- source/libs/stream/src/streamMeta.c | 70 +++++++++++++------------ source/libs/stream/src/streamRecover.c | 12 ++--- 3 files changed, 52 insertions(+), 52 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 46eb0d9957..e5b91f2815 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execNodeList.lock); + mDebug("register to stream task node list"); keepStreamTasksInBuf(&streamObj, &execNodeList); taosThreadMutexUnlock(&execNodeList.lock); @@ -876,13 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char detail[2000] = {0}; sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 - ", " - "fillHistory:%d, igExists:%d, " - "igExpired:%d, igUpdate:%d, lastTs:%" PRId64 - ", " - "maxDelay:%" PRId64 - ", numOfTags:%d, sourceDB:%s, " - "targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, + ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64 + ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, @@ -2281,7 +2277,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -2306,10 +2301,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t **index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + if (index == NULL) { + + continue; + } + + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, **index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ff5e9adaee..147e506680 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamHbMsg hbMsg = {0}; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { - // taosMemoryFree(param); return; } @@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; + bool hasValEpset = false; hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); @@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) { if (i == 0) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + hasValEpset = true; } } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); taosRUnLockLatch(&pMeta->lock); - int32_t code = 0; - int32_t tlen = 0; + if (hasValEpset) { + int32_t code = 0; + int32_t tlen = 0; - tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); - if (code < 0) { - qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; - } + tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); + if (code < 0) { + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); + return; + } - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; - } + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); + return; + } - SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { - rpcFreeCont(buf); - qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { + rpcFreeCont(buf); + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); + return; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + msg.info.noResp = 1; + + qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); + tmsgSendReq(&epset, &msg); } - tEncoderClear(&encoder); taosArrayDestroy(hbMsg.pTaskStatus); - - SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - msg.info.noResp = 1; - - qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); - - tmsgSendReq(&epset, &msg); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index edeabc8c0d..250e676d6c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -579,19 +579,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - int32_t tId = pTask->historyTaskId.taskId; - if (tId == 0) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t hTaskId = pTask->historyTaskId.taskId; + if (hTaskId == 0) { return TSDB_CODE_SUCCESS; } ASSERT(pTask->status.downstreamReady == 1); qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->historyTaskId.streamId, tId); + pTask->historyTaskId.streamId, hTaskId); - SStreamMeta* pMeta = pTask->pMeta; - int32_t hTaskId = pTask->historyTaskId.taskId; - - int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; + int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId}; // Set the execute conditions, including the query time window and the version range SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); From fecba5b097521063321b2891c44474b6a1d3c066 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 20:54:02 +0800 Subject: [PATCH 03/14] fix(stream): check for meta --- source/dnode/mnode/impl/src/mndStream.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e5b91f2815..48f0d0656d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2303,13 +2303,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); int64_t k[2] = {p->streamId, p->taskId}; - int32_t **index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); if (index == NULL) { - continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, **index); + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); From 9612704fa6d8f2c8abf66e165f1cadd48db52981 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 21:43:44 +0800 Subject: [PATCH 04/14] fix(stream): add assert --- source/libs/stream/src/streamRecover.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 250e676d6c..50ebb155c6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -613,7 +613,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { qDebug("s-task:%s set timer active flag", pTask->id.idStr); } } else { // timer exists - ASSERT(pTask->status.timerActive > 0); + ASSERT(pTask->status.timerActive == 1); qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); } From e8294ed8dc75bec4d1a90a082e5f6396076193b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Sep 2023 03:10:12 +0800 Subject: [PATCH 05/14] fix(stream): fix bug in multi-replica vnode redistribute. --- source/dnode/mnode/impl/src/mndStream.c | 27 +++++++++++---------- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 30 +++++++++++++++++++++++- source/dnode/vnode/src/tq/tqStreamTask.c | 29 +++++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeSync.c | 1 + 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 48f0d0656d..ffce9e73db 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2009,14 +2009,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); + const SEp* p = GET_ACTIVE_EP(pCurrent); - for (int32_t i = 0; i < pCurrent->numOfEps; ++i) { - const SEp *p = &(pCurrent->eps[i]); - if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { - return false; - } + if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { + return false; } - return true; } @@ -2113,6 +2110,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { + sdbCancelFetch(pSdb, pIter); return code; } } @@ -2216,18 +2214,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { code = mndProcessVgroupChange(pMnode, &changeInfo); + + // keep the new vnode snapshot + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("create trans successfully, update cached node list"); + taosArrayDestroy(execNodeList.pNodeEntryList); + execNodeList.pNodeEntryList = pNodeSnapshot; + execNodeList.ts = ts; + } + } else { + mDebug("no update found in nodeList"); } taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); - // keep the new vnode snapshot - if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { - taosArrayDestroy(execNodeList.pNodeEntryList); - execNodeList.pNodeEntryList = pNodeSnapshot; - execNodeList.ts = ts; - } - mDebug("end to do stream task node change checking"); atomic_store_32(&mndNodeCheckSentinel, 0); return 0; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1146cfdc46..14ca7f3b02 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int32_t tqCheckAndRunStreamTask(STQ* pTq); +int32_t tqStartStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5b848b51bd..1726452148 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1772,20 +1772,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamSetStatusNormal(pTask); + + SStreamTask** ppHTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + keys[0] = pTask->historyTaskId.streamId; + keys[1] = pTask->historyTaskId.taskId; + + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if (ppHTask == NULL || *ppHTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + } else { + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } { - streamSetStatusNormal(pTask); streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + if (streamMetaCommit(pMeta) < 0) { // persist to disk } } streamTaskStop(pTask); + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + } + tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); pMeta->closedTask += 1; + if (ppHTask != NULL) { + pMeta->closedTask += 1; + } + // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); bool allStopped = (pMeta->closedTask == numOfTasks); if (allStopped) { @@ -1824,6 +1851,7 @@ _end: taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart all stream tasks", vgId); + tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3c0321f300..831ddb9e35 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } +int32_t tqStartStreamTasks(STQ* pTq) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); + + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + taosWLockLatch(&pMeta->lock); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + + int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key)); + + int8_t status = (*pTask)->status.taskStatus; + if (status == TASK_STATUS__STOP) { + streamSetStatusNormal(*pTask); + } + } + + taosWUnLockLatch(&pMeta->lock); + return 0; +} + int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d580b41093..43850ebfee 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); + tqStartStreamTasks(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq); } } else { From bce46f1be84ba89195790bd7931066c3b26478a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Sep 2023 09:50:45 +0800 Subject: [PATCH 06/14] fix(stream): transfer state should wait for the checkpoint generating completed. --- source/libs/stream/src/streamRecover.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 50ebb155c6..6a64a7d2ea 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -917,6 +917,13 @@ void streamTaskHalt(SStreamTask* pTask) { return; } + // wait for checkpoint completed + while(pTask->status.taskStatus == TASK_STATUS__CK) { + qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, + streamGetTaskStatusStr(TASK_STATUS__CK)); + taosMsleep(1000); + } + // upgrade to halt status if (status == TASK_STATUS__PAUSE) { qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), From 3765fbbba26116a255211e8b815bc28fb88c3fd2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Sep 2023 09:59:56 +0800 Subject: [PATCH 07/14] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ffce9e73db..f901188570 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2224,6 +2224,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } } else { mDebug("no update found in nodeList"); + taosArrayDestroy(pNodeSnapshot); } taosArrayDestroy(changeInfo.pUpdateNodeList); From 4fee5ba59a88453d7b2fd91218badcf10314d421 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Sep 2023 14:40:30 +0800 Subject: [PATCH 08/14] fix(stream): even in ck, start stream tasks. --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1726452148..d4eb99ac3f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1488,7 +1488,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { // discard all the data when the stream task is suspended. From 037a232bfa909d7be8b2db3f3e86dba7a5024af3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Sep 2023 17:05:36 +0800 Subject: [PATCH 09/14] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 6 +++--- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 908b250e61..e63bd7e59d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -41,16 +41,16 @@ enum { STREAM_STATUS__PAUSE, }; -enum { +typedef enum ETaskStatus { TASK_STATUS__NORMAL = 0, TASK_STATUS__DROPPING, - TASK_STATUS__FAIL, + TASK_STATUS__UNINIT, // not used, an placeholder TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore -}; +} ETaskStatus; enum { TASK_SCHED_STATUS__INACTIVE = 1, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f901188570..f2a9eed3ef 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1567,8 +1567,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } else if (taskStatus == TASK_STATUS__DROPPING) { memcpy(varDataVal(status), "dropping", 8); varDataSetLen(status, 8); - } else if (taskStatus == TASK_STATUS__FAIL) { - memcpy(varDataVal(status), "fail", 4); + } else if (taskStatus == TASK_STATUS__UNINIT) { + memcpy(varDataVal(status), "uninit", 6); varDataSetLen(status, 4); } else if (taskStatus == TASK_STATUS__STOP) { memcpy(varDataVal(status), "stop", 4); From d8518de5e0067fd65d4ed91df779e662358710d1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Sep 2023 20:20:39 +0800 Subject: [PATCH 10/14] fix(stream):fix error in timerActive. --- source/libs/stream/src/streamDispatch.c | 30 ++++++++++++++++--------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cf04bcc1b8..ee3026806b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; if (streamTaskShouldStop(&pTask->status)) { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } @@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration); - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration); + if (pTask->launchTaskTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + } else { + pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + } } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, @@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", - pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } @@ -997,8 +1004,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. From 325a177170d07b0ce2d7243e6a64647fb3bf995d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Sep 2023 17:18:04 +0800 Subject: [PATCH 11/14] fix(stream): disable the fill-history task when restart stream tasks. --- source/libs/stream/src/streamMeta.c | 2 ++ source/libs/stream/src/streamRecover.c | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 147e506680..93972c0b3a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -659,6 +659,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { + // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader + // In this case, we try not to start fill-history task anymore. if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 6a64a7d2ea..743d87e938 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); streamTaskLaunchScanHistory(pTask); } else { - qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + if (pTask->info.fillHistory == 1) { + qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + ASSERT(pTask->historyTaskId.taskId == 0); + } else { + qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + } } // when current stream task is ready, check the related fill history task. From 5869750782486c2ba7d0376bb3ad1b5fb34d1503 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Sep 2023 18:33:33 +0800 Subject: [PATCH 12/14] fix(stream): release stream task with max_delay option. --- source/libs/stream/src/streamMeta.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 93972c0b3a..88b8320867 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -521,6 +521,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ASSERT(pTask->status.timerActive == 0); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); + if (pTask->info.triggerParam != 0) { + qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); + taosTmrStop(pTask->schedInfo.pTimer); + pTask->info.triggerParam = 0; + streamMetaReleaseTask(pMeta, pTask); + } + streamMetaRemoveTask(pMeta, keys); streamMetaReleaseTask(pMeta, pTask); } else { From 4f85a2620a1dfde88e44add2a139b5015229d985 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Sep 2023 19:36:33 +0800 Subject: [PATCH 13/14] fix(stream): remove invalid free. --- source/libs/stream/src/streamDispatch.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ee3026806b..916cc6e9ee 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -989,8 +989,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } - streamFreeQitem(pTask->msgInfo.pData); - pTask->msgInfo.pData = NULL; return TSDB_CODE_SUCCESS; } From 5b3cb8ec421499b7bd43ffa39524e03bbe2e5b9e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Sep 2023 23:13:57 +0800 Subject: [PATCH 14/14] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tqStreamTask.c | 3 +++ source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 831ddb9e35..eb587b8be2 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -412,6 +412,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pStreamMeta, pTask); + if (pItem != NULL) { + streamFreeQitem(pItem); + } continue; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 88b8320867..4fa39d010f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -521,7 +521,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ASSERT(pTask->status.timerActive == 0); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - if (pTask->info.triggerParam != 0) { + if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { qDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); taosTmrStop(pTask->schedInfo.pTimer); pTask->info.triggerParam = 0;