From 73be7ddbc68ef247ff293af7506113d027fe402c Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Wed, 30 Oct 2024 19:38:08 +0800 Subject: [PATCH] fix: set tablefield of stream block to NULL --- source/dnode/vnode/src/tq/tqUtil.c | 3 +++ source/libs/executor/src/scanoperator.c | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b4866b8c65..1edc3d42ab 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -626,6 +626,9 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); TSDB_CHECK_NULL(tmp, code, line, END, terrno) colDataSetNULL(tmp, i); + tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + TSDB_CHECK_NULL(tmp, code, line, END, terrno) + colDataSetNULL(tmp, i); } if (type == 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f936e95005..918be0465f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2115,6 +2115,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { uint64_t groupId = pSrcGp[i]; if (groupId == 0) { @@ -2152,7 +2153,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr QUERY_CHECK_CODE(code, lino, _end); colDataSetNULL(pDestCalStartTsCol, i); - colDataSetNULL(pDestCalEndTsCol, i); + colDataSetNULL(pDestTableNameInxCol, i); pDestBlock->info.rows++; } @@ -2205,6 +2206,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { uint64_t groupId = pSrcGp[i]; if (groupId == 0) { @@ -2233,6 +2235,8 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&range.win.ekey, false); QUERY_CHECK_CODE(code, lino, _end); + colDataSetNULL(pDestTableNameInxCol, i); + pDestBlock->info.rows++; } @@ -2287,6 +2291,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); for (int32_t i = 0; i < pSrcBlock->info.rows;) { uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; @@ -2319,6 +2324,8 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS code = colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false); QUERY_CHECK_CODE(code, lino, _end); + colDataSetNULL(pDestTableNameInxCol, pDestBlock->info.rows); + pDestBlock->info.rows++; } @@ -3074,6 +3081,7 @@ static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, S colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j); colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j); colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j); + colDataSetNULL(taosArrayGet(pDst->pDataBlock, TABLE_NAME_COLUMN_INDEX), j); j++; } } @@ -3670,6 +3678,9 @@ FETCH_NEXT_BLOCK: // printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo)); (*ppRes) = pInfo->pCheckpointRes; return code; + } else { + qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } _end: