From 2c9d54af375a40748d1bb7335d98a3a97eb5163f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 19:58:46 +0800 Subject: [PATCH 01/12] fix(stream): fix memory leak. --- source/common/src/tglobal.c | 2 +- source/libs/stream/src/streamQueue.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1578a9ea4c..1928b3950d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 300; +int32_t tsStreamCheckpointTickInterval = 10; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6aaea2ce24..d3d114d4aa 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -387,7 +387,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", pTask->id.idStr, total + 1, size, tstrerror(code)); } else { - qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); + qDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); } return TSDB_CODE_SUCCESS; From 1ca06e960f3c2ca7e4a43a5a86cfbe88d8033361 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 20:25:24 +0800 Subject: [PATCH 02/12] fix(stream): remove stream in buf. --- source/dnode/mnode/impl/src/mndStream.c | 30 ++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f4110562a6..d2f4657aa8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -83,6 +83,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); +static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); + int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -1280,7 +1283,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } - // mndTransSetSerial(pTrans); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1304,13 +1306,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } + removeStreamTasksInBuf(pStream, &execNodeList); + char detail[100] = {0}; sprintf(detail, "igNotExists:%d", dropReq.igNotExists); SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream - auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); sdbRelease(pMnode->pSdb, pStream); @@ -2238,7 +2240,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { +void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2261,6 +2263,25 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p } } +void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { + int32_t level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + if (p != NULL) { + taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p); + taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + } + } + } +} + // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -2277,7 +2298,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - // int64_t now = taosGetTimestampSec(); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execNodeList.lock); From 377fdfacf6f8f4ab27b24febddfdabd6acc94074 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 23:51:08 +0800 Subject: [PATCH 03/12] fix(stream): add some logs. --- source/libs/stream/src/streamMeta.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 31bf5a482b..def1253502 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -644,10 +644,12 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); + int32_t vgId = pMeta->vgId; + + qInfo("vgId:%d load stream tasks from meta files", vgId); if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); + qError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); return -1; } @@ -662,6 +664,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + qError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno)); doClear(pKey, pVal, pCur, pRecycleList); return -1; } @@ -672,9 +676,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); qError( - "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " - "manually", - tsDataDir); + "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " + "manually", vgId, tsDataDir); return -1; } tDecoderClear(&decoder); @@ -731,6 +734,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); if (tdbTbcClose(pCur) < 0) { + qError("vgId:%d failed to close meta-file cursor", vgId); taosArrayDestroy(pRecycleList); return -1; } From 9a9fffa577afd32d97069abef82104495b78d7d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 00:52:35 +0800 Subject: [PATCH 04/12] fix(stream): reset task counter. --- source/libs/stream/src/streamMeta.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index def1253502..47146f6c53 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -294,6 +294,8 @@ void streamMetaClear(SStreamMeta* pMeta) { taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->chkpSaved); taosArrayClear(pMeta->chkpInUse); + pMeta->numOfStreamTasks = 0; + pMeta->numOfPausedTasks = 0; } void streamMetaClose(SStreamMeta* pMeta) { @@ -747,6 +749,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + ASSERT(pMeta->numOfStreamTasks <= numOfTasks); qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); From e76bab7122ed7d329ca6ce395a5d58106b6e0f73 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 09:43:14 +0800 Subject: [PATCH 05/12] fix(stream): not handle the check msg for follower tasks. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tq.c | 14 +++++++++++--- source/dnode/vnode/src/tq/tqStreamTask.c | 8 +------- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamRecover.c | 19 ++++++++++++++++--- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a19ebd67b0..a55d188978 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -661,6 +661,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); +int32_t streamSetStatusUnint(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f5909eb0fe..f797feee34 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -864,6 +864,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamMeta* pMeta = pTq->pStreamMeta; SStreamTaskCheckReq req; SDecoder decoder; @@ -884,10 +885,17 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); + // only the leader node handle the check request + if (!pMeta->leader) { + tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + return -1; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", @@ -899,7 +907,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } - return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); + return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3cba4567fe..3a5eeae561 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -95,7 +95,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { } pTask->taskExecInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); + tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); streamSetStatusNormal(pTask); streamTaskCheckDownstream(pTask); @@ -111,12 +111,9 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; -// taosWLockLatch(&pMeta->lock); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); -// taosWUnLockLatch(&pMeta->lock); return 0; } @@ -124,7 +121,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); -// taosWUnLockLatch(&pMeta->lock); return -1; } @@ -135,8 +131,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); -// taosWUnLockLatch(&pMeta->lock); - return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 47146f6c53..93f4b7c4dd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -749,7 +749,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - ASSERT(pMeta->numOfStreamTasks <= numOfTasks); + ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d28ec85dd5..2689e9ee70 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -205,21 +205,22 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); + const char* id = pTask->id.idStr; if (stage == -1) { - qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id, upstreamTaskId, stage); return 0; } if (pInfo->stage == -1) { pInfo->stage = stage; - qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id, upstreamTaskId, stage); } if (pInfo->stage < stage) { qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64, - pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage); + id, upstreamTaskId, vgId, stage, pInfo->stage); } return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0; @@ -355,6 +356,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } } +int32_t streamSetStatusUnint(SStreamTask* pTask) { + int32_t status = atomic_load_8(&pTask->status.taskStatus); + if (status == TASK_STATUS__DROPPING) { + qError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr); + return -1; + } else { + qDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT); + return 0; + } +} + // source int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); From b7efedf4aae44605d67a986d93ac0ee40c6f406c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 09:53:33 +0800 Subject: [PATCH 06/12] enh(stream): log the checkpoint time. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckpoint.c | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a55d188978..5329da2f17 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -247,6 +247,7 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { + int64_t startTs; int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version int64_t nextProcessVer; // current offset in WAL, not serialize it diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a48f74ce86..3f8b69785d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -141,6 +141,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.startTs = taosGetTimestampMs(); // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. @@ -312,15 +313,19 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); + double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; + if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); - qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, - pTask->checkpointingId); + qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId, + el, pTask->checkpointingId); } else { - qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, - pTask->id.idStr, remain, pMeta->numOfStreamTasks); + qDebug( + "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not " + "ready:%d/%d", + pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks); } // send check point response to upstream task From 5a6e50d523e83811d32154a3cbe7237fe7e0acec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 10:33:58 +0800 Subject: [PATCH 07/12] fix(stream): keep the status entry in hash table, instead entry index. --- source/dnode/mnode/impl/src/mndStream.c | 46 ++++++++++++++++--------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d2f4657aa8..3018326c74 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -125,7 +125,7 @@ int32_t mndInitStream(SMnode *pMnode) { taosThreadMutexInit(&execNodeList.lock, NULL); execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); - execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); + execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId)); return sdbSetTable(pMnode->pSdb, table); } @@ -1183,10 +1183,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execNodeList.lock); for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); - if (p->status != TASK_STATUS__NORMAL) { + STaskId *p = taosArrayGet(execNodeList.pTaskList, i); + STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", - p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status)); + pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); ready = false; break; } @@ -1557,13 +1562,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char status[20 + VARSTR_HEADER_SIZE] = {0}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); - if (index == NULL) { + STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); - const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status); + const char* pStatus = streamGetTaskStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2254,10 +2258,8 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod if (p == NULL) { STaskStatusEntry entry = { .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; - taosArrayPush(pExecNode->pTaskList, &entry); - - int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; - taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal)); + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + taosArrayPush(pExecNode->pTaskList, &id); } } } @@ -2275,11 +2277,21 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p != NULL) { - taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p); taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + + for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId* pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == id.taskId && pId->streamId == id.streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + break; + } + } + } } } + + ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } // todo: this process should be executed by the write queue worker of the mnode @@ -2308,13 +2320,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); - if (index == NULL) { + STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); + if (pEntry == NULL) { + mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId); continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); - pStatusEntry->status = p->status; + pEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); } From 38b3a7c1bd451c52cc20bf70bf6f004ea35edbfc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 11:05:49 +0800 Subject: [PATCH 08/12] fix(stream): update logs. --- source/libs/stream/src/streamExec.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 969b547d71..74b24fb4c3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -525,6 +525,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; + // merge multiple input data if possible in the input queue. + qDebug("s-task:%s start to extract data block from inputQ", id); + while (1) { int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; @@ -533,9 +536,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); if (pInput == NULL) { ASSERT(numOfBlocks == 0); From 309630eb11d38b367b8c7b7644a545ca4e203cca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 14:22:05 +0800 Subject: [PATCH 09/12] fix(stream): add timestamp. --- source/libs/stream/src/streamCheckpoint.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3f8b69785d..f367ba932f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -202,6 +202,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); + pTask->chkInfo.startTs = taosGetTimestampMs(); + // update the child Id for downstream tasks streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); From 1decaaee1e8bd3ff0ff584f68844b3c1b1ef3d81 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 16:49:04 +0800 Subject: [PATCH 10/12] fix(stream): set correct size for results generated by scan history stream tasks. --- source/libs/stream/src/streamData.c | 4 +++ source/libs/stream/src/streamExec.c | 38 +++++++++++++---------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 00bf631d74..a108667f5d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; pStreamBlocks->blocks = pRes; + if (pItem == NULL) { + return pStreamBlocks; + } + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; pStreamBlocks->sourceVer = pSubmit->ver; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 74b24fb4c3..d89817d236 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,9 +16,10 @@ #include "streamInt.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 -#define STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 4 +#define STREAM_RESULT_DUMP_THRESHOLD 100 +#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); @@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position - //code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY destroyStreamDataBlock(pStreamBlocks); return code; } @@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); // current output should be dispatched to down stream nodes - if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) { + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); if (code != TSDB_CODE_SUCCESS) { @@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + int32_t size = 0; int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; @@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - if ((++numOfBlocks) >= outputBatchSize) { - qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize); + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + + if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, + outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); break; } } if (taosArrayGetSize(pRes) > 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - - code = doOutputResultBlockImpl(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); + code = doOutputResultBlockImpl(pTask, pStreamBlocks); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock(pStreamBlocks); return code; } + + size = 0; } else { taosArrayDestroy(pRes); } From 271e492188008a682a4c1cf546a99558ad14c4c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 17:40:26 +0800 Subject: [PATCH 11/12] refactor: update the log level. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index e0bae18545..da7ac20600 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); void* colData = colDataGetData(pColData, j); - tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); + tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); } if (IS_SET_NULL(pCol)) { From b99232fd7a9f904c4012f8ee7792ca2c4828fd9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 17:45:28 +0800 Subject: [PATCH 12/12] fix(stream): limit the max scan times. --- source/dnode/vnode/src/tq/tqStreamTask.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3a5eeae561..854478f41e 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,6 +16,8 @@ #include "tq.h" #include "vnd.h" +#define MAX_REPEAT_SCAN_THRESHOLD 3 + static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); @@ -153,6 +155,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { } pMeta->walScanCounter += 1; + if (pMeta->walScanCounter > MAX_REPEAT_SCAN_THRESHOLD) { + pMeta->walScanCounter = MAX_REPEAT_SCAN_THRESHOLD; + } if (pMeta->walScanCounter > 1) { tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);