fix: crash issue

This commit is contained in:
dapan1121 2024-07-22 14:35:12 +08:00
parent 45348b15be
commit 64ad240773
3 changed files with 258 additions and 154 deletions

View File

@ -105,7 +105,7 @@ static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev,
return (NULL == num) ? false : true;
}
static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) {
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) {
SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev;
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
SStbJoinTableList* pNode = pPrev->pListHead;
@ -141,32 +141,38 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin)
pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
if (pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0);
if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
pStbJoin->execInfo.rightCacheNum++;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
int32_t code = TSDB_CODE_SUCCESS;
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
code = terrno;
freeOperatorParam(pChild, OP_GET_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (pChild) {
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
if (NULL == (*ppRes)->pChildren) {
code = terrno;
freeOperatorParam(pChild, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
code = terrno;
freeOperatorParam(pChild, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
} else {
(*ppRes)->pChildren = NULL;
@ -174,9 +180,10 @@ static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t down
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
if (NULL == pGc) {
code = terrno;
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
@ -194,16 +201,18 @@ static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t down
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
int32_t code = TSDB_CODE_SUCCESS;
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
(*ppRes)->pChildren = NULL;
SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
if (NULL == pGc) {
code = terrno;
freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
pGc->downstreamIdx = downstreamIdx;
@ -221,13 +230,13 @@ static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
(*ppRes)->pChildren = NULL;
SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
if (NULL == pExc) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pExc->multiParams = false;
@ -237,7 +246,7 @@ static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downst
pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
if (NULL == pExc->basic.uidList) {
taosMemoryFree(pExc);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
taosArrayPush(pExc->basic.uidList, pUid);
@ -252,14 +261,14 @@ static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downst
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
(*ppRes)->pChildren = NULL;
SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
if (NULL == pExc) {
taosMemoryFreeClear(*ppRes);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pExc->multiParams = true;
@ -267,7 +276,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
if (NULL == pExc->pBatchs) {
taosMemoryFree(pExc);
taosMemoryFreeClear(*ppRes);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
@ -283,7 +292,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
basic.uidList = pUidList;
basic.tableSeq = false;
tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic));
QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
*(SArray**)p = NULL;
@ -298,39 +307,45 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) {
int32_t code = TSDB_CODE_SUCCESS;
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
code = terrno;
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
code = terrno;
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
code = terrno;
freeOperatorParam(pChild0, OP_GET_PARAM);
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
code = terrno;
freeOperatorParam(pChild1, OP_GET_PARAM);
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
if (NULL == pJoin) {
code = terrno;
freeOperatorParam(*ppRes, OP_GET_PARAM);
*ppRes = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
pJoin->initDownstream = initParam;
@ -435,7 +450,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64,
rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
updatePostJoinCurrTableInfo(&pInfo->stbJoin);
QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
if (pInfo->stbJoin.basic.batchFetch) {
if (pPrev->leftHash) {

View File

@ -2655,7 +2655,7 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo
mJoinSetDone(pOperator);
}
return false;
return TSDB_CODE_SUCCESS;
}
if (buildGot) {
@ -2690,8 +2690,9 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo
}
pCtx->probeGrp.blk = pJoin->probe->blk;
*newBlock = true;
return true;
return TSDB_CODE_SUCCESS;
}

File diff suppressed because it is too large Load Diff