fix: check return value and fix bugs.

This commit is contained in:
Haojun Liao 2024-08-06 10:19:22 +08:00
parent 6645192ff3
commit 5524d42c2d
10 changed files with 90 additions and 58 deletions

View File

@ -5762,6 +5762,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb; STsdb* pTsdb = pReader->pTsdb;
SVersionRange* pRange = &pReader->info.verRange; SVersionRange* pRange = &pReader->info.verRange;
*ppSnap = NULL;
// lock // lock
code = taosThreadMutexLock(&pTsdb->mutex); code = taosThreadMutexLock(&pTsdb->mutex);
@ -5774,7 +5775,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
if (pSnap == NULL) { if (pSnap == NULL) {
(void) taosThreadMutexUnlock(&pTsdb->mutex); (void) taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto _exit; goto _exit;
} }
@ -5784,7 +5785,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
if (pSnap->pNode == NULL) { if (pSnap->pNode == NULL) {
(void) taosThreadMutexUnlock(&pTsdb->mutex); (void) taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto _exit; goto _exit;
} }
@ -5798,8 +5799,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pIMem = pTsdb->imem; pSnap->pIMem = pTsdb->imem;
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
if (pSnap->pINode == NULL) { if (pSnap->pINode == NULL) {
tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
code = terrno;
(void) taosThreadMutexUnlock(&pTsdb->mutex); (void) taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
@ -5811,28 +5814,33 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
// fs // fs
code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray); code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray);
if (code) {
if (pSnap->pNode) {
tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
}
if (pSnap->pINode) {
tsdbUnrefMemTable(pTsdb->imem, pSnap->pINode, true);
}
(void) taosThreadMutexUnlock(&pTsdb->mutex);
goto _exit;
}
// unlock // unlock
(void) taosThreadMutexUnlock(&pTsdb->mutex); (void) taosThreadMutexUnlock(&pTsdb->mutex);
*ppSnap = pSnap;
if (code == TSDB_CODE_SUCCESS) { tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); return code;
}
_exit: _exit:
if (code != TSDB_CODE_SUCCESS) { tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code)); if (pSnap) {
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
*ppSnap = NULL; if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
if (pSnap) { taosMemoryFree(pSnap);
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
taosMemoryFree(pSnap);
}
} else {
*ppSnap = pSnap;
} }
return code; return code;
} }

View File

@ -98,7 +98,6 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
@ -227,6 +226,7 @@ _error:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} }
pInfo->pTableList = NULL;
destroyCacheScanOperator(pInfo); destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return code; return code;

View File

@ -333,10 +333,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, STableListInfo* pTableListInfo = tableListCreate();
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, pTaskInfo);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
@ -366,7 +366,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL){ if (pTableListInfo == NULL) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return terrno; return terrno;
} }
@ -384,6 +384,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
//pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); //pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator); code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return code;
}
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator); code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator);
@ -404,10 +409,17 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code)); qError("failed to getTableList, code: %s", tstrerror(code));
tableListDestroy(pTableListInfo);
return code; return code;
} }
} }
code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, &pOperator); code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
&pOperator);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return code;
}
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
@ -422,6 +434,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
taosArrayDestroy(pList); taosArrayDestroy(pList);
tableListDestroy(pTableListInfo);
return code; return code;
} }
@ -429,13 +442,14 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
uint64_t* id = taosArrayGet(pList, i); uint64_t* id = taosArrayGet(pList, i);
if (id == NULL) { if (id == NULL) {
pTaskInfo->code = terrno; continue;
return terrno;
} }
code = tableListAddTableInfo(pTableListInfo, *id, 0); code = tableListAddTableInfo(pTableListInfo, *id, 0);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
taosArrayDestroy(pList);
return code; return code;
} }
} }
@ -445,11 +459,17 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return code; return code;
} }
} }
code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator); code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return code;
}
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
@ -458,20 +478,27 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
return terrno; return terrno;
} }
code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
pTagCond, pTagIndexCond, pTaskInfo); pTagIndexCond, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return code; return code;
} }
code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo); code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return code; return code;
} }
code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator); code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
if (code) {
tableListDestroy(pTableListInfo);
pTaskInfo->code = code;
return code;
}
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator); code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
} else { } else {

View File

@ -485,6 +485,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
} }
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
// Make sure the size of SSDataBlock will never exceed the size of 2MB. // Make sure the size of SSDataBlock will never exceed the size of 2MB.
int32_t TWOMB = 2 * 1024 * 1024; int32_t TWOMB = 2 * 1024 * 1024;

View File

@ -136,8 +136,7 @@ void cleanupQueriedTableScanInfo(void* p) {
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) { int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0}; SMetaReader mr = {0};
if (pHandle == NULL) { if (pHandle == NULL) {
terrno = TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
return terrno;
} }
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -149,7 +148,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&mr); pAPI->metaReaderFn.clearReader(&mr);
return terrno; return code;
} }
SSchemaInfo schemaInfo = {0}; SSchemaInfo schemaInfo = {0};
@ -169,7 +168,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
pAPI->metaReaderFn.clearReader(&mr); pAPI->metaReaderFn.clearReader(&mr);
taosMemoryFree(schemaInfo.tablename); taosMemoryFree(schemaInfo.tablename);
taosMemoryFree(schemaInfo.dbname); taosMemoryFree(schemaInfo.dbname);
return terrno; return code;
} }
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);

View File

@ -1418,6 +1418,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
pInfo->base.pTableListInfo = NULL; // this attribute will be destroy outside of this function
destroyTableScanOperatorInfo(pInfo); destroyTableScanOperatorInfo(pInfo);
} }
@ -3876,7 +3877,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
@ -3889,7 +3889,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
int32_t numOfCols = 0; int32_t numOfCols = 0;
code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
@ -3919,8 +3918,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
if (pTableScanNode->pSubtable != NULL) { if (pTableScanNode->pSubtable != NULL) {
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
if (pSubTableExpr == NULL) { if (pSubTableExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
@ -3929,7 +3927,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) { if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) {
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
} }
@ -3939,20 +3936,17 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags); SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
if (pTagExpr == NULL) { if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) { if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
} }
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
if (pInfo->pBlockLists == NULL) { if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
@ -4636,6 +4630,10 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
return code; return code;
_error: _error:
if (pInfo) {
pInfo->pTableListInfo = NULL;
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return code; return code;
@ -5839,7 +5837,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
return code; return code;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = code;
pInfo->base.pTableListInfo = NULL;
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return code; return code;

View File

@ -101,7 +101,10 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
} }
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
if (pSortNode->calcGroupId) { if (pSortNode->calcGroupId) {
int32_t keyLen; int32_t keyLen;
@ -788,6 +791,8 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);

View File

@ -3738,9 +3738,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
goto _error;
}
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pSessionNode->window.watermark, .waterMark = pSessionNode->window.watermark,

View File

@ -2748,6 +2748,9 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
return code; return code;
_error: _error:
if (pInfo) {
pInfo->pTableListInfo = NULL;
}
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return code; return code;

View File

@ -1706,9 +1706,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
goto _error;
}
pInfo->twAggSup.waterMark = pSessionNode->window.watermark; pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType; pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
@ -1732,15 +1730,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
goto _error;
}
} }
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
goto _error;
}
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
@ -1748,9 +1742,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _error);
goto _error;
}
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return code;