From 1bf581db49cea7cfb1c67dfcb577ecdd81936e63 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 16:28:43 +0800 Subject: [PATCH 01/25] fix: add block decode check --- source/common/src/tdatablock.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8e50c943b9..718763e063 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3036,6 +3036,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos // total rows sizeof(int32_t) int32_t numOfRows = *(int32_t*)pStart; pStart += sizeof(int32_t); + if (numOfRows <= 0) { + uError("block decode numOfRows:%d error", numOfRows); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } // total columns sizeof(int32_t) int32_t numOfCols = *(int32_t*)pStart; @@ -3084,8 +3089,9 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos for (int32_t i = 0; i < numOfCols; ++i) { colLen[i] = htonl(colLen[i]); - if (colLen[i] < 0) { + if (colLen[i] <= 0) { uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); + ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return terrno; } From 4e6781d7531ba5ac9648a2cb9e3f66aff0c2ba04 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 16:50:30 +0800 Subject: [PATCH 02/25] fix: add encode debug info --- source/common/src/tdatablock.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 718763e063..8cf7094fc3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3009,6 +3009,13 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } + if (colSizes[col] <= 0) { + uError("Invalid colSize:%d while encoding block", colSizes[col]); + ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } + colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); From 24eb6c439bc33a8d2fe556cc60f0c050d809031b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 17:18:40 +0800 Subject: [PATCH 03/25] fix: add validation --- source/common/src/tdatablock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8cf7094fc3..d970b7ab88 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3009,7 +3009,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } - if (colSizes[col] <= 0) { + if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0)) { uError("Invalid colSize:%d while encoding block", colSizes[col]); ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; From ebcb1e0c3617cdd5a40485add68daa69c66ce809 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 17:27:43 +0800 Subject: [PATCH 04/25] fix: correct validation --- source/common/src/tdatablock.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d970b7ab88..0bfff81d46 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3096,7 +3096,7 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos for (int32_t i = 0; i < numOfCols; ++i) { colLen[i] = htonl(colLen[i]); - if (colLen[i] <= 0) { + if (colLen[i] < 0) { uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; @@ -3130,6 +3130,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos if (colLen[i] > 0) { memcpy(pColInfoData->pData, pStart, colLen[i]); + } else if (!colDataIsNull_s(pColInfoData, 0)) { + uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); + ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; } // TODO From cd04e9e39dbd2a6aa84e2232e813c1b398504b70 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 2 Sep 2024 18:52:11 +0800 Subject: [PATCH 05/25] fix: downstream free issue --- source/libs/executor/src/mergejoinoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 14f3a08e17..30cc596a44 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1919,10 +1919,10 @@ _return: if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); if (newDownstreams) { taosMemoryFree(pDownstream); } - destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; From b939a9daab7b2cd05e12a29ccd0c4560116094ce Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Sep 2024 08:35:24 +0800 Subject: [PATCH 06/25] fix: return code issue --- source/libs/executor/src/scanoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7c016d8c2a..45b4fd544a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4541,6 +4541,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + QUERY_CHECK_CODE(code, lino, _end); } } } From a72c0c2bd2c4a8eeb02598391f04e8eee25ff2bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 2 Sep 2024 16:49:47 +0800 Subject: [PATCH 07/25] fix(tsdb): init merge if get the initial pschema failed. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 101 ++++++++++-------------- 1 file changed, 40 insertions(+), 61 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b7bfd045d1..cecc52c51c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1734,14 +1734,35 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc return isCleanFileBlock; } +static int32_t initRowMergeIfNeeded(STsdbReader* pReader, int64_t uid) { + SRowMerger* pMerger = &pReader->status.merger; + int32_t code = 0; + + if (pMerger->pArray == NULL) { + STSchema* ps = getTableSchemaImpl(pReader, uid); + if (ps == NULL) { + return terrno; + } + + code = tsdbRowMergerInit(pMerger, ps); + } + + return code; +} + static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) { return TSDB_CODE_SUCCESS; } + int32_t code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; + } + int64_t st = taosGetTimestampUs(); SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); + code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); double el = (taosGetTimestampUs() - st) / 1000.0; updateComposedBlockInfo(pReader, el, pBlockScanInfo); @@ -1943,19 +1964,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema != NULL) { - tsdbError("tsdb failed at %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - int32_t code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int32_t code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } SRowKey minKey = k; @@ -1983,7 +1994,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* // file block ---> stt block -----> mem if (pkCompEx(&minKey, pfKey) == 0) { - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1996,7 +2007,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pkCompEx(&minKey, pSttKey) == 0) { TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); + code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2007,7 +2018,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (pkCompEx(&minKey, &k) == 0) { - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2018,7 +2029,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2039,19 +2050,9 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema) { - tsdbError("tsdb failed at %s %d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); @@ -2175,20 +2176,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema != NULL) { - tsdbError("tsdb read failed at: %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - - code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } SRowKey minKey = k; @@ -2579,20 +2569,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized - if (pMerger->pArray == NULL) { - if (pReader->info.pSchema != NULL) { - tsdbError("tsdb reader failed at: %s:%d", __func__, __LINE__); - return TSDB_CODE_INTERNAL_ERROR; - } - STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); - if (ps == NULL) { - return terrno; - } - - code = tsdbRowMergerInit(pMerger, ps); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = initRowMergeIfNeeded(pReader, pBlockScanInfo->uid); + if (code != 0) { + return code; } tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); @@ -4770,13 +4749,13 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi if (pCond->suid != 0) { pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, -1, 1); if (pReader->info.pSchema == NULL) { - tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr); + tsdbWarn("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr); } } else if (numOfTables > 0) { STableKeyInfo* pKey = pTableList; pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); if (pReader->info.pSchema == NULL) { - tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); + tsdbWarn("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); } } From e9c7c33a1a8eb78355f4b012339b467b31117bbe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Sep 2024 09:55:45 +0800 Subject: [PATCH 08/25] fix: remove debug assert --- source/common/src/tdatablock.c | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0bfff81d46..5dbe01d8ba 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3009,9 +3009,8 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } - if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0)) { - uError("Invalid colSize:%d while encoding block", colSizes[col]); - ASSERT(0); + if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) { + uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; } @@ -3098,7 +3097,6 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos colLen[i] = htonl(colLen[i]); if (colLen[i] < 0) { uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); - ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return terrno; } @@ -3130,9 +3128,8 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos if (colLen[i] > 0) { memcpy(pColInfoData->pData, pStart, colLen[i]); - } else if (!colDataIsNull_s(pColInfoData, 0)) { - uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); - ASSERT(0); + } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) { + uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return terrno; } From e2b2085052f94555de6d785f95f172b65ccddefd Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 3 Sep 2024 13:44:35 +0800 Subject: [PATCH 09/25] fix(query):check scan operator error code --- source/libs/executor/src/scanoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7c016d8c2a..675c3aa067 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4530,6 +4530,7 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); + QUERY_CHECK_CODE(code, lino, _end); } } } else { From d6955fd2bd6179aafb6549dfbc10a8ad63a25585 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Sep 2024 17:15:32 +0800 Subject: [PATCH 10/25] fix: column decode has null issue --- source/common/src/tdatablock.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5dbe01d8ba..816bf3a757 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3126,6 +3126,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos pStart += BitmapLen(numOfRows); } + // TODO + // setting this flag to true temporarily so aggregate function on stable will + // examine NULL value for non-primary key column + pColInfoData->hasNull = true; + if (colLen[i] > 0) { memcpy(pColInfoData->pData, pStart, colLen[i]); } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) { @@ -3134,10 +3139,6 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos return terrno; } - // TODO - // setting this flag to true temporarily so aggregate function on stable will - // examine NULL value for non-primary key column - pColInfoData->hasNull = true; pStart += colLen[i]; } From 87130db0e47d6a924c207eae7be0cf4e85512e7c Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Wed, 4 Sep 2024 17:42:57 +0800 Subject: [PATCH 11/25] fix:[TD-31889] extend result buf size for percentile function to handle large double value. --- source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 3 ++- tests/system-test/2-query/percentile.py | 6 ++++++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 17ba430150..f7a6585800 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -678,7 +678,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t // set result type if (numOfParams > 2) { - pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_VARCHAR}; + pFunc->node.resType = (SDataType){.bytes = 3200, .type = TSDB_DATA_TYPE_VARCHAR}; } else { pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 84ab103456..2f75774149 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2105,7 +2105,8 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { tMemBucket* pMemBucket = ppInfo->pMemBucket; if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null if (pCtx->numOfParams > 2) { - char buf[512] = {0}; + char buf[3200] = {0}; + // max length of double num is 317, e.g. use %.6lf to print -1.0e+308, consider the comma and bracket, 3200 is enough. size_t len = 1; varDataVal(buf)[0] = '['; diff --git a/tests/system-test/2-query/percentile.py b/tests/system-test/2-query/percentile.py index e01aae97c0..46fff0201e 100644 --- a/tests/system-test/2-query/percentile.py +++ b/tests/system-test/2-query/percentile.py @@ -124,6 +124,9 @@ class TDTestCase: tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.ntbname}') tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]') + tdSql.query(f'select percentile(col1 * 1e+200, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.ntbname}') + tdSql.checkRows(1); + tdSql.error(f'select percentile(col1) from {self.ntbname}') tdSql.error(f'select percentile(col1, -1) from {self.ntbname}') tdSql.error(f'select percentile(col1, 101) from {self.ntbname}') @@ -166,6 +169,9 @@ class TDTestCase: tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.stbname}_0') tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]') + tdSql.query(f'select percentile(col1 * 1e+200, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.stbname}_0') + tdSql.checkRows(1); + tdSql.error(f'select percentile(col1) from {self.stbname}_0') tdSql.error(f'select percentile(col1, -1) from {self.stbname}_0') tdSql.error(f'select percentile(col1, 101) from {self.stbname}_0') From d3e4203dcb5027e44c994301203288f4796e0935 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 4 Sep 2024 18:37:41 +0800 Subject: [PATCH 12/25] enh(query):remove void for operator --- source/libs/executor/src/exchangeoperator.c | 25 ++++++++++++++++--- source/libs/executor/src/executor.c | 10 ++++++-- source/libs/executor/src/scanoperator.c | 8 ++++-- source/libs/executor/src/sysscanoperator.c | 17 ++++++++++--- source/libs/executor/src/timewindowoperator.c | 7 +++--- 5 files changed, 53 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index f6f3570804..d43f37ca9e 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -464,7 +464,10 @@ _error: void destroyExchangeOperatorInfo(void* param) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; - (void)taosRemoveRef(exchangeObjRefPool, pExInfo->self); + int32_t code = taosRemoveRef(exchangeObjRefPool, pExInfo->self); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } void freeBlock(void* pParam) { @@ -505,7 +508,10 @@ void doDestroyExchangeOperatorInfo(void* param) { blockDataDestroy(pExInfo->pDummyBlock); tSimpleHashCleanup(pExInfo->pHashSources); - (void)tsem_destroy(&pExInfo->ready); + int32_t code = tsem_destroy(&pExInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } taosMemoryFreeClear(pExInfo->pTaskId); taosMemoryFreeClear(param); @@ -561,9 +567,13 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { code = TAOS_SYSTEM_ERROR(code); qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); + return code; } - (void)taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } return code; } @@ -1190,7 +1200,14 @@ static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeIn return pTask->code; } } - (void)tsem_wait(&pExchangeInfo->ready); + + code = tsem_wait(&pExchangeInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + pTask->code = code; + return pTask->code; + } + if (pTask->pWorkerCb) { code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 117eb8d80a..1653116da5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -904,8 +904,14 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { } SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); if (pExchangeInfo) { - (void)tsem_post(&pExchangeInfo->ready); - (void)taosReleaseRef(exchangeObjRefPool, pStop->refId); + int32_t code = tsem_post(&pExchangeInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } + code = taosReleaseRef(exchangeObjRefPool, pStop->refId); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f6c8efbaf5..5f137e46f1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -671,7 +671,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); val = *pVal; - (void)taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); + bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); + qTrace("release LRU cache, res %d", bRes); } qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch, @@ -893,7 +894,10 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { if (pInfo->base.pTableListInfo->groupOffset) { pInfo->countState = TABLE_COUNT_STATE_PROCESSED; } else { - (void)taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); + int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } } } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 7e22e38c95..d7ce123dbd 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2178,7 +2178,12 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca T_LONG_JMP(pTaskInfo->env, code); } - (void)tsem_wait(&pInfo->ready); + code = tsem_wait(&pInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } if (pTaskInfo->code) { qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo), @@ -2328,7 +2333,10 @@ void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNo void destroySysScanOperator(void* param) { SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; - (void)tsem_destroy(&pInfo->ready); + int32_t code = tsem_destroy(&pInfo->ready); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } blockDataDestroy(pInfo->pRes); if (pInfo->name.type == TSDB_TABLE_NAME_T) { @@ -2384,7 +2392,10 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { } } - (void)tsem_post(&pScanResInfo->ready); + int32_t res = tsem_post(&pScanResInfo->ready); + if (res != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(res)); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fc91877b66..1999694057 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1231,7 +1231,7 @@ void destroyIntervalOperatorInfo(void* param) { cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); - (void)tdListFree(pInfo->binfo.resultRowInfo.openWindow); + pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow); taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = NULL; @@ -2132,7 +2132,7 @@ typedef struct SGroupTimeWindow { void destroyMergeIntervalOperatorInfo(void* param) { SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; - (void)tdListFree(miaInfo->groupIntervals); + miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); @@ -2162,7 +2162,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t STimeWindow* prevWin = &prevGrpWin->window; if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { - (void)tdListPopNode(miaInfo->groupIntervals, listNode); + SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode); + taosMemoryFreeClear(tmp); } } From 27446f8df1aca0cbacfd8868fa851b8b446b8328 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 4 Sep 2024 19:01:14 +0800 Subject: [PATCH 13/25] fix(query):free data block --- source/libs/executor/src/aggregateoperator.c | 52 +++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 61f1339c82..d9af279813 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -51,7 +51,7 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); -static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); @@ -63,7 +63,7 @@ static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, in static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); @@ -184,7 +184,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { if (pBlock) { pAggInfo->pNewGroupBlock = NULL; tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + QUERY_CHECK_CODE(code, lino, _end); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); QUERY_CHECK_CODE(code, lino, _end); @@ -225,12 +226,19 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { break; } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); - QUERY_CHECK_CODE(code, lino, _end); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } code = doAggregateImpl(pOperator, pSup->pCtx); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } @@ -427,20 +435,24 @@ void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { *ppBlock = NULL; } -void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { +int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + int32_t code = TSDB_CODE_SUCCESS; SAggOperatorInfo* pAggInfo = pOperator->info; if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { - return; + return code; } - doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); + code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; + return code; } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { +int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; @@ -452,23 +464,27 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); if (pResultRow == NULL || pTaskInfo->code != 0) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + code = pTaskInfo->code; + lino = __LINE__; + goto _end; } /* * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } + code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); + QUERY_CHECK_CODE(code, lino, _end); } - int32_t ret = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, ret); + code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } + return code; } // a new buffer page for each table. Needs to opt this design From 277528996eb5ebf0e4c217d021d4998196bfce5a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Sep 2024 16:04:36 +0800 Subject: [PATCH 14/25] fix(stream): handle continuous retrieve during checkpoint procedure. --- include/libs/stream/tstream.h | 3 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 22 +++++-- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/querytask.c | 2 +- source/libs/stream/src/streamCheckStatus.c | 7 +- source/libs/stream/src/streamCheckpoint.c | 31 +++++---- source/libs/stream/src/streamDispatch.c | 71 +++++++++++++-------- source/libs/stream/src/streamExec.c | 47 ++++++++------ source/libs/stream/src/streamMeta.c | 11 +--- source/libs/stream/src/streamQueue.c | 4 +- source/libs/stream/src/streamTask.c | 5 +- 13 files changed, 117 insertions(+), 92 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 20f91106a5..31b9f62346 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -705,7 +705,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); -void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); +int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pInfo, int32_t code); @@ -810,6 +810,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq); int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes); +void streamTaskSetCheckpointFailed(SStreamTask* pTask); // stream task state machine, and event handling int32_t streamCreateStateMachine(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index ef91ccef34..d75713fd28 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -251,7 +251,7 @@ void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) { int32_t code = mndKillTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); if (code) { - mError("failed to kill trans:%d", pTrans->id); + mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code)); } } else { mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 07daab4459..de295f2611 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1129,7 +1129,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); - if (pTask == NULL) { + if (pTask == NULL || code != 0) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64 " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 44b3f75289..68f43d637b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -410,7 +410,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tDecoderClear(&decoder); if (code) { - tqError("vgId:%d failed to decode retrieve msg, quit handling it", pMeta->vgId); + tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId); return code; } @@ -420,9 +420,16 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.dstTaskId); tCleanupStreamRetrieveReq(&req); - return -1; + return code; } + // enqueue + tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr, + pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId); + + // if task is in ck status, set current ck failed + streamTaskSetCheckpointFailed(pTask); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamProcessRetrieveReq(pTask, &req); } else { @@ -431,14 +438,19 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { code = streamTaskBroadcastRetrieveReq(pTask, &req); } - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamTaskSendRetrieveRsp(&req, &rsp); + if (code != TSDB_CODE_SUCCESS) { // return error not send rsp manually + tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId, + req.srcTaskId, tstrerror(code)); + } else { // send rsp manually only on success. + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + streamTaskSendRetrieveRsp(&req, &rsp); + } streamMetaReleaseTask(pMeta, pTask); tCleanupStreamRetrieveReq(&req); // always return success, to disable the auto rsp - return TSDB_CODE_SUCCESS; + return code; } int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c13104fc07..6620e2b934 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -601,7 +601,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; (void)taosThreadOnce(&initPoolOnce, initRefPool); - qDebug("start to create task, TID:0x%" PRIx64 "QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); + qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model); if (code != TSDB_CODE_SUCCESS || NULL == *pTask) { diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index b050143ac0..fc6c9f2861 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -287,7 +287,7 @@ void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst) { memcpy(p, "TID:0x", offset); offset += tintToHex(taskId, &p[offset]); - memcpy(&p[offset], "QID:0x", 7); + memcpy(&p[offset], " QID:0x", 7); offset += 7; offset += tintToHex(queryId, &p[offset]); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 91196f31e0..41124d8543 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -65,12 +65,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ ", prev:%" PRId64, id, upstreamTaskId, vgId, stage, pInfo->stage); // record the checkpoint failure id and sent to mnode - streamMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - streamMutexUnlock(&pTask->lock); + streamTaskSetCheckpointFailed(pTask); } if (pInfo->stage != stage) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d0bf24bd03..916aee4e6e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -673,6 +673,15 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { } } +void streamTaskSetCheckpointFailed(SStreamTask* pTask) { + streamMutexLock(&pTask->lock); + ETaskStatus status = streamTaskGetStatus(pTask).state; + if (status == TASK_STATUS__CK) { + streamTaskSetFailedCheckpointId(pTask); + } + streamMutexUnlock(&pTask->lock); +} + static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { int32_t code = 0; int32_t cap = strlen(path) + 64; @@ -1111,26 +1120,20 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_ // record the dispatch checkpoint trigger info in the list // memory insufficient may cause the stream computing stopped -void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { +int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + int64_t now = taosGetTimestampMs(); - int64_t now = taosGetTimestampMs(); streamMutexLock(&pInfo->lock); - - // outputQ should be empty here - if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) { - stFatal("s-task:%s items are still in outputQ, failed to init trigger dispatch info", pTask->id.idStr); - return; - } - pInfo->dispatchTrigger = true; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); - if (px == NULL) { - // pause the stream task, if memory not enough + if (px == NULL) { // pause the stream task, if memory not enough + streamMutexUnlock(&pInfo->lock); + return terrno; } } else { for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { @@ -1141,13 +1144,15 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); - if (px == NULL) { - // pause the stream task, if memory not enough + if (px == NULL) { // pause the stream task, if memory not enough + streamMutexUnlock(&pInfo->lock); + return terrno; } } } streamMutexUnlock(&pInfo->lock); + return 0; } int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0bc090cdfe..36b35bd3f6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -726,8 +726,11 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } int32_t streamDispatchStreamBlock(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue); + const char* id = pTask->id.idStr; + int32_t code = 0; + SStreamDataBlock* pBlock = NULL; + + int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue); if (numOfElems > 0) { double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue)); int32_t numOfUnAccessed = streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue); @@ -749,41 +752,57 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } + ASSERT(pTask->msgInfo.pData == NULL); + if (pTask->msgInfo.pData != NULL) { stFatal("s-task:%s not rsp data:%p exist, should not dispatch msg now", id, pTask->msgInfo.pData); } else { stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status); } - SStreamDataBlock* pBlock = NULL; - streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock); - if (pBlock == NULL) { - atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); - stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status); - return 0; - } + while (1) { + streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock); + if (pBlock == NULL) { + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status); + return 0; + } - int32_t type = pBlock->type; - if (!(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER || - type == STREAM_INPUT__TRANS_STATE)) { - stError("s-task:%s invalid dispatch block type:%d", id, type); - return TSDB_CODE_INTERNAL_ERROR; - } + int32_t type = pBlock->type; + if (!(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE)) { + stError("s-task:%s invalid dispatch block type:%d", id, type); + return TSDB_CODE_INTERNAL_ERROR; + } - pTask->execInfo.dispatch += 1; + pTask->execInfo.dispatch += 1; - streamMutexLock(&pTask->msgInfo.lock); - initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); - streamMutexUnlock(&pTask->msgInfo.lock); + streamMutexLock(&pTask->msgInfo.lock); + initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); + streamMutexUnlock(&pTask->msgInfo.lock); - int32_t code = doBuildDispatchMsg(pTask, pBlock); - if (code == 0) { - destroyStreamDataBlock(pBlock); - } else { // todo handle build dispatch msg failed - } + code = doBuildDispatchMsg(pTask, pBlock); + if (code == 0) { + destroyStreamDataBlock(pBlock); + } else { // todo handle build dispatch msg failed + } - if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - streamTaskInitTriggerDispatchInfo(pTask); + if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + // outputQ should be empty here, otherwise, set the checkpoint failed due to the retrieve req happens + if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) { + stError("s-task:%s items are still in outputQ due to downstream retrieve, failed to init trigger dispatch", + pTask->id.idStr); + streamTaskSetCheckpointFailed(pTask); + clearBufferedDispatchMsg(pTask); + continue; + } + + code = streamTaskInitTriggerDispatchInfo(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle error + } + } + + break; } code = sendDispatchMsg(pTask, pTask->msgInfo.pData); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fca0bf403f..0ac37fd2b9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -98,14 +98,13 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; - - *totalBlocks = 0; - *totalSize = 0; - int32_t size = 0; int32_t numOfBlocks = 0; SArray* pRes = NULL; + *totalBlocks = 0; + *totalSize = 0; + while (1) { if (pRes == NULL) { pRes = taosArrayInit(4, sizeof(SSDataBlock)); @@ -131,7 +130,8 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; - int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); + + int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); if (num != 1) { stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); continue; @@ -596,12 +596,32 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) // static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } +static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize, + double st, const char* id) { + double el = (taosGetTimestampMs() - st) / 1000.0; + + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + SIZE_IN_MiB(totalSize), totalBlocks); + + pInfo->outputDataBlocks += totalBlocks; + pInfo->outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pInfo->procsThroughput = 0; + pInfo->outputThroughput = 0; + } else { + pInfo->outputThroughput = (totalSize / el); + pInfo->procsThroughput = (blockSize / el); + } +} + static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { const char* id = pTask->id.idStr; int32_t blockSize = 0; int64_t st = taosGetTimestampMs(); SCheckpointInfo* pInfo = &pTask->chkInfo; int64_t ver = pInfo->processedVer; + int64_t totalSize = 0; + int32_t totalBlocks = 0; stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type)); @@ -611,23 +631,8 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i return; } - int64_t totalSize = 0; - int32_t totalBlocks = 0; streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(totalSize), totalBlocks); - - pTask->execInfo.outputDataBlocks += totalBlocks; - pTask->execInfo.outputDataSize += totalSize; - if (fabs(el - 0.0) <= DBL_EPSILON) { - pTask->execInfo.procsThroughput = 0; - pTask->execInfo.outputThroughput = 0; - } else { - pTask->execInfo.outputThroughput = (totalSize / el); - pTask->execInfo.procsThroughput = (blockSize / el); - } + doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr); // update the currentVer if processing the submit blocks. if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 084eb7b827..81061cd06d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1254,16 +1254,7 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { continue; } - streamMutexLock(&pTask->lock); - - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } else { - stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState.name); - } - - streamMutexUnlock(&pTask->lock); + streamTaskSetCheckpointFailed(pTask); streamMetaReleaseTask(pMeta, pTask); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index eca728b2d5..3765d0f9d3 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t msgLen = px->submit.msgLen; @@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamFreeQitem(pItem); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = taosWriteQitem(pQueue, pItem); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0791784656..2acfd64b2a 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1098,15 +1098,12 @@ static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* return terrno = code; } - // enqueue - stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr, - pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); - pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->srcVgId = 0; code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code)); taosFreeQitem(pData); return code; } From df0f71a31c8db9aca3fc4cc991c8f7313af38a03 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Sep 2024 16:12:39 +0800 Subject: [PATCH 15/25] refactor: remove assert. --- source/libs/stream/src/streamDispatch.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 36b35bd3f6..ad1866807a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -752,8 +752,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - ASSERT(pTask->msgInfo.pData == NULL); - if (pTask->msgInfo.pData != NULL) { stFatal("s-task:%s not rsp data:%p exist, should not dispatch msg now", id, pTask->msgInfo.pData); } else { From ad5aa4b032a633c4efd2e9f71ae175a6588785e4 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Thu, 5 Sep 2024 16:58:20 +0800 Subject: [PATCH 16/25] fix(query)[TD-31929]. Fix mem-leak caused by redundant allocations --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 34280326a0..0c61366513 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1743,8 +1743,6 @@ static int32_t initRowMergeIfNeeded(STsdbReader* pReader, int64_t uid) { if (ps == NULL) { return terrno; } - - code = tsdbRowMergerInit(pMerger, ps); } return code; From 6fdc3d63f7a062d68e97f9ac81e7fcf4ffe6a88f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 5 Sep 2024 17:11:38 +0800 Subject: [PATCH 17/25] fix: add debug assert --- source/common/src/tdatablock.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 816bf3a757..34beb38879 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3011,6 +3011,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) { uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type); + ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; } From 6b945732319bc88128b2214e4bbfb80b3088907e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Sep 2024 17:17:26 +0800 Subject: [PATCH 18/25] fix(stream): fix syntax error. --- source/libs/stream/src/streamExec.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0ac37fd2b9..5bb9c993de 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -600,8 +600,8 @@ static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, double st, const char* id) { double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(totalSize), totalBlocks); + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id, + el, SIZE_IN_MiB(totalSize), totalBlocks); pInfo->outputDataBlocks += totalBlocks; pInfo->outputDataSize += totalSize; From ede112d39dcd605d70bb471fddc5fef30d5ad87f Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 5 Sep 2024 17:25:31 +0800 Subject: [PATCH 19/25] fix:[TD-31909] Avoid double free when error occurs in mode function. --- source/libs/function/src/builtinsimpl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 84ab103456..196fd767bb 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6008,6 +6008,7 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->buf = taosMemoryMalloc(pInfo->colBytes); if (NULL == pInfo->buf) { taosHashCleanup(pInfo->pHash); + pInfo->pHash = NULL; return TSDB_CODE_OUT_OF_MEMORY; } @@ -6016,6 +6017,7 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { static void modeFunctionCleanup(SModeInfo * pInfo) { taosHashCleanup(pInfo->pHash); + pInfo->pHash = NULL; taosMemoryFreeClear(pInfo->buf); } From ff3db4cd9e537f3d1ab38b6f1e81e456f34f1a14 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 5 Sep 2024 17:32:27 +0800 Subject: [PATCH 20/25] fix(stream):set null for invalid column --- source/libs/executor/src/streamtimewindowoperator.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8d5aa7104f..bedbdfa299 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -718,6 +718,11 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pTbName = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + SColumnInfoData* pPrimaryKey = NULL; + if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { + pPrimaryKey = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + } for (; (*pIndex) < size; (*pIndex)++) { SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex)); code = colDataSetVal(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); @@ -735,6 +740,11 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB code = colDataSetVal(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false); QUERY_CHECK_CODE(code, lino, _end); + colDataSetNULL(pTbName, pBlock->info.rows); + if (pPrimaryKey != NULL) { + colDataSetNULL(pPrimaryKey, pBlock->info.rows); + } + pBlock->info.rows++; } if ((*pIndex) == size) { From c3149f52252e2b4a30e599f9766484e8d608da3f Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 5 Sep 2024 18:18:51 +0800 Subject: [PATCH 21/25] fix:[TD-31914] Fix mem leak when error occurs. --- source/libs/scalar/src/filter.c | 49 +++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 382b83012d..9a9b5b44f1 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1274,6 +1274,16 @@ int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) { return TSDB_CODE_SUCCESS; } +static void filterFreeGroup(void *pItem) { + if (pItem == NULL) { + return; + } + + SFilterGroup *p = (SFilterGroup *)pItem; + taosMemoryFreeClear(p->unitIdxs); + taosMemoryFreeClear(p->unitFlags); +} + int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) { SOperatorNode *node = (SOperatorNode *)tree; int32_t ret = TSDB_CODE_SUCCESS; @@ -1336,9 +1346,11 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) { SFilterGroup fgroup = {0}; code = filterAddUnitToGroup(&fgroup, uidx); if (TSDB_CODE_SUCCESS != code) { + filterFreeGroup((void*)&fgroup); break; } if (NULL == taosArrayPush(group, &fgroup)) { + filterFreeGroup((void*)&fgroup); code = TSDB_CODE_OUT_OF_MEMORY; break; } @@ -1658,16 +1670,6 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan return TSDB_CODE_SUCCESS; } -static void filterFreeGroup(void *pItem) { - if (pItem == NULL) { - return; - } - - SFilterGroup *p = (SFilterGroup *)pItem; - taosMemoryFreeClear(p->unitIdxs); - taosMemoryFreeClear(p->unitFlags); -} - EDealRes fltTreeToGroup(SNode *pNode, void *pContext) { int32_t code = TSDB_CODE_SUCCESS; SArray *preGroup = NULL; @@ -2944,25 +2946,44 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t gResNum for (int32_t n = 0; n < usize; ++n) { SFilterUnit *u = (SFilterUnit *)taosArrayGetP((SArray *)colInfo->info, n); if (NULL == u) { - FLT_ERR_JRET(TSDB_CODE_OUT_OF_RANGE); + code = TSDB_CODE_OUT_OF_RANGE; + break; } - FLT_ERR_JRET(filterAddUnitFromUnit(info, &oinfo, u, &uidx)); - FLT_ERR_JRET(filterAddUnitToGroup(&ng, uidx)); + code = filterAddUnitFromUnit(info, &oinfo, u, &uidx); + if (TSDB_CODE_SUCCESS != code) { + break; + } + code = filterAddUnitToGroup(&ng, uidx); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + if (TSDB_CODE_SUCCESS != code) { + break; } continue; } + if (TSDB_CODE_SUCCESS != code) { + filterFreeGroup((void*)&ng); + FLT_ERR_JRET(code); + } if (colInfo->type != RANGE_TYPE_MR_CTX) { fltError("filterRewrite get invalid col type : %d", colInfo->type); FLT_ERR_JRET(TSDB_CODE_QRY_FILTER_INVALID_TYPE); } - FLT_ERR_JRET(filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group)); + code = filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group); + if (TSDB_CODE_SUCCESS != code) { + filterFreeGroup((void*)&ng); + FLT_ERR_JRET(code); + } } if (ng.unitNum > 0) { if (NULL == taosArrayPush(group, &ng)) { + filterFreeGroup((void*)&ng); FLT_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } } From fc74db1c858f81695fb49a2620652c423bc30bf7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 5 Sep 2024 18:39:54 +0800 Subject: [PATCH 22/25] more code --- source/common/src/tmsg.c | 889 +++++++++++++++++++++++++-------------- 1 file changed, 564 insertions(+), 325 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 349a928b41..1b10ba3226 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10975,15 +10975,12 @@ _exit: static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData) { int32_t code = 0; + int32_t lino; int32_t flags; uint8_t version; - if (tStartDecode(pCoder) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - - if (tDecodeI32v(pCoder, &flags) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(pCoder)); + TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags)); pSubmitTbData->flags = flags & 0xff; version = (flags >> 8) & 0xff; @@ -10991,41 +10988,25 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { pSubmitTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (pSubmitTbData->pCreateTbReq == NULL) { - goto _exit; + TAOS_CHECK_EXIT(terrno); } - if (tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq)); } // submit data - if (tDecodeI64(pCoder, &pSubmitTbData->suid) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - if (tDecodeI64(pCoder, &pSubmitTbData->uid) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - if (tDecodeI32v(pCoder, &pSubmitTbData->sver) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->suid)); + TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->uid)); + TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver)); if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { uint64_t nColData; - if (tDecodeU64v(pCoder, &nColData) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData)); pSubmitTbData->aCol = taosArrayInit(nColData, sizeof(SColData)); if (pSubmitTbData->aCol == NULL) { - code = terrno; - goto _exit; + TAOS_CHECK_EXIT(terrno); } for (int32_t i = 0; i < nColData; ++i) { @@ -11033,19 +11014,18 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa } } else { uint64_t nRow; - if (tDecodeU64v(pCoder, &nRow) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow)); pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); if (pSubmitTbData->aRowP == NULL) { - code = terrno; - goto _exit; + TAOS_CHECK_EXIT(terrno); } for (int32_t iRow = 0; iRow < nRow; ++iRow) { SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); + if (ppRow == NULL) { + TAOS_CHECK_EXIT(terrno); + } *ppRow = (SRow *)(pCoder->data + pCoder->pos); pCoder->pos += (*ppRow)->len; @@ -11054,31 +11034,27 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa pSubmitTbData->ctimeMs = 0; if (!tDecodeIsEnd(pCoder)) { - if (tDecodeI64(pCoder, &pSubmitTbData->ctimeMs) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } + TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->ctimeMs)); } tEndDecode(pCoder); _exit: - if (code) { - // TODO: clear - } - return 0; + return code; } int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) { - if (tStartEncode(pCoder) < 0) return -1; + int32_t code = 0; + int32_t lino; - if (tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(pCoder)); + TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData))); for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) { - if (tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i))); } - tEndEncode(pCoder); - return 0; +_exit: + return code; } int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) { @@ -11187,17 +11163,21 @@ void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) { } int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) { - if (tStartEncode(pCoder) < 0) return -1; + int32_t code = 0; + int32_t lino; - if (tEncodeI32v(pCoder, pRsp->affectedRows) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(pCoder)); - if (tEncodeU64v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp)) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32v(pCoder, pRsp->affectedRows)); + + TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp))); for (int32_t i = 0; i < taosArrayGetSize(pRsp->aCreateTbRsp); ++i) { - if (tEncodeSVCreateTbRsp(pCoder, taosArrayGet(pRsp->aCreateTbRsp, i)) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSVCreateTbRsp(pCoder, taosArrayGet(pRsp->aCreateTbRsp, i))); } tEndEncode(pCoder); - return 0; +_exit: + return code; } int32_t tDecodeSSubmitRsp2(SDecoder *pCoder, SSubmitRsp2 *pRsp) { @@ -11280,55 +11260,79 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { } int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) { + int32_t code = 0; + int32_t lino; + int32_t tlen; SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; - if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->igUntreated) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { @@ -11375,59 +11379,71 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { int32_t tSerializeSCMCreateViewReq(void *buf, int32_t bufLen, const SCMCreateViewReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->querySql) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; - if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1; - if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; - if (tEncodeI32(&encoder, pReq->numOfCols) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->fullname)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dbFName)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->querySql)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->sql)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->orReplace)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->precision)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->numOfCols)); for (int32_t i = 0; i < pReq->numOfCols; ++i) { SSchema *pSchema = &pReq->pSchema[i]; - if (tEncodeSSchema(&encoder, pSchema) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSSchema(&encoder, pSchema)); } tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSCMCreateViewReq(void *buf, int32_t bufLen, SCMCreateViewReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pReq->querySql) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->numOfCols) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->fullname)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->dbFName)); + TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pReq->querySql)); + TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pReq->sql)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->orReplace)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->precision)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->numOfCols)); if (pReq->numOfCols > 0) { pReq->pSchema = taosMemoryCalloc(pReq->numOfCols, sizeof(SSchema)); if (pReq->pSchema == NULL) { - return -1; + TAOS_CHECK_EXIT(terrno); } for (int32_t i = 0; i < pReq->numOfCols; ++i) { SSchema *pSchema = pReq->pSchema + i; - if (tDecodeSSchema(&decoder, pSchema) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeSSchema(&decoder, pSchema)); } } tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } void tFreeSCMCreateViewReq(SCMCreateViewReq *pReq) { @@ -11442,37 +11458,49 @@ void tFreeSCMCreateViewReq(SCMCreateViewReq *pReq) { int32_t tSerializeSCMDropViewReq(void *buf, int32_t bufLen, const SCMDropViewReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; - if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->fullname)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dbFName)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->sql)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSCMDropViewReq(void *buf, int32_t bufLen, SCMDropViewReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1; - if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->fullname)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->dbFName)); + TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pReq->sql)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } void tFreeSCMDropViewReq(SCMDropViewReq *pReq) { if (NULL == pReq) { @@ -11484,101 +11512,133 @@ void tFreeSCMDropViewReq(SCMDropViewReq *pReq) { int32_t tSerializeSViewMetaReq(void *buf, int32_t bufLen, const SViewMetaReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->fullname) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->fullname)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSViewMetaReq(void *buf, int32_t bufLen, SViewMetaReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->fullname) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->fullname)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } static int32_t tEncodeSViewMetaRsp(SEncoder *pEncoder, const SViewMetaRsp *pRsp) { - if (tEncodeCStr(pEncoder, pRsp->name) < 0) return -1; - if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1; - if (tEncodeCStr(pEncoder, pRsp->user) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->viewId) < 0) return -1; - if (tEncodeCStr(pEncoder, pRsp->querySql) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->type) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->version) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->numOfCols) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pRsp->name)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pRsp->dbFName)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pRsp->user)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pRsp->dbId)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pRsp->viewId)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pRsp->querySql)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->precision)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->type)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->version)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->numOfCols)); for (int32_t i = 0; i < pRsp->numOfCols; ++i) { SSchema *pSchema = &pRsp->pSchema[i]; - if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSSchema(pEncoder, pSchema)); } - return 0; +_exit: + return code; } int32_t tSerializeSViewMetaRsp(void *buf, int32_t bufLen, const SViewMetaRsp *pRsp) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeSViewMetaRsp(&encoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeSViewMetaRsp(&encoder, pRsp)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) { - if (tDecodeCStrTo(pDecoder, pRsp->name) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pRsp->user) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->viewId) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pRsp->querySql) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->type) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->numOfCols) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pRsp->name)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pRsp->dbFName)); + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pRsp->user)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pRsp->dbId)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pRsp->viewId)); + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pRsp->querySql)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->precision)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->type)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->version)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->numOfCols)); if (pRsp->numOfCols > 0) { pRsp->pSchema = taosMemoryCalloc(pRsp->numOfCols, sizeof(SSchema)); if (pRsp->pSchema == NULL) { - return -1; + TAOS_CHECK_EXIT(terrno); } for (int32_t i = 0; i < pRsp->numOfCols; ++i) { SSchema *pSchema = pRsp->pSchema + i; - if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeSSchema(pDecoder, pSchema)); } } - return 0; +_exit: + return code; } int32_t tDeserializeSViewMetaRsp(void *buf, int32_t bufLen, SViewMetaRsp *pRsp) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeSViewMetaRsp(&decoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeSViewMetaRsp(&decoder, pRsp)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } void tFreeSViewMetaRsp(SViewMetaRsp *pRsp) { @@ -11593,48 +11653,62 @@ void tFreeSViewMetaRsp(SViewMetaRsp *pRsp) { int32_t tSerializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); int32_t numOfMeta = taosArrayGetSize(pRsp->pViewRsp); - if (tEncodeI32(&encoder, numOfMeta) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, numOfMeta)); for (int32_t i = 0; i < numOfMeta; ++i) { SViewMetaRsp *pMetaRsp = taosArrayGetP(pRsp->pViewRsp, i); - if (tEncodeSViewMetaRsp(&encoder, pMetaRsp) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSViewMetaRsp(&encoder, pMetaRsp)); } tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); int32_t numOfMeta = 0; - if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfMeta)); pRsp->pViewRsp = taosArrayInit(numOfMeta, POINTER_BYTES); if (pRsp->pViewRsp == NULL) { - return -1; + TAOS_CHECK_EXIT(terrno); } for (int32_t i = 0; i < numOfMeta; ++i) { SViewMetaRsp *metaRsp = taosMemoryCalloc(1, sizeof(SViewMetaRsp)); if (NULL == metaRsp) return -1; - if (tDecodeSViewMetaRsp(&decoder, metaRsp) < 0) return -1; - if (taosArrayPush(pRsp->pViewRsp, &metaRsp) == NULL) return -1; + TAOS_CHECK_EXIT(tDecodeSViewMetaRsp(&decoder, metaRsp)); + if (taosArrayPush(pRsp->pViewRsp, &metaRsp) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } void tFreeSViewHbRsp(SViewHbRsp *pRsp) { @@ -11662,183 +11736,243 @@ void setFieldWithOptions(SFieldWithOptions *fieldWithOptions, SField *field) { } int32_t tSerializeTableTSMAInfoReq(void *buf, int32_t bufLen, const STableTSMAInfoReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeI8(&encoder, pReq->fetchingWithTsmaName) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->fetchingWithTsmaName)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeTableTSMAInfoReq(void *buf, int32_t bufLen, STableTSMAInfoReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeI8(&decoder, (uint8_t *)&pReq->fetchingWithTsmaName) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, (uint8_t *)&pReq->fetchingWithTsmaName)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } static int32_t tEncodeTableTSMAInfo(SEncoder *pEncoder, const STableTSMAInfo *pTsmaInfo) { - if (tEncodeCStr(pEncoder, pTsmaInfo->name) < 0) return -1; - if (tEncodeU64(pEncoder, pTsmaInfo->tsmaId) < 0) return -1; - if (tEncodeCStr(pEncoder, pTsmaInfo->tb) < 0) return -1; - if (tEncodeCStr(pEncoder, pTsmaInfo->dbFName) < 0) return -1; - if (tEncodeU64(pEncoder, pTsmaInfo->suid) < 0) return -1; - if (tEncodeU64(pEncoder, pTsmaInfo->destTbUid) < 0) return -1; - if (tEncodeU64(pEncoder, pTsmaInfo->dbId) < 0) return -1; - if (tEncodeI32(pEncoder, pTsmaInfo->version) < 0) return -1; - if (tEncodeCStr(pEncoder, pTsmaInfo->targetTb) < 0) return -1; - if (tEncodeCStr(pEncoder, pTsmaInfo->targetDbFName) < 0) return -1; - if (tEncodeI64(pEncoder, pTsmaInfo->interval) < 0) return -1; - if (tEncodeI8(pEncoder, pTsmaInfo->unit) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTsmaInfo->name)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTsmaInfo->tsmaId)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTsmaInfo->tb)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTsmaInfo->dbFName)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTsmaInfo->suid)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTsmaInfo->destTbUid)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTsmaInfo->dbId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTsmaInfo->version)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTsmaInfo->targetTb)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTsmaInfo->targetDbFName)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTsmaInfo->interval)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTsmaInfo->unit)); int32_t size = pTsmaInfo->pFuncs ? pTsmaInfo->pFuncs->size : 0; - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); for (int32_t i = 0; i < size; ++i) { STableTSMAFuncInfo *pFuncInfo = taosArrayGet(pTsmaInfo->pFuncs, i); - if (tEncodeI32(pEncoder, pFuncInfo->funcId) < 0) return -1; - if (tEncodeI16(pEncoder, pFuncInfo->colId) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pFuncInfo->funcId)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pFuncInfo->colId)); } size = pTsmaInfo->pTags ? pTsmaInfo->pTags->size : 0; - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); for (int32_t i = 0; i < size; ++i) { const SSchema *pSchema = taosArrayGet(pTsmaInfo->pTags, i); - if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSSchema(pEncoder, pSchema)); } size = pTsmaInfo->pUsedCols ? pTsmaInfo->pUsedCols->size : 0; - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); for (int32_t i = 0; i < size; ++i) { const SSchema *pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i); - if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSSchema(pEncoder, pSchema)); } - if (tEncodeCStr(pEncoder, pTsmaInfo->ast) < 0) return -1; - if (tEncodeI64(pEncoder, pTsmaInfo->streamUid) < 0) return -1; - if (tEncodeI64(pEncoder, pTsmaInfo->reqTs) < 0) return -1; - if (tEncodeI64(pEncoder, pTsmaInfo->rspTs) < 0) return -1; - if (tEncodeI64(pEncoder, pTsmaInfo->delayDuration) < 0) return -1; - if (tEncodeI8(pEncoder, pTsmaInfo->fillHistoryFinished) < 0) return -1; - return 0; + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTsmaInfo->ast)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTsmaInfo->streamUid)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTsmaInfo->reqTs)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTsmaInfo->rspTs)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTsmaInfo->delayDuration)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTsmaInfo->fillHistoryFinished)); + +_exit: + return code; } static int32_t tDecodeTableTSMAInfo(SDecoder *pDecoder, STableTSMAInfo *pTsmaInfo) { - if (tDecodeCStrTo(pDecoder, pTsmaInfo->name) < 0) return -1; - if (tDecodeU64(pDecoder, &pTsmaInfo->tsmaId) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTsmaInfo->tb) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTsmaInfo->dbFName) < 0) return -1; - if (tDecodeU64(pDecoder, &pTsmaInfo->suid) < 0) return -1; - if (tDecodeU64(pDecoder, &pTsmaInfo->destTbUid) < 0) return -1; - if (tDecodeU64(pDecoder, &pTsmaInfo->dbId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTsmaInfo->version) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTsmaInfo->targetTb) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTsmaInfo->targetDbFName) < 0) return -1; - if (tDecodeI64(pDecoder, &pTsmaInfo->interval) < 0) return -1; - if (tDecodeI8(pDecoder, &pTsmaInfo->unit) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTsmaInfo->name)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pTsmaInfo->tsmaId)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTsmaInfo->tb)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTsmaInfo->dbFName)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pTsmaInfo->suid)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pTsmaInfo->destTbUid)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pTsmaInfo->dbId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTsmaInfo->version)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTsmaInfo->targetTb)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTsmaInfo->targetDbFName)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTsmaInfo->interval)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTsmaInfo->unit)); int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); if (size > 0) { pTsmaInfo->pFuncs = taosArrayInit(size, sizeof(STableTSMAFuncInfo)); - if (!pTsmaInfo->pFuncs) return -1; + if (!pTsmaInfo->pFuncs) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < size; ++i) { STableTSMAFuncInfo funcInfo = {0}; - if (tDecodeI32(pDecoder, &funcInfo.funcId) < 0) return -1; - if (tDecodeI16(pDecoder, &funcInfo.colId) < 0) return -1; - if (!taosArrayPush(pTsmaInfo->pFuncs, &funcInfo)) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &funcInfo.funcId)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &funcInfo.colId)); + if (!taosArrayPush(pTsmaInfo->pFuncs, &funcInfo)) { + TAOS_CHECK_EXIT(terrno); + } } } - if (tDecodeI32(pDecoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); if (size > 0) { pTsmaInfo->pTags = taosArrayInit(size, sizeof(SSchema)); - if (!pTsmaInfo->pTags) return -1; + if (!pTsmaInfo->pTags) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < size; ++i) { SSchema schema = {0}; - if (tDecodeSSchema(pDecoder, &schema) < 0) return -1; - if (taosArrayPush(pTsmaInfo->pTags, &schema) == NULL) return -1; + TAOS_CHECK_EXIT(tDecodeSSchema(pDecoder, &schema)); + if (taosArrayPush(pTsmaInfo->pTags, &schema) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } } - if (tDecodeI32(pDecoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); if (size > 0) { pTsmaInfo->pUsedCols = taosArrayInit(size, sizeof(SSchema)); - if (!pTsmaInfo->pUsedCols) return -1; + if (!pTsmaInfo->pUsedCols) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < size; ++i) { SSchema schema = {0}; - if (tDecodeSSchema(pDecoder, &schema) < 0) return -1; - if (taosArrayPush(pTsmaInfo->pUsedCols, &schema) == NULL) return -1; + TAOS_CHECK_EXIT(tDecodeSSchema(pDecoder, &schema)); + if (taosArrayPush(pTsmaInfo->pUsedCols, &schema) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } } - if (tDecodeCStrAlloc(pDecoder, &pTsmaInfo->ast) < 0) return -1; - if (tDecodeI64(pDecoder, &pTsmaInfo->streamUid) < 0) return -1; - if (tDecodeI64(pDecoder, &pTsmaInfo->reqTs) < 0) return -1; - if (tDecodeI64(pDecoder, &pTsmaInfo->rspTs) < 0) return -1; - if (tDecodeI64(pDecoder, &pTsmaInfo->delayDuration) < 0) return -1; - if (tDecodeI8(pDecoder, (int8_t *)&pTsmaInfo->fillHistoryFinished) < 0) return -1; - return 0; + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTsmaInfo->ast)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTsmaInfo->streamUid)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTsmaInfo->reqTs)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTsmaInfo->rspTs)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTsmaInfo->delayDuration)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pTsmaInfo->fillHistoryFinished)); + +_exit: + return code; } static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp) { + int32_t code = 0; + int32_t lino; + int32_t size = pRsp->pTsmas ? pRsp->pTsmas->size : 0; - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); for (int32_t i = 0; i < size; ++i) { STableTSMAInfo *pInfo = taosArrayGetP(pRsp->pTsmas, i); - if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeTableTSMAInfo(pEncoder, pInfo)); } - return 0; +_exit: + return code; } static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pRsp) { int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); if (size <= 0) return 0; pRsp->pTsmas = taosArrayInit(size, POINTER_BYTES); - if (!pRsp->pTsmas) return -1; + if (!pRsp->pTsmas) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < size; ++i) { STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); - if (!pTsma) return -1; - if (taosArrayPush(pRsp->pTsmas, &pTsma) == NULL) return -1; - if (tDecodeTableTSMAInfo(pDecoder, pTsma) < 0) return -1; + if (!pTsma) { + TAOS_CHECK_EXIT(terrno); + } + if (taosArrayPush(pRsp->pTsmas, &pTsma) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeTableTSMAInfo(pDecoder, pTsma)); } - return 0; +_exit: + return code; } int32_t tSerializeTableTSMAInfoRsp(void *buf, int32_t bufLen, const STableTSMAInfoRsp *pRsp) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeTableTSMAInfoRsp(&encoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeTableTSMAInfoRsp(&encoder, pRsp)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeTableTSMAInfoRsp(void *buf, int32_t bufLen, STableTSMAInfoRsp *pRsp) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeTableTSMAInfoRsp(&decoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeTableTSMAInfoRsp(&decoder, pRsp)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } void tFreeTableTSMAInfo(void *p) { @@ -11899,129 +12033,184 @@ void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp *pRsp) { } static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) { - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->fetchIdx) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->subFetchIdx) < 0) return -1; - return 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->subFetchIdx)); + +_exit: + return code; } int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeStreamProgressReq(&encoder, pReq) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) { - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->fetchIdx) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->subFetchIdx) < 0) return -1; - return 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->subFetchIdx)); + +_exit: + return code; } int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, (char *)buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeStreamProgressReq(&decoder, pReq) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) { - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->fillHisFinished) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->progressDelay) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->fetchIdx) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->subFetchIdx) < 0) return -1; - return 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->vgId)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->subFetchIdx)); + +_exit: + return code; } int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeStreamProgressRsp(&encoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) { - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; - if (tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->progressDelay) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->fetchIdx) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->subFetchIdx) < 0) return -1; - return 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->vgId)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->subFetchIdx)); + +_exit: + return code; } int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeStreamProgressRsp(&decoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder *pEncoder, const SMDropTbReqsOnSingleVg *pReq) { const SVgroupInfo *pVgInfo = &pReq->vgInfo; - if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1; - if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; - if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1; - if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pVgInfo->vgId)); + TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pVgInfo->hashBegin)); + TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pVgInfo->hashEnd)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pVgInfo->epSet)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pVgInfo->numOfTable)); int32_t size = pReq->pTbs ? pReq->pTbs->size : 0; - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); for (int32_t i = 0; i < size; ++i) { const SVDropTbReq *pInfo = taosArrayGet(pReq->pTbs, i); - if (tEncodeSVDropTbReq(pEncoder, pInfo) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSVDropTbReq(pEncoder, pInfo)); } - return 0; +_exit: + return code; } int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder *pDecoder, SMDropTbReqsOnSingleVg *pReq) { - if (tDecodeI32(pDecoder, &pReq->vgInfo.vgId) < 0) return -1; - if (tDecodeU32(pDecoder, &pReq->vgInfo.hashBegin) < 0) return -1; - if (tDecodeU32(pDecoder, &pReq->vgInfo.hashEnd) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pReq->vgInfo.epSet) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->vgInfo.numOfTable) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgInfo.vgId)); + TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pReq->vgInfo.hashBegin)); + TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pReq->vgInfo.hashEnd)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pReq->vgInfo.epSet)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgInfo.numOfTable)); int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); pReq->pTbs = taosArrayInit(size, sizeof(SVDropTbReq)); if (!pReq->pTbs) { - return -1; + TAOS_CHECK_EXIT(terrno); } SVDropTbReq pTbReq = {0}; for (int32_t i = 0; i < size; ++i) { - if (tDecodeSVDropTbReq(pDecoder, &pTbReq) < 0) return -1; - if (taosArrayPush(pReq->pTbs, &pTbReq) == NULL) return -1; + TAOS_CHECK_EXIT(tDecodeSVDropTbReq(pDecoder, &pTbReq)); + if (taosArrayPush(pReq->pTbs, &pTbReq) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } - return 0; + +_exit: + return code; } void tFreeSMDropTbReqOnSingleVg(void *p) { @@ -12031,38 +12220,54 @@ void tFreeSMDropTbReqOnSingleVg(void *p) { int32_t tSerializeSMDropTbsReq(void *buf, int32_t bufLen, const SMDropTbsReq *pReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); int32_t size = pReq->pVgReqs ? pReq->pVgReqs->size : 0; - if (tEncodeI32(&encoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, size)); for (int32_t i = 0; i < size; ++i) { SMDropTbReqsOnSingleVg *pVgReq = taosArrayGet(pReq->pVgReqs, i); - if (tEncodeSMDropTbReqOnSingleVg(&encoder, pVgReq) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSMDropTbReqOnSingleVg(&encoder, pVgReq)); } tEndEncode(&encoder); - int32_t tlen = encoder.pos; + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSMDropTbsReq(void *buf, int32_t bufLen, SMDropTbsReq *pReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); int32_t size = 0; - if (tDecodeI32(&decoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size)); pReq->pVgReqs = taosArrayInit(size, sizeof(SMDropTbReqsOnSingleVg)); if (!pReq->pVgReqs) { - return -1; + TAOS_CHECK_EXIT(terrno); } for (int32_t i = 0; i < size; ++i) { SMDropTbReqsOnSingleVg vgReq = {0}; - if (tDecodeSMDropTbReqOnSingleVg(&decoder, &vgReq) < 0) return -1; - if (taosArrayPush(pReq->pVgReqs, &vgReq) == NULL) return -1; + TAOS_CHECK_EXIT(tDecodeSMDropTbReqOnSingleVg(&decoder, &vgReq)); + if (taosArrayPush(pReq->pVgReqs, &vgReq) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } void tFreeSMDropTbsReq(void *p) { @@ -12071,29 +12276,42 @@ void tFreeSMDropTbsReq(void *p) { } int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder *pCoder, const SVFetchTtlExpiredTbsRsp *pRsp) { - if (tEncodeI32(pCoder, pRsp->vgId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeI32(pCoder, pRsp->vgId)); int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0; - if (tEncodeI32(pCoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pCoder, size)); for (int32_t i = 0; i < size; ++i) { - if (tEncodeSVDropTbReq(pCoder, taosArrayGet(pRsp->pExpiredTbs, i)) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeSVDropTbReq(pCoder, taosArrayGet(pRsp->pExpiredTbs, i))); } - return 0; + +_exit: + return code; } int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder *pCoder, SVFetchTtlExpiredTbsRsp *pRsp) { - if (tDecodeI32(pCoder, &pRsp->vgId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeI32(pCoder, &pRsp->vgId)); int32_t size = 0; - if (tDecodeI32(pCoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pCoder, &size)); if (size > 0) { pRsp->pExpiredTbs = taosArrayInit(size, sizeof(SVDropTbReq)); - if (!pRsp->pExpiredTbs) return terrno; + if (!pRsp->pExpiredTbs) { + TAOS_CHECK_EXIT(terrno); + } SVDropTbReq tb = {0}; for (int32_t i = 0; i < size; ++i) { - if (tDecodeSVDropTbReq(pCoder, &tb) < 0) return -1; - if (taosArrayPush(pRsp->pExpiredTbs, &tb) == NULL) return -1; + TAOS_CHECK_EXIT(tDecodeSVDropTbReq(pCoder, &tb)); + if (taosArrayPush(pRsp->pExpiredTbs, &tb) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } } - return 0; +_exit: + return code; } void tFreeFetchTtlExpiredTbsRsp(void *p) { @@ -12102,52 +12320,73 @@ void tFreeFetchTtlExpiredTbsRsp(void *p) { } int32_t tEncodeMqBatchMetaRsp(SEncoder *pEncoder, const SMqBatchMetaRsp *pRsp) { - if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset)); int32_t size = taosArrayGetSize(pRsp->batchMetaReq); - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); if (size > 0) { for (int32_t i = 0; i < size; i++) { void *pMetaReq = taosArrayGetP(pRsp->batchMetaReq, i); int32_t metaLen = *(int32_t *)taosArrayGet(pRsp->batchMetaLen, i); - if (tEncodeBinary(pEncoder, pMetaReq, metaLen) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMetaReq, metaLen)); } } - return 0; +_exit: + return code; } int32_t tDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) { int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); if (size > 0) { pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES); - if (!pRsp->batchMetaReq) return -1; + if (!pRsp->batchMetaReq) { + TAOS_CHECK_EXIT(terrno); + } pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t)); - if (!pRsp->batchMetaLen) return -1; + if (!pRsp->batchMetaLen) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < size; i++) { void *pCreate = NULL; uint64_t len = 0; - if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &pCreate, &len)); int32_t l = (int32_t)len; - if (taosArrayPush(pRsp->batchMetaReq, &pCreate) == NULL) return -1; - if (taosArrayPush(pRsp->batchMetaLen, &l) == NULL) return -1; + if (taosArrayPush(pRsp->batchMetaReq, &pCreate) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if (taosArrayPush(pRsp->batchMetaLen, &l) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } } - return 0; +_exit: + return code; } int32_t tSemiDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) { - if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset)); if (pDecoder->size < pDecoder->pos) { - return -1; + return TSDB_CODE_INVALID_PARA; } pRsp->metaBuffLen = TD_CODER_REMAIN_CAPACITY(pDecoder); pRsp->pMetaBuff = taosMemoryCalloc(1, pRsp->metaBuffLen); if (pRsp->pMetaBuff == NULL) { - return -1; + TAOS_CHECK_EXIT(terrno); } memcpy(pRsp->pMetaBuff, TD_CODER_CURRENT(pDecoder), pRsp->metaBuffLen); - return 0; + +_exit: + return code; } void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) { From 54a2c139376c871c796592a8fe62f3988671499a Mon Sep 17 00:00:00 2001 From: Pan Wei <72057773+dapan1121@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:42:57 +0800 Subject: [PATCH 23/25] Update tdatablock.c --- source/common/src/tdatablock.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 34beb38879..816bf3a757 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3011,7 +3011,6 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) { uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type); - ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; } From 3189d0f0bdbdaf2b2fee0e3768baef6aba91ddf4 Mon Sep 17 00:00:00 2001 From: WANG Xu Date: Thu, 5 Sep 2024 23:00:01 +0800 Subject: [PATCH 24/25] docs: recommend show cluster alive --- docs/en/14-reference/03-taos-sql/10-function.md | 2 +- docs/zh/14-reference/03-taos-sql/10-function.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/14-reference/03-taos-sql/10-function.md b/docs/en/14-reference/03-taos-sql/10-function.md index 72ca878ce8..2ba3c416fd 100644 --- a/docs/en/14-reference/03-taos-sql/10-function.md +++ b/docs/en/14-reference/03-taos-sql/10-function.md @@ -1384,7 +1384,7 @@ SELECT SERVER_VERSION(); SELECT SERVER_STATUS(); ``` -**Description**: The server status. +**Description**: The server status. When checking the status of a cluster, the recommended way is to use `SHOW CLUSTER ALIVE;`. Unlike `SELECT SERVER_STATUS();`, it does not return an error when some nodes in the cluster are unavailable; instead, it returns different status codes. Plese check [SHOW CLUSTER ALIVE](https://docs.tdengine.com/reference/taos-sql/show/#show-cluster-alive) for details. ### CURRENT_USER diff --git a/docs/zh/14-reference/03-taos-sql/10-function.md b/docs/zh/14-reference/03-taos-sql/10-function.md index ac9311184e..ee71abbdec 100644 --- a/docs/zh/14-reference/03-taos-sql/10-function.md +++ b/docs/zh/14-reference/03-taos-sql/10-function.md @@ -1374,7 +1374,7 @@ SELECT SERVER_VERSION(); SELECT SERVER_STATUS(); ``` -**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。 +**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。如果想要查询集群的状态,推荐使用 `SHOW CLUSTER ALIVE;`, 与 `SELECT SERVER_STATUS();` 不同,当集群中的部分节点不可用时,它不会返回错误,而是返回不同的状态码,详见:[SHOW CLUSTER ALIVE](https://docs.taosdata.com/reference/taos-sql/show/#show-cluster-alive) ### CURRENT_USER From 109f03d3d384b8554b28c96c68e0340de722743c Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Fri, 6 Sep 2024 09:44:57 +0800 Subject: [PATCH 25/25] jdbc upgrade to 3.3.2 --- docs/en/08-develop/01-connect/index.md | 2 +- docs/en/14-reference/05-connectors/14-java.mdx | 3 ++- docs/examples/JDBC/JDBCDemo/pom.xml | 2 +- docs/examples/JDBC/connectionPools/pom.xml | 2 +- docs/examples/JDBC/consumer-demo/pom.xml | 4 ++-- docs/examples/JDBC/taosdemo/pom.xml | 2 +- docs/examples/java/pom.xml | 2 +- docs/zh/08-develop/01-connect/index.md | 2 +- docs/zh/14-reference/05-connector/14-java.mdx | 1 + 9 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/en/08-develop/01-connect/index.md b/docs/en/08-develop/01-connect/index.md index 916d5e1e09..ab35f6ad63 100644 --- a/docs/en/08-develop/01-connect/index.md +++ b/docs/en/08-develop/01-connect/index.md @@ -90,7 +90,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 ``` diff --git a/docs/en/14-reference/05-connectors/14-java.mdx b/docs/en/14-reference/05-connectors/14-java.mdx index aa7e91b7fa..1f4cf9895f 100644 --- a/docs/en/14-reference/05-connectors/14-java.mdx +++ b/docs/en/14-reference/05-connectors/14-java.mdx @@ -42,6 +42,7 @@ REST connection supports all platforms that can run Java. | taos-jdbcdriver version | major changes | TDengine version | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | +| 3.3.2 | 1. Optimized websocket prepareStatement performance; 2. Improved mybatis support| - | | 3.3.0 | 1. Optimized data transmission performance under Websocket connection; 2. SSL validation skipping is supported but disabled by default| 3.3.2.0 or later | | 3.2.11 | Fixed the result set closing bug when using a native connection.| - | | 3.2.10 | 1. Automatic compression/decompression for data transmission, disabled by default; 2.Automatic reconnection for websocket with configurable parameter, disabled by default; 3. A new method for schemaless writing is added in the connection class; 4. Optimized performance for data fetching on native connection; 5. Fixing for some known issues; 6. The list of supported functions can be returned by the API for retrieving metadata| - | @@ -179,7 +180,7 @@ Add following dependency in the `pom.xml` file of your Maven project: com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 ``` diff --git a/docs/examples/JDBC/JDBCDemo/pom.xml b/docs/examples/JDBC/JDBCDemo/pom.xml index 763be51aff..a9b981a026 100644 --- a/docs/examples/JDBC/JDBCDemo/pom.xml +++ b/docs/examples/JDBC/JDBCDemo/pom.xml @@ -19,7 +19,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 org.locationtech.jts diff --git a/docs/examples/JDBC/connectionPools/pom.xml b/docs/examples/JDBC/connectionPools/pom.xml index 855d531f4c..a3705e6834 100644 --- a/docs/examples/JDBC/connectionPools/pom.xml +++ b/docs/examples/JDBC/connectionPools/pom.xml @@ -18,7 +18,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 diff --git a/docs/examples/JDBC/consumer-demo/pom.xml b/docs/examples/JDBC/consumer-demo/pom.xml index ad0f6cd6a1..0db41bc33f 100644 --- a/docs/examples/JDBC/consumer-demo/pom.xml +++ b/docs/examples/JDBC/consumer-demo/pom.xml @@ -17,7 +17,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 com.google.guava @@ -67,4 +67,4 @@ - \ No newline at end of file + diff --git a/docs/examples/JDBC/taosdemo/pom.xml b/docs/examples/JDBC/taosdemo/pom.xml index c73614948b..8e61cbecdf 100644 --- a/docs/examples/JDBC/taosdemo/pom.xml +++ b/docs/examples/JDBC/taosdemo/pom.xml @@ -67,7 +67,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml index 35fe5f280c..c44be4704d 100644 --- a/docs/examples/java/pom.xml +++ b/docs/examples/java/pom.xml @@ -22,7 +22,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 diff --git a/docs/zh/08-develop/01-connect/index.md b/docs/zh/08-develop/01-connect/index.md index d1aeb0ed8b..5cecd245e5 100644 --- a/docs/zh/08-develop/01-connect/index.md +++ b/docs/zh/08-develop/01-connect/index.md @@ -89,7 +89,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 com.taosdata.jdbc taos-jdbcdriver - 3.3.0 + 3.3.2 ``` diff --git a/docs/zh/14-reference/05-connector/14-java.mdx b/docs/zh/14-reference/05-connector/14-java.mdx index 5f95233163..ec24f1329d 100644 --- a/docs/zh/14-reference/05-connector/14-java.mdx +++ b/docs/zh/14-reference/05-connector/14-java.mdx @@ -33,6 +33,7 @@ REST 连接支持所有能运行 Java 的平台。 | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | +| 3.3.2 | 1. 优化 Websocket 连接下的参数绑定性能;2. 优化了对 mybatis 的支持 | - | | 3.3.0 | 1. 优化 Websocket 连接下的数据传输性能;2. 支持跳过 SSL 验证,默认关闭 | 3.3.2.0 及更高版本 | | 3.2.11 | 解决了 Native 连接关闭结果集 bug | - | | 3.2.10 | 1. REST/WebSocket 连接支持传输中的数据压缩;2. Websocket 自动重连机制,默认关闭;3. Connection 类提供无模式写入的方法;4. 优化了原生连接的数据拉取性能;5. 修复了一些已知问题;6.元数据获取函数可以返回支持的函数列表。 | - |