Merge pull request #27001 from taosdata/fix/syntax
fix: check return value and fix bugs.
This commit is contained in:
commit
13264b5605
|
@ -4745,9 +4745,12 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
|
||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||
TARRAY2_DESTROY(&pSupInfo->colAggArray, NULL);
|
||||
for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
|
||||
if (pSupInfo->buildBuf[i] != NULL) {
|
||||
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
|
||||
|
||||
if (pSupInfo->buildBuf) {
|
||||
for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
|
||||
if (pSupInfo->buildBuf[i] != NULL) {
|
||||
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5762,6 +5765,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
int32_t code = 0;
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
SVersionRange* pRange = &pReader->info.verRange;
|
||||
*ppSnap = NULL;
|
||||
|
||||
// lock
|
||||
code = taosThreadMutexLock(&pTsdb->mutex);
|
||||
|
@ -5774,7 +5778,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
|
||||
if (pSnap == NULL) {
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -5784,7 +5788,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
|
||||
if (pSnap->pNode == NULL) {
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -5798,8 +5802,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
pSnap->pIMem = pTsdb->imem;
|
||||
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
|
||||
if (pSnap->pINode == NULL) {
|
||||
tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
|
||||
code = terrno;
|
||||
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -5811,28 +5817,33 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
|
||||
// fs
|
||||
code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray);
|
||||
if (code) {
|
||||
if (pSnap->pNode) {
|
||||
tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
|
||||
}
|
||||
|
||||
if (pSnap->pINode) {
|
||||
tsdbUnrefMemTable(pTsdb->imem, pSnap->pINode, true);
|
||||
}
|
||||
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// unlock
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
*ppSnap = pSnap;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
|
||||
return code;
|
||||
|
||||
_exit:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
|
||||
*ppSnap = NULL;
|
||||
if (pSnap) {
|
||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
||||
taosMemoryFree(pSnap);
|
||||
}
|
||||
} else {
|
||||
*ppSnap = pSnap;
|
||||
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
if (pSnap) {
|
||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
||||
taosMemoryFree(pSnap);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -108,7 +108,6 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tableListDestroy(pTableListInfo);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -245,6 +244,7 @@ _error:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
pInfo->pTableList = NULL;
|
||||
destroyCacheScanOperator(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
return code;
|
||||
|
|
|
@ -409,6 +409,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
|
|||
|
||||
pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
|
||||
|
||||
pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
|
||||
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
|
||||
|
|
|
@ -391,6 +391,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
|
||||
//pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
|
||||
code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||
code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator);
|
||||
|
@ -411,10 +416,17 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("failed to getTableList, code: %s", tstrerror(code));
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, &pOperator);
|
||||
code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
|
||||
&pOperator);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
||||
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
|
||||
STableListInfo* pTableListInfo = tableListCreate();
|
||||
|
@ -430,6 +442,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
taosArrayDestroy(pList);
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -437,13 +450,14 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
for (int32_t i = 0; i < num; ++i) {
|
||||
uint64_t* id = taosArrayGet(pList, i);
|
||||
if (id == NULL) {
|
||||
pTaskInfo->code = terrno;
|
||||
return terrno;
|
||||
continue;
|
||||
}
|
||||
|
||||
code = tableListAddTableInfo(pTableListInfo, *id, 0);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
taosArrayDestroy(pList);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -453,11 +467,17 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
||||
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
||||
STableListInfo* pTableListInfo = tableListCreate();
|
||||
|
@ -467,20 +487,27 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
|
|||
return terrno;
|
||||
}
|
||||
|
||||
code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
|
||||
pTagCond, pTagIndexCond, pTaskInfo);
|
||||
code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
|
||||
pTagIndexCond, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
tableListDestroy(pTableListInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
|
||||
if (code) {
|
||||
tableListDestroy(pTableListInfo);
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
||||
code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
|
||||
} else {
|
||||
|
|
|
@ -136,8 +136,7 @@ void cleanupQueriedTableScanInfo(void* p) {
|
|||
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) {
|
||||
SMetaReader mr = {0};
|
||||
if (pHandle == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
@ -149,7 +148,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
GET_TASKID(pTaskInfo));
|
||||
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
SSchemaInfo schemaInfo = {0};
|
||||
|
@ -169,7 +168,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
taosMemoryFree(schemaInfo.tablename);
|
||||
taosMemoryFree(schemaInfo.dbname);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||
|
|
|
@ -1422,6 +1422,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
|||
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
pInfo->base.pTableListInfo = NULL; // this attribute will be destroy outside of this function
|
||||
destroyTableScanOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
|
@ -3891,7 +3892,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tableListDestroy(pTableListInfo);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -3904,7 +3904,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
int32_t numOfCols = 0;
|
||||
code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tableListDestroy(pTableListInfo);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -3935,8 +3934,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
if (pTableScanNode->pSubtable != NULL) {
|
||||
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
|
||||
if (pSubTableExpr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tableListDestroy(pTableListInfo);
|
||||
code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -3945,7 +3943,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) {
|
||||
tableListDestroy(pTableListInfo);
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
@ -3955,20 +3952,17 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
|
||||
if (pTagExpr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tableListDestroy(pTableListInfo);
|
||||
goto _error;
|
||||
}
|
||||
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tableListDestroy(pTableListInfo);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
|
||||
if (pInfo->pBlockLists == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tableListDestroy(pTableListInfo);
|
||||
code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -4656,6 +4650,10 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
|
|||
return code;
|
||||
|
||||
_error:
|
||||
if (pInfo) {
|
||||
pInfo->pTableListInfo = NULL;
|
||||
}
|
||||
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
return code;
|
||||
|
@ -5878,7 +5876,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
|
|||
return code;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pTaskInfo->code = code;
|
||||
pInfo->base.pTableListInfo = NULL;
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
return code;
|
||||
|
|
|
@ -101,8 +101,10 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
}
|
||||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno);
|
||||
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
|
||||
|
||||
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
||||
TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
|
||||
|
||||
if (pSortNode->calcGroupId) {
|
||||
int32_t keyLen;
|
||||
|
@ -790,7 +792,8 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
|
|||
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
|
||||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno);
|
||||
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
|
||||
|
||||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -3738,9 +3738,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pSessionNode->window.watermark,
|
||||
|
|
|
@ -2811,6 +2811,9 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
|||
return code;
|
||||
|
||||
_error:
|
||||
if (pInfo) {
|
||||
pInfo->pTableListInfo = NULL;
|
||||
}
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return code;
|
||||
|
|
|
@ -1728,9 +1728,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
||||
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
||||
|
@ -1754,15 +1752,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
@ -1770,9 +1764,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
*pOptrInfo = pOperator;
|
||||
return code;
|
||||
|
|
|
@ -469,10 +469,10 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
|
||||
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
|
||||
"sortExternalBuf", tsTempDir);
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
}
|
||||
|
||||
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||
|
@ -562,10 +562,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
|
|||
|
||||
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
|
||||
"sortComparInit", tsTempDir);
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return code;
|
||||
} else {
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1111,9 +1111,10 @@ static int32_t createPageBuf(SSortHandle* pHandle) {
|
|||
|
||||
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
|
||||
"tableBlocksBuf", tsTempDir);
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
} else {
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue