From 75c121c18a9cf2fc7761e0106589f4ea10112ca5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Jul 2024 14:29:56 +0800 Subject: [PATCH 1/6] fix(query): check return value. --- include/common/tdatablock.h | 37 +- source/common/src/tdatablock.c | 361 ++++++++++++----- source/dnode/vnode/src/tq/tqSink.c | 45 ++- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/inc/tsort.h | 11 +- source/libs/executor/src/exchangeoperator.c | 10 +- source/libs/executor/src/executil.c | 16 +- source/libs/executor/src/executor.c | 2 + source/libs/executor/src/mergejoin.c | 12 +- source/libs/executor/src/mergejoinoperator.c | 7 +- source/libs/executor/src/projectoperator.c | 4 +- source/libs/executor/src/scanoperator.c | 5 +- source/libs/executor/src/tsort.c | 391 +++++++++++++++---- source/libs/function/src/builtinsimpl.c | 17 +- source/libs/function/src/tudf.c | 18 +- source/libs/function/test/runUdf.c | 8 +- source/libs/stream/src/streamData.c | 48 ++- 17 files changed, 749 insertions(+), 245 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 13ff3da53f..e917bcd2a8 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -41,6 +41,15 @@ typedef struct SBlockOrderInfo { #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) #define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_)))) +#define QRY_OPTR_CHECK(_o) \ + do { \ + if ((_o) == NULL) { \ + return TSDB_CODE_INVALID_PARA; \ + } else { \ + *(_o) = NULL; \ + } \ + } while(0) + #define colDataSetNull_f(bm_, r_) \ do { \ BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ @@ -222,8 +231,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity); - -SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); +int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock); size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(SSDataBlock* pBlock); @@ -254,15 +262,6 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc); -#define QRY_OPTR_CHECK(_o) \ - do { \ - if ((_o) == NULL) { \ - return TSDB_CODE_INVALID_PARA; \ - } else { \ - *(_o) = NULL; \ - } \ - } while(0) - int32_t createDataBlock(SSDataBlock** pResBlock); void blockDataDestroy(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock); @@ -272,15 +271,15 @@ int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock); int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); -SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); -SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); +SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); +int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData); -int32_t blockGetEncodeSize(const SSDataBlock* pBlock); -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); -const char* blockDecode(SSDataBlock* pBlock, const char* pData); +int32_t blockGetEncodeSize(const SSDataBlock* pBlock); +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); +int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos); // for debug -char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr); +int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr); int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); @@ -288,10 +287,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName, int64_t groupId); bool isAutoTableName(char* ctbName); void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); -char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); +int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); -void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); +int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0a27f4e3bd..b554d86313 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -48,7 +48,9 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo } int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) { - if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0; + if (colDataIsNull_s(pColumnInfoData, rowIdx)) { + return 0; + } if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes; if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) @@ -493,7 +495,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) { if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } if (numOfRows <= 0) { @@ -698,9 +700,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { } int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) { + int32_t code = 0; if (pDest->info.rows + numOfRows > pDest->info.capacity) { - ASSERT(0); - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } size_t numOfCols = taosArrayGetSize(pDest->pDataBlock); @@ -708,11 +710,14 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); - colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); + code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); + if (code) { + return code; + } } pDest->info.rows += numOfRows; - return TSDB_CODE_SUCCESS; + return code; } void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { @@ -824,20 +829,24 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd return TSDB_CODE_SUCCESS; } -SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { +int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) { int32_t code = 0; + QRY_OPTR_CHECK(pResBlock); + if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { - return NULL; + return TSDB_CODE_INVALID_PARA; } SSDataBlock* pDst = NULL; code = createOneDataBlock(pBlock, false, &pDst); if (code) { - terrno = code; - return NULL; + return code; } - blockDataEnsureCapacity(pDst, rowCount); + code = blockDataEnsureCapacity(pDst, rowCount); + if (code) { + return code; + } /* may have disorder varchar data, TODO for (int32_t i = 0; i < numOfCols; ++i) { @@ -865,12 +874,16 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 } else { char* p = colDataGetData(pColData, j); code = colDataSetVal(pDstCol, j - startIndex, p, false); + if (code) { + break; + } } } } pDst->info.rows = rowCount; - return pDst; + *pResBlock = pDst; + return code; } /** @@ -934,7 +947,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { int32_t numOfRows = *(int32_t*)buf; - blockDataEnsureCapacity(pBlock, numOfRows); + int32_t code = blockDataEnsureCapacity(pBlock, numOfRows); + if (code) { + return code; + } pBlock->info.rows = numOfRows; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -1217,12 +1233,18 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t)); pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length); + if (pCols[i].varmeta.offset == NULL) { + return NULL; + } pCols[i].varmeta.length = pColInfoData->varmeta.length; pCols[i].varmeta.allocLen = pCols[i].varmeta.length; } else { pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows)); pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes); + if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) { + return NULL; + } } } @@ -1343,10 +1365,12 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { int64_t p2 = taosGetTimestampUs(); - blockDataAssign(pCols, pDataBlock, index); + int32_t code = blockDataAssign(pCols, pDataBlock, index); + if (code) { + return code; + } int64_t p3 = taosGetTimestampUs(); - copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); @@ -1354,7 +1378,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { ", rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); destroyTupleIndex(index); - return TSDB_CODE_SUCCESS; } @@ -1416,7 +1439,7 @@ void blockDataReset(SSDataBlock* pDataBlock) { */ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { - if (numOfRows <= 0 || pBlockInfo && numOfRows <= pBlockInfo->capacity) { + if ((numOfRows <= 0)|| (pBlockInfo && numOfRows <= pBlockInfo->capacity)) { return TSDB_CODE_SUCCESS; } @@ -1441,7 +1464,7 @@ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockI pColumn->nullbitmap = tmp; memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); if (pColumn->info.bytes == 0) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned @@ -1547,6 +1570,8 @@ void blockDataDestroy(SSDataBlock* pBlock) { // todo remove it int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { + int32_t code = 0; + dst->info = src->info; dst->info.rows = 0; dst->info.capacity = 0; @@ -1555,13 +1580,15 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(dst, &colInfo); + code = blockDataAppendColInfo(dst, &colInfo); + if (code) { + return code; + } } - int32_t code = blockDataEnsureCapacity(dst, src->info.rows); + code = blockDataEnsureCapacity(dst, src->info.rows); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1571,13 +1598,16 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { continue; } - colDataAssign(pDst, pSrc, src->info.rows, &src->info); + int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info); + if (ret < 0) { + return ret; + } } uint32_t cap = dst->info.capacity; dst->info = src->info; dst->info.capacity = cap; - return 0; + return code; } int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { @@ -1585,7 +1615,6 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows); if (code != TSDB_CODE_SUCCESS) { - terrno = code; return code; } @@ -1593,7 +1622,11 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i); - colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); + int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); + if (ret < 0) { + code = ret; + return code; + } } uint32_t cap = pDst->info.capacity; @@ -1602,7 +1635,7 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { copyPkVal(&pDst->info, &pSrc->info); pDst->info.capacity = cap; - return TSDB_CODE_SUCCESS; + return code; } int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) { @@ -1718,8 +1751,17 @@ int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlo size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + blockDataDestroy(pBlock); + return terrno; + } + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(pBlock, &colInfo); + code = blockDataAppendColInfo(pBlock, &colInfo); + if (code) { + blockDataDestroy(pBlock); + return code; + } } code = blockDataEnsureCapacity(pBlock, 1); @@ -1731,13 +1773,22 @@ int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlo for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); - void* pData = NULL; + if (pDst == NULL || pSrc == NULL) { + blockDataDestroy(pBlock); + return terrno; + } + + bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); + void* pData = NULL; if (!isNull) { pData = colDataGetData(pSrc, rowIdx); } code = colDataSetVal(pDst, 0, pData, isNull); + if (code) { + blockDataDestroy(pBlock); + return code; + } } pBlock->info.rows = 1; @@ -1789,8 +1840,17 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + blockDataDestroy(pDstBlock); + return terrno; + } + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(pDstBlock, &colInfo); + code = blockDataAppendColInfo(pDstBlock, &colInfo); + if (code) { + blockDataDestroy(pDstBlock); + return code; + } } copyPkVal(&pDstBlock->info, &pDataBlock->info); @@ -1805,7 +1865,20 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + if (pDst == NULL) { + return terrno; + } + + if (pSrc == NULL) { + return terrno; + } + + int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + if (ret < 0) { + code = ret; + blockDataDestroy(pDstBlock); + return code; + } } pDstBlock->info.rows = pDataBlock->info.rows; @@ -1838,7 +1911,6 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat if (pBlock->pDataBlock == NULL) { pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); if (pBlock->pDataBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } } @@ -1867,12 +1939,20 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) return col; } -SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index) { +int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) { + int32_t code = 0; + QRY_OPTR_CHECK(pColInfoData); + if (index >= taosArrayGetSize(pBlock->pDataBlock)) { - return NULL; + return TSDB_CODE_INVALID_PARA; } - return taosArrayGet(pBlock->pDataBlock, index); + *pColInfoData = taosArrayGet(pBlock->pDataBlock, index); + if (*pColInfoData == NULL) { + code = terrno; + } + + return code; } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) { @@ -1887,6 +1967,10 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t numFixCols = 0; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + return terrno; + } + if (IS_VAR_DATA_TYPE(pCol->info.type)) { ++numVarCols; } else { @@ -2029,6 +2113,10 @@ int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } + colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows); } @@ -2078,12 +2166,15 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + continue; + } + colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows); } pBlock->info.rows = n; } - return ; } int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { @@ -2101,6 +2192,10 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + if (pColData == NULL) { + return terrno; + } + tlen += taosEncodeFixedI16(buf, pColData->info.colId); tlen += taosEncodeFixedI8(buf, pColData->info.type); tlen += taosEncodeFixedI32(buf, pColData->info.bytes); @@ -2134,8 +2229,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { } void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { - int32_t sz; - + int32_t sz = 0; int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid); @@ -2143,7 +2237,12 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); buf = taosDecodeFixedI64(buf, &pBlock->info.rows); buf = taosDecodeFixedI32(buf, &sz); + pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); + if (pBlock->pDataBlock == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { SColumnInfoData data = {0}; buf = taosDecodeFixedI16(buf, &data.info.colId); @@ -2164,8 +2263,13 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { data.varmeta.length = len; data.varmeta.allocLen = len; } - taosArrayPush(pBlock->pDataBlock, &data); + + void* px = taosArrayPush(pBlock->pDataBlock, &data); + if (px == NULL) { + return NULL; + } } + return (void*)buf; } @@ -2205,8 +2309,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { if (taosLocalTime(&tt, &ptm, buf) == NULL) { return buf; } - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms); } else if (precision == TSDB_TIME_PRECISION_MICRO) { @@ -2219,31 +2323,45 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } // for debug -char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { +int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { int32_t size = 2048 * 1024; - *pDataBuf = taosMemoryCalloc(size, 1); + int32_t code = 0; char* dumpBuf = *pDataBuf; char pBuf[128] = {0}; - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; + + *pDataBuf = taosMemoryCalloc(size, 1); + if (*pDataBuf == NULL) { + return terrno; + } + + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); len += snprintf(dumpBuf + len, size - len, "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) { + return code; + } for (int32_t j = 0; j < rows; j++) { len += snprintf(dumpBuf + len, size - len, "%s|", flag); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) { + return code; + } for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + if (pColInfoData == NULL) { + return terrno; + } + if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) { len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; continue; } @@ -2251,53 +2369,53 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: memset(pBuf, 0, sizeof(pBuf)); - formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision); + (void) formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_TINYINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_UTINYINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_SMALLINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_USMALLINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_INT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_UINT: len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_BIGINT: len += snprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_UBIGINT: len += snprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_FLOAT: len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_DOUBLE: len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_BOOL: len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: @@ -2308,7 +2426,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, dataSize = TMIN(dataSize, 50); memcpy(pBuf, varDataVal(pData), dataSize); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; } break; case TSDB_DATA_TYPE_NCHAR: { char* pData = colDataGetVarData(pColInfoData, j); @@ -2316,15 +2434,15 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, memset(pBuf, 0, sizeof(pBuf)); (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; } break; } } len += snprintf(dumpBuf + len, size - len, "%d\n", j); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return code; } len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); - return dumpBuf; + return code; } int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema, @@ -2362,13 +2480,16 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat SSubmitTbData tbData = {0}; if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { + code = terrno; goto _end; } + tbData.suid = suid; tbData.uid = uid; tbData.sver = pTSchema->version; if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) { + code = terrno; taosArrayDestroy(tbData.aRowP); goto _end; } @@ -2381,6 +2502,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int32_t offset = 0; for (int32_t k = 0; k < colNum; ++k) { // iterate by column SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + if (pColInfoData == NULL) { + return terrno; + } + const STColumn* pCol = &pTSchema->columns[k]; void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); @@ -2416,13 +2541,20 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + goto _end; + } } else { void* data = colDataGetVarData(pColInfoData, j); SValue sv = (SValue){ - .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value + .type = pCol->type, .nData = varDataLen(data), .pData = (uint8_t*) varDataVal(data)}; // address copy, no value SColVal cv = COL_VAL_VALUE(pCol->colId, sv); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + code = terrno; + goto _end; + } } break; } @@ -2437,7 +2569,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + goto _end; + } } else { SValue sv = {.type = pCol->type}; if (pCol->type == pColInfoData->info.type) { @@ -2468,7 +2603,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes); } SColVal cv = COL_VAL_VALUE(pCol->colId, sv); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + code = terrno; + goto _end; + } } } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); @@ -2525,8 +2664,9 @@ void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId } snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName + i + 1, groupId); } + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end - strcat(ctbName, tmp); + (void)strcat(ctbName, tmp); for(int i = 0; i < strlen(ctbName); i++){ if(ctbName[i] == '.'){ ctbName[i] = '_'; @@ -2547,18 +2687,19 @@ bool alreadyAddGroupId(char* ctbName, int64_t groupId) { return memcmp(ctbName + len1 - len2, tmp, len2) == 0; } -char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { +int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) { + QRY_OPTR_CHECK(pName); + char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno; } + int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); - return NULL; } - return pBuf; + return code; } int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) { @@ -2568,20 +2709,23 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha SArray* tags = taosArrayInit(0, sizeof(SSmlKv)); if (tags == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (cname == NULL) { - terrno = TSDB_CODE_INVALID_PARA; taosArrayDestroy(tags); - return terrno; + return TSDB_CODE_INVALID_PARA; } int8_t type = TSDB_DATA_TYPE_UBIGINT; const char* name = "group_id"; int32_t len = strlen(name); - SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)}; - taosArrayPush(tags, &pTag); + + SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)}; + void* px = taosArrayPush(tags, &pTag); + if (px == NULL) { + return terrno; + } RandTableName rname = { .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname}; @@ -2590,12 +2734,13 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha if (code != TSDB_CODE_SUCCESS) { return code; } - taosArrayDestroy(tags); + taosArrayDestroy(tags); if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } - return TSDB_CODE_SUCCESS; + + return code; } int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { @@ -2630,6 +2775,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } *((int8_t*)data) = pColInfoData->info.type; data += sizeof(int8_t); @@ -2646,6 +2794,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t numOfRows = pBlock->info.rows; for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col); + if (pColRes == NULL) { + return terrno; + } // copy the null bitmap size_t metaSize = 0; @@ -2699,7 +2850,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { return dataLen; } -const char* blockDecode(SSDataBlock* pBlock, const char* pData) { +int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) { const char* pStart = pData; int32_t version = *(int32_t*)pStart; @@ -2728,10 +2879,17 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { if (pBlock->pDataBlock == NULL) { pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols); + if (pBlock->pDataBlock == NULL) { + return terrno; + } } for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } + pColInfoData->info.type = *(int8_t*)pStart; pStart += sizeof(int8_t); @@ -2743,7 +2901,10 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { } } - blockDataEnsureCapacity(pBlock, numOfRows); + int32_t code = blockDataEnsureCapacity(pBlock, numOfRows); + if (code) { + return code; + } int32_t* colLen = (int32_t*)pStart; pStart += sizeof(int32_t) * numOfCols; @@ -2753,6 +2914,10 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows); pStart += sizeof(int32_t) * numOfRows; @@ -2760,7 +2925,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) { char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]); if (tmp == NULL) { - return NULL; + return terrno; } pColInfoData->pData = tmp; @@ -2791,11 +2956,14 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pBlock->info.rows = numOfRows; pBlock->info.blankFill = blankFill; ASSERT(pStart - pData == dataLen); - return pStart; + + *pEndPos = pStart; + return code; } -void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { +int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { // int32_t totalRows = pBlock->info.rows; + int32_t code = 0; int32_t bmLen = BitmapLen(totalRows); char* pBitmap = NULL; int32_t maxRows = 0; @@ -2816,7 +2984,7 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList memset(pDst->nullbitmap, 0, bmLen); } } - return; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -2849,10 +3017,18 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList } else { len = varDataTLen(p1); } + char* p2 = taosMemoryMalloc(len); + if (p2 == NULL) { + return terrno; + } + memcpy(p2, p1, len); - colDataSetVal(pDst, numOfRows, p2, false); + code = colDataSetVal(pDst, numOfRows, p2, false); taosMemoryFree(p2); + if (code) { + return code; + } } numOfRows += 1; j += 1; @@ -2864,6 +3040,9 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList } else { if (pBitmap == NULL) { pBitmap = taosMemoryCalloc(1, bmLen); + if (pBitmap == NULL) { + return terrno; + } } memcpy(pBitmap, pDst->nullbitmap, bmLen); @@ -2953,6 +3132,8 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList if (pBitmap != NULL) { taosMemoryFree(pBitmap); } + + return code; } int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 45212df1dd..d9e39ad6f5 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -41,8 +41,8 @@ static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, con static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); -static int32_t createDefaultTagColName(SArray** pList); -static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, +static int32_t createDefaultTagColName(SArray** pColNameList); +static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule); static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo); @@ -70,14 +70,25 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + if (name == NULL) { + return terrno; + } + memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) { buildCtbNameAddGroupId(stbFullName, name, groupId); } } else if (stbFullName) { - name = buildCtbNameByGroupId(stbFullName, groupId); + int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name); + if (code) { + return code; + } } else { originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); + if (originName == NULL) { + return terrno; + } + if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) { name = varDataVal(originName); } @@ -205,23 +216,33 @@ int32_t createDefaultTagColName(SArray** pColNameList) { return TSDB_CODE_SUCCESS; } -void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, +int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + if (pCreateTableReq->name == NULL) { + return terrno; + } + strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid); // tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + if (pCreateTableReq->name == NULL) { + return terrno; + } // tqDebug("copy name:%s", pDataBlock->info.parTbName); } } else { - pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); + int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name); + return code; // tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); } + + return 0; } static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, @@ -310,8 +331,11 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S ASSERT(gid == *(int64_t*)pGpIdData); } - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); + if (code) { + goto _end; + } void* p = taosArrayPush(reqs.pArray, pCreateTbReq); if (p == NULL) { @@ -534,9 +558,16 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n } code = createDefaultTagColName(&pCreateTbReq->ctb.tagName); + if (code) { + return code; + } // set table name - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); + if (code) { + return code; + } + *pReq = pCreateTbReq; return code; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 36d81382f5..f3ceb33f64 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != -1); \ + ASSERT((_c) != 1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 8cfd8f52ac..474f3eedbf 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -78,8 +78,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @return */ int32_t tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, - uint32_t pqSortBufSize, SSortHandle** pHandle); + SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, + uint32_t pqSortBufSize, SSortHandle** pHandle); void tsortSetForceUsePQSort(SSortHandle* pHandle); @@ -213,10 +213,11 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke /** * @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param */ -void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), + void* param); -int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex, - void* pOrder); +int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, + int32_t rightRowIndex, void* pOrder); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 3329001cbc..5af8df8f06 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -655,7 +655,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo int32_t lino = 0; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - *pNextStart = (char*)blockDecode(pRes, pData); + code = blockDecode(pRes, pData, (const char**) pNextStart); + if (code) { + return code; + } } else { // extract data according to pColList char* pStart = pData; @@ -682,7 +685,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo QUERY_CHECK_CODE(code, lino, _end); } - (void)blockDecode(pBlock, pStart); + const char* pDummy = NULL; + code = blockDecode(pBlock, pStart, &pDummy); + QUERY_CHECK_CODE(code, lino, _end); + code = blockDataEnsureCapacity(pRes, pBlock->info.rows); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f263418e3e..596487f0de 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2628,9 +2628,12 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); return; } - char* pBuf = NULL; - qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr)); - taosMemoryFree(pBuf); + char* pBuf = NULL; + int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr); + if (code == 0) { + qDebug("%s", pBuf); + taosMemoryFree(pBuf); + } } void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { @@ -2645,8 +2648,11 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr char* pBuf = NULL; char flagBuf[64]; snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); - qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); - taosMemoryFree(pBuf); + int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr); + if (code == 0) { + qDebug("%s", pBuf); + taosMemoryFree(pBuf); + } } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fe74037f0f..dc910888ad 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -697,6 +697,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { SSDataBlock* p1 = NULL; code = createOneDataBlock(pRes, true, &p1); + QUERY_CHECK_CODE(code, lino, _end); + void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); p = p1; diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index e59010f27f..d3abaaab6d 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2836,24 +2836,24 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { } if (!pGrp->clonedBlk) { + int32_t code = 0; if (0 == pGrp->beginIdx) { SSDataBlock* p = NULL; - int32_t code = createOneDataBlock(pGrp->blk, true, &p); + code = createOneDataBlock(pGrp->blk, true, &p); if (code) { MJ_ERR_RET(code); } pGrp->blk = p; } else { - pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx); + code = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx, &pGrp->blk); pGrp->endIdx -= pGrp->beginIdx; pGrp->beginIdx = 0; pGrp->readIdx = 0; } - - if (NULL == pGrp->blk) { - MJ_ERR_RET(terrno); + if (code) { + MJ_ERR_RET(code); } - + pGrp->clonedBlk = true; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 04dca1c61f..542f161a80 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1360,10 +1360,9 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol if (keepGrp && rowNum > 0) { pTable->eqRowNum += rowNum; - - pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum); - if (NULL == pGrp->blk) { - MJ_ERR_RET(terrno); + code = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum, &pGrp->blk); + if (code) { + MJ_ERR_RET(code); } pGrp->endIdx -= pGrp->beginIdx; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index f81fff0806..295180652d 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -370,8 +370,8 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { pFinalRes->info.version = pRes->info.version; // continue merge data, ignore the group id - code = blockDataMerge(pFinalRes, pRes); - if (code) { + int32_t ret = blockDataMerge(pFinalRes, pRes); + if (ret < 0) { pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8d21ccf780..6755f131b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2584,7 +2584,10 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock bool colExists = false; for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) { - SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j); + SColumnInfoData* pResCol = NULL; + code = bdGetColumnInfoData(pBlock, j, &pResCol); + QUERY_CHECK_CODE(code, lino, _end); + if (pResCol->info.colId == pColMatchInfo->colId) { SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); code = colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ab94493385..bac5120ff9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -27,6 +27,9 @@ #include "tsimplehash.h" #include "executil.h" +#define AllocatedTupleType 0 +#define ReferencedTupleType 1 // tuple references to one row in pDataBlock + struct STupleHandle { SSDataBlock* pBlock; int32_t rowIndex; @@ -70,15 +73,15 @@ struct SSortHandle { int64_t startTs; uint64_t totalElapsed; - uint64_t pqMaxRows; - uint32_t pqMaxTupleLength; - uint32_t pqSortBufSize; - bool forceUsePQSort; - BoundedQueue* pBoundedQueue; - uint32_t tmpRowIdx; + uint64_t pqMaxRows; + uint32_t pqMaxTupleLength; + uint32_t pqSortBufSize; + bool forceUsePQSort; + BoundedQueue* pBoundedQueue; + uint32_t tmpRowIdx; - int64_t mergeLimit; - int64_t currMergeLimitTs; + int64_t mergeLimit; + int64_t currMergeLimitTs; int32_t sourceId; SSDataBlock* pDataBlock; @@ -102,14 +105,14 @@ struct SSortHandle { bool (*abortCheckFn)(void* param); void* abortCheckParam; - bool bSortByRowId; + bool bSortByRowId; SSortMemFile* pExtRowsMemFile; - int32_t extRowBytes; - int32_t extRowsPageSize; - int32_t extRowsMemSize; - int32_t srcTsSlotId; - SArray* aExtRowsOrders; - bool bSortPk; + int32_t extRowBytes; + int32_t extRowsPageSize; + int32_t extRowsMemSize; + int32_t srcTsSlotId; + SArray* aExtRowsOrders; + bool bSortPk; void (*mergeLimitReachedFn)(uint64_t tableUid, void* param); void* mergeLimitReachedParam; }; @@ -133,6 +136,7 @@ static void* createTuple(uint32_t columnNum, uint32_t tupleLen) { uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen; return taosMemoryCalloc(1, totalLen); } + static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } #define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx)) @@ -148,22 +152,32 @@ static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } * @param colIndex the columnIndex, for setting null bitmap * @return the next offset to add field * */ -static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length, - bool isNull, uint32_t tupleLen) { +static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, + size_t length, bool isNull, uint32_t tupleLen) { tupleSetOffset(*t, colIdx, offset); + if (isNull) { tupleSetNull(*t, colIdx, colNum); } else { if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { - *t = taosMemoryRealloc(*t, offset + length); + void* px = taosMemoryRealloc(*t, offset + length); + if (px == NULL) { + return terrno; + } + + *t = px; } tupleSetData(*t, offset, data, length); } + return offset + length; } static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { - if (tupleColIsNull(t, colIdx, colNum)) return NULL; + if (tupleColIsNull(t, colIdx, colNum)) { + return NULL; + } + return t + *tupleOffset(t, colIdx); } @@ -175,8 +189,6 @@ int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pB return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); } -#define AllocatedTupleType 0 -#define ReferencedTupleType 1 // tuple references to one row in pDataBlock typedef struct TupleDesc { uint8_t type; char* data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock @@ -187,17 +199,26 @@ typedef struct ReferencedTuple { size_t rowIndex; } ReferencedTuple; -static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) { +static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) { TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc)); - void* pTuple = createTuple(colNum, tupleLen); + if (t == NULL) { + return terrno; + } + + void* pTuple = createTuple(colNum, tupleLen); if (!pTuple) { taosMemoryFree(t); - return NULL; + return terrno; } + size_t colLen = 0; uint32_t offset = tupleGetDataStartOffset(colNum); for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (pCol == NULL) { + return terrno; + } + if (colDataIsNull_s(pCol, rowIdx)) { offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); } else { @@ -206,20 +227,34 @@ static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint3 tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen); } } + t->type = AllocatedTupleType; t->data = pTuple; - return t; + + *pDesc = t; + return 0; } -void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) { +int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) { + *pResult = NULL; + if (pDesc->type == ReferencedTupleType) { ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc; SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx); - if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL; - return colDataGetData(pCol, pRefTuple->rowIndex); + if (pCol == NULL) { + return terrno; + } + + if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) { + return TSDB_CODE_SUCCESS; + } + + *pResult = colDataGetData(pCol, pRefTuple->rowIndex); } else { - return tupleGetField(pDesc->data, colIdx, colNum); + *pResult = tupleGetField(pDesc->data, colIdx, colNum); } + + return 0; } void destroyTuple(void* t) { @@ -270,7 +305,7 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); if (pSortHandle->pOrderedSource == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _err; } @@ -398,7 +433,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { taosArrayDestroy(pPageIdList); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pSource->src.pBlock = pBlock; @@ -441,6 +476,10 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { } SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); + if (pPageIdList == NULL) { + return terrno; + } + while (start < pDataBlock->info.rows) { int32_t stop = 0; @@ -450,10 +489,11 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { return code; } - SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); - if (p == NULL) { + SSDataBlock* p = NULL; + code = blockDataExtractBlock(pDataBlock, start, stop - start + 1, &p); + if (code) { taosArrayDestroy(pPageIdList); - return terrno; + return code; } int32_t pageId = -1; @@ -505,8 +545,11 @@ static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) { static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) { pParam->pSources = taosArrayGet(pSources, startIndex); - pParam->numOfSources = (endIndex - startIndex + 1); + if (pParam->pSources == NULL) { + return terrno; + } + pParam->numOfSources = (endIndex - startIndex + 1); int32_t code = 0; // multi-pass internal merge sort is required @@ -537,6 +580,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); + if (pPgId == NULL) { + return terrno; + } void* pPage = getBufPage(pHandle->pBuf, *pPgId); if (NULL == pPage) { @@ -577,7 +623,14 @@ static int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* p for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfo == NULL) { + return terrno; + } + SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); + if (pSrcColInfo == NULL) { + return terrno; + } bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); if (isNull) { @@ -624,8 +677,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); + if (pPgId) { + return terrno; + } - void* pPage = getBufPage(pHandle->pBuf, *pPgId); + void* pPage = getBufPage(pHandle->pBuf, *pPgId); if (pPage == NULL) { qError("failed to get buffer, code:%s", tstrerror(terrno)); return terrno; @@ -899,8 +955,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); for (int32_t t = 0; t < sortPass; ++t) { int64_t st = taosGetTimestampUs(); - SArray* pResList = taosArrayInit(4, POINTER_BYTES); + if (pResList == NULL) { + return terrno; + } int32_t numOfInputSources = pHandle->numOfPages; int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources; @@ -931,8 +989,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } int32_t nMergedRows = 0; - SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); + if (pPageIdList == NULL) { + taosArrayDestroy(pResList); + return terrno; + } + while (1) { if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) { code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; @@ -1083,6 +1145,9 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfo == NULL) { + return terrno; + } if (!isNull[i]) { code = colDataSetVal(pColInfo, pBlock->info.rows, pStart, false); @@ -1124,7 +1189,11 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl } else { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - bool isNull = tsortIsNullVal(pTupleHandle, i); + if (pColInfo == NULL) { + return terrno; + } + + bool isNull = tsortIsNullVal(pTupleHandle, i); if (isNull) { colDataSetNULL(pColInfo, pBlock->info.rows); } else { @@ -1157,6 +1226,10 @@ static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + return terrno; + } + if (colDataIsNull_s(pCol, rowIdx)) { isNull[i] = 1; continue; @@ -1198,11 +1271,15 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i char** ppRow, bool* pFreeRow) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId); + if (pRegion == NULL) { + return terrno; + } + if (pRegion->buf == NULL) { pRegion->bufRegOffset = 0; pRegion->buf = taosMemoryMalloc(pMemFile->blockSize); if (pRegion->buf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } // todo @@ -1223,7 +1300,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i } else { *ppRow = taosMemoryMalloc(rowLen); if (*ppRow == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock); @@ -1277,7 +1354,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize); if (pMemFile->writeBuf == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } } @@ -1285,7 +1362,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { pMemFile->cacheSize = pHandle->extRowsMemSize; pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion)); if (pMemFile->aFileRegions == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } } @@ -1313,8 +1390,13 @@ static void destroySortMemFile(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) { SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i); + if (pRegion == NULL) { + continue; + } + taosMemoryFree(pRegion->buf); } + taosArrayDestroy(pMemFile->aFileRegions); pMemFile->aFileRegions = NULL; @@ -1338,7 +1420,7 @@ static int32_t tsortOpenRegion(SSortHandle* pHandle) { region.bufRegOffset = 0; void* px = taosArrayPush(pMemFile->aFileRegions, ®ion); if (px == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } pMemFile->currRegionId = 0; @@ -1347,11 +1429,16 @@ static int32_t tsortOpenRegion(SSortHandle* pHandle) { } else { SSortMemFileRegion regionNew = {0}; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + if (pRegion == NULL) { + return terrno; + } + regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize; regionNew.bufRegOffset = 0; + void* px = taosArrayPush(pMemFile->aFileRegions, ®ionNew); if (px == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } ++pMemFile->currRegionId; pMemFile->currRegionOffset = 0; @@ -1363,6 +1450,10 @@ static int32_t tsortOpenRegion(SSortHandle* pHandle) { static int32_t tsortCloseRegion(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + if (pRegion == NULL) { + return terrno; + } + pRegion->regionSize = pMemFile->currRegionOffset; int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); if (writeBytes > 0) { @@ -1390,7 +1481,7 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) { for (int32_t i = 0; i < numRegions; ++i) { SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i); if (pRegion == NULL) { - return TSDB_CODE_INVALID_PARA; + return terrno; } pRegion->bufRegOffset = 0; @@ -1406,6 +1497,10 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + if (pRegion == NULL) { + return terrno; + } + { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); @@ -1442,8 +1537,20 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou SSDataBlock* pBlock = pHandle->pDataBlock; SBlockOrderInfo* extRowsTsOrder = taosArrayGet(pHandle->aExtRowsOrders, 0); + if (extRowsTsOrder == NULL) { + return terrno; + } + SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, extRowsTsOrder->slotId); + if (pSrcTsCol == NULL) { + return terrno; + } + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0); + if (pTsCol == NULL) { + return terrno; + } + char* pData = colDataGetData(pSrcTsCol, *rowIndex); code = colDataSetVal(pTsCol, pBlock->info.rows, pData, false); if (code) { @@ -1451,18 +1558,42 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou } SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1); + if (pRegionIdCol == NULL) { + return terrno; + } + colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId); SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2); + if (pOffsetCol == NULL) { + return terrno; + } + colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset); SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3); + if (pLengthCol == NULL) { + return terrno; + } + colDataSetInt32(pLengthCol, pBlock->info.rows, &length); if (pHandle->bSortPk) { SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + if (extRowsPkOrder == NULL) { + return terrno; + } + SColumnInfoData* pSrcPkCol = taosArrayGet(pSource->pDataBlock, extRowsPkOrder->slotId); + if (pSrcPkCol == NULL) { + return terrno; + } + SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, 4); + if (pPkCol == NULL) { + return terrno; + } + if (colDataIsNull_s(pSrcPkCol, *rowIndex)) { colDataSetNULL(pPkCol, pBlock->info.rows); } else { @@ -1483,16 +1614,15 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL; SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; - SColumnInfoData pkCol = {0}; - SSDataBlock* pSortInput = NULL; - int32_t code = createDataBlock(&pSortInput); + SColumnInfoData pkCol = {0}; + SSDataBlock* pSortInput = NULL; + int32_t code = createDataBlock(&pSortInput); if (code) { return code; } SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); - code = blockDataAppendColInfo(pSortInput, &tsCol); if (code) { blockDataDestroy(pSortInput); @@ -1539,7 +1669,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); if (pOrderInfoList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order; @@ -1551,7 +1681,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); void* p = taosArrayPush(pOrderInfoList, &biTs); if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (pHandle->bSortPk) { @@ -1563,7 +1693,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { void* px = taosArrayPush(pOrderInfoList, &biPk); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } @@ -1655,7 +1785,7 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, void* px = taosArrayPush(aPgId, &pageId); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); @@ -1817,13 +1947,22 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); - + if (pOrigBlockTsOrder == NULL) { + return terrno; + } + SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); + if (pHandleBlockTsOrder == NULL) { + return terrno; + } SBlockOrderInfo* pOrigBlockPkOrder = NULL; if (pHandle->bSortPk) { pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); + if (pOrigBlockPkOrder) { + return terrno; + } } code = initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder); @@ -1843,6 +1982,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); + if (aPgId == NULL) { + return terrno; + } + int32_t nRows = 0; int32_t nMergedRows = 0; bool mergeLimitReached = false; @@ -1989,7 +2132,11 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb *pSkipBlock = false; SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { - pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + code = blockDataExtractBlock(pOrigBlk, 0, keepRows, &pBlock); + if (code) { + return code; + } + *pExtractedBlock = true; } else { *pExtractedBlock = false; @@ -2001,11 +2148,14 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { + int32_t szSort = 0; size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + if (aExtSrc == NULL) { + return terrno; + } size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); - int32_t code = createPageBuf(pHandle); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(aExtSrc); @@ -2013,10 +2163,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - int32_t szSort = 0; + if (pSrc == NULL) { + taosArrayDestroy(aExtSrc); + return TSDB_CODE_INVALID_PARA; + } SBlockOrderInfo* pOrigTsOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); + if (pOrigTsOrder == NULL) { + return terrno; + } + if (pOrigTsOrder->order == TSDB_ORDER_ASC) { pHandle->currMergeLimitTs = INT64_MAX; } else { @@ -2024,13 +2181,28 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); - SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); - SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); - while (1) { - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + if (mTableNumRows == NULL) { + return terrno; + } + SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + if (aBlkSort == NULL) { + tSimpleHashCleanup(mTableNumRows); + return terrno; + } + + SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + if (mUidBlk == NULL) { + tSimpleHashCleanup(mTableNumRows); + taosArrayDestroy(aBlkSort); + return terrno; + } + + while (1) { bool bExtractedBlock = false; bool bSkipBlock = false; + + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); if (pBlk != NULL && pHandle->mergeLimit > 0) { SSDataBlock* p = NULL; code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p); @@ -2043,7 +2215,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; + if (tsCol == NULL) { + return terrno; + } + + int64_t firstRowTs = *(int64_t*)tsCol->pData; if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { @@ -2085,7 +2261,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { void* px = taosArrayPush(aBlkSort, &tBlk); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } } @@ -2176,10 +2352,13 @@ static void freeSSortSource(SSortSource* source) { } static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { - int32_t code = 0; - size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - + int32_t code = 0; + size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + if (pSource == NULL) { + return terrno; + } + SSortSource* source = *pSource; *pSource = NULL; @@ -2277,6 +2456,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } else if (pHandle->type == SORT_BLOCK_TS_MERGE) { code = createBlocksMergeSortInitialSources(pHandle); } + qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); return code; } @@ -2438,14 +2618,31 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); for (int32_t i = 0; i < orderInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i); - void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum); - void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum); - if (!lData && !rData) continue; + void *lData = NULL, *rData = NULL; + + int32_t ret1 = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum, &lData); + int32_t ret2 = tupleDescGetField(pRightDesc, pOrder->slotId, colNum, &rData); + if (ret1) { + return ret1; + } + + if (ret2) { + return ret2; + } + + if ((!lData) && (!rData)) { + continue; + } + if (!lData) return pOrder->nullFirst ? -1 : 1; if (!rData) return pOrder->nullFirst ? 1 : -1; - int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; - __compar_fn_t fn = getKeyComparFunc(type, pOrder->order); + SColumnInfoData* p = (SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId); + if (p == NULL) { + return terrno; + } + + __compar_fn_t fn = getKeyComparFunc(p->info.type, pOrder->order); int32_t ret = fn(lData, rData); if (ret == 0) { @@ -2454,20 +2651,28 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) return ret; } } + return 0; } static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle); - if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; + if (NULL == pHandle->pBoundedQueue) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tsortSetComparFp(pHandle, tupleComparFn); SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - SSortSource* source = *pSource; + if (pSource == NULL) { + return terrno; + } - pHandle->pDataBlock = NULL; - uint32_t tupleLen = 0; + SSortSource* source = *pSource; + uint32_t tupleLen = 0; PriorityQueueNode pqNode; + pHandle->pDataBlock = NULL; + while (1) { // fetch data SSDataBlock* pBlock = pHandle->fetchfp(source->param); @@ -2491,12 +2696,17 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { if (tupleLen == 0) { for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (pCol == NULL) { + return terrno; + } + tupleLen += pCol->info.bytes; if (IS_VAR_DATA_TYPE(pCol->info.type)) { tupleLen += sizeof(VarDataLenT); } } } + ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0}; for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) { refTuple.rowIndex = rowIdx; @@ -2505,11 +2715,15 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { if (!pPushedNode) { // do nothing if push failed } else { - pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx); - if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY; + pPushedNode->data = NULL; + int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data); + if (code) { + return code; + } } } } + return TSDB_CODE_SUCCESS; } @@ -2543,10 +2757,17 @@ static int32_t tsortPQSortNextTuple(SSortHandle* pHandle, STupleHandle **pTupleH for (uint32_t i = 0; i < colNum; ++i) { void* pData = tupleGetField(pTuple, i, colNum); + + SColumnInfoData* p = NULL; + code = bdGetColumnInfoData(pHandle->pDataBlock, i, &p); + if (code) { + return code; + } + if (!pData) { - colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0); + colDataSetNULL(p, 0); } else { - code = colDataSetVal(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false); + code = colDataSetVal(p, 0, pData, false); if (code) { return code; } @@ -2582,6 +2803,10 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle } SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + if (pSource == NULL) { + return terrno; + } + SSortSource* source = *pSource; SSDataBlock* pBlock = pHandle->fetchfp(source->param); if (!pBlock || pBlock->info.rows == 0) { @@ -2603,7 +2828,7 @@ int32_t tsortOpen(SSortHandle* pHandle) { } if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return -1; + return TSDB_CODE_INVALID_PARA; } pHandle->opened = true; @@ -2629,6 +2854,10 @@ int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) { bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex); + if (pColInfoSrc == NULL) { + return true; + } + return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1197891cab..59cfd85bc3 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -469,8 +469,12 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, } if (pIter->hasPrev) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { - (void)blockDataDestroy(pIter->pPrevRowBlock); - pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + blockDataDestroy(pIter->pPrevRowBlock); + int32_t code = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1, &pIter->pPrevRowBlock); + if (code) { + return code; + } + pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex); pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes); @@ -490,11 +494,10 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, char* pkData = colDataGetData(pIter->pPkCol, pIter->inputEndIndex); (void)memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes); - pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); - + code = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1, &pIter->pPrevRowBlock); pIter->hasPrev = true; *res = false; - return TSDB_CODE_SUCCESS; + return code; } else { int32_t idx = pIter->rowIndex; while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { @@ -553,9 +556,9 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, } (void)memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes); - pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + int32_t code = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1, &pIter->pPrevRowBlock); *res = false; - return TSDB_CODE_SUCCESS; + return code; } } } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 9526576426..654a8a92de 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -862,7 +862,12 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { blockDataAppendColInfo(block, &colInfoData); blockDataEnsureCapacity(block, udfCol->colData.numOfRows); - SColumnInfoData *col = bdGetColumnInfoData(block, 0); + SColumnInfoData *col = NULL; + int32_t code = bdGetColumnInfoData(block, 0, &col); + if (code) { + return code; + } + for (int i = 0; i < udfCol->colData.numOfRows; ++i) { if (udfColDataIsNull(udfCol, i)) { colDataSetNULL(col, i); @@ -1264,10 +1269,17 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { - blockDataAppendColInfo(pTempBlock, pInput->pData[i]); + code = blockDataAppendColInfo(pTempBlock, pInput->pData[i]); + if (code) { + return code; + } } - SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); + SSDataBlock *inputBlock = NULL; + code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock); + if (code) { + return code; + } SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 7f94ecfb3e..52ab34c121 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -89,7 +89,6 @@ int aggregateFuncTest() { } SSDataBlock *pBlock = NULL; - int32_t code = createDataBlock(&pBlock); if (code) { return code; @@ -103,7 +102,12 @@ int aggregateFuncTest() { blockDataEnsureCapacity(pBlock, 1024); pBlock->info.rows = 1024; - SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0); + SColumnInfoData *pColInfo = NULL; + code = bdGetColumnInfoData(pBlock, 0, &pColInfo); + if (code) { + return code; + } + for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataSetInt32(pColInfo, j, &j); } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index c0ee503f77..00a62d4773 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -37,19 +37,30 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); + if (pDataBlock == NULL) { + return terrno; + } int32_t compLen = *(int32_t*)pRetrieve->data; int32_t fullLen = *(int32_t*)(pRetrieve->data + sizeof(int32_t)); char* pInput = pRetrieve->data + PAYLOAD_PREFIX_LEN; if (pRetrieve->compressed && compLen < fullLen) { - char* p = taosMemoryMalloc(fullLen); + char* p = taosMemoryMalloc(fullLen); + if (p == NULL) { + return terrno; + } + int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0); ASSERT(len == fullLen); pInput = p; } - (void) blockDecode(pDataBlock, pInput); + const char* pDummy = NULL; + code = blockDecode(pDataBlock, pInput, &pDummy); + if (code) { + return code; + } if (pRetrieve->compressed && compLen < fullLen) { taosMemoryFree(pInput); @@ -109,18 +120,31 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) { } int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) { - SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); + const char* pDummy = NULL; + SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; + SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); if (pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; stError("failed to prepare retrieve block, %s", id); return terrno; } - (void) taosArrayPush(pArray, &(SSDataBlock){0}); - SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; - SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); + void* px = taosArrayPush(pArray, &(SSDataBlock){0}); + if (px == NULL) { + taosArrayDestroy(pArray); + return terrno; + } - (void) blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN); + SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); + if (pDataBlock == NULL) { + taosArrayDestroy(pArray); + return terrno; + } + + int32_t code = blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, &pDummy); + if (code) { + taosArrayDestroy(pArray); + return code; + } // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); @@ -132,7 +156,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock pData->reqId = pReq->reqId; pData->blocks = pArray; - return TSDB_CODE_SUCCESS; + return code; } int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit** pSubmit) { @@ -178,7 +202,7 @@ int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { void* p = taosArrayPush(pMerged->submits, &pSubmit->submit); if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (pSubmit->ver > pMerged->ver) { @@ -260,8 +284,12 @@ void streamFreeQitem(SStreamQueueItem* data) { int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); + if (pSubmit == NULL) { + continue; + } taosMemoryFree(pSubmit->msgStr); } + taosArrayDestroy(pMerge->submits); taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { From 848b4aab4fea85a38586d4599bd2a6ba6b266256 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Jul 2024 15:46:19 +0800 Subject: [PATCH 2/6] fix(util): fix error --- source/common/src/tdatablock.c | 11 ++- source/common/src/tname.c | 3 + .../dnode/mnode/impl/test/stream/stream.cpp | 5 +- source/libs/executor/test/executorTests.cpp | 27 ++++--- source/libs/executor/test/lhashTests.cpp | 15 ++-- source/libs/executor/test/sortTests.cpp | 36 ++++++--- .../libs/executor/test/tSimpleHashTests.cpp | 19 +++-- source/libs/stream/test/backendTest.cpp | 81 +++++++++++++------ 8 files changed, 135 insertions(+), 62 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b554d86313..bda2de3a1c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2326,13 +2326,13 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { int32_t size = 2048 * 1024; int32_t code = 0; - char* dumpBuf = *pDataBuf; + char* dumpBuf = NULL; char pBuf[128] = {0}; int32_t rows = pDataBlock->info.rows; int32_t len = 0; - *pDataBuf = taosMemoryCalloc(size, 1); - if (*pDataBuf == NULL) { + dumpBuf = taosMemoryCalloc(size, 1); + if (dumpBuf == NULL) { return terrno; } @@ -2442,6 +2442,8 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf if (len >= size - 1) return code; } len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); + + *pDataBuf = dumpBuf; return code; } @@ -2698,7 +2700,10 @@ int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); + } else { + *pName = pBuf; } + return code; } diff --git a/source/common/src/tname.c b/source/common/src/tname.c index d38105d6ce..e495547cc5 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -298,6 +298,7 @@ int32_t buildChildTableName(RandTableName* rName) { if (sb.buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + taosArraySort(rName->tags, compareKv); for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) { taosStringBuilderAppendChar(&sb, ','); @@ -305,6 +306,7 @@ int32_t buildChildTableName(RandTableName* rName) { if (tagKv == NULL) { return TSDB_CODE_SML_INVALID_DATA; } + taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); taosStringBuilderAppendChar(&sb, '='); if (IS_VAR_DATA_TYPE(tagKv->type)) { @@ -313,6 +315,7 @@ int32_t buildChildTableName(RandTableName* rName) { taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length); } } + size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); T_MD5_CTX context; diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index c9365b4318..5cf9b52de7 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -192,7 +192,8 @@ TEST_F(StreamTest, kill_checkpoint_trans) { opt.pWal = pMnode->pWal; pMnode->pSdb = sdbInit(&opt); - taosThreadMutexInit(&pMnode->syncMgmt.lock, NULL); + int32_t code = taosThreadMutexInit(&pMnode->syncMgmt.lock, NULL); + ASSERT(code == 0); } SVgroupChangeInfo info; @@ -248,7 +249,7 @@ TEST_F(StreamTest, plan_Test) { if (taosCreateLog("taoslog", 10, "/etc/taos", NULL, NULL, NULL, NULL, 1) != 0) { // ignore create log failed, only print - printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); + (void) printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); } if (nodesStringToNode(ast, &pAst) < 0) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 4815e17b5a..5b34075ecf 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -76,8 +76,11 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { ASSERT(code == 0); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pInfo->pBlock, &colInfo); - blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage); + code = blockDataAppendColInfo(pInfo->pBlock, &colInfo); + ASSERT(code == 0); + + code = blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage); + ASSERT(code == 0); // SColumnInfoData colInfo1 = {0}; // colInfo1.info.type = TSDB_DATA_TYPE_BINARY; @@ -109,7 +112,8 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { v = taosRand(); } - colDataSetVal(pColInfo, i, reinterpret_cast(&v), false); + int32_t code = colDataSetVal(pColInfo, i, reinterpret_cast(&v), false); + ASSERT(code == 0); // sprintf(buf, "this is %d row", i); // STR_TO_VARSTR(b1, buf); @@ -137,12 +141,15 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { ASSERT(code == 0); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1); - blockDataAppendColInfo(pInfo->pBlock, &colInfo); + int32_t code = blockDataAppendColInfo(pInfo->pBlock, &colInfo); + ASSERT(code == 0); SColumnInfoData colInfo1 = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); - blockDataAppendColInfo(pInfo->pBlock, &colInfo1); + code = blockDataAppendColInfo(pInfo->pBlock, &colInfo1); + ASSERT(code == 0); - blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage); + code = blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage); + ASSERT(code == 0); } else { blockDataCleanup(pInfo->pBlock); } @@ -157,7 +164,8 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); ts = (++pInfo->tsStart); - colDataSetVal(pColInfo, i, reinterpret_cast(&ts), false); + int32_t code = colDataSetVal(pColInfo, i, reinterpret_cast(&ts), false); + ASSERT(code == 0); SColumnInfoData* pColInfo1 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); if (pInfo->type == data_desc) { @@ -168,7 +176,8 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { v = taosRand(); } - colDataSetVal(pColInfo1, i, reinterpret_cast(&v), false); + code = colDataSetVal(pColInfo1, i, reinterpret_cast(&v), false); + ASSERT(code == 0); // sprintf(buf, "this is %d row", i); // STR_TO_VARSTR(b1, buf); @@ -182,7 +191,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { pInfo->current += 1; pBlock->info.dataLoad = 1; - blockDataUpdateTsWindow(pBlock, 0); + int32_t code = blockDataUpdateTsWindow(pBlock, 0); return pBlock; } diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp index 92f7652d8d..daf59c6058 100644 --- a/source/libs/executor/test/lhashTests.cpp +++ b/source/libs/executor/test/lhashTests.cpp @@ -42,9 +42,6 @@ TEST(testCase, linear_hash_Tests) { int64_t et = taosGetTimestampUs(); for (int32_t i = 0; i < 1000000; ++i) { - if (i == 950000) { - printf("kf\n"); - } char* v = tHashGet(pHashObj, &i, sizeof(i)); if (v != NULL) { // printf("find value: %d, key:%d\n", *(int32_t*) v, i); @@ -54,12 +51,16 @@ TEST(testCase, linear_hash_Tests) { } // tHashPrint(pHashObj, LINEAR_HASH_STATIS); - tHashCleanup(pHashObj); + int32_t code = tHashCleanup(pHashObj); +ASSERT(code == 0); + int64_t et1 = taosGetTimestampUs(); SHashObj* pHashObj1 = taosHashInit(1000, fn, false, HASH_NO_LOCK); + ASSERT(pHashObj1 != NULL); for (int32_t i = 0; i < 1000000; ++i) { - taosHashPut(pHashObj1, &i, sizeof(i), &i, sizeof(i)); + int32_t code = taosHashPut(pHashObj1, &i, sizeof(i), &i, sizeof(i)); + ASSERT(code == 0); } for (int32_t i = 0; i < 1000000; ++i) { @@ -68,6 +69,6 @@ TEST(testCase, linear_hash_Tests) { taosHashCleanup(pHashObj1); int64_t et2 = taosGetTimestampUs(); - printf("linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f\n", (et1 - st) / 1000.0, (et - st) / 1000.0, - (et2 - et1) / 1000.0); + (void)printf("linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f\n", (et1 - st) / 1000.0, (et - st) / 1000.0, + (et2 - et1) / 1000.0); } \ No newline at end of file diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index 877e3a924c..b2313f35a1 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -78,8 +78,11 @@ SSDataBlock* getSingleColDummyBlock(void* param) { } colInfo.info.colId = 1; - blockDataAppendColInfo(pBlock, &colInfo); - blockDataEnsureCapacity(pBlock, pInfo->pageRows); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + ASSERT(code == 0); + + code = blockDataEnsureCapacity(pBlock, pInfo->pageRows); + ASSERT(code == 0); for (int32_t i = 0; i < pInfo->pageRows; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); @@ -92,25 +95,31 @@ SSDataBlock* getSingleColDummyBlock(void* param) { int32_t len = 0; bool ret = taosMbsToUcs4(strOri, size, (TdUcs4*)varDataVal(str), size * TSDB_NCHAR_SIZE, &len); if (!ret) { - printf("error\n"); + (void) printf("error\n"); return NULL; } varDataSetLen(str, len); - colDataSetVal(pColInfo, i, reinterpret_cast(str), false); + int32_t code = colDataSetVal(pColInfo, i, reinterpret_cast(str), false); + ASSERT(code == 0); + pBlock->info.hasVarCol = true; - printf("nchar: %s\n", strOri); + (void) printf("nchar: %s\n", strOri); } else if (pInfo->type == TSDB_DATA_TYPE_BINARY || pInfo->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t size = taosRand() % VARCOUNT; char str[64] = {0}; taosRandStr(varDataVal(str), size); varDataSetLen(str, size); - colDataSetVal(pColInfo, i, reinterpret_cast(str), false); + code = colDataSetVal(pColInfo, i, reinterpret_cast(str), false); + ASSERT(code == 0); + pBlock->info.hasVarCol = true; - printf("binary: %s\n", varDataVal(str)); + (void) printf("binary: %s\n", varDataVal(str)); } else if (pInfo->type == TSDB_DATA_TYPE_DOUBLE || pInfo->type == TSDB_DATA_TYPE_FLOAT) { double v = rand_f2(); - colDataSetVal(pColInfo, i, reinterpret_cast(&v), false); - printf("float: %f\n", v); + code = colDataSetVal(pColInfo, i, reinterpret_cast(&v), false); + ASSERT(code == 0); + + (void) printf("float: %f\n", v); } else { int64_t v = ++pInfo->startVal; char* result = static_cast(taosMemoryCalloc(tDataTypes[pInfo->type].bytes, 1)); @@ -120,8 +129,10 @@ SSDataBlock* getSingleColDummyBlock(void* param) { memcpy(result, (char*)(&v) + sizeof(int64_t) - tDataTypes[pInfo->type].bytes, tDataTypes[pInfo->type].bytes); } - colDataSetVal(pColInfo, i, result, false); - printf("int: %" PRId64 "\n", v); + code = colDataSetVal(pColInfo, i, result, false); + ASSERT(code == 0); + + (void) printf("int: %" PRId64 "\n", v); taosMemoryFree(result); } } @@ -359,7 +370,8 @@ TEST(testCase, ordered_merge_sort_Test) { for (int32_t i = 0; i < 1; ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pBlock, &colInfo); + code = blockDataAppendColInfo(pBlock, &colInfo); + ASSERT(code == 0); } SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); diff --git a/source/libs/executor/test/tSimpleHashTests.cpp b/source/libs/executor/test/tSimpleHashTests.cpp index 3fbf2e7be8..86fb763210 100644 --- a/source/libs/executor/test/tSimpleHashTests.cpp +++ b/source/libs/executor/test/tSimpleHashTests.cpp @@ -43,7 +43,9 @@ TEST(testCase, tSimpleHashTest_intKey) { int64_t originKeySum = 0; for (int64_t i = 1; i <= 100; ++i) { originKeySum += i; - tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen); + code = tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen); +ASSERT(code == 0); + ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); } @@ -68,7 +70,9 @@ TEST(testCase, tSimpleHashTest_intKey) { ASSERT_EQ(keySum, originKeySum); for (int64_t i = 1; i <= 100; ++i) { - tSimpleHashRemove(pHashObj, (const void *)&i, keyLen); + code = tSimpleHashRemove(pHashObj, (const void *)&i, keyLen); + ASSERT(code == 0); + ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); } @@ -95,7 +99,9 @@ TEST(testCase, tSimpleHashTest_binaryKey) { for (int64_t i = 1; i <= 100; ++i) { combineKey.suid = i; combineKey.uid = i + 1; - tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen); + code = tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen); + ASSERT(code == 0); + originDataSum += i; ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); } @@ -120,7 +126,8 @@ TEST(testCase, tSimpleHashTest_binaryKey) { ASSERT_EQ(originDataSum, dataSum); - tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); + code = tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); + ASSERT(code == 0); while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { void *key = tSimpleHashGetKey(data, &kLen); @@ -130,7 +137,9 @@ TEST(testCase, tSimpleHashTest_binaryKey) { for (int64_t i = 1; i <= 99; ++i) { combineKey.suid = i; combineKey.uid = i + 1; - tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); + code = tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); + ASSERT(code == 0); + ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj)); } diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 104b1c27d8..1b2f961726 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -69,7 +69,8 @@ void *backendOpen() { key.ts = ts; const char *val = "value data"; int32_t vlen = strlen(val); - streamStatePut_rocksdb(p, &key, (char *)val, vlen); + int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen); + ASSERT(code == 0); tsArray.push_back(ts); } @@ -82,7 +83,9 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; char *newVal = NULL; - streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + ASSERT(code == 0); + ASSERT(len == strlen(val)); } int64_t ts = tsArray[0]; @@ -90,9 +93,11 @@ void *backendOpen() { key.groupId = (uint64_t)(0); key.ts = ts; - streamStateDel_rocksdb(p, &key); + int32_t code = streamStateDel_rocksdb(p, &key); + ASSERT(code == 0); - streamStateClear_rocksdb(p); + code = streamStateClear_rocksdb(p); + ASSERT(code == 0); for (int i = 0; i < size; i++) { int64_t ts = tsArray[i]; @@ -118,11 +123,12 @@ void *backendOpen() { const char *val = "value data"; int32_t vlen = strlen(val); - streamStatePut_rocksdb(p, &key, (char *)val, vlen); + code = streamStatePut_rocksdb(p, &key, (char *)val, vlen); + ASSERT(code == 0); } SWinKey winkey; - int32_t code = streamStateGetFirst_rocksdb(p, &key); + code = streamStateGetFirst_rocksdb(p, &key); ASSERT(code == 0); ASSERT(key.ts == tsArray[0]); @@ -151,7 +157,8 @@ void *backendOpen() { const char *val = "Value"; int32_t len = strlen(val); - streamStateFuncPut_rocksdb(p, &key, val, len); + code = streamStateFuncPut_rocksdb(p, &key, val, len); + ASSERT(code == 0); } for (int i = 0; i < size; i++) { STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; @@ -161,7 +168,9 @@ void *backendOpen() { char *val = NULL; int32_t len = 0; - streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); + int32_t code = streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); + ASSERT(code == 0); + ASSERT(len == strlen("Value")); } for (int i = 0; i < size; i++) { @@ -172,7 +181,8 @@ void *backendOpen() { char *val = NULL; int32_t len = 0; - streamStateFuncDel_rocksdb(p, &key); + int32_t code = streamStateFuncDel_rocksdb(p, &key); + ASSERT(code == 0); } // session put @@ -187,7 +197,8 @@ void *backendOpen() { const char *val = "Value"; int32_t len = strlen(val); - streamStateSessionPut_rocksdb(p, &key, val, len); + code = streamStateSessionPut_rocksdb(p, &key, val, len); + ASSERT(code == 0); char *pval = NULL; ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len)); @@ -346,7 +357,9 @@ void *backendOpen() { ASSERT(code == 0); } SArray *result = taosArrayInit(8, sizeof(void *)); - streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result); + code = streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result); + ASSERT(code == 0); + ASSERT(taosArrayGetSize(result) >= 0); return p; @@ -363,10 +376,14 @@ TEST_F(BackendEnv, checkOpen) { sprintf(key, "key_%d", i); char val[128] = {0}; sprintf(val, "val_%d", i); - streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, (int32_t)(strlen(val)), tsStart + 100000); + ASSERT(code == 0); } - streamStatePutBatch_rocksdb(p, pBatch); + + int32_t code = streamStatePutBatch_rocksdb(p, pBatch); + ASSERT(code == 0); + streamStateDestroyBatch(pBatch); } { @@ -378,14 +395,18 @@ TEST_F(BackendEnv, checkOpen) { sprintf(key, "key_%d", i); char val[128] = {0}; sprintf(val, "val_%d", i); - streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + ASSERT(code == 0); } - streamStatePutBatch_rocksdb(p, pBatch); + int32_t code = streamStatePutBatch_rocksdb(p, pBatch); + ASSERT(code == 0); streamStateDestroyBatch(pBatch); } // do checkpoint 2 - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0); + int32_t code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0); + ASSERT(code == 0); + { void *pBatch = streamStateCreateBatch(); int32_t size = 0; @@ -395,27 +416,37 @@ TEST_F(BackendEnv, checkOpen) { sprintf(key, "key_%d", i); char val[128] = {0}; sprintf(val, "val_%d", i); - streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + ASSERT(code == 0); } - streamStatePutBatch_rocksdb(p, pBatch); + code = streamStatePutBatch_rocksdb(p, pBatch); + ASSERT(code == 0); + streamStateDestroyBatch(pBatch); } - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0); + code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0); + ASSERT(code == 0); const char *path = "/tmp/backend/stream"; const char *dump = "/tmp/backend/stream/dump"; // taosMkDir(dump); - taosMulMkDir(dump); + code = taosMulMkDir(dump); + ASSERT(code == 0); + SBkdMgt *mgt = bkdMgtCreate((char *)path); SArray *result = taosArrayInit(4, sizeof(void *)); - bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); + code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); + ASSERT(code == 0); - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); + code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); + ASSERT(code == 0); taosArrayClear(result); - bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump); + code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump); + ASSERT(code == 0); + bkdMgtDestroy(mgt); streamStateClose((SStreamState *)p, true); // { @@ -444,7 +475,9 @@ TEST_F(BackendEnv, backendUtil) { } TEST_F(BackendEnv, oldBackendInit) { const char *path = "/tmp/backend1"; - taosMulMkDir(path); + int32_t code = taosMulMkDir(path); + ASSERT(code == 0); + { SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10); streamBackendCleanup((void *)p); From 7fbb76b44154d4a65afed5223937733ca83dc8d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Jul 2024 15:52:30 +0800 Subject: [PATCH 3/6] fix(query): check return value. --- .../dnode/mnode/impl/test/stream/stream.cpp | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 5cf9b52de7..0a1e9a0059 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -49,7 +49,9 @@ SRpcMsg buildHbReq() { entry.stage = 4; } - taosArrayPush(msg.pTaskStatus, &entry); + void* px = taosArrayPush(msg.pTaskStatus, &entry); + ASSERT(px != NULL); + } // (p->checkpointId != 0) && p->checkpointFailed @@ -65,7 +67,8 @@ SRpcMsg buildHbReq() { entry.checkpointInfo.activeId = 1; entry.checkpointInfo.failed = true; - taosArrayPush(msg.pTaskStatus, &entry); + void* px = taosArrayPush(msg.pTaskStatus, &entry); + ASSERT(px != NULL); } int32_t tlen = 0; @@ -122,8 +125,11 @@ void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskI entry.stage = 1; entry.status = TASK_STATUS__READY; - taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); - taosArrayPush(pExecNode->pTaskList, &id); + int32_t code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + ASSERT(code == 0); + + void* px = taosArrayPush(pExecNode->pTaskList, &id); + ASSERT(px != NULL); } void initStreamExecInfo() { @@ -141,7 +147,8 @@ void initNodeInfo() { SNodeEntry entry = {0}; entry.nodeId = 2; entry.stageUpdated = true; - taosArrayPush(execInfo.pNodeList, &entry); + void* px = taosArrayPush(execInfo.pNodeList, &entry); + ASSERT(px != NULL); } } // namespace @@ -149,15 +156,17 @@ class StreamTest : public testing::Test { // 继承了 testing::Test protected: static void SetUpTestSuite() { - mndInitExecInfo(); + int32_t code = mndInitExecInfo(); + ASSERT(code == 0); + initStreamExecInfo(); initNodeInfo(); - std::cout<<"setup env for streamTest suite"<(taosMemoryCalloc(1, sizeof(SMnode))); {// init sdb @@ -203,7 +213,8 @@ TEST_F(StreamTest, kill_checkpoint_trans) { const char* pDbName = "test_db_name"; int32_t len = strlen(pDbName); - taosHashPut(info.pDBMap, pDbName, len, NULL, 0); + int32_t code = taosHashPut(info.pDBMap, pDbName, len, NULL, 0); + ASSERT(code == 0); killAllCheckpointTrans(pMnode, &info); @@ -225,12 +236,17 @@ TEST_F(StreamTest, kill_checkpoint_trans) { pTask->id.streamId = defStreamId; pTask->id.taskId = 1; pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1); - taosThreadMutexInit(&pTask->lock, NULL); + int32_t code = taosThreadMutexInit(&pTask->lock, NULL); + ASSERT(code == 0); - taosArrayPush(pLevel, &pTask); + void* px = taosArrayPush(pLevel, &pTask); + ASSERT(px != NULL); - taosArrayPush(pStream->tasks, &pLevel); - mndCreateStreamResetStatusTrans(pMnode, pStream); + px = taosArrayPush(pStream->tasks, &pLevel); + ASSERT(px != NULL); + + int32_t code = mndCreateStreamResetStatusTrans(pMnode, pStream); + ASSERT(code == 0); tFreeStreamObj(pStream); sdbCleanup(pMnode->pSdb); From 51b90819cae6abc29b9449bd6b35b471a30558f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Jul 2024 16:07:06 +0800 Subject: [PATCH 4/6] fix(test): fix syntax error. --- source/dnode/mnode/impl/test/stream/stream.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 0a1e9a0059..407163ae91 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -213,7 +213,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { const char* pDbName = "test_db_name"; int32_t len = strlen(pDbName); - int32_t code = taosHashPut(info.pDBMap, pDbName, len, NULL, 0); + code = taosHashPut(info.pDBMap, pDbName, len, NULL, 0); ASSERT(code == 0); killAllCheckpointTrans(pMnode, &info); @@ -236,7 +236,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { pTask->id.streamId = defStreamId; pTask->id.taskId = 1; pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1); - int32_t code = taosThreadMutexInit(&pTask->lock, NULL); + code = taosThreadMutexInit(&pTask->lock, NULL); ASSERT(code == 0); void* px = taosArrayPush(pLevel, &pTask); @@ -245,7 +245,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { px = taosArrayPush(pStream->tasks, &pLevel); ASSERT(px != NULL); - int32_t code = mndCreateStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream); ASSERT(code == 0); tFreeStreamObj(pStream); From dc8e2e9e0df794fb64629f3d6b8467a4dce95e5a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Jul 2024 18:03:13 +0800 Subject: [PATCH 5/6] fix(query):fix error. --- include/util/tutil.h | 12 ++++---- .../dnode/mnode/impl/test/stream/stream.cpp | 2 +- source/libs/executor/src/tsort.c | 28 ++++--------------- 3 files changed, 13 insertions(+), 29 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index 1eaff53cac..8c6c10c6be 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -160,12 +160,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, return (terrno = (CODE)); \ } while (0) -#define TAOS_CHECK_RETURN(CMD) \ - do { \ - int32_t code = (CMD); \ - if (code != TSDB_CODE_SUCCESS) { \ - TAOS_RETURN(code); \ - } \ +#define TAOS_CHECK_RETURN(CMD) \ + do { \ + int32_t __c = (CMD); \ + if (__c != TSDB_CODE_SUCCESS) { \ + TAOS_RETURN(__c); \ + } \ } while (0) #define TAOS_CHECK_GOTO(CMD, LINO, LABEL) \ diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 407163ae91..1ec319381d 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -246,7 +246,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { ASSERT(px != NULL); code = mndCreateStreamResetStatusTrans(pMnode, pStream); - ASSERT(code == 0); + ASSERT(code != 0); tFreeStreamObj(pStream); sdbCleanup(pMnode->pSdb); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index bac5120ff9..1e345e9700 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1960,7 +1960,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* if (pHandle->bSortPk) { pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); - if (pOrigBlockPkOrder) { + if (pOrigBlockPkOrder == NULL) { return terrno; } } @@ -2234,11 +2234,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); if (ppBlk != NULL) { SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); - - code = blockDataMerge(tBlk, pBlk); - if (code) { - return code; - } + TAOS_CHECK_RETURN(blockDataMerge(tBlk, pBlk)); if (bExtractedBlock) { blockDataDestroy(pBlk); @@ -2248,10 +2244,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (bExtractedBlock) { tBlk = pBlk; } else { - code = createOneDataBlock(pBlk, true, &tBlk); - if (code) { - return code; - } + TAOS_CHECK_RETURN(createOneDataBlock(pBlk, true, &tBlk)); } code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); @@ -2271,10 +2264,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t p = taosGetTimestampUs(); if (pHandle->bSortByRowId) { - code = tsortOpenRegion(pHandle); - if (code) { - return code; - } + TAOS_CHECK_RETURN(tsortOpenRegion(pHandle)); } code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); @@ -2759,18 +2749,12 @@ static int32_t tsortPQSortNextTuple(SSortHandle* pHandle, STupleHandle **pTupleH void* pData = tupleGetField(pTuple, i, colNum); SColumnInfoData* p = NULL; - code = bdGetColumnInfoData(pHandle->pDataBlock, i, &p); - if (code) { - return code; - } + TAOS_CHECK_RETURN(bdGetColumnInfoData(pHandle->pDataBlock, i, &p)); if (!pData) { colDataSetNULL(p, 0); } else { - code = colDataSetVal(p, 0, pData, false); - if (code) { - return code; - } + TAOS_CHECK_RETURN(colDataSetVal(p, 0, pData, false)); } } pHandle->pDataBlock->info.rows++; From a60c30dbbac68d05ef9d9fae2301f379b8e72f69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Jul 2024 23:19:03 +0800 Subject: [PATCH 6/6] fix(query): fix error. --- include/common/ttime.h | 2 +- include/util/tsimplehash.h | 5 +---- source/libs/executor/src/tsort.c | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index cec5b15761..d430f7bd2a 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -64,7 +64,7 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { : 1000000000; time_t t = taosTime(NULL); struct tm tm; - taosLocalTime(&t, &tm, NULL); + (void) taosLocalTime(&t, &tm, NULL); tm.tm_hour = 0; tm.tm_min = 0; tm.tm_sec = 0; diff --git a/include/util/tsimplehash.h b/include/util/tsimplehash.h index 987a9fe2a8..4a8a9ced58 100644 --- a/include/util/tsimplehash.h +++ b/include/util/tsimplehash.h @@ -17,15 +17,12 @@ #define TDENGINE_TSIMPLEHASH_H #include "tarray.h" +#include "thash.h" #ifdef __cplusplus extern "C" { #endif -typedef uint32_t (*_hash_fn_t)(const char *, uint32_t); -typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len); -typedef void (*_hash_free_fn_t)(void *); - /** * @brief single thread hash * diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1e345e9700..fa7d59e137 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -677,7 +677,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); - if (pPgId) { + if (pPgId == NULL) { return terrno; }