diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 862bbee776..13ff3da53f 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -254,14 +254,23 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc); -SSDataBlock* createDataBlock(); -void blockDataDestroy(SSDataBlock* pBlock); -void blockDataFreeRes(SSDataBlock* pBlock); -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); -SSDataBlock* createSpecialDataBlock(EStreamType type); +#define QRY_OPTR_CHECK(_o) \ + do { \ + if ((_o) == NULL) { \ + return TSDB_CODE_INVALID_PARA; \ + } else { \ + *(_o) = NULL; \ + } \ + } while(0) -SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); -int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); +int32_t createDataBlock(SSDataBlock** pResBlock); +void blockDataDestroy(SSDataBlock* pBlock); +void blockDataFreeRes(SSDataBlock* pBlock); +int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock); +int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock); + +int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock); +int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 112731d39d..0c1924c8d8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1017,15 +1017,13 @@ void returnToUser(SRequestObj* pRequest) { } static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) { - int64_t lastTs = 0; - - int32_t code = TSDB_CODE_SUCCESS; + int64_t lastTs = 0; TAOS_FIELD* pResFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); - *pBlock = createDataBlock(); - if (NULL == *pBlock) { - return terrno; + int32_t code = createDataBlock(pBlock); + if (code) { + return code; } for(int32_t i = 0; i < numOfFields; ++i) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 08efd9cc08..347b7f8632 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -21,8 +21,6 @@ #define MALLOC_ALIGN_BYTES 32 - - int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (pColumnInfoData->reassigned) { @@ -134,7 +132,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const uint32_t len = pColumnInfoData->varmeta.length; pColumnInfoData->varmeta.offset[rowIndex] = len; - memmove(pColumnInfoData->pData + len, pData, dataLen); + (void) memmove(pColumnInfoData->pData + len, pData, dataLen); pColumnInfoData->varmeta.length += dataLen; } else { memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes); @@ -290,7 +288,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren pColumnInfoData->reassigned = true; } - return TSDB_CODE_SUCCESS; + return code; } int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, @@ -371,7 +369,7 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2) { if (pColumnInfoData->info.type != pSource->info.type) { - return TSDB_CODE_FAILED; + return TSDB_CODE_INVALID_PARA; } if (numOfRow2 == 0) { @@ -823,12 +821,15 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd } SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { + int32_t code = 0; if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { return NULL; } - SSDataBlock* pDst = createOneDataBlock(pBlock, false); - if (pDst == NULL) { + SSDataBlock* pDst = NULL; + code = createOneDataBlock(pBlock, false, &pDst); + if (code) { + terrno = code; return NULL; } @@ -859,7 +860,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 colDataSetNULL(pDstCol, j - startIndex); } else { char* p = colDataGetData(pColData, j); - colDataSetVal(pDstCol, j - startIndex, p, false); + code = colDataSetVal(pDstCol, j - startIndex, p, false); } } } @@ -1600,53 +1601,112 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { return TSDB_CODE_SUCCESS; } -SSDataBlock* createSpecialDataBlock(EStreamType type) { - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->info.hasVarCol = false; - pBlock->info.id.groupId = 0; - pBlock->info.rows = 0; - pBlock->info.type = type; - pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + - sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; - pBlock->info.watermark = INT64_MIN; +int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) { + QRY_OPTR_CHECK(pBlock); + + int32_t code = 0; + SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (p == NULL) { + return terrno; + } + + p->info.hasVarCol = false; + p->info.id.groupId = 0; + p->info.rows = 0; + p->info.type = type; + p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + + sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; + p->info.watermark = INT64_MIN; + + p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); + if (p->pDataBlock == NULL) { + taosMemoryFree(p); + return terrno; + } - pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.bytes = sizeof(TSKEY); + // window start ts - taosArrayPush(pBlock->pDataBlock, &infoData); + void* px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + // window end ts - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } infoData.info.type = TSDB_DATA_TYPE_UBIGINT; infoData.info.bytes = sizeof(uint64_t); + // uid - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + // group id - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.bytes = sizeof(TSKEY); + // calculate start ts - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + // calculate end ts - taosArrayPush(pBlock->pDataBlock, &infoData); + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } // table name infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); - - return pBlock; -} - -SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { - if (pDataBlock == NULL) { - return NULL; + px = taosArrayPush(p->pDataBlock, &infoData); + if (px == NULL) { + code = errno; + goto _err; + } + + *pBlock = p; + return code; + +_err: + taosArrayDestroy(p->pDataBlock); + taosMemoryFree(p); + return code; +} + +int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); + + if (pDataBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } - SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; pBlock->info.rows = 0; pBlock->info.capacity = 0; @@ -1658,11 +1718,10 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { blockDataAppendColInfo(pBlock, &colInfo); } - int32_t code = blockDataEnsureCapacity(pBlock, 1); + code = blockDataEnsureCapacity(pBlock, 1); if (code != TSDB_CODE_SUCCESS) { - terrno = code; blockDataDestroy(pBlock); - return NULL; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1670,13 +1729,17 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); void* pData = NULL; - if (!isNull) pData = colDataGetData(pSrc, rowIdx); - colDataSetVal(pDst, 0, pData, isNull); + if (!isNull) { + pData = colDataGetData(pSrc, rowIdx); + } + + code = colDataSetVal(pDst, 0, pData, isNull); } pBlock->info.rows = 1; - return pBlock; + *pResBlock = pBlock; + return code; } void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) { @@ -1699,12 +1762,18 @@ void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) { memcpy(p->pData, pDst->pks[1].pData, p->nData); } -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { +int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); if (pDataBlock == NULL) { - return NULL; + return TSDB_CODE_INVALID_PARA; + } + + SSDataBlock* pDstBlock = NULL; + int32_t code = createDataBlock(&pDstBlock); + if (code) { + return code; } - SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; pDstBlock->info.rows = 0; @@ -1723,11 +1792,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { copyPkVal(&pDstBlock->info, &pDataBlock->info); if (copyData) { - int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); + code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { - terrno = code; blockDataDestroy(pDstBlock); - return NULL; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1740,24 +1808,26 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pDstBlock->info.capacity = pDataBlock->info.rows; } - return pDstBlock; + *pResBlock = pDstBlock; + return code; } -SSDataBlock* createDataBlock() { +int32_t createDataBlock(SSDataBlock** pResBlock) { + QRY_OPTR_CHECK(pResBlock); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno; } pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); if (pBlock->pDataBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = terrno; taosMemoryFree(pBlock); - return NULL; + return code; } - return pBlock; + *pResBlock = pBlock; + return 0; } int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) { @@ -1771,7 +1841,6 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -1863,7 +1932,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { int32_t newLen = BitmapLen(total - n); if (n % 8 == 0) { - memmove(nullBitmap, nullBitmap + n / 8, newLen); + (void) memmove(nullBitmap, nullBitmap + n / 8, newLen); } else { int32_t tail = n % 8; int32_t i = 0; @@ -1924,23 +1993,23 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s } if (dataOffset > 0) { - memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); + (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); } - memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); + (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); return dataLen; } static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total); - memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); + (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); // clear the offset value of the unused entries. memset(&pColInfoData->varmeta.offset[total - n], 0, n); } else { int32_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes); + (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes); doShiftBitmap(pColInfoData->nullbitmap, n, total); } } @@ -2258,18 +2327,19 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int64_t uid, int32_t vgId, tb_uid_t suid) { SSubmitReq2* pReq = *ppReq; SArray* pVals = NULL; - int32_t numOfBlks = 0; int32_t sz = 1; - - terrno = TSDB_CODE_SUCCESS; + int32_t code = 0; + *ppReq = NULL; + terrno = 0; if (NULL == pReq) { if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _end; } if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + code = terrno; goto _end; } } @@ -2317,13 +2387,23 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat isStartKey = true; ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var})); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + return terrno; + } + } else if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + return terrno; + } } else { SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(int64_t*)var})); - taosArrayPush(pVals, &cv); + void* px = taosArrayPush(pVals, &cv); + if (px == NULL) { + return terrno; + } } break; case TSDB_DATA_TYPE_NCHAR: @@ -2394,29 +2474,38 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } } SRow* pRow = NULL; - if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) { + if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) { tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); goto _end; } + ASSERT(pRow); - taosArrayPush(tbData.aRowP, &pRow); + void* px = taosArrayPush(tbData.aRowP, &pRow); + if (px == NULL) { + code = terrno; + goto _end; + } } - taosArrayPush(pReq->aSubmitTbData, &tbData); + void* px = taosArrayPush(pReq->aSubmitTbData, &tbData); + if (px == NULL) { + code = terrno; + goto _end; + } } + _end: taosArrayDestroy(pVals); - if (terrno != 0) { - *ppReq = NULL; + if (code != 0) { if (pReq) { tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFreeClear(pReq); } - - return TSDB_CODE_FAILED; + } else { + *ppReq = pReq; } - *ppReq = pReq; - return TSDB_CODE_SUCCESS; + + return code; } void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) { @@ -2579,14 +2668,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { } colSizes[col] += colSize; dataLen += colSize; - memmove(data, pColData, colSize); + (void) memmove(data, pColData, colSize); data += colSize; } } else { colSizes[col] = colDataGetLength(pColRes, numOfRows); dataLen += colSizes[col]; if (pColRes->pData != NULL) { - memmove(data, pColRes->pData, colSizes[col]); + (void) memmove(data, pColRes->pData, colSizes[col]); } data += colSizes[col]; } diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index b539d6731f..5095bf3bfc 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -236,7 +236,9 @@ TEST(testCase, toInteger_test) { } TEST(testCase, Datablock_test) { - SSDataBlock* b = createDataBlock(); + SSDataBlock* b = NULL; + int32_t code = createDataBlock(&b); + ASSERT(code == 0); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1); taosArrayPush(b->pDataBlock, &infoData); @@ -361,7 +363,9 @@ TEST(testCase, non_var_dataBlock_split_test) { TEST(testCase, var_dataBlock_split_test) { int32_t numOfRows = 1000000; - SSDataBlock* b = createDataBlock(); + SSDataBlock* b = NULL; + int32_t code = createDataBlock(&b); + ASSERT(code == 0); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1); blockDataAppendColInfo(b, &infoData); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 894d888e2d..687f21845e 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -289,7 +289,12 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { int32_t numOfCols = pShow->pMeta->numOfColumns; - SSDataBlock *pBlock = createDataBlock(); + SSDataBlock *pBlock = NULL; + code = createDataBlock(&pBlock); + if (code) { + TAOS_RETURN(code); + } + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7c6194a3ea..c779a17301 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -289,7 +289,13 @@ STqReader* tqReaderOpen(SVnode* pVnode) { pReader->cachedSchemaSuid = 0; pReader->pSchemaWrapper = NULL; pReader->tbIdHash = NULL; - pReader->pResBlock = createDataBlock(); + pReader->pResBlock = NULL; + + int32_t code = createDataBlock(&pReader->pResBlock); + if (code) { + terrno = code; + } + return pReader; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index b72288425b..5df8c97962 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -117,8 +117,13 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } STqOffsetVal offset = {0}; qStreamExtractOffset(task, &offset); - pHandle->block = createOneDataBlock(pDataBlock, true); - TSDB_CHECK_NULL(pDataBlock, code, line, END, terrno); + pHandle->block = NULL; + + code = createOneDataBlock(pDataBlock, true, &pHandle->block); + if (code) { + return code; + } + pHandle->blockTime = offset.ts; tOffsetDestroy(&offset); code = getDataBlock(task, pHandle, vgId, &pDataBlock); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 91074b32f6..9724973440 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -576,8 +576,10 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* goto END; } - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); - TSDB_CHECK_NULL(pDelBlock, code, line, END, terrno) + SSDataBlock* pDelBlock = NULL; + code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); + TSDB_CHECK_CODE(code, line, END); + code = blockDataEnsureCapacity(pDelBlock, numOfTables); TSDB_CHECK_CODE(code, line, END); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a0063cbf29..a3210dbfd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -402,28 +402,30 @@ static void initReaderStatus(SReaderStatus* pStatus) { } static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) { - *pResBlock = createDataBlock(); - if (*pResBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pResBlock); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code != 0) { + return code; } for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info = pCond->colList[i]; - int32_t code = blockDataAppendColInfo(*pResBlock, &colInfo); + code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(*pResBlock); - *pResBlock = NULL; + taosMemoryFree(pBlock); return code; } } - int32_t code = blockDataEnsureCapacity(*pResBlock, capacity); + code = blockDataEnsureCapacity(pBlock, capacity); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(*pResBlock); - *pResBlock = NULL; + taosMemoryFree(pBlock); } + *pResBlock = pBlock; return code; } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 07409b1411..bb5766feaa 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -74,13 +74,16 @@ static int32_t getSchemaBytes(const SSchema* pSchema) { } static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_FIELD_LEN, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_TYPE_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -229,13 +232,16 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** static int32_t execResetQueryCache() { return catalogClearCache(); } static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_FIELD2_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -418,13 +424,16 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT } static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD1_LEN, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD2_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -439,13 +448,16 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { } static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD1_LEN, 1); - int32_t code = blockDataAppendColInfo(pBlock, &infoData); + code = blockDataAppendColInfo(pBlock, &infoData); if (TSDB_CODE_SUCCESS == code) { infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD2_LEN, 2); code = blockDataAppendColInfo(pBlock, &infoData); @@ -894,9 +906,12 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { } static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) { - SSDataBlock* pBlock = createDataBlock(); - if (NULL == pBlock) { - return TSDB_CODE_OUT_OF_MEMORY; + QRY_OPTR_CHECK(pOutput); + + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; } SNode* pProj = NULL; @@ -912,8 +927,9 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p } QRY_ERR_RET(blockDataAppendColInfo(pBlock, &infoData)); } + *pOutput = pBlock; - return TSDB_CODE_SUCCESS; + return code; } int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) { diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 1c95735c14..8d9f1fb9cc 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1941,7 +1941,8 @@ _return: } int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { - int32_t code = 0; + int32_t code = 0; + SSDataBlock *pBlock = NULL; SExplainCtx *pCtx = (SExplainCtx *)ctx; int32_t rowNum = taosArrayGetSize(pCtx->rows); if (rowNum <= 0) { @@ -1949,7 +1950,9 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { QRY_ERR_RET(TSDB_CODE_APP_ERROR); } - SSDataBlock *pBlock = createDataBlock(); + code = createDataBlock(&pBlock); + QRY_ERR_JRET(code); + SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1); QRY_ERR_JRET(blockDataAppendColInfo(pBlock, &infoData)); QRY_ERR_JRET(blockDataEnsureCapacity(pBlock, rowNum)); diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index c047c496ca..af8532b5b3 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -20,15 +20,6 @@ extern "C" { #endif -#define QRY_OPTR_CHECK(_o) \ - do { \ - if ((_o) == NULL) { \ - return TSDB_CODE_INVALID_PARA; \ - } else { \ - *(_o) = NULL; \ - } \ - } while(0) - typedef struct SOperatorCostInfo { double openCost; double totalCost; diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 67c449d3ac..7e105d2260 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -364,7 +364,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + code = createDataBlock(&pBlock); + if (code) { + return code; + } + pBlock->info.rows = 1; pBlock->info.capacity = 0; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b6994e7036..9d49c8e9ca 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -181,7 +181,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl capacity = TMIN(totalTables, 4096); - pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); + pInfo->pBufferedRes = NULL; + code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes); + QUERY_CHECK_CODE(code, lino, _error); + setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 7c74fa7a44..3329001cbc 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -672,7 +672,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + code = createDataBlock(&pBlock); + QUERY_CHECK_CODE(code, lino, _end); + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId); code = blockDataAppendColInfo(pBlock, &idata); @@ -791,8 +794,8 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks); blockDataCleanup(pb); } else { - pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); - QUERY_CHECK_NULL(pb, code, lino, _end, terrno); + code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb); + QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); } int32_t compLen = *(int32_t*)pStart; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a8e7d0e03f..f263418e3e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -251,9 +251,13 @@ SArray* createSortInfo(SNodeList* pNodeList) { } SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { - int32_t numOfCols = LIST_LENGTH(pNode->pSlots); - - SSDataBlock* pBlock = createDataBlock(); + int32_t numOfCols = LIST_LENGTH(pNode->pSlots); + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + terrno = code; + return NULL; + } pBlock->info.id.blockId = pNode->dataBlockId; pBlock->info.type = STREAM_INVALID; @@ -267,7 +271,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { idata.info.scale = pDescNode->dataType.scale; idata.info.precision = pDescNode->dataType.precision; - int32_t code = blockDataAppendColInfo(pBlock, &idata); + code = blockDataAppendColInfo(pBlock, &idata); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); @@ -1029,11 +1033,9 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S SStorageAPI* pStorageAPI) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SSDataBlock* pResBlock = createDataBlock(); - if (pResBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSDataBlock* pResBlock = NULL; + code = createDataBlock(&pResBlock); + QUERY_CHECK_CODE(code, lino, _end); for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) { SColumnInfoData colInfo = {0}; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d864a87f75..fe74037f0f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -695,9 +695,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo while (pRes != NULL) { SSDataBlock* p = NULL; if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { - SSDataBlock* p1 = createOneDataBlock(pRes, true); - void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + SSDataBlock* p1 = NULL; + code = createOneDataBlock(pRes, true, &p1); + void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); + QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); p = p1; } else { p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 1a433ba025..4b71c5ee3f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -513,7 +513,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi goto _error; } - pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); + pInfo->pFinalRes = NULL; + + code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes); + if (code) { + goto _error; + } + code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 173e67cc16..706cf50270 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -613,7 +613,10 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc return NULL; } - SSDataBlock* pDstBlock = createDataBlock(); + SSDataBlock* pDstBlock = NULL; + code = createDataBlock(&pDstBlock); + QUERY_CHECK_CODE(code, lino, _end); + pDstBlock->info = pDataBlock->info; pDstBlock->info.id.blockId = pOperator->resultDataBlockId; pDstBlock->info.capacity = 0; @@ -1319,7 +1322,10 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag QUERY_CHECK_CODE(code, lino, _end); if (winCode != TSDB_CODE_SUCCESS) { - SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); + SSDataBlock* pTmpBlock = NULL; + code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock); + QUERY_CHECK_CODE(code, lino, _end); + memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); pTmpBlock->info.id.groupId = groupId; char* tbName = pSrcBlock->info.parTbName; @@ -1708,8 +1714,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); taosHashSetFreeFp(pInfo->pPartitions, freePartItem); pInfo->tsColIndex = 0; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); - QUERY_CHECK_NULL(pInfo->pDelRes, code, lino, _error, terrno); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 9f6318b58d..807d6b9785 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -1157,22 +1157,23 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); if (TSDB_CODE_SUCCESS != code) { - QRY_ERR_RET(terrno); + QRY_ERR_RET(code); } if (NULL != pJoin->pPreFilter) { - pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false); - if (NULL == pJoin->finBlk) { - QRY_ERR_RET(terrno); + pJoin->midBlk = NULL; + code = createOneDataBlock(pJoin->finBlk, false, &pJoin->midBlk); + if (code) { + QRY_ERR_RET(code); } + code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity); if (TSDB_CODE_SUCCESS != code) { - QRY_ERR_RET(terrno); + QRY_ERR_RET(code); } } pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO; - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 5381abe28e..7bab656e1b 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -2248,10 +2248,12 @@ static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInf } while (true); if (buildGot && NULL == pCtx->cache.outBlk) { - pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); - if (NULL == pCtx->cache.outBlk) { - MJ_ERR_RET(terrno); + pCtx->cache.outBlk = NULL; + int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk); + if (code) { + MJ_ERR_RET(code); } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); } @@ -2678,10 +2680,12 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo if (buildGot && pJoin->build->newBlk) { if (NULL == pCtx->cache.outBlk) { - pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); - if (NULL == pCtx->cache.outBlk) { - MJ_ERR_RET(terrno); + pCtx->cache.outBlk = NULL; + int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk); + if (code) { + MJ_ERR_RET(code); } + MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); } @@ -2833,7 +2837,11 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { if (!pGrp->clonedBlk) { if (0 == pGrp->beginIdx) { - pGrp->blk = createOneDataBlock(pGrp->blk, true); + pGrp->blk = NULL; + int32_t code = createOneDataBlock(pGrp->blk, true, &pGrp->blk); + if (code) { + MJ_ERR_RET(code); + } } else { pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx); pGrp->endIdx -= pGrp->beginIdx; @@ -3672,9 +3680,10 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); if (pJoin->pFPreFilter) { - pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); - if (NULL == pCtx->midBlk) { - MJ_ERR_RET(terrno); + pCtx->midBlk = NULL; + int32_t code = createOneDataBlock(pCtx->finBlk, false, &pCtx->midBlk); + if (code) { + MJ_ERR_RET(code); } MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity)); } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index d425c87019..04dca1c61f 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1331,10 +1331,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol } if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) { - pGrp->blk = createOneDataBlock(pTable->blk, true); - if (NULL == pGrp->blk) { - MJ_ERR_RET(terrno); + pGrp->blk = NULL; + code = createOneDataBlock(pTable->blk, true, &pGrp->blk); + if (code) { + MJ_ERR_RET(code); } + if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) { MJ_ERR_RET(terrno); } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 31757ef70c..3b390c8719 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -207,7 +207,6 @@ int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { if (pSortMergeInfo->pIntermediateBlock == NULL) { pSortMergeInfo->pIntermediateBlock = NULL; - code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock); if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) { return code; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 9b85f53494..f9ef57ec5e 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -27,6 +27,8 @@ #include "querytask.h" #include "storageapi.h" +#include "tdatablock.h" + SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index cbc3d77faf..f81fff0806 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -114,7 +114,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; - pInfo->pFinalRes = createOneDataBlock(pResBlock, false); + pInfo->pFinalRes = NULL; + + code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes); + TSDB_CHECK_CODE(code, lino, _error); + pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 600700ab37..8d21ccf780 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1747,8 +1747,9 @@ static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t ts } if (pInfo->partitionSup.needCalc) { - SSDataBlock* tmpBlock = createOneDataBlock(pResult, true); - QUERY_CHECK_NULL(tmpBlock, code, lino, _end, terrno); + SSDataBlock* tmpBlock = NULL; + code = createOneDataBlock(pResult, true, &tmpBlock); + QUERY_CHECK_CODE(code, lino, _end); blockDataCleanup(pResult); for (int32_t i = 0; i < tmpBlock->info.rows; i++) { @@ -3141,7 +3142,9 @@ FETCH_NEXT_BLOCK: printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo)); SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { - pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); + QUERY_CHECK_CODE(code, lino, _end); + code = filterDelBlockByUid(pDelBlock, pBlock, pInfo); QUERY_CHECK_CODE(code, lino, _end); } else { @@ -3782,7 +3785,8 @@ _end: } int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -3945,15 +3949,21 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } pInfo->pRes = createDataBlockFromDescNode(pDescNode); - pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); + code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; pInfo->pStreamScanOp = pOperator; pInfo->deleteDataIndex = 0; - pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); + code = createSpecialDataBlock(STREAM_DELETE_DATA, &pInfo->pDeleteDataRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; - pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); + createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes); + QUERY_CHECK_CODE(code, lino, _error); + if (hasPrimaryKeyCol(pInfo)) { code = addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes); QUERY_CHECK_CODE(code, lino, _error); @@ -3961,6 +3971,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->pkColType = pkType.type; pInfo->pkColLen = pkType.bytes; } + pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; @@ -3969,7 +3980,9 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); // for stream if (pTaskInfo->streamInfo.pState) { @@ -4770,10 +4783,16 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; pInput->type = SUB_TABLE_MEM_BLOCK; + code = dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond); QUERY_CHECK_CODE(code, lino, _end); - pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); - pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false); + + code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pReaderBlock); + QUERY_CHECK_CODE(code, lino, _end); + + code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pPageBlock); + QUERY_CHECK_CODE(code, lino, _end); + STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); pInput->pKeyInfo = keyInfo; @@ -5183,7 +5202,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { if (!bSkipped) { - pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); + int32_t code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]); + if (code) { + terrno = code; + return NULL; + } + ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), @@ -5685,8 +5709,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo); QUERY_CHECK_CODE(code, lino, _error); - pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); - QUERY_CHECK_NULL(pInfo->pReaderBlock, code, lino, _error, terrno); + + code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pReaderBlock); + QUERY_CHECK_CODE(code, lino, _error); pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; @@ -5696,7 +5721,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); // start one reader variable - pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pSortInputBlock); + QUERY_CHECK_CODE(code, lino, _error); if (!tsExperimental) { pInfo->filesetDelimited = false; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index de731299ab..6adc60b79e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -858,7 +858,10 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->ignoreExpiredData = pCountNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; @@ -870,7 +873,9 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index aca71d10fc..76eaccb4ec 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -900,7 +900,9 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pEventNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -922,8 +924,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pAllUpdated = NULL; } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); - QUERY_CHECK_NULL(pInfo->pCheckpointRes, code, lino, _error, terrno); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); pInfo->reCkBlock = false; pInfo->recvGetAll = false; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 1cecbc4a31..c6bf13dabd 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1394,11 +1394,8 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi } } - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); - if (!pInfo->pDelRes) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _error); - } + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index f6edf050b1..aebc2d9c97 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1918,10 +1918,15 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); - pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); + + code = createSpecialDataBlock(STREAM_RETRIEVE, &pInfo->pPullDataRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delKey.ts = INT64_MAX; @@ -1936,11 +1941,16 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); - pInfo->recvRetrive = false; - pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->recvPullover = false; - pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->recvRetrive = false; + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidRetriveRes); + QUERY_CHECK_CODE(code, lino, _error); + code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidPulloverRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); @@ -2101,13 +2111,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SStorageAPI* pApi, int32_t tsIndex) { pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); - pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); + + int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock); + if (code) { + return code; + } + pSup->gap = gap; pSup->stateKeySize = keySize; pSup->stateKeyType = keyType; pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); if (pSup->pDummyCtx == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pSup->stateStore = *pStore; @@ -2129,7 +2144,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in } pSup->pSessionAPI = pApi; - return TSDB_CODE_SUCCESS; } @@ -3718,7 +3732,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pChildren = NULL; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; @@ -3734,7 +3750,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->isHistoryOp = pHandle->fillHistory; } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->clearState = false; pInfo->recvGetAll = false; pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; @@ -3771,6 +3789,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pOptrInfo = pOperator; return code; + _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); @@ -4847,7 +4866,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -4856,14 +4878,17 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->dataVersion = 0; pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); if (!pInfo->historyWins) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; QUERY_CHECK_CODE(code, lino, _error); } + if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; @@ -5157,7 +5182,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->invertible = false; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; - pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + + createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->pPhyNode = NULL; // create new child @@ -5187,7 +5215,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; - pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index dc3c8f8070..5dca0ebb73 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1176,11 +1176,17 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) { } } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + if (code) { + terrno = code; + return NULL; + } + for (int32_t i = 0; i < pMeta[index].colNum; ++i) { SColumnInfoData colInfoData = createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); - int32_t code = blockDataAppendColInfo(pBlock, &colInfoData); + code = blockDataAppendColInfo(pBlock, &colInfoData); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); blockDataDestroy(pBlock); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 36c3d49810..ab94493385 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -168,22 +168,11 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { } int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { - if (pBlock == NULL) { - *pBlock = NULL; - return TSDB_CODE_SUCCESS; - } - if (pSortHandle->pDataBlock == NULL) { *pBlock = NULL; return TSDB_CODE_SUCCESS; } - - *pBlock = createOneDataBlock(pSortHandle->pDataBlock, false); - if (*pBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } else { - return TSDB_CODE_SUCCESS; - } + return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); } #define AllocatedTupleType 0 @@ -271,9 +260,8 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, pSortHandle->forceUsePQSort = false; if (pBlock != NULL) { - pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); - if (pSortHandle->pDataBlock == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock); + if (code) { goto _err; } } @@ -500,7 +488,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataCleanup(pDataBlock); - SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); + SSDataBlock* pBlock = NULL; + int32_t code = createOneDataBlock(pDataBlock, false, &pBlock); + if (code) { + return code; + } + return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList); } @@ -994,7 +987,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { tMergeTreeDestroy(&pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; - SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); + SSDataBlock* pBlock = NULL; + + code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock); + if (code) { + taosArrayDestroy(pResList); + return code; + } + code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pResList); @@ -1481,12 +1481,19 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou static int32_t initRowIdSort(SSortHandle* pHandle) { SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL; - SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; + SColumnInfoData* extPkCol = + (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; SColumnInfoData pkCol = {0}; - SSDataBlock* pSortInput = createDataBlock(); + SSDataBlock* pSortInput = NULL; + int32_t code = createDataBlock(&pSortInput); + if (code) { + return code; + } + SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); - int32_t code = blockDataAppendColInfo(pSortInput, &tsCol); + + code = blockDataAppendColInfo(pSortInput, &tsCol); if (code) { blockDataDestroy(pSortInput); return code; @@ -1499,14 +1506,14 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { return code; } - SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); + SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); code = blockDataAppendColInfo(pSortInput, &offsetCol); if (code) { blockDataDestroy(pSortInput); return code; } - SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); + SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); code = blockDataAppendColInfo(pSortInput, &lengthCol); if (code) { blockDataDestroy(pSortInput); @@ -1525,9 +1532,9 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; -// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); -// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); - pHandle->pageSize = 256 * 1024; // 256k + // int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); + // size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); + pHandle->pageSize = 256 * 1024; // 256k pHandle->numOfPages = 256; SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); @@ -1928,7 +1935,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); } - SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); + SSDataBlock* pMemSrcBlk = NULL; + code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk); + if (code) { + cleanupMergeSup(&sup); + tMergeTreeDestroy(&pTree); + return code; + } + code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); cleanupMergeSup(&sup); @@ -2054,7 +2068,16 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { blockDataDestroy(pBlk); } } else { - SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true); + SSDataBlock* tBlk = NULL; + if (bExtractedBlock) { + tBlk = pBlk; + } else { + code = createOneDataBlock(pBlk, true, &tBlk); + if (code) { + return code; + } + } + code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); if (code) { return code; @@ -2175,7 +2198,11 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { // todo, number of pages are set according to the total available sort buffer pHandle->numOfPages = 1024; sortBufSize = pHandle->numOfPages * pHandle->pageSize; - pHandle->pDataBlock = createOneDataBlock(pBlock, false); + code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock); + if (code) { + freeSSortSource(source); + return code; + } } if (pHandle->beforeFp != NULL) { @@ -2453,11 +2480,10 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { } if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createOneDataBlock(pBlock, false); - } - - if (pHandle->pDataBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock); + if (code) { + return code; + } } size_t colNum = blockDataGetNumOfCols(pBlock); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index cefe12990d..4815e17b5a 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -71,7 +71,9 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { } if (pInfo->pBlock == NULL) { - pInfo->pBlock = createDataBlock(); + pInfo->pBlock = NULL; + int32_t code = createDataBlock(&pInfo->pBlock); + ASSERT(code == 0); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); blockDataAppendColInfo(pInfo->pBlock, &colInfo); @@ -129,7 +131,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { } if (pInfo->pBlock == NULL) { - pInfo->pBlock = createDataBlock(); + pInfo->pBlock = NULL; + + int32_t code = createDataBlock(&pInfo->pBlock); + ASSERT(code == 0); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1); blockDataAppendColInfo(pInfo->pBlock, &colInfo); diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 11bb9ad2b1..6100b0722d 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -928,8 +928,9 @@ SExecTaskInfo* createDummyTaskInfo(char* taskId) { } SSDataBlock* createDummyBlock(int32_t blkId) { - SSDataBlock* p = createDataBlock(); - assert(p); + SSDataBlock* p = NULL; + int32_t code = createDataBlock(&p); + assert(code == 0); p->info.id.blockId = blkId; p->info.type = STREAM_INVALID; diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index b4d1884597..877e3a924c 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -60,7 +60,12 @@ SSDataBlock* getSingleColDummyBlock(void* param) { return NULL; } - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + + int32_t code = createDataBlock(&pBlock); + if (code) { + return NULL; + } SColumnInfoData colInfo = {0}; colInfo.info.type = pInfo->type; @@ -348,7 +353,10 @@ TEST(testCase, ordered_merge_sort_Test) { SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); taosArrayPush(orderInfo, &oi); - SSDataBlock* pBlock = createDataBlock(); + SSDataBlock* pBlock = NULL; + int32_t code = createDataBlock(&pBlock); + ASSERT(code == 0); + for (int32_t i = 0; i < 1; ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); blockDataAppendColInfo(pBlock, &colInfo); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c9b8a1e08b..9526576426 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1254,8 +1254,13 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t numOfCols = pInput->numOfInputCols; int32_t start = pInput->startRowIndex; int32_t numOfRows = pInput->numOfRows; + SSDataBlock *pTempBlock = NULL; + int32_t code = createDataBlock(&pTempBlock); + + if (code) { + return code; + } - SSDataBlock *pTempBlock = createDataBlock(); pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index aa8b88b738..7f94ecfb3e 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -88,7 +88,13 @@ int aggregateFuncTest() { return -1; } - SSDataBlock *pBlock = createDataBlock(); + SSDataBlock *pBlock = NULL; + + int32_t code = createDataBlock(&pBlock); + if (code) { + return code; + } + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); blockDataAppendColInfo(pBlock, &colInfo); diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index b210e2c326..80e2c005a7 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -112,7 +112,10 @@ int32_t flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, } if (NULL == *block) { - SSDataBlock *res = createDataBlock(); + SSDataBlock *res = NULL; + int32_t code = createDataBlock(&res); + ASSERT(code == 0); + for (int32_t i = 0; i < 2; ++i) { SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, 10, 1 + i); FLT_ERR_RET(blockDataAppendColInfo(res, &idata)); diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index f247eb8432..2109d8d0d6 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -91,14 +91,16 @@ void scltInitLogFile() { int32_t scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows, SColumnInfo *colInfo) { if (newBlock) { - SSDataBlock *res = createDataBlock(); - if (NULL == res || NULL == res->pDataBlock) { + SSDataBlock *res = NULL; + int32_t code = createDataBlock(&res); + if (code != 0 || NULL == res->pDataBlock) { SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } SColumnInfoData idata = {0}; idata.info = *colInfo; - int32_t code = colInfoDataEnsureCapacity(&idata, rows, true); + + code = colInfoDataEnsureCapacity(&idata, rows, true); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(&idata); SCL_ERR_RET(code); @@ -185,7 +187,11 @@ int32_t scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, } if (NULL == *block) { - SSDataBlock *res = createDataBlock(); + SSDataBlock *res = NULL; + + int32_t code = createDataBlock(&res); + ASSERT(code == 0); + for (int32_t i = 0; i < 2; ++i) { SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1); code = colInfoDataEnsureCapacity(&idata, rowNum, true); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 884d7ea1b6..d8656c0f60 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci } int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); + size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { if (input[0] == 1) { - return FL2_decompress(output, outputSize, input + 1, compressedSize - 1); + return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1); } else if (input[0] == 0) { memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1;