Merge pull request #27074 from taosdata/fix/TD-31288-2

check function res
This commit is contained in:
Haojun Liao 2024-08-08 13:27:22 +08:00 committed by GitHub
commit 27586ef650
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 49 additions and 35 deletions

View File

@ -343,6 +343,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SSDataBlock* pBlock = NULL;
if (!tsCountAlwaysReturnValue) { if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -366,7 +367,6 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* pBlock = NULL;
code = createDataBlock(&pBlock); code = createDataBlock(&pBlock);
if (code) { if (code) {
return code; return code;
@ -414,6 +414,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(pBlock);
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
} }
return code; return code;

View File

@ -294,10 +294,11 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;

View File

@ -110,11 +110,11 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;

View File

@ -735,6 +735,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
pBlock = NULL;
code = createDataBlock(&pBlock); code = createDataBlock(&pBlock);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);

View File

@ -278,7 +278,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
pBlock = NULL; pBlock = NULL;
terrno = code; terrno = TSDB_CODE_INVALID_PARA;
break; break;
} }
SColumnInfoData idata = SColumnInfoData idata =
@ -1094,7 +1094,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
code = blockDataEnsureCapacity(pResBlock, numOfTables); code = blockDataEnsureCapacity(pResBlock, numOfTables);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
taosMemoryFree(pResBlock); blockDataDestroy(pResBlock);
return NULL; return NULL;
} }
@ -1166,7 +1166,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pResBlock); blockDataDestroy(pResBlock);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
terrno = code; terrno = code;
return NULL; return NULL;

View File

@ -4319,11 +4319,12 @@ static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray*
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t numOfTables = taosArrayGetSize(aUidTags); int32_t numOfTables = taosArrayGetSize(aUidTags);
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
QUERY_CHECK_NULL(pResBlock, code, lino, _end, terrno); QUERY_CHECK_NULL(pResBlock, code, lino, _end, terrno);
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); pBlockList = taosArrayInit(1, POINTER_BYTES);
QUERY_CHECK_NULL(pBlockList, code, lino, _end, terrno); QUERY_CHECK_NULL(pBlockList, code, lino, _end, terrno);
void* tmp = taosArrayPush(pBlockList, &pResBlock); void* tmp = taosArrayPush(pBlockList, &pResBlock);
@ -5836,7 +5837,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
SOperatorInfo** pOptrInfo) { SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo); QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -5849,16 +5851,10 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
int32_t numOfCols = 0; int32_t numOfCols = 0;
code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
&pInfo->base.matchInfo); &pInfo->base.matchInfo);
int32_t lino = 0; QUERY_CHECK_CODE(code, lino, _error);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
taosArrayDestroy(pInfo->base.matchInfo.pList);
goto _error;
}
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pTableScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->base.pseudoSup; SExprSupp* pSup = &pInfo->base.pseudoSup;
@ -5873,10 +5869,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { QUERY_CHECK_NULL(pInfo->base.metaCache.pTableMetaEntryCache, code, lino, _error, terrno);
code = terrno;
goto _error;
}
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
@ -5893,9 +5886,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
goto _error;
}
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
@ -5957,6 +5948,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
return code; return code;
_error: _error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
pTaskInfo->code = code; pTaskInfo->code = code;
pInfo->base.pTableListInfo = NULL; pInfo->base.pTableListInfo = NULL;
if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo); if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo);

View File

@ -5203,6 +5203,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->interval = (SInterval){ pInfo->interval = (SInterval){
.interval = pIntervalPhyNode->interval, .interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding, .sliding = pIntervalPhyNode->sliding,
@ -5230,7 +5232,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
pSup->hasWindowOrGroup = true; pSup->hasWindowOrGroup = true;
initBasicInfo(&pInfo->binfo, pResBlock);
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);

View File

@ -780,6 +780,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
int32_t ret = 0; int32_t ret = 0;
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno);
} else { } else {
(void)pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0); (void)pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0);
} }
@ -1578,6 +1579,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno);
firstMetaCursor = 1; firstMetaCursor = 1;
} }
if (!firstMetaCursor) { if (!firstMetaCursor) {

View File

@ -152,8 +152,10 @@ static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }
* @param colIndex the columnIndex, for setting null bitmap * @param colIndex the columnIndex, for setting null bitmap
* @return the next offset to add field * @return the next offset to add field
* */ * */
static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, static inline int32_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data,
size_t length, bool isNull, uint32_t tupleLen) { size_t length, bool isNull, uint32_t tupleLen, uint32_t* pOffset) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
tupleSetOffset(*t, colIdx, offset); tupleSetOffset(*t, colIdx, offset);
if (isNull) { if (isNull) {
@ -161,16 +163,20 @@ static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, u
} else { } else {
if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
void* px = taosMemoryRealloc(*t, offset + length); void* px = taosMemoryRealloc(*t, offset + length);
if (px == NULL) { QUERY_CHECK_NULL(px, code, lino, _end, terrno);
return terrno;
}
*t = px; *t = px;
} }
tupleSetData(*t, offset, data, length); tupleSetData(*t, offset, data, length);
} }
return offset + length; (*pOffset) = offset + length;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
} }
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
@ -200,6 +206,7 @@ typedef struct ReferencedTuple {
} ReferencedTuple; } ReferencedTuple;
static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) { static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx, TupleDesc** pDesc) {
int32_t code = TSDB_CODE_SUCCESS;
TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc)); TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
if (t == NULL) { if (t == NULL) {
return terrno; return terrno;
@ -216,15 +223,20 @@ static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (pCol == NULL) { if (pCol == NULL) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno; return terrno;
} }
if (colDataIsNull_s(pCol, rowIdx)) { if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); code = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen, &offset);
} else { } else {
colLen = colDataGetRowLength(pCol, rowIdx); colLen = colDataGetRowLength(pCol, rowIdx);
offset = code =
tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen); tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen, &offset);
}
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
} }
} }
@ -232,7 +244,7 @@ static int32_t createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t
t->data = pTuple; t->data = pTuple;
*pDesc = t; *pDesc = t;
return 0; return code;
} }
int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) { int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum, void** pResult) {
@ -259,7 +271,7 @@ int32_t tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNu
void destroyTuple(void* t) { void destroyTuple(void* t) {
TupleDesc* pDesc = t; TupleDesc* pDesc = t;
if (pDesc->type == AllocatedTupleType) { if (pDesc != NULL && pDesc->type == AllocatedTupleType) {
destoryAllocatedTuple(pDesc->data); destoryAllocatedTuple(pDesc->data);
taosMemoryFree(pDesc); taosMemoryFree(pDesc);
} }
@ -1686,6 +1698,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) {
biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
void* p = taosArrayPush(pOrderInfoList, &biTs); void* p = taosArrayPush(pOrderInfoList, &biTs);
if (p == NULL) { if (p == NULL) {
taosArrayDestroy(pOrderInfoList);
return terrno; return terrno;
} }
@ -1698,6 +1711,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) {
void* px = taosArrayPush(pOrderInfoList, &biPk); void* px = taosArrayPush(pOrderInfoList, &biPk);
if (px == NULL) { if (px == NULL) {
taosArrayDestroy(pOrderInfoList);
return terrno; return terrno;
} }
} }