fix(query): check null before free resource, and some internal refactor.

This commit is contained in:
Haojun Liao 2022-10-31 10:41:25 +08:00
parent 864e2c60a7
commit f790d91b45
5 changed files with 70 additions and 64 deletions

View File

@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
init = true; init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -1881,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo {
typedef struct SBlockDistInfo { typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
void* pHandle; STsdbReader* pHandle;
SReadHandle readHandle; SReadHandle readHandle;
uint64_t uid; // table uid uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);

View File

@ -1772,6 +1772,10 @@ _error:
} }
void* tableListDestroy(STableListInfo* pTableListInfo) { void* tableListDestroy(STableListInfo* pTableListInfo) {
if (pTableListInfo == NULL) {
return NULL;
}
pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList); pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList);
taosMemoryFreeClear(pTableListInfo->groupOffset); taosMemoryFreeClear(pTableListInfo->groupOffset);

View File

@ -3378,34 +3378,11 @@ bool groupbyTbname(SNodeList* pGroupList) {
return bytbname; return bytbname;
} }
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
memset(pCond, 0, sizeof(SQueryTableDataCond)); SNode* pTagIndexCond, const char* pUser) {
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* pUser) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* idstr = GET_TASKID(pTaskInfo);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
SOperatorInfo* pOperator = NULL; SOperatorInfo* pOperator = NULL;
@ -3420,10 +3397,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
return NULL; return NULL;
} }
@ -3440,7 +3417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
@ -3465,7 +3442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pHandle->vnode) { if (pHandle->vnode) {
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pHandle, pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
@ -3492,7 +3469,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, GET_TASKID(pTaskInfo)); pTagIndexCond, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code)); qError("failed to getTableList, code: %s", tstrerror(code));
@ -3516,30 +3493,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tableListAddTableInfo(pTableListInfo, p->uid, 0); tableListAddTableInfo(pTableListInfo, p->uid, 0);
} }
taosArrayDestroy(pList); taosArrayDestroy(pList);
} else { // Create one table group. } else { // Create group with only one table
tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
} }
SQueryTableDataCond cond = {0}; pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
cleanupQueryTableDataCond(&cond);
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTagCond, pTagIndexCond, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
@ -3566,17 +3529,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
if (ops == NULL) {
return NULL;
}
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
if (ops[i] == NULL) { if (ops[i] == NULL) {
taosMemoryFree(ops); taosMemoryFree(ops);
return NULL; return NULL;
} }
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
} }
SOperatorInfo* pOptr = NULL; SOperatorInfo* pOptr = NULL;
@ -3888,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
sql = NULL; sql = NULL;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, (*pTaskInfo)->pTableInfoList, (*pTaskInfo)->pRoot =
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code; code = (*pTaskInfo)->code;

View File

@ -1000,8 +1000,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -1009,9 +1032,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
goto _error; goto _error;
} }
pInfo->pHandle = dataReader; {
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
}
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->uid = uid; pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0; int32_t numOfCols = 0;