From 9f4f4f7f9fc19865bc5cfab307ebd5dd01eac58b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Jul 2024 00:01:54 +0800 Subject: [PATCH] fix(stream): set the null column when extracting data from submit data. --- source/libs/executor/src/scanoperator.c | 42 +++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 490f6b86fa..ce4915ca4d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2187,6 +2187,17 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co taosMemoryFree(p); } +static int32_t colIdComparFn(const void* param1, const void * param2) { + int32_t p1 = *(int32_t*) param1; + int32_t p2 = *(int32_t*) param2; + + if (p1 == p2) { + return 0; + } else { + return (p1 < p2)? -1:1; + } +} + static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; @@ -2203,6 +2214,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); + SArray* pColList = taosArrayInit(4, sizeof(int32_t)); + // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); @@ -2217,6 +2230,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); colExists = true; + taosArrayPush(pColList, &pColMatchInfo->dstSlotId); break; } } @@ -2225,6 +2239,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (!colExists) { SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataSetNNULL(pDst, 0, pBlockInfo->rows); + taosArrayPush(pColList, &pColMatchInfo->dstSlotId); } } @@ -2240,8 +2255,35 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // reset the error code. terrno = 0; + + for(int32_t i = 0; i < pInfo->numOfPseudoExpr; ++i) { + taosArrayPush(pColList, &pInfo->pPseudoExpr[i].base.resSchema.slotId); + } } + taosArraySort(pColList, colIdComparFn); + + int32_t i = 0, j = 0; + while(i < taosArrayGetSize(pColList)) { + int32_t slot1 = *(int32_t*)taosArrayGet(pColList, i); + if (slot1 > j) { + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j); + colDataSetNNULL(pDst, 0, pBlockInfo->rows); + j += 1; + } else { + i += 1; + j += 1; + } + } + + while(j < taosArrayGetSize(pInfo->pRes->pDataBlock)) { + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j); + colDataSetNNULL(pDst, 0, pBlockInfo->rows); + j += 1; + } + + taosArrayDestroy(pColList); + if (filter) { doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); }