diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 13ff3da53f..e917bcd2a8 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -41,6 +41,15 @@ typedef struct SBlockOrderInfo { #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) #define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_)))) +#define QRY_OPTR_CHECK(_o) \ + do { \ + if ((_o) == NULL) { \ + return TSDB_CODE_INVALID_PARA; \ + } else { \ + *(_o) = NULL; \ + } \ + } while(0) + #define colDataSetNull_f(bm_, r_) \ do { \ BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ @@ -222,8 +231,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity); - -SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); +int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock); size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(SSDataBlock* pBlock); @@ -254,15 +262,6 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc); -#define QRY_OPTR_CHECK(_o) \ - do { \ - if ((_o) == NULL) { \ - return TSDB_CODE_INVALID_PARA; \ - } else { \ - *(_o) = NULL; \ - } \ - } while(0) - int32_t createDataBlock(SSDataBlock** pResBlock); void blockDataDestroy(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock); @@ -272,15 +271,15 @@ int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock); int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); -SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); -SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); +SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); +int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData); -int32_t blockGetEncodeSize(const SSDataBlock* pBlock); -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); -const char* blockDecode(SSDataBlock* pBlock, const char* pData); +int32_t blockGetEncodeSize(const SSDataBlock* pBlock); +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); +int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos); // for debug -char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr); +int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr); int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); @@ -288,10 +287,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName, int64_t groupId); bool isAutoTableName(char* ctbName); void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); -char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); +int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); -void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); +int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0a27f4e3bd..b554d86313 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -48,7 +48,9 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo } int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) { - if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0; + if (colDataIsNull_s(pColumnInfoData, rowIdx)) { + return 0; + } if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes; if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) @@ -493,7 +495,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) { if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } if (numOfRows <= 0) { @@ -698,9 +700,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { } int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) { + int32_t code = 0; if (pDest->info.rows + numOfRows > pDest->info.capacity) { - ASSERT(0); - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } size_t numOfCols = taosArrayGetSize(pDest->pDataBlock); @@ -708,11 +710,14 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); - colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); + code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); + if (code) { + return code; + } } pDest->info.rows += numOfRows; - return TSDB_CODE_SUCCESS; + return code; } void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { @@ -824,20 +829,24 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd return TSDB_CODE_SUCCESS; } -SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { +int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) { int32_t code = 0; + QRY_OPTR_CHECK(pResBlock); + if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { - return NULL; + return TSDB_CODE_INVALID_PARA; } SSDataBlock* pDst = NULL; code = createOneDataBlock(pBlock, false, &pDst); if (code) { - terrno = code; - return NULL; + return code; } - blockDataEnsureCapacity(pDst, rowCount); + code = blockDataEnsureCapacity(pDst, rowCount); + if (code) { + return code; + } /* may have disorder varchar data, TODO for (int32_t i = 0; i < numOfCols; ++i) { @@ -865,12 +874,16 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 } else { char* p = colDataGetData(pColData, j); code = colDataSetVal(pDstCol, j - startIndex, p, false); + if (code) { + break; + } } } } pDst->info.rows = rowCount; - return pDst; + *pResBlock = pDst; + return code; } /** @@ -934,7 +947,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { int32_t numOfRows = *(int32_t*)buf; - blockDataEnsureCapacity(pBlock, numOfRows); + int32_t code = blockDataEnsureCapacity(pBlock, numOfRows); + if (code) { + return code; + } pBlock->info.rows = numOfRows; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -1217,12 +1233,18 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t)); pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length); + if (pCols[i].varmeta.offset == NULL) { + return NULL; + } pCols[i].varmeta.length = pColInfoData->varmeta.length; pCols[i].varmeta.allocLen = pCols[i].varmeta.length; } else { pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows)); pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes); + if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) { + return NULL; + } } } @@ -1343,10 +1365,12 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { int64_t p2 = taosGetTimestampUs(); - blockDataAssign(pCols, pDataBlock, index); + int32_t code = blockDataAssign(pCols, pDataBlock, index); + if (code) { + return code; + } int64_t p3 = taosGetTimestampUs(); - copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); @@ -1354,7 +1378,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { ", rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); destroyTupleIndex(index); - return TSDB_CODE_SUCCESS; } @@ -1416,7 +1439,7 @@ void blockDataReset(SSDataBlock* pDataBlock) { */ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { - if (numOfRows <= 0 || pBlockInfo && numOfRows <= pBlockInfo->capacity) { + if ((numOfRows <= 0)|| (pBlockInfo && numOfRows <= pBlockInfo->capacity)) { return TSDB_CODE_SUCCESS; } @@ -1441,7 +1464,7 @@ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockI pColumn->nullbitmap = tmp; memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); if (pColumn->info.bytes == 0) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned @@ -1547,6 +1570,8 @@ void blockDataDestroy(SSDataBlock* pBlock) { // todo remove it int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { + int32_t code = 0; + dst->info = src->info; dst->info.rows = 0; dst->info.capacity = 0; @@ -1555,13 +1580,15 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(dst, &colInfo); + code = blockDataAppendColInfo(dst, &colInfo); + if (code) { + return code; + } } - int32_t code = blockDataEnsureCapacity(dst, src->info.rows); + code = blockDataEnsureCapacity(dst, src->info.rows); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1571,13 +1598,16 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { continue; } - colDataAssign(pDst, pSrc, src->info.rows, &src->info); + int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info); + if (ret < 0) { + return ret; + } } uint32_t cap = dst->info.capacity; dst->info = src->info; dst->info.capacity = cap; - return 0; + return code; } int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { @@ -1585,7 +1615,6 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows); if (code != TSDB_CODE_SUCCESS) { - terrno = code; return code; } @@ -1593,7 +1622,11 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i); - colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); + int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); + if (ret < 0) { + code = ret; + return code; + } } uint32_t cap = pDst->info.capacity; @@ -1602,7 +1635,7 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { copyPkVal(&pDst->info, &pSrc->info); pDst->info.capacity = cap; - return TSDB_CODE_SUCCESS; + return code; } int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) { @@ -1718,8 +1751,17 @@ int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlo size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + blockDataDestroy(pBlock); + return terrno; + } + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(pBlock, &colInfo); + code = blockDataAppendColInfo(pBlock, &colInfo); + if (code) { + blockDataDestroy(pBlock); + return code; + } } code = blockDataEnsureCapacity(pBlock, 1); @@ -1731,13 +1773,22 @@ int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlo for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); - void* pData = NULL; + if (pDst == NULL || pSrc == NULL) { + blockDataDestroy(pBlock); + return terrno; + } + + bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); + void* pData = NULL; if (!isNull) { pData = colDataGetData(pSrc, rowIdx); } code = colDataSetVal(pDst, 0, pData, isNull); + if (code) { + blockDataDestroy(pBlock); + return code; + } } pBlock->info.rows = 1; @@ -1789,8 +1840,17 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (p == NULL) { + blockDataDestroy(pDstBlock); + return terrno; + } + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(pDstBlock, &colInfo); + code = blockDataAppendColInfo(pDstBlock, &colInfo); + if (code) { + blockDataDestroy(pDstBlock); + return code; + } } copyPkVal(&pDstBlock->info, &pDataBlock->info); @@ -1805,7 +1865,20 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + if (pDst == NULL) { + return terrno; + } + + if (pSrc == NULL) { + return terrno; + } + + int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + if (ret < 0) { + code = ret; + blockDataDestroy(pDstBlock); + return code; + } } pDstBlock->info.rows = pDataBlock->info.rows; @@ -1838,7 +1911,6 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat if (pBlock->pDataBlock == NULL) { pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); if (pBlock->pDataBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } } @@ -1867,12 +1939,20 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) return col; } -SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index) { +int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) { + int32_t code = 0; + QRY_OPTR_CHECK(pColInfoData); + if (index >= taosArrayGetSize(pBlock->pDataBlock)) { - return NULL; + return TSDB_CODE_INVALID_PARA; } - return taosArrayGet(pBlock->pDataBlock, index); + *pColInfoData = taosArrayGet(pBlock->pDataBlock, index); + if (*pColInfoData == NULL) { + code = terrno; + } + + return code; } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) { @@ -1887,6 +1967,10 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t numFixCols = 0; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + return terrno; + } + if (IS_VAR_DATA_TYPE(pCol->info.type)) { ++numVarCols; } else { @@ -2029,6 +2113,10 @@ int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } + colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows); } @@ -2078,12 +2166,15 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + continue; + } + colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows); } pBlock->info.rows = n; } - return ; } int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { @@ -2101,6 +2192,10 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + if (pColData == NULL) { + return terrno; + } + tlen += taosEncodeFixedI16(buf, pColData->info.colId); tlen += taosEncodeFixedI8(buf, pColData->info.type); tlen += taosEncodeFixedI32(buf, pColData->info.bytes); @@ -2134,8 +2229,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { } void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { - int32_t sz; - + int32_t sz = 0; int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid); @@ -2143,7 +2237,12 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); buf = taosDecodeFixedI64(buf, &pBlock->info.rows); buf = taosDecodeFixedI32(buf, &sz); + pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); + if (pBlock->pDataBlock == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { SColumnInfoData data = {0}; buf = taosDecodeFixedI16(buf, &data.info.colId); @@ -2164,8 +2263,13 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { data.varmeta.length = len; data.varmeta.allocLen = len; } - taosArrayPush(pBlock->pDataBlock, &data); + + void* px = taosArrayPush(pBlock->pDataBlock, &data); + if (px == NULL) { + return NULL; + } } + return (void*)buf; } @@ -2205,8 +2309,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { if (taosLocalTime(&tt, &ptm, buf) == NULL) { return buf; } - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); if (precision == TSDB_TIME_PRECISION_NANO) { sprintf(buf + pos, ".%09d", ms); } else if (precision == TSDB_TIME_PRECISION_MICRO) { @@ -2219,31 +2323,45 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } // for debug -char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { +int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { int32_t size = 2048 * 1024; - *pDataBuf = taosMemoryCalloc(size, 1); + int32_t code = 0; char* dumpBuf = *pDataBuf; char pBuf[128] = {0}; - int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; + + *pDataBuf = taosMemoryCalloc(size, 1); + if (*pDataBuf == NULL) { + return terrno; + } + + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); len += snprintf(dumpBuf + len, size - len, "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) { + return code; + } for (int32_t j = 0; j < rows; j++) { len += snprintf(dumpBuf + len, size - len, "%s|", flag); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) { + return code; + } for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + if (pColInfoData == NULL) { + return terrno; + } + if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) { len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; continue; } @@ -2251,53 +2369,53 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: memset(pBuf, 0, sizeof(pBuf)); - formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision); + (void) formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_TINYINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_UTINYINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_SMALLINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_USMALLINT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_INT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_UINT: len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_BIGINT: len += snprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_UBIGINT: len += snprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_FLOAT: len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_DOUBLE: len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_BOOL: len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; break; case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: @@ -2308,7 +2426,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, dataSize = TMIN(dataSize, 50); memcpy(pBuf, varDataVal(pData), dataSize); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; } break; case TSDB_DATA_TYPE_NCHAR: { char* pData = colDataGetVarData(pColInfoData, j); @@ -2316,15 +2434,15 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, memset(pBuf, 0, sizeof(pBuf)); (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return 0; } break; } } len += snprintf(dumpBuf + len, size - len, "%d\n", j); - if (len >= size - 1) return dumpBuf; + if (len >= size - 1) return code; } len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); - return dumpBuf; + return code; } int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema, @@ -2362,13 +2480,16 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat SSubmitTbData tbData = {0}; if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { + code = terrno; goto _end; } + tbData.suid = suid; tbData.uid = uid; tbData.sver = pTSchema->version; if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) { + code = terrno; taosArrayDestroy(tbData.aRowP); goto _end; } @@ -2381,6 +2502,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int32_t offset = 0; for (int32_t k = 0; k < colNum; ++k) { // iterate by column SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + if (pColInfoData == NULL) { + return terrno; + } + const STColumn* pCol = &pTSchema->columns[k]; void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); @@ -2416,13 +2541,20 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + goto _end; + } } else { void* data = colDataGetVarData(pColInfoData, j); SValue sv = (SValue){ - .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value + .type = pCol->type, .nData = varDataLen(data), .pData = (uint8_t*) varDataVal(data)}; // address copy, no value SColVal cv = COL_VAL_VALUE(pCol->colId, sv); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + code = terrno; + goto _end; + } } break; } @@ -2437,7 +2569,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + goto _end; + } } else { SValue sv = {.type = pCol->type}; if (pCol->type == pColInfoData->info.type) { @@ -2468,7 +2603,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes); } SColVal cv = COL_VAL_VALUE(pCol->colId, sv); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + code = terrno; + goto _end; + } } } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); @@ -2525,8 +2664,9 @@ void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId } snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName + i + 1, groupId); } + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end - strcat(ctbName, tmp); + (void)strcat(ctbName, tmp); for(int i = 0; i < strlen(ctbName); i++){ if(ctbName[i] == '.'){ ctbName[i] = '_'; @@ -2547,18 +2687,19 @@ bool alreadyAddGroupId(char* ctbName, int64_t groupId) { return memcmp(ctbName + len1 - len2, tmp, len2) == 0; } -char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { +int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) { + QRY_OPTR_CHECK(pName); + char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno; } + int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); - return NULL; } - return pBuf; + return code; } int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) { @@ -2568,20 +2709,23 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha SArray* tags = taosArrayInit(0, sizeof(SSmlKv)); if (tags == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (cname == NULL) { - terrno = TSDB_CODE_INVALID_PARA; taosArrayDestroy(tags); - return terrno; + return TSDB_CODE_INVALID_PARA; } int8_t type = TSDB_DATA_TYPE_UBIGINT; const char* name = "group_id"; int32_t len = strlen(name); - SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)}; - taosArrayPush(tags, &pTag); + + SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)}; + void* px = taosArrayPush(tags, &pTag); + if (px == NULL) { + return terrno; + } RandTableName rname = { .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname}; @@ -2590,12 +2734,13 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha if (code != TSDB_CODE_SUCCESS) { return code; } - taosArrayDestroy(tags); + taosArrayDestroy(tags); if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } - return TSDB_CODE_SUCCESS; + + return code; } int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { @@ -2630,6 +2775,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } *((int8_t*)data) = pColInfoData->info.type; data += sizeof(int8_t); @@ -2646,6 +2794,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t numOfRows = pBlock->info.rows; for (int32_t col = 0; col < numOfCols; ++col) { SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col); + if (pColRes == NULL) { + return terrno; + } // copy the null bitmap size_t metaSize = 0; @@ -2699,7 +2850,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { return dataLen; } -const char* blockDecode(SSDataBlock* pBlock, const char* pData) { +int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) { const char* pStart = pData; int32_t version = *(int32_t*)pStart; @@ -2728,10 +2879,17 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { if (pBlock->pDataBlock == NULL) { pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols); + if (pBlock->pDataBlock == NULL) { + return terrno; + } } for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } + pColInfoData->info.type = *(int8_t*)pStart; pStart += sizeof(int8_t); @@ -2743,7 +2901,10 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { } } - blockDataEnsureCapacity(pBlock, numOfRows); + int32_t code = blockDataEnsureCapacity(pBlock, numOfRows); + if (code) { + return code; + } int32_t* colLen = (int32_t*)pStart; pStart += sizeof(int32_t) * numOfCols; @@ -2753,6 +2914,10 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfoData == NULL) { + return terrno; + } + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows); pStart += sizeof(int32_t) * numOfRows; @@ -2760,7 +2925,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) { char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]); if (tmp == NULL) { - return NULL; + return terrno; } pColInfoData->pData = tmp; @@ -2791,11 +2956,14 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pBlock->info.rows = numOfRows; pBlock->info.blankFill = blankFill; ASSERT(pStart - pData == dataLen); - return pStart; + + *pEndPos = pStart; + return code; } -void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { +int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { // int32_t totalRows = pBlock->info.rows; + int32_t code = 0; int32_t bmLen = BitmapLen(totalRows); char* pBitmap = NULL; int32_t maxRows = 0; @@ -2816,7 +2984,7 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList memset(pDst->nullbitmap, 0, bmLen); } } - return; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -2849,10 +3017,18 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList } else { len = varDataTLen(p1); } + char* p2 = taosMemoryMalloc(len); + if (p2 == NULL) { + return terrno; + } + memcpy(p2, p1, len); - colDataSetVal(pDst, numOfRows, p2, false); + code = colDataSetVal(pDst, numOfRows, p2, false); taosMemoryFree(p2); + if (code) { + return code; + } } numOfRows += 1; j += 1; @@ -2864,6 +3040,9 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList } else { if (pBitmap == NULL) { pBitmap = taosMemoryCalloc(1, bmLen); + if (pBitmap == NULL) { + return terrno; + } } memcpy(pBitmap, pDst->nullbitmap, bmLen); @@ -2953,6 +3132,8 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList if (pBitmap != NULL) { taosMemoryFree(pBitmap); } + + return code; } int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 45212df1dd..d9e39ad6f5 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -41,8 +41,8 @@ static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, con static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); -static int32_t createDefaultTagColName(SArray** pList); -static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, +static int32_t createDefaultTagColName(SArray** pColNameList); +static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule); static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo); @@ -70,14 +70,25 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + if (name == NULL) { + return terrno; + } + memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) { buildCtbNameAddGroupId(stbFullName, name, groupId); } } else if (stbFullName) { - name = buildCtbNameByGroupId(stbFullName, groupId); + int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name); + if (code) { + return code; + } } else { originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); + if (originName == NULL) { + return terrno; + } + if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) { name = varDataVal(originName); } @@ -205,23 +216,33 @@ int32_t createDefaultTagColName(SArray** pColNameList) { return TSDB_CODE_SUCCESS; } -void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, +int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + if (pCreateTableReq->name == NULL) { + return terrno; + } + strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid); // tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); + if (pCreateTableReq->name == NULL) { + return terrno; + } // tqDebug("copy name:%s", pDataBlock->info.parTbName); } } else { - pCreateTableReq->name = buildCtbNameByGroupId(stbFullName, gid); + int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name); + return code; // tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); } + + return 0; } static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, @@ -310,8 +331,11 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S ASSERT(gid == *(int64_t*)pGpIdData); } - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); + if (code) { + goto _end; + } void* p = taosArrayPush(reqs.pArray, pCreateTbReq); if (p == NULL) { @@ -534,9 +558,16 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n } code = createDefaultTagColName(&pCreateTbReq->ctb.tagName); + if (code) { + return code; + } // set table name - setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule); + if (code) { + return code; + } + *pReq = pCreateTbReq; return code; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 36d81382f5..f3ceb33f64 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != -1); \ + ASSERT((_c) != 1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 8cfd8f52ac..474f3eedbf 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -78,8 +78,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @return */ int32_t tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, - uint32_t pqSortBufSize, SSortHandle** pHandle); + SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, + uint32_t pqSortBufSize, SSortHandle** pHandle); void tsortSetForceUsePQSort(SSortHandle* pHandle); @@ -213,10 +213,11 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke /** * @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param */ -void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), + void* param); -int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex, - void* pOrder); +int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, + int32_t rightRowIndex, void* pOrder); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 3329001cbc..5af8df8f06 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -655,7 +655,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo int32_t lino = 0; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - *pNextStart = (char*)blockDecode(pRes, pData); + code = blockDecode(pRes, pData, (const char**) pNextStart); + if (code) { + return code; + } } else { // extract data according to pColList char* pStart = pData; @@ -682,7 +685,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo QUERY_CHECK_CODE(code, lino, _end); } - (void)blockDecode(pBlock, pStart); + const char* pDummy = NULL; + code = blockDecode(pBlock, pStart, &pDummy); + QUERY_CHECK_CODE(code, lino, _end); + code = blockDataEnsureCapacity(pRes, pBlock->info.rows); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f263418e3e..596487f0de 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2628,9 +2628,12 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); return; } - char* pBuf = NULL; - qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr)); - taosMemoryFree(pBuf); + char* pBuf = NULL; + int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr); + if (code == 0) { + qDebug("%s", pBuf); + taosMemoryFree(pBuf); + } } void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { @@ -2645,8 +2648,11 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr char* pBuf = NULL; char flagBuf[64]; snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); - qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); - taosMemoryFree(pBuf); + int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr); + if (code == 0) { + qDebug("%s", pBuf); + taosMemoryFree(pBuf); + } } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fe74037f0f..dc910888ad 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -697,6 +697,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { SSDataBlock* p1 = NULL; code = createOneDataBlock(pRes, true, &p1); + QUERY_CHECK_CODE(code, lino, _end); + void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); p = p1; diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index e59010f27f..d3abaaab6d 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2836,24 +2836,24 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { } if (!pGrp->clonedBlk) { + int32_t code = 0; if (0 == pGrp->beginIdx) { SSDataBlock* p = NULL; - int32_t code = createOneDataBlock(pGrp->blk, true, &p); + code = createOneDataBlock(pGrp->blk, true, &p); if (code) { MJ_ERR_RET(code); } pGrp->blk = p; } else { - pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx); + code = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx, &pGrp->blk); pGrp->endIdx -= pGrp->beginIdx; pGrp->beginIdx = 0; pGrp->readIdx = 0; } - - if (NULL == pGrp->blk) { - MJ_ERR_RET(terrno); + if (code) { + MJ_ERR_RET(code); } - + pGrp->clonedBlk = true; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 04dca1c61f..542f161a80 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1360,10 +1360,9 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol if (keepGrp && rowNum > 0) { pTable->eqRowNum += rowNum; - - pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum); - if (NULL == pGrp->blk) { - MJ_ERR_RET(terrno); + code = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum, &pGrp->blk); + if (code) { + MJ_ERR_RET(code); } pGrp->endIdx -= pGrp->beginIdx; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index f81fff0806..295180652d 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -370,8 +370,8 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { pFinalRes->info.version = pRes->info.version; // continue merge data, ignore the group id - code = blockDataMerge(pFinalRes, pRes); - if (code) { + int32_t ret = blockDataMerge(pFinalRes, pRes); + if (ret < 0) { pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8d21ccf780..6755f131b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2584,7 +2584,10 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock bool colExists = false; for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) { - SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j); + SColumnInfoData* pResCol = NULL; + code = bdGetColumnInfoData(pBlock, j, &pResCol); + QUERY_CHECK_CODE(code, lino, _end); + if (pResCol->info.colId == pColMatchInfo->colId) { SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); code = colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ab94493385..bac5120ff9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -27,6 +27,9 @@ #include "tsimplehash.h" #include "executil.h" +#define AllocatedTupleType 0 +#define ReferencedTupleType 1 // tuple references to one row in pDataBlock + struct STupleHandle { SSDataBlock* pBlock; int32_t rowIndex; @@ -70,15 +73,15 @@ struct SSortHandle { int64_t startTs; uint64_t totalElapsed; - uint64_t pqMaxRows; - uint32_t pqMaxTupleLength; - uint32_t pqSortBufSize; - bool forceUsePQSort; - BoundedQueue* pBoundedQueue; - uint32_t tmpRowIdx; + uint64_t pqMaxRows; + uint32_t pqMaxTupleLength; + uint32_t pqSortBufSize; + bool forceUsePQSort; + BoundedQueue* pBoundedQueue; + uint32_t tmpRowIdx; - int64_t mergeLimit; - int64_t currMergeLimitTs; + int64_t mergeLimit; + int64_t currMergeLimitTs; int32_t sourceId; SSDataBlock* pDataBlock; @@ -102,14 +105,14 @@ struct SSortHandle { bool (*abortCheckFn)(void* param); void* abortCheckParam; - bool bSortByRowId; + bool bSortByRowId; SSortMemFile* pExtRowsMemFile; - int32_t extRowBytes; - int32_t extRowsPageSize; - int32_t extRowsMemSize; - int32_t srcTsSlotId; - SArray* aExtRowsOrders; - bool bSortPk; + int32_t extRowBytes; + int32_t extRowsPageSize; + int32_t extRowsMemSize; + int32_t srcTsSlotId; + SArray* aExtRowsOrders; + bool bSortPk; void (*mergeLimitReachedFn)(uint64_t tableUid, void* param); void* mergeLimitReachedParam; }; @@ -133,6 +136,7 @@ static void* createTuple(uint32_t columnNum, uint32_t tupleLen) { uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen; return taosMemoryCalloc(1, totalLen); } + static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } #define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx)) @@ -148,22 +152,32 @@ static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } * @param colIndex the columnIndex, for setting null bitmap * @return the next offset to add field * */ -static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length, - bool isNull, uint32_t tupleLen) { +static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, + size_t length, bool isNull, uint32_t tupleLen) { tupleSetOffset(*t, colIdx, offset); + if (isNull) { tupleSetNull(*t, colIdx, colNum); } else { if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { - *t = taosMemoryRealloc(*t, offset + length); + void* px = taosMemoryRealloc(*t, offset + length); + if (px == NULL) { + return terrno; + } + + *t = px; } tupleSetData(*t, offset, data, length); } + return offset + length; } static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { - if (tupleColIsNull(t, colIdx, colNum)) return NULL; + if (tupleColIsNull(t, colIdx, colNum)) { + return NULL; + } + return t + *tupleOffset(t, colIdx); } @@ -175,8 +189,6 @@ int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pB return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); } -#define AllocatedTupleType 0 -#define ReferencedTupleType 1 // tuple references to one row in pDataBlock typedef struct TupleDesc { uint8_t type; char* data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock @@ -187,17 +199,26 @@ typedef struct ReferencedTuple { size_t rowIndex; } ReferencedTuple; -static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) { +static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) { TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc)); - void* pTuple = createTuple(colNum, tupleLen); + if (t == NULL) { + return terrno; + } + + void* pTuple = createTuple(colNum, tupleLen); if (!pTuple) { taosMemoryFree(t); - return NULL; + return terrno; } + size_t colLen = 0; uint32_t offset = tupleGetDataStartOffset(colNum); for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (pCol == NULL) { + return terrno; + } + if (colDataIsNull_s(pCol, rowIdx)) { offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); } else { @@ -206,20 +227,34 @@ static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint3 tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen); } } + t->type = AllocatedTupleType; t->data = pTuple; - return t; + + *pDesc = t; + return 0; } -void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) { +int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) { + *pResult = NULL; + if (pDesc->type == ReferencedTupleType) { ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc; SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx); - if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL; - return colDataGetData(pCol, pRefTuple->rowIndex); + if (pCol == NULL) { + return terrno; + } + + if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) { + return TSDB_CODE_SUCCESS; + } + + *pResult = colDataGetData(pCol, pRefTuple->rowIndex); } else { - return tupleGetField(pDesc->data, colIdx, colNum); + *pResult = tupleGetField(pDesc->data, colIdx, colNum); } + + return 0; } void destroyTuple(void* t) { @@ -270,7 +305,7 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); if (pSortHandle->pOrderedSource == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _err; } @@ -398,7 +433,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { taosArrayDestroy(pPageIdList); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pSource->src.pBlock = pBlock; @@ -441,6 +476,10 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { } SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); + if (pPageIdList == NULL) { + return terrno; + } + while (start < pDataBlock->info.rows) { int32_t stop = 0; @@ -450,10 +489,11 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { return code; } - SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); - if (p == NULL) { + SSDataBlock* p = NULL; + code = blockDataExtractBlock(pDataBlock, start, stop - start + 1, &p); + if (code) { taosArrayDestroy(pPageIdList); - return terrno; + return code; } int32_t pageId = -1; @@ -505,8 +545,11 @@ static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) { static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) { pParam->pSources = taosArrayGet(pSources, startIndex); - pParam->numOfSources = (endIndex - startIndex + 1); + if (pParam->pSources == NULL) { + return terrno; + } + pParam->numOfSources = (endIndex - startIndex + 1); int32_t code = 0; // multi-pass internal merge sort is required @@ -537,6 +580,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); + if (pPgId == NULL) { + return terrno; + } void* pPage = getBufPage(pHandle->pBuf, *pPgId); if (NULL == pPage) { @@ -577,7 +623,14 @@ static int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* p for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfo == NULL) { + return terrno; + } + SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); + if (pSrcColInfo == NULL) { + return terrno; + } bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); if (isNull) { @@ -624,8 +677,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); + if (pPgId) { + return terrno; + } - void* pPage = getBufPage(pHandle->pBuf, *pPgId); + void* pPage = getBufPage(pHandle->pBuf, *pPgId); if (pPage == NULL) { qError("failed to get buffer, code:%s", tstrerror(terrno)); return terrno; @@ -899,8 +955,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); for (int32_t t = 0; t < sortPass; ++t) { int64_t st = taosGetTimestampUs(); - SArray* pResList = taosArrayInit(4, POINTER_BYTES); + if (pResList == NULL) { + return terrno; + } int32_t numOfInputSources = pHandle->numOfPages; int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources; @@ -931,8 +989,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } int32_t nMergedRows = 0; - SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); + if (pPageIdList == NULL) { + taosArrayDestroy(pResList); + return terrno; + } + while (1) { if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) { code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; @@ -1083,6 +1145,9 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColInfo == NULL) { + return terrno; + } if (!isNull[i]) { code = colDataSetVal(pColInfo, pBlock->info.rows, pStart, false); @@ -1124,7 +1189,11 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl } else { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - bool isNull = tsortIsNullVal(pTupleHandle, i); + if (pColInfo == NULL) { + return terrno; + } + + bool isNull = tsortIsNullVal(pTupleHandle, i); if (isNull) { colDataSetNULL(pColInfo, pBlock->info.rows); } else { @@ -1157,6 +1226,10 @@ static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (pCol == NULL) { + return terrno; + } + if (colDataIsNull_s(pCol, rowIdx)) { isNull[i] = 1; continue; @@ -1198,11 +1271,15 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i char** ppRow, bool* pFreeRow) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId); + if (pRegion == NULL) { + return terrno; + } + if (pRegion->buf == NULL) { pRegion->bufRegOffset = 0; pRegion->buf = taosMemoryMalloc(pMemFile->blockSize); if (pRegion->buf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } // todo @@ -1223,7 +1300,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i } else { *ppRow = taosMemoryMalloc(rowLen); if (*ppRow == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock); @@ -1277,7 +1354,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize); if (pMemFile->writeBuf == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } } @@ -1285,7 +1362,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { pMemFile->cacheSize = pHandle->extRowsMemSize; pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion)); if (pMemFile->aFileRegions == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } } @@ -1313,8 +1390,13 @@ static void destroySortMemFile(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) { SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i); + if (pRegion == NULL) { + continue; + } + taosMemoryFree(pRegion->buf); } + taosArrayDestroy(pMemFile->aFileRegions); pMemFile->aFileRegions = NULL; @@ -1338,7 +1420,7 @@ static int32_t tsortOpenRegion(SSortHandle* pHandle) { region.bufRegOffset = 0; void* px = taosArrayPush(pMemFile->aFileRegions, ®ion); if (px == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } pMemFile->currRegionId = 0; @@ -1347,11 +1429,16 @@ static int32_t tsortOpenRegion(SSortHandle* pHandle) { } else { SSortMemFileRegion regionNew = {0}; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + if (pRegion == NULL) { + return terrno; + } + regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize; regionNew.bufRegOffset = 0; + void* px = taosArrayPush(pMemFile->aFileRegions, ®ionNew); if (px == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; } ++pMemFile->currRegionId; pMemFile->currRegionOffset = 0; @@ -1363,6 +1450,10 @@ static int32_t tsortOpenRegion(SSortHandle* pHandle) { static int32_t tsortCloseRegion(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + if (pRegion == NULL) { + return terrno; + } + pRegion->regionSize = pMemFile->currRegionOffset; int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); if (writeBytes > 0) { @@ -1390,7 +1481,7 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) { for (int32_t i = 0; i < numRegions; ++i) { SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i); if (pRegion == NULL) { - return TSDB_CODE_INVALID_PARA; + return terrno; } pRegion->bufRegOffset = 0; @@ -1406,6 +1497,10 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + if (pRegion == NULL) { + return terrno; + } + { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); @@ -1442,8 +1537,20 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou SSDataBlock* pBlock = pHandle->pDataBlock; SBlockOrderInfo* extRowsTsOrder = taosArrayGet(pHandle->aExtRowsOrders, 0); + if (extRowsTsOrder == NULL) { + return terrno; + } + SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, extRowsTsOrder->slotId); + if (pSrcTsCol == NULL) { + return terrno; + } + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0); + if (pTsCol == NULL) { + return terrno; + } + char* pData = colDataGetData(pSrcTsCol, *rowIndex); code = colDataSetVal(pTsCol, pBlock->info.rows, pData, false); if (code) { @@ -1451,18 +1558,42 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou } SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1); + if (pRegionIdCol == NULL) { + return terrno; + } + colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId); SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2); + if (pOffsetCol == NULL) { + return terrno; + } + colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset); SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3); + if (pLengthCol == NULL) { + return terrno; + } + colDataSetInt32(pLengthCol, pBlock->info.rows, &length); if (pHandle->bSortPk) { SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + if (extRowsPkOrder == NULL) { + return terrno; + } + SColumnInfoData* pSrcPkCol = taosArrayGet(pSource->pDataBlock, extRowsPkOrder->slotId); + if (pSrcPkCol == NULL) { + return terrno; + } + SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, 4); + if (pPkCol == NULL) { + return terrno; + } + if (colDataIsNull_s(pSrcPkCol, *rowIndex)) { colDataSetNULL(pPkCol, pBlock->info.rows); } else { @@ -1483,16 +1614,15 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL; SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; - SColumnInfoData pkCol = {0}; - SSDataBlock* pSortInput = NULL; - int32_t code = createDataBlock(&pSortInput); + SColumnInfoData pkCol = {0}; + SSDataBlock* pSortInput = NULL; + int32_t code = createDataBlock(&pSortInput); if (code) { return code; } SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); - code = blockDataAppendColInfo(pSortInput, &tsCol); if (code) { blockDataDestroy(pSortInput); @@ -1539,7 +1669,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); if (pOrderInfoList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order; @@ -1551,7 +1681,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); void* p = taosArrayPush(pOrderInfoList, &biTs); if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (pHandle->bSortPk) { @@ -1563,7 +1693,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { void* px = taosArrayPush(pOrderInfoList, &biPk); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } @@ -1655,7 +1785,7 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, void* px = taosArrayPush(aPgId, &pageId); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); @@ -1817,13 +1947,22 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); - + if (pOrigBlockTsOrder == NULL) { + return terrno; + } + SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); + if (pHandleBlockTsOrder == NULL) { + return terrno; + } SBlockOrderInfo* pOrigBlockPkOrder = NULL; if (pHandle->bSortPk) { pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); + if (pOrigBlockPkOrder) { + return terrno; + } } code = initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder); @@ -1843,6 +1982,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); + if (aPgId == NULL) { + return terrno; + } + int32_t nRows = 0; int32_t nMergedRows = 0; bool mergeLimitReached = false; @@ -1989,7 +2132,11 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb *pSkipBlock = false; SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { - pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + code = blockDataExtractBlock(pOrigBlk, 0, keepRows, &pBlock); + if (code) { + return code; + } + *pExtractedBlock = true; } else { *pExtractedBlock = false; @@ -2001,11 +2148,14 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { + int32_t szSort = 0; size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + if (aExtSrc == NULL) { + return terrno; + } size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); - int32_t code = createPageBuf(pHandle); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(aExtSrc); @@ -2013,10 +2163,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - int32_t szSort = 0; + if (pSrc == NULL) { + taosArrayDestroy(aExtSrc); + return TSDB_CODE_INVALID_PARA; + } SBlockOrderInfo* pOrigTsOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); + if (pOrigTsOrder == NULL) { + return terrno; + } + if (pOrigTsOrder->order == TSDB_ORDER_ASC) { pHandle->currMergeLimitTs = INT64_MAX; } else { @@ -2024,13 +2181,28 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); - SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); - SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); - while (1) { - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + if (mTableNumRows == NULL) { + return terrno; + } + SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + if (aBlkSort == NULL) { + tSimpleHashCleanup(mTableNumRows); + return terrno; + } + + SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + if (mUidBlk == NULL) { + tSimpleHashCleanup(mTableNumRows); + taosArrayDestroy(aBlkSort); + return terrno; + } + + while (1) { bool bExtractedBlock = false; bool bSkipBlock = false; + + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); if (pBlk != NULL && pHandle->mergeLimit > 0) { SSDataBlock* p = NULL; code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p); @@ -2043,7 +2215,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; + if (tsCol == NULL) { + return terrno; + } + + int64_t firstRowTs = *(int64_t*)tsCol->pData; if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { @@ -2085,7 +2261,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { void* px = taosArrayPush(aBlkSort, &tBlk); if (px == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } } @@ -2176,10 +2352,13 @@ static void freeSSortSource(SSortSource* source) { } static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { - int32_t code = 0; - size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - + int32_t code = 0; + size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + if (pSource == NULL) { + return terrno; + } + SSortSource* source = *pSource; *pSource = NULL; @@ -2277,6 +2456,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } else if (pHandle->type == SORT_BLOCK_TS_MERGE) { code = createBlocksMergeSortInitialSources(pHandle); } + qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); return code; } @@ -2438,14 +2618,31 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); for (int32_t i = 0; i < orderInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i); - void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum); - void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum); - if (!lData && !rData) continue; + void *lData = NULL, *rData = NULL; + + int32_t ret1 = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum, &lData); + int32_t ret2 = tupleDescGetField(pRightDesc, pOrder->slotId, colNum, &rData); + if (ret1) { + return ret1; + } + + if (ret2) { + return ret2; + } + + if ((!lData) && (!rData)) { + continue; + } + if (!lData) return pOrder->nullFirst ? -1 : 1; if (!rData) return pOrder->nullFirst ? 1 : -1; - int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; - __compar_fn_t fn = getKeyComparFunc(type, pOrder->order); + SColumnInfoData* p = (SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId); + if (p == NULL) { + return terrno; + } + + __compar_fn_t fn = getKeyComparFunc(p->info.type, pOrder->order); int32_t ret = fn(lData, rData); if (ret == 0) { @@ -2454,20 +2651,28 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) return ret; } } + return 0; } static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle); - if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; + if (NULL == pHandle->pBoundedQueue) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tsortSetComparFp(pHandle, tupleComparFn); SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - SSortSource* source = *pSource; + if (pSource == NULL) { + return terrno; + } - pHandle->pDataBlock = NULL; - uint32_t tupleLen = 0; + SSortSource* source = *pSource; + uint32_t tupleLen = 0; PriorityQueueNode pqNode; + pHandle->pDataBlock = NULL; + while (1) { // fetch data SSDataBlock* pBlock = pHandle->fetchfp(source->param); @@ -2491,12 +2696,17 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { if (tupleLen == 0) { for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (pCol == NULL) { + return terrno; + } + tupleLen += pCol->info.bytes; if (IS_VAR_DATA_TYPE(pCol->info.type)) { tupleLen += sizeof(VarDataLenT); } } } + ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0}; for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) { refTuple.rowIndex = rowIdx; @@ -2505,11 +2715,15 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { if (!pPushedNode) { // do nothing if push failed } else { - pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx); - if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY; + pPushedNode->data = NULL; + int32_t code = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx, (TupleDesc**)&pPushedNode->data); + if (code) { + return code; + } } } } + return TSDB_CODE_SUCCESS; } @@ -2543,10 +2757,17 @@ static int32_t tsortPQSortNextTuple(SSortHandle* pHandle, STupleHandle **pTupleH for (uint32_t i = 0; i < colNum; ++i) { void* pData = tupleGetField(pTuple, i, colNum); + + SColumnInfoData* p = NULL; + code = bdGetColumnInfoData(pHandle->pDataBlock, i, &p); + if (code) { + return code; + } + if (!pData) { - colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0); + colDataSetNULL(p, 0); } else { - code = colDataSetVal(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false); + code = colDataSetVal(p, 0, pData, false); if (code) { return code; } @@ -2582,6 +2803,10 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle } SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + if (pSource == NULL) { + return terrno; + } + SSortSource* source = *pSource; SSDataBlock* pBlock = pHandle->fetchfp(source->param); if (!pBlock || pBlock->info.rows == 0) { @@ -2603,7 +2828,7 @@ int32_t tsortOpen(SSortHandle* pHandle) { } if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return -1; + return TSDB_CODE_INVALID_PARA; } pHandle->opened = true; @@ -2629,6 +2854,10 @@ int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) { bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex); + if (pColInfoSrc == NULL) { + return true; + } + return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1197891cab..59cfd85bc3 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -469,8 +469,12 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, } if (pIter->hasPrev) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { - (void)blockDataDestroy(pIter->pPrevRowBlock); - pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + blockDataDestroy(pIter->pPrevRowBlock); + int32_t code = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1, &pIter->pPrevRowBlock); + if (code) { + return code; + } + pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex); pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes); @@ -490,11 +494,10 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, char* pkData = colDataGetData(pIter->pPkCol, pIter->inputEndIndex); (void)memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes); - pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); - + code = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1, &pIter->pPrevRowBlock); pIter->hasPrev = true; *res = false; - return TSDB_CODE_SUCCESS; + return code; } else { int32_t idx = pIter->rowIndex; while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { @@ -553,9 +556,9 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, } (void)memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes); - pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + int32_t code = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1, &pIter->pPrevRowBlock); *res = false; - return TSDB_CODE_SUCCESS; + return code; } } } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 9526576426..654a8a92de 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -862,7 +862,12 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { blockDataAppendColInfo(block, &colInfoData); blockDataEnsureCapacity(block, udfCol->colData.numOfRows); - SColumnInfoData *col = bdGetColumnInfoData(block, 0); + SColumnInfoData *col = NULL; + int32_t code = bdGetColumnInfoData(block, 0, &col); + if (code) { + return code; + } + for (int i = 0; i < udfCol->colData.numOfRows; ++i) { if (udfColDataIsNull(udfCol, i)) { colDataSetNULL(col, i); @@ -1264,10 +1269,17 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { - blockDataAppendColInfo(pTempBlock, pInput->pData[i]); + code = blockDataAppendColInfo(pTempBlock, pInput->pData[i]); + if (code) { + return code; + } } - SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); + SSDataBlock *inputBlock = NULL; + code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock); + if (code) { + return code; + } SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 7f94ecfb3e..52ab34c121 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -89,7 +89,6 @@ int aggregateFuncTest() { } SSDataBlock *pBlock = NULL; - int32_t code = createDataBlock(&pBlock); if (code) { return code; @@ -103,7 +102,12 @@ int aggregateFuncTest() { blockDataEnsureCapacity(pBlock, 1024); pBlock->info.rows = 1024; - SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0); + SColumnInfoData *pColInfo = NULL; + code = bdGetColumnInfoData(pBlock, 0, &pColInfo); + if (code) { + return code; + } + for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataSetInt32(pColInfo, j, &j); } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index c0ee503f77..00a62d4773 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -37,19 +37,30 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); + if (pDataBlock == NULL) { + return terrno; + } int32_t compLen = *(int32_t*)pRetrieve->data; int32_t fullLen = *(int32_t*)(pRetrieve->data + sizeof(int32_t)); char* pInput = pRetrieve->data + PAYLOAD_PREFIX_LEN; if (pRetrieve->compressed && compLen < fullLen) { - char* p = taosMemoryMalloc(fullLen); + char* p = taosMemoryMalloc(fullLen); + if (p == NULL) { + return terrno; + } + int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0); ASSERT(len == fullLen); pInput = p; } - (void) blockDecode(pDataBlock, pInput); + const char* pDummy = NULL; + code = blockDecode(pDataBlock, pInput, &pDummy); + if (code) { + return code; + } if (pRetrieve->compressed && compLen < fullLen) { taosMemoryFree(pInput); @@ -109,18 +120,31 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) { } int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) { - SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); + const char* pDummy = NULL; + SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; + SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); if (pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; stError("failed to prepare retrieve block, %s", id); return terrno; } - (void) taosArrayPush(pArray, &(SSDataBlock){0}); - SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; - SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); + void* px = taosArrayPush(pArray, &(SSDataBlock){0}); + if (px == NULL) { + taosArrayDestroy(pArray); + return terrno; + } - (void) blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN); + SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); + if (pDataBlock == NULL) { + taosArrayDestroy(pArray); + return terrno; + } + + int32_t code = blockDecode(pDataBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, &pDummy); + if (code) { + taosArrayDestroy(pArray); + return code; + } // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); @@ -132,7 +156,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock pData->reqId = pReq->reqId; pData->blocks = pArray; - return TSDB_CODE_SUCCESS; + return code; } int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit** pSubmit) { @@ -178,7 +202,7 @@ int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { void* p = taosArrayPush(pMerged->submits, &pSubmit->submit); if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (pSubmit->ver > pMerged->ver) { @@ -260,8 +284,12 @@ void streamFreeQitem(SStreamQueueItem* data) { int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); + if (pSubmit == NULL) { + continue; + } taosMemoryFree(pSubmit->msgStr); } + taosArrayDestroy(pMerge->submits); taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {