From 1bf581db49cea7cfb1c67dfcb577ecdd81936e63 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 16:28:43 +0800 Subject: [PATCH 01/11] fix: add block decode check --- source/common/src/tdatablock.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8e50c943b9..718763e063 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3036,6 +3036,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos // total rows sizeof(int32_t) int32_t numOfRows = *(int32_t*)pStart; pStart += sizeof(int32_t); + if (numOfRows <= 0) { + uError("block decode numOfRows:%d error", numOfRows); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } // total columns sizeof(int32_t) int32_t numOfCols = *(int32_t*)pStart; @@ -3084,8 +3089,9 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos for (int32_t i = 0; i < numOfCols; ++i) { colLen[i] = htonl(colLen[i]); - if (colLen[i] < 0) { + if (colLen[i] <= 0) { uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); + ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return terrno; } From 4e6781d7531ba5ac9648a2cb9e3f66aff0c2ba04 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 16:50:30 +0800 Subject: [PATCH 02/11] fix: add encode debug info --- source/common/src/tdatablock.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 718763e063..8cf7094fc3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3009,6 +3009,13 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } + if (colSizes[col] <= 0) { + uError("Invalid colSize:%d while encoding block", colSizes[col]); + ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } + colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); From 24eb6c439bc33a8d2fe556cc60f0c050d809031b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 17:18:40 +0800 Subject: [PATCH 03/11] fix: add validation --- source/common/src/tdatablock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8cf7094fc3..d970b7ab88 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3009,7 +3009,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } - if (colSizes[col] <= 0) { + if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0)) { uError("Invalid colSize:%d while encoding block", colSizes[col]); ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; From ebcb1e0c3617cdd5a40485add68daa69c66ce809 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 17:27:43 +0800 Subject: [PATCH 04/11] fix: correct validation --- source/common/src/tdatablock.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d970b7ab88..0bfff81d46 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3096,7 +3096,7 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos for (int32_t i = 0; i < numOfCols; ++i) { colLen[i] = htonl(colLen[i]); - if (colLen[i] <= 0) { + if (colLen[i] < 0) { uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; @@ -3130,6 +3130,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos if (colLen[i] > 0) { memcpy(pColInfoData->pData, pStart, colLen[i]); + } else if (!colDataIsNull_s(pColInfoData, 0)) { + uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); + ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; } // TODO From cd04e9e39dbd2a6aa84e2232e813c1b398504b70 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 18:52:11 +0800 Subject: [PATCH 05/11] fix: downstream free issue --- source/libs/executor/src/mergejoinoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 14f3a08e17..30cc596a44 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1919,10 +1919,10 @@ _return: if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); if (newDownstreams) { taosMemoryFree(pDownstream); } - destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; From b939a9daab7b2cd05e12a29ccd0c4560116094ce Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Sep 2024 08:35:24 +0800 Subject: [PATCH 06/11] fix: return code issue --- source/libs/executor/src/scanoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7c016d8c2a..45b4fd544a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4541,6 +4541,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + QUERY_CHECK_CODE(code, lino, _end); } } } From e9c7c33a1a8eb78355f4b012339b467b31117bbe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Sep 2024 09:55:45 +0800 Subject: [PATCH 07/11] fix: remove debug assert --- source/common/src/tdatablock.c | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0bfff81d46..5dbe01d8ba 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3009,9 +3009,8 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } - if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0)) { - uError("Invalid colSize:%d while encoding block", colSizes[col]); - ASSERT(0); + if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) { + uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; } @@ -3098,7 +3097,6 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos colLen[i] = htonl(colLen[i]); if (colLen[i] < 0) { uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); - ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return terrno; } @@ -3130,9 +3128,8 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos if (colLen[i] > 0) { memcpy(pColInfoData->pData, pStart, colLen[i]); - } else if (!colDataIsNull_s(pColInfoData, 0)) { - uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); - ASSERT(0); + } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) { + uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return terrno; } From d6955fd2bd6179aafb6549dfbc10a8ad63a25585 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Sep 2024 17:15:32 +0800 Subject: [PATCH 08/11] fix: column decode has null issue --- source/common/src/tdatablock.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5dbe01d8ba..816bf3a757 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3126,6 +3126,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos pStart += BitmapLen(numOfRows); } + // TODO + // setting this flag to true temporarily so aggregate function on stable will + // examine NULL value for non-primary key column + pColInfoData->hasNull = true; + if (colLen[i] > 0) { memcpy(pColInfoData->pData, pStart, colLen[i]); } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) { @@ -3134,10 +3139,6 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos return terrno; } - // TODO - // setting this flag to true temporarily so aggregate function on stable will - // examine NULL value for non-primary key column - pColInfoData->hasNull = true; pStart += colLen[i]; } From 6fdc3d63f7a062d68e97f9ac81e7fcf4ffe6a88f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 5 Sep 2024 17:11:38 +0800 Subject: [PATCH 09/11] fix: add debug assert --- source/common/src/tdatablock.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 816bf3a757..34beb38879 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3011,6 +3011,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) { uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type); + ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; } From ff3db4cd9e537f3d1ab38b6f1e81e456f34f1a14 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 5 Sep 2024 17:32:27 +0800 Subject: [PATCH 10/11] fix(stream):set null for invalid column --- source/libs/executor/src/streamtimewindowoperator.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8d5aa7104f..bedbdfa299 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -718,6 +718,11 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pTbName = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + SColumnInfoData* pPrimaryKey = NULL; + if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { + pPrimaryKey = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + } for (; (*pIndex) < size; (*pIndex)++) { SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex)); code = colDataSetVal(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); @@ -735,6 +740,11 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB code = colDataSetVal(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false); QUERY_CHECK_CODE(code, lino, _end); + colDataSetNULL(pTbName, pBlock->info.rows); + if (pPrimaryKey != NULL) { + colDataSetNULL(pPrimaryKey, pBlock->info.rows); + } + pBlock->info.rows++; } if ((*pIndex) == size) { From 54a2c139376c871c796592a8fe62f3988671499a Mon Sep 17 00:00:00 2001 From: Pan Wei <72057773+dapan1121@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:42:57 +0800 Subject: [PATCH 11/11] Update tdatablock.c --- source/common/src/tdatablock.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 34beb38879..816bf3a757 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3011,7 +3011,6 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) { uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type); - ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; }