diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 862bbee776..13ff3da53f 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -254,14 +254,23 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc); -SSDataBlock* createDataBlock(); -void blockDataDestroy(SSDataBlock* pBlock); -void blockDataFreeRes(SSDataBlock* pBlock); -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); -SSDataBlock* createSpecialDataBlock(EStreamType type); +#define QRY_OPTR_CHECK(_o) \ + do { \ + if ((_o) == NULL) { \ + return TSDB_CODE_INVALID_PARA; \ + } else { \ + *(_o) = NULL; \ + } \ + } while(0) -SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); -int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); +int32_t createDataBlock(SSDataBlock** pResBlock); +void blockDataDestroy(SSDataBlock* pBlock); +void blockDataFreeRes(SSDataBlock* pBlock); +int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock); +int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock); + +int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock); +int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4830593616..75a67ea484 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2899,7 +2899,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { } pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebInfo->newConsumers == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } return pRebInfo; @@ -3455,7 +3454,9 @@ static FORCE_INLINE void* taosDecodeSMqTopicInfoMsg(void* buf, SMqTopicInfo* pTo buf = taosDecodeStringTo(buf, pTopicInfo->name); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); - pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo)); + if ((pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo))) == NULL) { + return NULL; + } for (int32_t i = 0; i < sz; i++) { SMqReportVgInfo vgInfo; buf = taosDecodeSMqVgInfo(buf, &vgInfo); @@ -3493,7 +3494,9 @@ static FORCE_INLINE void* taosDecodeSMqReportMsg(void* buf, SMqReportReq* pMsg) buf = taosDecodeFixedI64(buf, &pMsg->consumerId); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); - pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo)); + if ((pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo))) == NULL) { + return NULL; + } for (int32_t i = 0; i < sz; i++) { SMqTopicInfo topicInfo; buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 112731d39d..0c1924c8d8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1017,15 +1017,13 @@ void returnToUser(SRequestObj* pRequest) { } static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) { - int64_t lastTs = 0; - - int32_t code = TSDB_CODE_SUCCESS; + int64_t lastTs = 0; TAOS_FIELD* pResFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); - *pBlock = createDataBlock(); - if (NULL == *pBlock) { - return terrno; + int32_t code = createDataBlock(pBlock); + if (code) { + return code; } for(int32_t i = 0; i < numOfFields; ++i) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 08efd9cc08..0a27f4e3bd 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -21,8 +21,6 @@ #define MALLOC_ALIGN_BYTES 32 - - int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (pColumnInfoData->reassigned) { @@ -134,7 +132,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const uint32_t len = pColumnInfoData->varmeta.length; pColumnInfoData->varmeta.offset[rowIndex] = len; - memmove(pColumnInfoData->pData + len, pData, dataLen); + (void) memmove(pColumnInfoData->pData + len, pData, dataLen); pColumnInfoData->varmeta.length += dataLen; } else { memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes); @@ -290,7 +288,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren pColumnInfoData->reassigned = true; } - return TSDB_CODE_SUCCESS; + return code; } int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, @@ -371,7 +369,7 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2) { if (pColumnInfoData->info.type != pSource->info.type) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } if (numOfRow2 == 0) { @@ -679,20 +677,24 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b } int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { + int32_t code = 0; int32_t capacity = pDest->info.capacity; - - size_t numOfCols = taosArrayGetSize(pDest->pDataBlock); + size_t numOfCols = taosArrayGetSize(pDest->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); capacity = pDest->info.capacity; - colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); + int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); + if (ret < 0) { // error occurs + code = ret; + return code; + } } pDest->info.capacity = capacity; pDest->info.rows += pSrc->info.rows; - return TSDB_CODE_SUCCESS; + return code; } int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) { @@ -823,12 +825,15 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd } SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { + int32_t code = 0; if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { return NULL; } - SSDataBlock* pDst = createOneDataBlock(pBlock, false); - if (pDst == NULL) { + SSDataBlock* pDst = NULL; + code = createOneDataBlock(pBlock, false, &pDst); + if (code) { + terrno = code; return NULL; } @@ -859,7 +864,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 colDataSetNULL(pDstCol, j - startIndex); } else { char* p = colDataGetData(pColData, j); - colDataSetVal(pDstCol, j - startIndex, p, false); + code = colDataSetVal(pDstCol, j - startIndex, p, false); } } } @@ -1600,53 +1605,112 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { return TSDB_CODE_SUCCESS; } -SSDataBlock* createSpecialDataBlock(EStreamType type) { - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->info.hasVarCol = false; - pBlock->info.id.groupId = 0; - pBlock->info.rows = 0; - pBlock->info.type = type; - pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + - sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; - pBlock->info.watermark = INT64_MIN; +int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) { + QRY_OPTR_CHECK(pBlock); + + int32_t code = 0; + SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (p == NULL) { + return terrno; + } + + p->info.hasVarCol = false; + p->info.id.groupId = 0; + p->info.rows = 0; + p->info.type = type; + p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + + sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; + p->info.watermark = INT64_MIN; + + p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); + if (p->pDataBlock == NULL) { + taosMemoryFree(p); + return terrno; + } - pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.bytes = sizeof(TSKEY); + // window start ts - taosArrayPush(pBlock->pDataBlock, &infoData); + void* px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + // window end ts - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } infoData.info.type = TSDB_DATA_TYPE_UBIGINT; infoData.info.bytes = sizeof(uint64_t); + // uid - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + // group id - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.bytes = sizeof(TSKEY); + // calculate start ts - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + // calculate end ts - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } // table name infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); - - return pBlock; -} - -SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { - if (pDataBlock == NULL) { - return NULL; + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + + *pBlock = p; + return code; + +_err: + taosArrayDestroy(p->pDataBlock); + taosMemoryFree(p); + return code; +} + +int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); + + if (pDataBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } - SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; pBlock->info.rows = 0; pBlock->info.capacity = 0; @@ -1658,11 +1722,10 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { blockDataAppendColInfo(pBlock, &colInfo); } - int32_t code = blockDataEnsureCapacity(pBlock, 1); + code = blockDataEnsureCapacity(pBlock, 1); if (code != TSDB_CODE_SUCCESS) { - terrno = code; blockDataDestroy(pBlock); - return NULL; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1670,13 +1733,17 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); void* pData = NULL; - if (!isNull) pData = colDataGetData(pSrc, rowIdx); - colDataSetVal(pDst, 0, pData, isNull); + if (!isNull) { + pData = colDataGetData(pSrc, rowIdx); + } + + code = colDataSetVal(pDst, 0, pData, isNull); } pBlock->info.rows = 1; - return pBlock; + *pResBlock = pBlock; + return code; } void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) { @@ -1699,12 +1766,18 @@ void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) { memcpy(p->pData, pDst->pks[1].pData, p->nData); } -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { +int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); if (pDataBlock == NULL) { - return NULL; + return TSDB_CODE_INVALID_PARA; + } + + SSDataBlock* pDstBlock = NULL; + int32_t code = createDataBlock(&pDstBlock); + if (code) { + return code; } - SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; pDstBlock->info.rows = 0; @@ -1723,11 +1796,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { copyPkVal(&pDstBlock->info, &pDataBlock->info); if (copyData) { - int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); + code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { - terrno = code; blockDataDestroy(pDstBlock); - return NULL; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1740,24 +1812,26 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pDstBlock->info.capacity = pDataBlock->info.rows; } - return pDstBlock; + *pResBlock = pDstBlock; + return code; } -SSDataBlock* createDataBlock() { +int32_t createDataBlock(SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno; } pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); if (pBlock->pDataBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = terrno; taosMemoryFree(pBlock); - return NULL; + return code; } - return pBlock; + *pResBlock = pBlock; + return 0; } int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) { @@ -1771,7 +1845,6 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -1863,7 +1936,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { int32_t newLen = BitmapLen(total - n); if (n % 8 == 0) { - memmove(nullBitmap, nullBitmap + n / 8, newLen); + (void) memmove(nullBitmap, nullBitmap + n / 8, newLen); } else { int32_t tail = n % 8; int32_t i = 0; @@ -1924,23 +1997,23 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s } if (dataOffset > 0) { - memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); + (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); } - memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); + (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); return dataLen; } static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total); - memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); + (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); // clear the offset value of the unused entries. memset(&pColInfoData->varmeta.offset[total - n], 0, n); } else { int32_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes); + (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes); doShiftBitmap(pColInfoData->nullbitmap, n, total); } } @@ -2258,18 +2331,19 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int64_t uid, int32_t vgId, tb_uid_t suid) { SSubmitReq2* pReq = *ppReq; SArray* pVals = NULL; - int32_t numOfBlks = 0; int32_t sz = 1; - - terrno = TSDB_CODE_SUCCESS; + int32_t code = 0; + *ppReq = NULL; + terrno = 0; if (NULL == pReq) { if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _end; } if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + code = terrno; goto _end; } } @@ -2317,13 +2391,23 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat isStartKey = true; ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var})); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + return terrno; + } + } else if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + return terrno; + } } else { SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(int64_t*)var})); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + return terrno; + } } break; case TSDB_DATA_TYPE_NCHAR: @@ -2394,29 +2478,38 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } } SRow* pRow = NULL; - if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) { + if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) { tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); goto _end; } + ASSERT(pRow); - taosArrayPush(tbData.aRowP, &pRow); + void* px = taosArrayPush(tbData.aRowP, &pRow); + if (px == NULL) { + code = terrno; + goto _end; + } } - taosArrayPush(pReq->aSubmitTbData, &tbData); + void* px = taosArrayPush(pReq->aSubmitTbData, &tbData); + if (px == NULL) { + code = terrno; + goto _end; + } } + _end: taosArrayDestroy(pVals); - if (terrno != 0) { - *ppReq = NULL; + if (code != 0) { if (pReq) { tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFreeClear(pReq); } - - return TSDB_CODE_FAILED; + } else { + *ppReq = pReq; } - *ppReq = pReq; - return TSDB_CODE_SUCCESS; + + return code; } void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) { @@ -2579,14 +2672,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { } colSizes[col] += colSize; dataLen += colSize; - memmove(data, pColData, colSize); + (void) memmove(data, pColData, colSize); data += colSize; } } else { colSizes[col] = colDataGetLength(pColRes, numOfRows); dataLen += colSizes[col]; if (pColRes->pData != NULL) { - memmove(data, pColRes->pData, colSizes[col]); + (void) memmove(data, pColRes->pData, colSizes[col]); } data += colSizes[col]; } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index dabdc630f5..371fc130f0 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -459,7 +459,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, SColVal colVal; if ((colValArray = taosArrayInit(numOfInfos, sizeof(SColVal))) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } for (int32_t iRow = 0; iRow < numOfRows; iRow++) { @@ -670,7 +670,7 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, // merge aColVal = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); if (aColVal == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _exit; } @@ -1748,7 +1748,7 @@ int32_t tTagToValArray(const STag *pTag, SArray **ppArray) { (*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal)); if (*ppArray == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _err; } diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index 1f396feb81..b85af42d1c 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -236,7 +236,9 @@ TEST(testCase, toInteger_test) { } TEST(testCase, Datablock_test) { - SSDataBlock* b = createDataBlock(); + SSDataBlock* b = NULL; + int32_t code = createDataBlock(&b); + ASSERT(code == 0); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1); taosArrayPush(b->pDataBlock, &infoData); @@ -361,7 +363,9 @@ TEST(testCase, non_var_dataBlock_split_test) { TEST(testCase, var_dataBlock_split_test) { int32_t numOfRows = 1000000; - SSDataBlock* b = createDataBlock(); + SSDataBlock* b = NULL; + int32_t code = createDataBlock(&b); + ASSERT(code == 0); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1); blockDataAppendColInfo(b, &infoData); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 894d888e2d..687f21845e 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -289,7 +289,12 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { int32_t numOfCols = pShow->pMeta->numOfColumns; - SSDataBlock *pBlock = createDataBlock(); + SSDataBlock *pBlock = NULL; + code = createDataBlock(&pBlock); + if (code) { + TAOS_RETURN(code); + } + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 19ad9e3540..381b1aba22 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -806,14 +806,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // schedule stream task for stream obj - if ((code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList)) < 0) { + code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; } // add stream to trans - if ((code = mndPersistStream(pTrans, &streamObj)) < 0) { + code = mndPersistStream(pTrans, &streamObj); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to persist since %s", createReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; @@ -837,7 +839,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { streamMutexUnlock(&execInfo.lock); // execute creation - if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); goto _OVER; @@ -848,12 +851,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SName dbname = {0}; code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (code) { + mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code)); goto _OVER; } SName name = {0}; - code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE); if (code) { + mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code)); goto _OVER; } @@ -868,7 +873,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { _OVER: if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create since %s", createReq.name, terrstr(code)); + } else { + mDebug("stream:%s create stream completed", createReq.name); } mndReleaseStream(pMnode, pStream); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 6417e6564e..a596233364 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1001,7 +1001,9 @@ static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; - if (pTrans == NULL) return -1; + if (pTrans == NULL) { + return TSDB_CODE_INVALID_PARA; + } TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); @@ -1583,6 +1585,7 @@ _OVER: static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; + terrno = 0; if (pTrans->exec == TRN_EXEC_SERIAL) { code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index e4a04047f1..ee616d7a0d 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1069,9 +1069,12 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { if (count > TRY_ERROR_LIMIT) break; int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type); - if (cmp == 0) - taosArrayPush(pUids, &p->uid); - else { + if (cmp == 0) { + if (taosArrayPush(pUids, &p->uid) == NULL) { + ret = terrno; + break; + } + } else { if (param->equal == true) { if (count > TRY_ERROR_LIMIT) break; count++; @@ -1132,7 +1135,10 @@ int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) { cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type); if (cmp == 0) { tb_uid_t tuid = *(tb_uid_t *)pEntryVal; - taosArrayPush(pUids, &tuid); + if (taosArrayPush(pUids, &tuid) == NULL) { + ret = terrno; + goto END; + } } else { if (param->equal == true) { if (count > TRY_ERROR_LIMIT) break; @@ -1328,7 +1334,10 @@ int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) { } else { tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes); } - taosArrayPush(pUids, &tuid); + if (taosArrayPush(pUids, &tuid) == NULL) { + ret = terrno; + break; + } found = true; } else { if (param->equal == true) { @@ -1432,7 +1441,11 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; info.pTagVal = taosMemoryMalloc(pCur->vLen); memcpy(info.pTagVal, pCur->pVal, pCur->vLen); - taosArrayPush(pUidTagInfo, &info); + if (taosArrayPush(pUidTagInfo, &info) == NULL) { + metaCloseCtbCursor(pCur); + taosHashCleanup(pSepecifiedUidMap); + return TSDB_CODE_OUT_OF_MEMORY; + } } } else { // only the specified tables need to be added while (1) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 52151e76d6..04447fd3ce 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -341,7 +341,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb break; } - taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); + (void)taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); } tdbTbcClose(pCtbIdxc); @@ -405,7 +405,7 @@ static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) { break; } - taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); + (void)taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); } tdbFree(pKey); @@ -1033,7 +1033,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi } if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { - taosArrayPush(tbUids, &uid); + (void)taosArrayPush(tbUids, &uid); if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); @@ -1135,7 +1135,7 @@ static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) { tbFName[TSDB_TABLE_FNAME_LEN] = '\0'; int32_t ret = vnodeValidateTableHash(pMeta->pVnode, tbFName); if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) { - taosArrayPush(uidList, &me.uid); + (void)taosArrayPush(uidList, &me.uid); } } tDecoderClear(&dc); @@ -1783,11 +1783,11 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA } else { memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); } - taosArrayPush(pTagArray, &val); + (void)taosArrayPush(pTagArray, &val); } else { STagVal val = {.cid = pCol->colId}; if (tTagGet(pOldTag, &val)) { - taosArrayPush(pTagArray, &val); + (void)taosArrayPush(pTagArray, &val); } } } @@ -2171,7 +2171,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT } SMetaPair pair = {.key = pKey, nKey = nKey}; - taosArrayPush(tagIdxList, &pair); + (void)taosArrayPush(tagIdxList, &pair); } tdbTbcClose(pTagIdxc); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7c6194a3ea..c779a17301 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -289,7 +289,13 @@ STqReader* tqReaderOpen(SVnode* pVnode) { pReader->cachedSchemaSuid = 0; pReader->pSchemaWrapper = NULL; pReader->tbIdHash = NULL; - pReader->pResBlock = createDataBlock(); + pReader->pResBlock = NULL; + + int32_t code = createDataBlock(&pReader->pResBlock); + if (code) { + terrno = code; + } + return pReader; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index b72288425b..5df8c97962 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -117,8 +117,13 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } STqOffsetVal offset = {0}; qStreamExtractOffset(task, &offset); - pHandle->block = createOneDataBlock(pDataBlock, true); - TSDB_CHECK_NULL(pDataBlock, code, line, END, terrno); + pHandle->block = NULL; + + code = createOneDataBlock(pDataBlock, true, &pHandle->block); + if (code) { + return code; + } + pHandle->blockTime = offset.ts; tOffsetDestroy(&offset); code = getDataBlock(task, pHandle, vgId, &pDataBlock); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 91074b32f6..9724973440 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -576,8 +576,10 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* goto END; } - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); - TSDB_CHECK_NULL(pDelBlock, code, line, END, terrno) + SSDataBlock* pDelBlock = NULL; + code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); + TSDB_CHECK_CODE(code, line, END); + code = blockDataEnsureCapacity(pDelBlock, numOfTables); TSDB_CHECK_CODE(code, line, END); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 68954e5156..3b0a40c475 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -770,7 +770,11 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { if (fset->channelOpened) { - taosArrayPush(channelArray, &fset->channel); + if (taosArrayPush(channelArray, &fset->channel) == NULL) { + taosArrayDestroy(channelArray); + taosThreadMutexUnlock(&pTsdb->mutex); + return terrno; + } fset->channel = (SVAChannelID){0}; fset->mergeScheduled = false; tsdbFSSetBlockCommit(fset, false); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a0063cbf29..a3210dbfd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -402,28 +402,30 @@ static void initReaderStatus(SReaderStatus* pStatus) { } static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) { - *pResBlock = createDataBlock(); - if (*pResBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pResBlock); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code != 0) { + return code; } for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info = pCond->colList[i]; - int32_t code = blockDataAppendColInfo(*pResBlock, &colInfo); + code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(*pResBlock); - *pResBlock = NULL; + taosMemoryFree(pBlock); return code; } } - int32_t code = blockDataEnsureCapacity(*pResBlock, capacity); + code = blockDataEnsureCapacity(pBlock, capacity); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(*pResBlock); - *pResBlock = NULL; + taosMemoryFree(pBlock); } + *pResBlock = pBlock; return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 64569e63ac..0cb6a152b0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -88,6 +88,9 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; if (pBuf->pData == NULL) { pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); + if (pBuf->pData == NULL) { + return terrno; + } } for (int32_t i = 0; i < num; ++i) { @@ -163,22 +166,22 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in if (IS_NUMERIC_TYPE(type)) { if (asc) { - switch(type) { + switch (type) { case TSDB_DATA_TYPE_BIGINT: { pKey->pks[0].val = INT64_MIN; break; } - case TSDB_DATA_TYPE_INT:{ + case TSDB_DATA_TYPE_INT: { int32_t min = INT32_MIN; (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } - case TSDB_DATA_TYPE_SMALLINT:{ + case TSDB_DATA_TYPE_SMALLINT: { int16_t min = INT16_MIN; (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } - case TSDB_DATA_TYPE_TINYINT:{ + case TSDB_DATA_TYPE_TINYINT: { int8_t min = INT8_MIN; (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; @@ -194,15 +197,31 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in ASSERT(0); } } else { - switch(type) { - case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MAX;break; - case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MAX;break; - case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MAX;break; - case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MAX;break; - case TSDB_DATA_TYPE_UBIGINT:pKey->pks[0].val = UINT64_MAX;break; - case TSDB_DATA_TYPE_UINT:pKey->pks[0].val = UINT32_MAX;break; - case TSDB_DATA_TYPE_USMALLINT:pKey->pks[0].val = UINT16_MAX;break; - case TSDB_DATA_TYPE_UTINYINT:pKey->pks[0].val = UINT8_MAX;break; + switch (type) { + case TSDB_DATA_TYPE_BIGINT: + pKey->pks[0].val = INT64_MAX; + break; + case TSDB_DATA_TYPE_INT: + pKey->pks[0].val = INT32_MAX; + break; + case TSDB_DATA_TYPE_SMALLINT: + pKey->pks[0].val = INT16_MAX; + break; + case TSDB_DATA_TYPE_TINYINT: + pKey->pks[0].val = INT8_MAX; + break; + case TSDB_DATA_TYPE_UBIGINT: + pKey->pks[0].val = UINT64_MAX; + break; + case TSDB_DATA_TYPE_UINT: + pKey->pks[0].val = UINT32_MAX; + break; + case TSDB_DATA_TYPE_USMALLINT: + pKey->pks[0].val = UINT16_MAX; + break; + case TSDB_DATA_TYPE_UTINYINT: + pKey->pks[0].val = UINT8_MAX; + break; default: ASSERT(0); } @@ -232,7 +251,7 @@ void clearRowKey(SRowKey* pKey) { taosMemoryFreeClear(pKey->pks[0].pData); } -static int32_t initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { +static int32_t initLastProcKey(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { int32_t code = 0; int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -448,8 +467,8 @@ void cleanupInfoForNextFileset(SSHashObj* pTableMap) { // brin records iterator void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) { - (void) memset(&pIter->block, 0, sizeof(SBrinBlock)); - (void) memset(&pIter->record, 0, sizeof(SBrinRecord)); + (void)memset(&pIter->block, 0, sizeof(SBrinBlock)); + (void)memset(&pIter->record, 0, sizeof(SBrinRecord)); pIter->blockIndex = -1; pIter->recordIndex = -1; @@ -471,7 +490,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) { return TSDB_CODE_INVALID_PARA; } - (void) tBrinBlockClear(&pIter->block); + (void)tBrinBlockClear(&pIter->block); int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code)); @@ -488,7 +507,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) { return code; } -void clearBrinBlockIter(SBrinRecordIter* pIter) { (void) tBrinBlockDestroy(&pIter->block); } +void clearBrinBlockIter(SBrinRecordIter* pIter) { (void)tBrinBlockDestroy(&pIter->block); } // initialize the file block access order // sort the file blocks according to the offset of each data block in the files @@ -658,7 +677,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); for (int32_t i = 0; i < numOfBlocks; ++i) { STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; - void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -774,6 +793,9 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (pScanInfo->pFileDelData == NULL) { pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); + if (pScanInfo->pFileDelData == NULL) { + return terrno; + } } for (int32_t k = 0; k < pBlock->numOfRecords; ++k) { @@ -810,6 +832,9 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (pScanInfo->pFileDelData == NULL) { pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); + if (pScanInfo->pFileDelData == NULL) { + return terrno; + } } } @@ -821,7 +846,7 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (record.version <= pReader->info.verRange.maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - void* px = taosArrayPush(pScanInfo->pFileDelData, &delData); + void* px = taosArrayPush(pScanInfo->pFileDelData, &delData); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -878,7 +903,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs ETombBlkCheckEnum ret = 0; code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret); - (void) tTombBlockDestroy(&block); + (void)tTombBlockDestroy(&block); if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) { return code; } @@ -977,7 +1002,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo SStatisBlk* p = &pStatisBlkArray->data[i]; STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); - (void) tStatisBlockInit(pStatisBlock); + (void)tStatisBlockInit(pStatisBlock); int64_t st = taosGetTimestampMs(); int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); @@ -995,7 +1020,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (index >= pStatisBlock->numOfRecords) { - (void) tStatisBlockDestroy(pStatisBlock); + (void)tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -1005,7 +1030,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { p = &pStatisBlkArray->data[i]; if (p->minTbid.suid > suid) { - (void) tStatisBlockDestroy(pStatisBlock); + (void)tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -1025,7 +1050,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } } - (void) tStatisBlockDestroy(pStatisBlock); + (void)tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -1037,7 +1062,7 @@ static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlo (*i) += 1; (*j) = 0; if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { - (void) tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); + (void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); } } } @@ -1049,7 +1074,7 @@ int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { int32_t inc = numOfFileObj - size; for (int32_t k = 0; k < inc; ++k) { SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); - void* px = taosArrayPush(pLDIterList, &pIter); + void* px = taosArrayPush(pLDIterList, &pIter); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1073,6 +1098,9 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) // add the list/iter placeholder while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) { SArray* pList = taosArrayInit(4, POINTER_BYTES); + if (pList == NULL) { + return terrno; + } void* px = taosArrayPush(pSttFileBlockIterArray, &pList); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1210,8 +1238,7 @@ static int32_t sortUidComparFn(const void* p1, const void* p2) { return ret; } -bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, - int32_t order) { +bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order) { // check if it overlap with del skyline taosArraySort(pKeyRangeList, sortUidComparFn); @@ -1242,7 +1269,7 @@ bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlo } STimeWindow w2 = {.skey = p2->skey.ts, .ekey = p2->ekey.ts}; - bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order); + bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order); if (overlap) { return false; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 45b2abeb24..1b656442a5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1124,7 +1124,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); if (vnodeValidateTableHash(pVnode, tbName) < 0) { cRsp.code = TSDB_CODE_VND_HASH_MISMATCH; - taosArrayPush(rsp.pArray, &cRsp); + if (taosArrayPush(rsp.pArray, &cRsp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } vError("vgId:%d create-table:%s failed due to hash value mismatch", TD_VID(pVnode), tbName); continue; } @@ -1139,11 +1143,19 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, } else { cRsp.code = TSDB_CODE_SUCCESS; tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); - taosArrayPush(tbUids, &pCreateReq->uid); + if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } vnodeUpdateMetaRsp(pVnode, cRsp.pMeta); } - taosArrayPush(rsp.pArray, &cRsp); + if (taosArrayPush(rsp.pArray, &cRsp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + rcode = -1; + goto _exit; + } } vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); @@ -1375,12 +1387,20 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid); } - taosArrayPush(rsp.pArray, &dropTbRsp); + if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pRsp->code = terrno; + goto _exit; + } if (tsEnableAuditCreateTable) { char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); strcpy(str, pDropTbReq->name); - taosArrayPush(tbNames, &str); + if (taosArrayPush(tbNames, &str) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pRsp->code = terrno; + goto _exit; + } } } @@ -1499,11 +1519,13 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) { taosArrayDestroy(pCxt->pColValues); pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal)); if (NULL == pCxt->pColValues) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) { SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type); - taosArrayPush(pCxt->pColValues, &val); + if (taosArrayPush(pCxt->pColValues, &val) == NULL) { + return terrno; + } } return TSDB_CODE_SUCCESS; @@ -1819,7 +1841,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in goto _exit; } - taosArrayPush(newTbUids, &pSubmitTbData->uid); + if (taosArrayPush(newTbUids, &pSubmitTbData->uid) == NULL) { + code = terrno; + goto _exit; + } if (pCreateTbRsp->pMeta) { vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index d258fa49e4..2811402ea1 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -74,13 +74,16 @@ static int32_t getSchemaBytes(const SSchema* pSchema) { } static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_FIELD_LEN, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_TYPE_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -229,13 +232,16 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** static int32_t execResetQueryCache() { return catalogClearCache(); } static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_FIELD2_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -418,13 +424,16 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT } static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD1_LEN, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD2_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -439,13 +448,16 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { } static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD1_LEN, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD2_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -892,9 +904,12 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { } static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SNode* pProj = NULL; @@ -910,8 +925,9 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p } QRY_ERR_RET(blockDataAppendColInfo(pBlock, &infoData)); } + *pOutput = pBlock; - return TSDB_CODE_SUCCESS; + return code; } int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) { diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 1c95735c14..8d9f1fb9cc 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1941,7 +1941,8 @@ _return: } int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { - int32_t code = 0; + int32_t code = 0; + SSDataBlock *pBlock = NULL; SExplainCtx *pCtx = (SExplainCtx *)ctx; int32_t rowNum = taosArrayGetSize(pCtx->rows); if (rowNum <= 0) { @@ -1949,7 +1950,9 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { QRY_ERR_RET(TSDB_CODE_APP_ERROR); } - SSDataBlock *pBlock = createDataBlock(); + code = createDataBlock(&pBlock); + QRY_ERR_JRET(code); + SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1); QRY_ERR_JRET(blockDataAppendColInfo(pBlock, &infoData)); QRY_ERR_JRET(blockDataEnsureCapacity(pBlock, rowNum)); diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index c047c496ca..af8532b5b3 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -20,15 +20,6 @@ extern "C" { #endif -#define QRY_OPTR_CHECK(_o) \ - do { \ - if ((_o) == NULL) { \ - return TSDB_CODE_INVALID_PARA; \ - } else { \ - *(_o) = NULL; \ - } \ - } while(0) - typedef struct SOperatorCostInfo { double openCost; double totalCost; diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 67c449d3ac..7e105d2260 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -364,7 +364,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + code = createDataBlock(&pBlock); + if (code) { + return code; + } + pBlock->info.rows = 1; pBlock->info.capacity = 0; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b6994e7036..9d49c8e9ca 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -181,7 +181,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl capacity = TMIN(totalTables, 4096); - pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); + pInfo->pBufferedRes = NULL; + code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes); + QUERY_CHECK_CODE(code, lino, _error); + setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 7c74fa7a44..3329001cbc 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -672,7 +672,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + code = createDataBlock(&pBlock); + QUERY_CHECK_CODE(code, lino, _end); + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId); code = blockDataAppendColInfo(pBlock, &idata); @@ -791,8 +794,8 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks); blockDataCleanup(pb); } else { - pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); - QUERY_CHECK_NULL(pb, code, lino, _end, terrno); + code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb); + QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); } int32_t compLen = *(int32_t*)pStart; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a8e7d0e03f..f263418e3e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -251,9 +251,13 @@ SArray* createSortInfo(SNodeList* pNodeList) { } SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { - int32_t numOfCols = LIST_LENGTH(pNode->pSlots); - - SSDataBlock* pBlock = createDataBlock(); + int32_t numOfCols = LIST_LENGTH(pNode->pSlots); + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + terrno = code; + return NULL; + } pBlock->info.id.blockId = pNode->dataBlockId; pBlock->info.type = STREAM_INVALID; @@ -267,7 +271,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { idata.info.scale = pDescNode->dataType.scale; idata.info.precision = pDescNode->dataType.precision; - int32_t code = blockDataAppendColInfo(pBlock, &idata); + code = blockDataAppendColInfo(pBlock, &idata); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); @@ -1029,11 +1033,9 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S SStorageAPI* pStorageAPI) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SSDataBlock* pResBlock = createDataBlock(); - if (pResBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSDataBlock* pResBlock = NULL; + code = createDataBlock(&pResBlock); + QUERY_CHECK_CODE(code, lino, _end); for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) { SColumnInfoData colInfo = {0}; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d864a87f75..fe74037f0f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -695,9 +695,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo while (pRes != NULL) { SSDataBlock* p = NULL; if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { - SSDataBlock* p1 = createOneDataBlock(pRes, true); - void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + SSDataBlock* p1 = NULL; + code = createOneDataBlock(pRes, true, &p1); + void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); + QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); p = p1; } else { p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 1a433ba025..4b71c5ee3f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -513,7 +513,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi goto _error; } - pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); + pInfo->pFinalRes = NULL; + + code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes); + if (code) { + goto _error; + } + code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 173e67cc16..706cf50270 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -613,7 +613,10 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc return NULL; } - SSDataBlock* pDstBlock = createDataBlock(); + SSDataBlock* pDstBlock = NULL; + code = createDataBlock(&pDstBlock); + QUERY_CHECK_CODE(code, lino, _end); + pDstBlock->info = pDataBlock->info; pDstBlock->info.id.blockId = pOperator->resultDataBlockId; pDstBlock->info.capacity = 0; @@ -1319,7 +1322,10 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag QUERY_CHECK_CODE(code, lino, _end); if (winCode != TSDB_CODE_SUCCESS) { - SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); + SSDataBlock* pTmpBlock = NULL; + code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock); + QUERY_CHECK_CODE(code, lino, _end); + memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); pTmpBlock->info.id.groupId = groupId; char* tbName = pSrcBlock->info.parTbName; @@ -1708,8 +1714,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); taosHashSetFreeFp(pInfo->pPartitions, freePartItem); pInfo->tsColIndex = 0; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); - QUERY_CHECK_NULL(pInfo->pDelRes, code, lino, _error, terrno); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 9f6318b58d..807d6b9785 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -1157,22 +1157,23 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); if (TSDB_CODE_SUCCESS != code) { - QRY_ERR_RET(terrno); + QRY_ERR_RET(code); } if (NULL != pJoin->pPreFilter) { - pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false); - if (NULL == pJoin->finBlk) { - QRY_ERR_RET(terrno); + pJoin->midBlk = NULL; + code = createOneDataBlock(pJoin->finBlk, false, &pJoin->midBlk); + if (code) { + QRY_ERR_RET(code); } + code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity); if (TSDB_CODE_SUCCESS != code) { - QRY_ERR_RET(terrno); + QRY_ERR_RET(code); } } pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO; - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 5381abe28e..e59010f27f 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2248,10 +2248,12 @@ static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInf } while (true); if (buildGot && NULL == pCtx->cache.outBlk) { - pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); - if (NULL == pCtx->cache.outBlk) { - MJ_ERR_RET(terrno); + pCtx->cache.outBlk = NULL; + int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk); + if (code) { + MJ_ERR_RET(code); } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); } @@ -2678,10 +2680,12 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo if (buildGot && pJoin->build->newBlk) { if (NULL == pCtx->cache.outBlk) { - pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); - if (NULL == pCtx->cache.outBlk) { - MJ_ERR_RET(terrno); + pCtx->cache.outBlk = NULL; + int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk); + if (code) { + MJ_ERR_RET(code); } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); } @@ -2833,7 +2837,12 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { if (!pGrp->clonedBlk) { if (0 == pGrp->beginIdx) { - pGrp->blk = createOneDataBlock(pGrp->blk, true); + SSDataBlock* p = NULL; + int32_t code = createOneDataBlock(pGrp->blk, true, &p); + if (code) { + MJ_ERR_RET(code); + } + pGrp->blk = p; } else { pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx); pGrp->endIdx -= pGrp->beginIdx; @@ -3672,9 +3681,10 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); if (pJoin->pFPreFilter) { - pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); - if (NULL == pCtx->midBlk) { - MJ_ERR_RET(terrno); + pCtx->midBlk = NULL; + int32_t code = createOneDataBlock(pCtx->finBlk, false, &pCtx->midBlk); + if (code) { + MJ_ERR_RET(code); } MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity)); } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index d425c87019..04dca1c61f 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1331,10 +1331,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol } if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) { - pGrp->blk = createOneDataBlock(pTable->blk, true); - if (NULL == pGrp->blk) { - MJ_ERR_RET(terrno); + pGrp->blk = NULL; + code = createOneDataBlock(pTable->blk, true, &pGrp->blk); + if (code) { + MJ_ERR_RET(code); } + if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) { MJ_ERR_RET(terrno); } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 31757ef70c..3b390c8719 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -207,7 +207,6 @@ int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { if (pSortMergeInfo->pIntermediateBlock == NULL) { pSortMergeInfo->pIntermediateBlock = NULL; - code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock); if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) { return code; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 9b85f53494..f9ef57ec5e 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -27,6 +27,8 @@ #include "querytask.h" #include "storageapi.h" +#include "tdatablock.h" + SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index cbc3d77faf..f81fff0806 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -114,7 +114,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; - pInfo->pFinalRes = createOneDataBlock(pResBlock, false); + pInfo->pFinalRes = NULL; + + code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes); + TSDB_CHECK_CODE(code, lino, _error); + pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 600700ab37..8d21ccf780 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1747,8 +1747,9 @@ static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t ts } if (pInfo->partitionSup.needCalc) { - SSDataBlock* tmpBlock = createOneDataBlock(pResult, true); - QUERY_CHECK_NULL(tmpBlock, code, lino, _end, terrno); + SSDataBlock* tmpBlock = NULL; + code = createOneDataBlock(pResult, true, &tmpBlock); + QUERY_CHECK_CODE(code, lino, _end); blockDataCleanup(pResult); for (int32_t i = 0; i < tmpBlock->info.rows; i++) { @@ -3141,7 +3142,9 @@ FETCH_NEXT_BLOCK: printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo)); SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { - pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); + QUERY_CHECK_CODE(code, lino, _end); + code = filterDelBlockByUid(pDelBlock, pBlock, pInfo); QUERY_CHECK_CODE(code, lino, _end); } else { @@ -3782,7 +3785,8 @@ _end: } int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -3945,15 +3949,21 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } pInfo->pRes = createDataBlockFromDescNode(pDescNode); - pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); + code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; pInfo->pStreamScanOp = pOperator; pInfo->deleteDataIndex = 0; - pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); + code = createSpecialDataBlock(STREAM_DELETE_DATA, &pInfo->pDeleteDataRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; - pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); + createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes); + QUERY_CHECK_CODE(code, lino, _error); + if (hasPrimaryKeyCol(pInfo)) { code = addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes); QUERY_CHECK_CODE(code, lino, _error); @@ -3961,6 +3971,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->pkColType = pkType.type; pInfo->pkColLen = pkType.bytes; } + pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; @@ -3969,7 +3980,9 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); // for stream if (pTaskInfo->streamInfo.pState) { @@ -4770,10 +4783,16 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; pInput->type = SUB_TABLE_MEM_BLOCK; + code = dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond); QUERY_CHECK_CODE(code, lino, _end); - pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); - pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false); + + code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pReaderBlock); + QUERY_CHECK_CODE(code, lino, _end); + + code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pPageBlock); + QUERY_CHECK_CODE(code, lino, _end); + STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); pInput->pKeyInfo = keyInfo; @@ -5183,7 +5202,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { if (!bSkipped) { - pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); + int32_t code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]); + if (code) { + terrno = code; + return NULL; + } + ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), @@ -5685,8 +5709,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo); QUERY_CHECK_CODE(code, lino, _error); - pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); - QUERY_CHECK_NULL(pInfo->pReaderBlock, code, lino, _error, terrno); + + code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pReaderBlock); + QUERY_CHECK_CODE(code, lino, _error); pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; @@ -5696,7 +5721,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); // start one reader variable - pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pSortInputBlock); + QUERY_CHECK_CODE(code, lino, _error); if (!tsExperimental) { pInfo->filesetDelimited = false; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index de731299ab..6adc60b79e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -858,7 +858,10 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->ignoreExpiredData = pCountNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; @@ -870,7 +873,9 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index aca71d10fc..76eaccb4ec 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -900,7 +900,9 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pEventNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -922,8 +924,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pAllUpdated = NULL; } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); - QUERY_CHECK_NULL(pInfo->pCheckpointRes, code, lino, _error, terrno); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); pInfo->reCkBlock = false; pInfo->recvGetAll = false; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 1cecbc4a31..c6bf13dabd 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1394,11 +1394,8 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi } } - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); - if (!pInfo->pDelRes) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _error); - } + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index f6edf050b1..aebc2d9c97 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1918,10 +1918,15 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); - pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); + + code = createSpecialDataBlock(STREAM_RETRIEVE, &pInfo->pPullDataRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delKey.ts = INT64_MAX; @@ -1936,11 +1941,16 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); - pInfo->recvRetrive = false; - pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->recvPullover = false; - pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->recvRetrive = false; + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidRetriveRes); + QUERY_CHECK_CODE(code, lino, _error); + code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidPulloverRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); @@ -2101,13 +2111,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); - pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); + + int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock); + if (code) { + return code; + } + pSup->gap = gap; pSup->stateKeySize = keySize; pSup->stateKeyType = keyType; pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); if (pSup->pDummyCtx == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pSup->stateStore = *pStore; @@ -2129,7 +2144,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in } pSup->pSessionAPI = pApi; - return TSDB_CODE_SUCCESS; } @@ -3718,7 +3732,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pChildren = NULL; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; @@ -3734,7 +3750,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->isHistoryOp = pHandle->fillHistory; } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->clearState = false; pInfo->recvGetAll = false; pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; @@ -3771,6 +3789,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pOptrInfo = pOperator; return code; + _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); @@ -4847,7 +4866,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -4856,14 +4878,17 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->dataVersion = 0; pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); if (!pInfo->historyWins) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; QUERY_CHECK_CODE(code, lino, _error); } + if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; @@ -5157,7 +5182,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->invertible = false; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + + createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->pPhyNode = NULL; // create new child @@ -5187,7 +5215,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index dc3c8f8070..5dca0ebb73 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1176,11 +1176,17 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) { } } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + terrno = code; + return NULL; + } + for (int32_t i = 0; i < pMeta[index].colNum; ++i) { SColumnInfoData colInfoData = createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); - int32_t code = blockDataAppendColInfo(pBlock, &colInfoData); + code = blockDataAppendColInfo(pBlock, &colInfoData); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 36c3d49810..ab94493385 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -168,22 +168,11 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { } int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { - if (pBlock == NULL) { - *pBlock = NULL; - return TSDB_CODE_SUCCESS; - } - if (pSortHandle->pDataBlock == NULL) { *pBlock = NULL; return TSDB_CODE_SUCCESS; } - - *pBlock = createOneDataBlock(pSortHandle->pDataBlock, false); - if (*pBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } else { - return TSDB_CODE_SUCCESS; - } + return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); } #define AllocatedTupleType 0 @@ -271,9 +260,8 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, pSortHandle->forceUsePQSort = false; if (pBlock != NULL) { - pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); - if (pSortHandle->pDataBlock == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock); + if (code) { goto _err; } } @@ -500,7 +488,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataCleanup(pDataBlock); - SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); + SSDataBlock* pBlock = NULL; + int32_t code = createOneDataBlock(pDataBlock, false, &pBlock); + if (code) { + return code; + } + return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList); } @@ -994,7 +987,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { tMergeTreeDestroy(&pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; - SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); + SSDataBlock* pBlock = NULL; + + code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock); + if (code) { + taosArrayDestroy(pResList); + return code; + } + code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pResList); @@ -1481,12 +1481,19 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou static int32_t initRowIdSort(SSortHandle* pHandle) { SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL; - SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; + SColumnInfoData* extPkCol = + (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; SColumnInfoData pkCol = {0}; - SSDataBlock* pSortInput = createDataBlock(); + SSDataBlock* pSortInput = NULL; + int32_t code = createDataBlock(&pSortInput); + if (code) { + return code; + } + SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); - int32_t code = blockDataAppendColInfo(pSortInput, &tsCol); + + code = blockDataAppendColInfo(pSortInput, &tsCol); if (code) { blockDataDestroy(pSortInput); return code; @@ -1499,14 +1506,14 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { return code; } - SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); + SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); code = blockDataAppendColInfo(pSortInput, &offsetCol); if (code) { blockDataDestroy(pSortInput); return code; } - SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); + SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); code = blockDataAppendColInfo(pSortInput, &lengthCol); if (code) { blockDataDestroy(pSortInput); @@ -1525,9 +1532,9 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; -// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); -// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); - pHandle->pageSize = 256 * 1024; // 256k + // int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); + // size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); + pHandle->pageSize = 256 * 1024; // 256k pHandle->numOfPages = 256; SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); @@ -1928,7 +1935,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); } - SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); + SSDataBlock* pMemSrcBlk = NULL; + code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk); + if (code) { + cleanupMergeSup(&sup); + tMergeTreeDestroy(&pTree); + return code; + } + code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); cleanupMergeSup(&sup); @@ -2054,7 +2068,16 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { blockDataDestroy(pBlk); } } else { - SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true); + SSDataBlock* tBlk = NULL; + if (bExtractedBlock) { + tBlk = pBlk; + } else { + code = createOneDataBlock(pBlk, true, &tBlk); + if (code) { + return code; + } + } + code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); if (code) { return code; @@ -2175,7 +2198,11 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { // todo, number of pages are set according to the total available sort buffer pHandle->numOfPages = 1024; sortBufSize = pHandle->numOfPages * pHandle->pageSize; - pHandle->pDataBlock = createOneDataBlock(pBlock, false); + code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock); + if (code) { + freeSSortSource(source); + return code; + } } if (pHandle->beforeFp != NULL) { @@ -2453,11 +2480,10 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { } if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createOneDataBlock(pBlock, false); - } - - if (pHandle->pDataBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock); + if (code) { + return code; + } } size_t colNum = blockDataGetNumOfCols(pBlock); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index cefe12990d..4815e17b5a 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -71,7 +71,9 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { } if (pInfo->pBlock == NULL) { - pInfo->pBlock = createDataBlock(); + pInfo->pBlock = NULL; + int32_t code = createDataBlock(&pInfo->pBlock); + ASSERT(code == 0); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); blockDataAppendColInfo(pInfo->pBlock, &colInfo); @@ -129,7 +131,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { } if (pInfo->pBlock == NULL) { - pInfo->pBlock = createDataBlock(); + pInfo->pBlock = NULL; + + int32_t code = createDataBlock(&pInfo->pBlock); + ASSERT(code == 0); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1); blockDataAppendColInfo(pInfo->pBlock, &colInfo); diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 11bb9ad2b1..6100b0722d 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -928,8 +928,9 @@ SExecTaskInfo* createDummyTaskInfo(char* taskId) { } SSDataBlock* createDummyBlock(int32_t blkId) { - SSDataBlock* p = createDataBlock(); - assert(p); + SSDataBlock* p = NULL; + int32_t code = createDataBlock(&p); + assert(code == 0); p->info.id.blockId = blkId; p->info.type = STREAM_INVALID; diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index b4d1884597..877e3a924c 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -60,7 +60,12 @@ SSDataBlock* getSingleColDummyBlock(void* param) { return NULL; } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + + int32_t code = createDataBlock(&pBlock); + if (code) { + return NULL; + } SColumnInfoData colInfo = {0}; colInfo.info.type = pInfo->type; @@ -348,7 +353,10 @@ TEST(testCase, ordered_merge_sort_Test) { SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); taosArrayPush(orderInfo, &oi); - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + ASSERT(code == 0); + for (int32_t i = 0; i < 1; ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); blockDataAppendColInfo(pBlock, &colInfo); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c9b8a1e08b..9526576426 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1254,8 +1254,13 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t numOfCols = pInput->numOfInputCols; int32_t start = pInput->startRowIndex; int32_t numOfRows = pInput->numOfRows; + SSDataBlock *pTempBlock = NULL; + int32_t code = createDataBlock(&pTempBlock); + + if (code) { + return code; + } - SSDataBlock *pTempBlock = createDataBlock(); pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index aa8b88b738..7f94ecfb3e 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -88,7 +88,13 @@ int aggregateFuncTest() { return -1; } - SSDataBlock *pBlock = createDataBlock(); + SSDataBlock *pBlock = NULL; + + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; + } + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); blockDataAppendColInfo(pBlock, &colInfo); diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index b210e2c326..80e2c005a7 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -112,7 +112,10 @@ int32_t flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, } if (NULL == *block) { - SSDataBlock *res = createDataBlock(); + SSDataBlock *res = NULL; + int32_t code = createDataBlock(&res); + ASSERT(code == 0); + for (int32_t i = 0; i < 2; ++i) { SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, 10, 1 + i); FLT_ERR_RET(blockDataAppendColInfo(res, &idata)); diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index f247eb8432..2109d8d0d6 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -91,14 +91,16 @@ void scltInitLogFile() { int32_t scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows, SColumnInfo *colInfo) { if (newBlock) { - SSDataBlock *res = createDataBlock(); - if (NULL == res || NULL == res->pDataBlock) { + SSDataBlock *res = NULL; + int32_t code = createDataBlock(&res); + if (code != 0 || NULL == res->pDataBlock) { SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } SColumnInfoData idata = {0}; idata.info = *colInfo; - int32_t code = colInfoDataEnsureCapacity(&idata, rows, true); + + code = colInfoDataEnsureCapacity(&idata, rows, true); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(&idata); SCL_ERR_RET(code); @@ -185,7 +187,11 @@ int32_t scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, } if (NULL == *block) { - SSDataBlock *res = createDataBlock(); + SSDataBlock *res = NULL; + + int32_t code = createDataBlock(&res); + ASSERT(code == 0); + for (int32_t i = 0; i < 2; ++i) { SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1); code = colInfoDataEnsureCapacity(&idata, rowNum, true); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 4663a1f6e9..aa7a6da0a2 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -142,7 +142,9 @@ static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { int64_t nowMS = taosGetTimestampMs(); if (nowMS - pStub->createTime > ttl || -1 == ttl) { - taosArrayPush(delIndexArray, pSeqNum); + if (taosArrayPush(delIndexArray, pSeqNum) == NULL) { + return terrno; + } cnt++; SFsmCbMeta cbMeta = { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 187d65d975..f86ed69fc3 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1366,11 +1366,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, if (ret < 0) { return ret; } - /* - if (pDecoder->ofps) { - taosArrayPush(pDecoder->ofps, &ofp); - } - */ ofpCell = tdbPageGetCell(ofp, 0); if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { @@ -1411,11 +1406,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, if (ret < 0) { return ret; } - /* - if (pDecoder->ofps) { - taosArrayPush(pDecoder->ofps, &ofp); - } - */ ofpCell = tdbPageGetCell(ofp, 0); int lastKeyPage = 0; @@ -1642,7 +1632,10 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * SArray *ofps = pPage->pPager->ofps; if (ofps) { - taosArrayPush(ofps, &ofp); + if (taosArrayPush(ofps, &ofp) == NULL) { + ASSERT(0); + return terrno; + } } tdbPagerReturnPage(pPage->pPager, ofp, pTxn); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 49132037d4..660b757bb2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) { SArray* pArray = taosMemoryMalloc(sizeof(SArray)); if (pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pArray->size = 0; pArray->pData = taosMemoryCalloc(size, elemSize); if (pArray->pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pArray); return NULL; } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index c0ed0b98d0..17b985b5e0 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -169,7 +169,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) { taosThreadOnce(&cacheThreadInit, doInitRefreshThread); taosThreadMutexLock(&guard); - taosArrayPush(pCacheArrayList, &pCacheObj); + (void)taosArrayPush(pCacheArrayList, &pCacheObj); taosThreadMutexUnlock(&guard); return cacheRefreshWorker; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index e2e4d90849..e68d86e232 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -1275,18 +1275,23 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { char *itemName = NULL, *itemValueString = NULL; TAOS_CHECK_GOTO(tjsonGetObjectName(item, &itemName), NULL, _err_json); TAOS_CHECK_GOTO(tjsonGetObjectValueString(item, &itemValueString), NULL, _err_json); + if (itemValueString != NULL && itemName != NULL) { size_t itemNameLen = strlen(itemName); size_t itemValueStringLen = strlen(itemValueString); - cfgLineBuf = taosMemoryRealloc(cfgLineBuf, itemNameLen + itemValueStringLen + 3); - if (NULL == cfgLineBuf) { + void* px = taosMemoryRealloc(cfgLineBuf, itemNameLen + itemValueStringLen + 3); + if (NULL == px) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err_json; } + cfgLineBuf = px; + (void)memset(cfgLineBuf, 0, itemNameLen + itemValueStringLen + 3); + (void)memcpy(cfgLineBuf, itemName, itemNameLen); cfgLineBuf[itemNameLen] = ' '; (void)memcpy(&cfgLineBuf[itemNameLen + 1], itemValueString, itemValueStringLen); + (void)paGetToken(cfgLineBuf, &name, &olen); if (olen == 0) continue; name[olen] = 0; diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 7e165a12d5..e08dc009fc 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -326,7 +326,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - taosArrayPush(deleted, &old); + (void)taosArrayPush(deleted, &old); } } @@ -392,7 +392,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { - taosArrayPush(lastReferenceList, &e); + (void)taosArrayPush(lastReferenceList, &e); } else { if (freeOnFail) { taosMemoryFree(e); @@ -415,7 +415,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - taosArrayPush(lastReferenceList, &old); + (void)taosArrayPush(lastReferenceList, &old); } } if (handle == NULL) { @@ -536,7 +536,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; - taosArrayPush(lastReferenceList, &old); + (void)taosArrayPush(lastReferenceList, &old); } (void)taosThreadMutexUnlock(&shard->mutex);