From ef96d37c1f2efd4a7fff5dac96c6b3d40b5e2f95 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 22 Jul 2024 15:07:01 +0800 Subject: [PATCH] fix: return code validation --- .../libs/executor/src/dynqueryctrloperator.c | 163 ++++++++++++------ source/libs/executor/src/mergejoinoperator.c | 1 - 2 files changed, 109 insertions(+), 55 deletions(-) diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index c3b3126f68..e4ca946f4f 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -358,31 +358,36 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { + int32_t code = TSDB_CODE_SUCCESS; *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { + code = terrno; freeOperatorParam(pChild0, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM); - return TSDB_CODE_OUT_OF_MEMORY; + return code; } (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); if (NULL == *ppRes) { + code = terrno; taosMemoryFreeClear(*ppRes); freeOperatorParam(pChild0, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM); - return TSDB_CODE_OUT_OF_MEMORY; + return code; } if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + code = terrno; freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); freeOperatorParam(pChild0, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM); *ppRes = NULL; - return TSDB_CODE_OUT_OF_MEMORY; + return code; } if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { + code = terrno; freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM); *ppRes = NULL; - return TSDB_CODE_OUT_OF_MEMORY; + return code; } (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; @@ -422,9 +427,11 @@ static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { SArray* pUidList = taosArrayInit(1, sizeof(int64_t)); if (NULL == pUidList) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + if (NULL == taosArrayPush(pUidList, pUid)) { + return terrno; } - taosArrayPush(pUidList, pUid); int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true); taosArrayDestroy(pUidList); @@ -553,53 +560,56 @@ static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPost return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam); } -static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) { +static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) { SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; pPost->isStarted = false; if (pStbJoin->basic.batchFetch) { - return; + return TSDB_CODE_SUCCESS; } if (pPost->leftNeedCache) { uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); if (num && --(*num) <= 0) { - tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); - notifySeqJoinTableCacheEnd(pOperator, pPost, true); + (void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); + QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true)); } } if (!pPost->rightNeedCache) { void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); if (NULL != v) { - tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); - notifySeqJoinTableCacheEnd(pOperator, pPost, false); + (void)tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); + QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false)); } } + + return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post; SStbJoinPrevJoinCtx* pPrev = &pInfo->stbJoin.ctx.prev; if (!pPost->isStarted) { - return; + return TSDB_CODE_SUCCESS; } qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo)); *ppRes = getNextBlockFromDownstream(pOperator, 1); if (NULL == *ppRes) { - handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin); + QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin)); pPrev->pListHead->readIdx++; } else { pInfo->stbJoin.execInfo.postBlkNum++; pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows; - return; } + + return TSDB_CODE_SUCCESS; } static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) { @@ -607,35 +617,38 @@ static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, in if (NULL == ppArray) { SArray* pArray = taosArrayInit(10, valSize); if (NULL == pArray) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (NULL == taosArrayPush(pArray, pVal)) { taosArrayDestroy(pArray); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) { taosArrayDestroy(pArray); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } return TSDB_CODE_SUCCESS; } if (NULL == taosArrayPush(*ppArray, pVal)) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } return TSDB_CODE_SUCCESS; } static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) { + int32_t code = TSDB_CODE_SUCCESS; uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize); if (NULL == pNum) { uint32_t n = 1; - if (tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n))) { - return TSDB_CODE_OUT_OF_MEMORY; + code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n)); + if (code) { + return code; } - if (tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0)) { - return TSDB_CODE_OUT_OF_MEMORY; + code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0); + if (code) { + return code; } return TSDB_CODE_SUCCESS; } @@ -648,7 +661,7 @@ static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnc break; default: if (1 == (*pNum)) { - tSimpleHashRemove(pOnceHash, pKey, keySize); + (void)tSimpleHashRemove(pOnceHash, pKey, keySize); } (*pNum)++; break; @@ -670,23 +683,40 @@ static void freeStbJoinTableList(SStbJoinTableList* pList) { } static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) { + int32_t code = TSDB_CODE_SUCCESS; SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList)); if (NULL == pNew) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg)); - pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid)); - pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg)); - pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid)); - if (NULL == pNew->pLeftVg || NULL == pNew->pLeftUid || NULL == pNew->pRightVg || NULL == pNew->pRightUid) { + if (NULL == pNew->pLeftVg) { + code = terrno; freeStbJoinTableList(pNew); - return TSDB_CODE_OUT_OF_MEMORY; + return code; + } + pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid)); + if (NULL == pNew->pLeftUid) { + code = terrno; + freeStbJoinTableList(pNew); + return code; + } + pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg)); + if (NULL == pNew->pRightVg) { + code = terrno; + freeStbJoinTableList(pNew); + return code; + } + pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid)); + if (NULL == pNew->pRightUid) { + code = terrno; + freeStbJoinTableList(pNew); + return code; } - memcpy(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg)); - memcpy(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid)); - memcpy(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg)); - memcpy(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid)); + TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg)); + TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid)); + TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg)); + TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid)); pNew->readIdx = 0; pNew->uidNum = rows; @@ -708,9 +738,21 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SColumnInfoData* pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]); + if (NULL == pVg0) { + QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SColumnInfoData* pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]); + if (NULL == pVg1) { + QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]); + if (NULL == pUid0) { + QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]); + if (NULL == pUid1) { + QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } if (pStbJoin->basic.batchFetch) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { @@ -746,6 +788,8 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc } } +_return: + if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); @@ -769,7 +813,7 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) { uint64_t* pUid = NULL; int32_t iter = 0; while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) { - tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid)); + (void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid)); } pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache); @@ -804,7 +848,7 @@ static void buildStbJoinTableList(SOperatorInfo* pOperator) { pStbJoin->ctx.prev.joinBuild = true; } -static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; @@ -820,17 +864,17 @@ static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppR seqJoinLaunchNewRetrieveImpl(pOperator, ppRes); if (*ppRes) { - return; + return TSDB_CODE_SUCCESS; } - handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin); + QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin)); pPrev->pListHead->readIdx++; } *ppRes = NULL; setOperatorCompleted(pOperator); - return; + return TSDB_CODE_SUCCESS; } static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { @@ -840,6 +884,7 @@ static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SSDataBlock* pRes = NULL; @@ -861,18 +906,23 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { } } - seqJoinContinueCurrRetrieve(pOperator, &pRes); + QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, &pRes)); if (pRes) { goto _return; } - seqJoinLaunchNewRetrieve(pOperator, &pRes); + QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, &pRes)); _return: if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } + + if (code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL; } @@ -881,24 +931,24 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { if (batchFetch) { pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (NULL == pPrev->leftHash) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (NULL == pPrev->rightHash) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } else { pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); if (NULL == pPrev->leftCache) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); if (NULL == pPrev->rightCache) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); if (NULL == pPrev->onceTable) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } @@ -908,13 +958,16 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { - SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - __optr_fn_t nextFp = NULL; - int32_t code = TSDB_CODE_SUCCESS; - if (pOperator == NULL || pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + __optr_fn_t nextFp = NULL; + SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo)); + if (pInfo == NULL) { + code = terrno; + goto _error; + } + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + code = terrno; goto _error; } @@ -928,7 +981,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 pInfo->qType = pPhyciNode->qType; switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: - memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); + TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId; code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch); if (TSDB_CODE_SUCCESS != code) { @@ -949,12 +1002,14 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 return pOperator; _error: + if (pInfo != NULL) { destroyDynQueryCtrlOperator(pInfo); } taosMemoryFree(pOperator); pTaskInfo->code = code; + return NULL; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 8d94841847..395b04f45b 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1717,7 +1717,6 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { pBlock = (*pJoin->joinFp)(pOperator); if (NULL == pBlock) { if (pJoin->errCode) { - ASSERT(0); T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode); } break;