fix issue

This commit is contained in:
54liuyao 2024-08-05 17:08:21 +08:00
parent 5c50970cca
commit a9f1cff14f
8 changed files with 51 additions and 2 deletions

View File

@ -408,6 +408,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno); QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);

View File

@ -1473,19 +1473,28 @@ _error:
} }
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) { int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSubplan* pSubplan = (SSubplan*)node; SSubplan* pSubplan = (SSubplan*)node;
SScanPhysiNode pNode = {0}; SScanPhysiNode pNode = {0};
pNode.suid = suid; pNode.suid = suid;
pNode.uid = suid; pNode.uid = suid;
pNode.tableType = TSDB_SUPER_TABLE; pNode.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
uint8_t digest[17] = {0}; uint8_t digest[17] = {0};
int code = code =
getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI); pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI);
QUERY_CHECK_CODE(code, lino, _end);
*tableList = pTableListInfo->pTableList; *tableList = pTableListInfo->pTableList;
pTableListInfo->pTableList = NULL; pTableListInfo->pTableList = NULL;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code; return code;
} }

View File

@ -594,9 +594,11 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
@ -620,6 +622,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
case MERGE_TYPE_NON_SORT: { case MERGE_TYPE_NON_SORT: {
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);
@ -629,6 +632,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
case MERGE_TYPE_COLUMNS: { case MERGE_TYPE_COLUMNS: {
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1); initResultSizeInfo(&pOperator->resultInfo, 1);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);

View File

@ -295,6 +295,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} }
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) { if (code) {
@ -330,6 +335,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
@ -362,6 +372,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (pHandle->vnode) { if (pHandle->vnode) {
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
@ -385,6 +400,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (!pTagScanPhyNode->onlyMetaCtbIdx) { if (!pTagScanPhyNode->onlyMetaCtbIdx) {
code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo); pTagIndexCond, pTaskInfo);
@ -398,6 +418,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
if (pBlockNode->tableType == TSDB_SUPER_TABLE) { if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(uint64_t)); SArray* pList = taosArrayInit(4, sizeof(uint64_t));
@ -436,6 +461,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, pTaskInfo); pTagCond, pTagIndexCond, pTaskInfo);

View File

@ -485,6 +485,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
} }
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
// Make sure the size of SSDataBlock will never exceed the size of 2MB. // Make sure the size of SSDataBlock will never exceed the size of 2MB.
int32_t TWOMB = 2 * 1024 * 1024; int32_t TWOMB = 2 * 1024 * 1024;

View File

@ -3694,6 +3694,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
QRY_OPTR_CHECK(pOptrInfo); QRY_OPTR_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo)); SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -3703,6 +3704,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
} }
pInfo->pTableListInfo = tableListCreate(); pInfo->pTableListInfo = tableListCreate();
QUERY_CHECK_NULL(pInfo->pTableListInfo, code, lino, _end, terrno);
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->pAPI = &pTaskInfo->storageAPI; pInfo->pAPI = &pTaskInfo->storageAPI;

View File

@ -101,6 +101,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
} }
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno);
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
if (pSortNode->calcGroupId) { if (pSortNode->calcGroupId) {
@ -789,6 +790,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);

View File

@ -899,7 +899,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);