fix: return code validation

This commit is contained in:
dapan1121 2024-07-22 15:07:01 +08:00
parent 64ad240773
commit ef96d37c1f
2 changed files with 109 additions and 55 deletions

View File

@ -358,31 +358,36 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
int32_t code = TSDB_CODE_SUCCESS;
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) { if (NULL == *ppRes) {
code = terrno;
freeOperatorParam(pChild0, OP_NOTIFY_PARAM); freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) { if (NULL == *ppRes) {
code = terrno;
taosMemoryFreeClear(*ppRes); taosMemoryFreeClear(*ppRes);
freeOperatorParam(pChild0, OP_NOTIFY_PARAM); freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
code = terrno;
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
freeOperatorParam(pChild0, OP_NOTIFY_PARAM); freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
*ppRes = NULL; *ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
code = terrno;
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
freeOperatorParam(pChild1, OP_NOTIFY_PARAM); freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
*ppRes = NULL; *ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
@ -422,9 +427,11 @@ static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
SArray* pUidList = taosArrayInit(1, sizeof(int64_t)); SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
if (NULL == pUidList) { if (NULL == pUidList) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
}
if (NULL == taosArrayPush(pUidList, pUid)) {
return terrno;
} }
taosArrayPush(pUidList, pUid);
int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true); int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
taosArrayDestroy(pUidList); taosArrayDestroy(pUidList);
@ -553,53 +560,56 @@ static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPost
return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam); return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
} }
static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) { static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) {
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
pPost->isStarted = false; pPost->isStarted = false;
if (pStbJoin->basic.batchFetch) { if (pStbJoin->basic.batchFetch) {
return; return TSDB_CODE_SUCCESS;
} }
if (pPost->leftNeedCache) { if (pPost->leftNeedCache) {
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
if (num && --(*num) <= 0) { if (num && --(*num) <= 0) {
tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); (void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
notifySeqJoinTableCacheEnd(pOperator, pPost, true); QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
} }
} }
if (!pPost->rightNeedCache) { if (!pPost->rightNeedCache) {
void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
if (NULL != v) { if (NULL != v) {
tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); (void)tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
notifySeqJoinTableCacheEnd(pOperator, pPost, false); QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
} }
} }
return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post; SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post;
SStbJoinPrevJoinCtx* pPrev = &pInfo->stbJoin.ctx.prev; SStbJoinPrevJoinCtx* pPrev = &pInfo->stbJoin.ctx.prev;
if (!pPost->isStarted) { if (!pPost->isStarted) {
return; return TSDB_CODE_SUCCESS;
} }
qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo)); qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
*ppRes = getNextBlockFromDownstream(pOperator, 1); *ppRes = getNextBlockFromDownstream(pOperator, 1);
if (NULL == *ppRes) { if (NULL == *ppRes) {
handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin); QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
pPrev->pListHead->readIdx++; pPrev->pListHead->readIdx++;
} else { } else {
pInfo->stbJoin.execInfo.postBlkNum++; pInfo->stbJoin.execInfo.postBlkNum++;
pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows; pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
return;
} }
return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) { static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
@ -607,35 +617,38 @@ static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, in
if (NULL == ppArray) { if (NULL == ppArray) {
SArray* pArray = taosArrayInit(10, valSize); SArray* pArray = taosArrayInit(10, valSize);
if (NULL == pArray) { if (NULL == pArray) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
if (NULL == taosArrayPush(pArray, pVal)) { if (NULL == taosArrayPush(pArray, pVal)) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) { if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (NULL == taosArrayPush(*ppArray, pVal)) { if (NULL == taosArrayPush(*ppArray, pVal)) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) { static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
int32_t code = TSDB_CODE_SUCCESS;
uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize); uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
if (NULL == pNum) { if (NULL == pNum) {
uint32_t n = 1; uint32_t n = 1;
if (tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n))) { code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
return TSDB_CODE_OUT_OF_MEMORY; if (code) {
return code;
} }
if (tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0)) { code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
return TSDB_CODE_OUT_OF_MEMORY; if (code) {
return code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -648,7 +661,7 @@ static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnc
break; break;
default: default:
if (1 == (*pNum)) { if (1 == (*pNum)) {
tSimpleHashRemove(pOnceHash, pKey, keySize); (void)tSimpleHashRemove(pOnceHash, pKey, keySize);
} }
(*pNum)++; (*pNum)++;
break; break;
@ -670,23 +683,40 @@ static void freeStbJoinTableList(SStbJoinTableList* pList) {
} }
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) { static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
int32_t code = TSDB_CODE_SUCCESS;
SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList)); SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList));
if (NULL == pNew) { if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg)); pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid)); if (NULL == pNew->pLeftVg) {
pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg)); code = terrno;
pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
if (NULL == pNew->pLeftVg || NULL == pNew->pLeftUid || NULL == pNew->pRightVg || NULL == pNew->pRightUid) {
freeStbJoinTableList(pNew); freeStbJoinTableList(pNew);
return TSDB_CODE_OUT_OF_MEMORY; return code;
}
pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
if (NULL == pNew->pLeftUid) {
code = terrno;
freeStbJoinTableList(pNew);
return code;
}
pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
if (NULL == pNew->pRightVg) {
code = terrno;
freeStbJoinTableList(pNew);
return code;
}
pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
if (NULL == pNew->pRightUid) {
code = terrno;
freeStbJoinTableList(pNew);
return code;
} }
memcpy(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg)); TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
memcpy(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid)); TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
memcpy(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg)); TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
memcpy(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid)); TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
pNew->readIdx = 0; pNew->readIdx = 0;
pNew->uidNum = rows; pNew->uidNum = rows;
@ -708,9 +738,21 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SColumnInfoData* pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]); SColumnInfoData* pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
if (NULL == pVg0) {
QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]); SColumnInfoData* pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
if (NULL == pVg1) {
QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]); SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
if (NULL == pUid0) {
QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]); SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
if (NULL == pUid1) {
QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pStbJoin->basic.batchFetch) { if (pStbJoin->basic.batchFetch) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
@ -746,6 +788,8 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc
} }
} }
_return:
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code; pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
@ -769,7 +813,7 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
uint64_t* pUid = NULL; uint64_t* pUid = NULL;
int32_t iter = 0; int32_t iter = 0;
while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) { while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid)); (void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
} }
pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache); pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
@ -804,7 +848,7 @@ static void buildStbJoinTableList(SOperatorInfo* pOperator) {
pStbJoin->ctx.prev.joinBuild = true; pStbJoin->ctx.prev.joinBuild = true;
} }
static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
@ -820,17 +864,17 @@ static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppR
seqJoinLaunchNewRetrieveImpl(pOperator, ppRes); seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
if (*ppRes) { if (*ppRes) {
return; return TSDB_CODE_SUCCESS;
} }
handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin); QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
pPrev->pListHead->readIdx++; pPrev->pListHead->readIdx++;
} }
*ppRes = NULL; *ppRes = NULL;
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
@ -840,6 +884,7 @@ static FORCE_INLINE SSDataBlock* seqStableJoinComposeRes(SStbJoinDynCtrlInfo*
SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
@ -861,12 +906,12 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
} }
} }
seqJoinContinueCurrRetrieve(pOperator, &pRes); QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, &pRes));
if (pRes) { if (pRes) {
goto _return; goto _return;
} }
seqJoinLaunchNewRetrieve(pOperator, &pRes); QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, &pRes));
_return: _return:
@ -874,6 +919,11 @@ _return:
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
if (code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL; return pRes ? seqStableJoinComposeRes(pStbJoin, pRes) : NULL;
} }
@ -881,24 +931,24 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
if (batchFetch) { if (batchFetch) {
pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pPrev->leftHash) { if (NULL == pPrev->leftHash) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pPrev->rightHash) { if (NULL == pPrev->rightHash) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
} else { } else {
pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (NULL == pPrev->leftCache) { if (NULL == pPrev->leftCache) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (NULL == pPrev->rightCache) { if (NULL == pPrev->rightCache) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (NULL == pPrev->onceTable) { if (NULL == pPrev->onceTable) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
} }
@ -908,13 +958,16 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
__optr_fn_t nextFp = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) { __optr_fn_t nextFp = NULL;
code = TSDB_CODE_OUT_OF_MEMORY; SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
if (pInfo == NULL) {
code = terrno;
goto _error;
}
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
code = terrno;
goto _error; goto _error;
} }
@ -928,7 +981,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
pInfo->qType = pPhyciNode->qType; pInfo->qType = pPhyciNode->qType;
switch (pInfo->qType) { switch (pInfo->qType) {
case DYN_QTYPE_STB_HASH: case DYN_QTYPE_STB_HASH:
memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId; pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId;
code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch); code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -949,12 +1002,14 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyDynQueryCtrlOperator(pInfo); destroyDynQueryCtrlOperator(pInfo);
} }
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }

View File

@ -1717,7 +1717,6 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
pBlock = (*pJoin->joinFp)(pOperator); pBlock = (*pJoin->joinFp)(pOperator);
if (NULL == pBlock) { if (NULL == pBlock) {
if (pJoin->errCode) { if (pJoin->errCode) {
ASSERT(0);
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode); T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
} }
break; break;