From 674acd0e9f6664f147b406f948998638a447a125 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Jul 2024 18:53:35 +0800 Subject: [PATCH 1/2] refactor(stream): delay checkpointInterval to generate the checkpoint after stream started. --- source/dnode/mnode/impl/src/mndStream.c | 31 +++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a137c10ed5..20cd415a6f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1134,6 +1134,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; + break; } if (pEntry->hTaskId != 0) { @@ -1153,6 +1154,27 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { return ready ? 0 : -1; } +int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { + int64_t ts = -1; + int32_t taskId = -1; + + for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { + STaskId *p = taosArrayGet(pTaskList, i); + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL || pEntry->id.streamId != streamId) { + continue; + } + + if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) { + ts = pEntry->startTime; + taskId = pEntry->id.taskId; + } + } + + mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId); + return ts; +} + typedef struct { int64_t streamId; int64_t duration; @@ -1191,6 +1213,15 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { continue; } + taosThreadMutexLock(&execInfo.lock); + int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); + if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) { + taosThreadMutexUnlock(&execInfo.lock); + sdbRelease(pSdb, pStream); + continue; + } + taosThreadMutexUnlock(&execInfo.lock); + SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; taosArrayPush(pList, &in); From 9f4f4f7f9fc19865bc5cfab307ebd5dd01eac58b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Jul 2024 00:01:54 +0800 Subject: [PATCH 2/2] fix(stream): set the null column when extracting data from submit data. --- source/libs/executor/src/scanoperator.c | 42 +++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 490f6b86fa..ce4915ca4d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2187,6 +2187,17 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co taosMemoryFree(p); } +static int32_t colIdComparFn(const void* param1, const void * param2) { + int32_t p1 = *(int32_t*) param1; + int32_t p2 = *(int32_t*) param2; + + if (p1 == p2) { + return 0; + } else { + return (p1 < p2)? -1:1; + } +} + static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; @@ -2203,6 +2214,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); + SArray* pColList = taosArrayInit(4, sizeof(int32_t)); + // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); @@ -2217,6 +2230,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); colExists = true; + taosArrayPush(pColList, &pColMatchInfo->dstSlotId); break; } } @@ -2225,6 +2239,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (!colExists) { SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataSetNNULL(pDst, 0, pBlockInfo->rows); + taosArrayPush(pColList, &pColMatchInfo->dstSlotId); } } @@ -2240,8 +2255,35 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // reset the error code. terrno = 0; + + for(int32_t i = 0; i < pInfo->numOfPseudoExpr; ++i) { + taosArrayPush(pColList, &pInfo->pPseudoExpr[i].base.resSchema.slotId); + } } + taosArraySort(pColList, colIdComparFn); + + int32_t i = 0, j = 0; + while(i < taosArrayGetSize(pColList)) { + int32_t slot1 = *(int32_t*)taosArrayGet(pColList, i); + if (slot1 > j) { + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j); + colDataSetNNULL(pDst, 0, pBlockInfo->rows); + j += 1; + } else { + i += 1; + j += 1; + } + } + + while(j < taosArrayGetSize(pInfo->pRes->pDataBlock)) { + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j); + colDataSetNNULL(pDst, 0, pBlockInfo->rows); + j += 1; + } + + taosArrayDestroy(pColList); + if (filter) { doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); }