fix: function return code validation

This commit is contained in:
dapan1121 2024-07-19 18:27:49 +08:00
parent e5fde30635
commit 558d22dff4
4 changed files with 124 additions and 46 deletions

View File

@ -90,7 +90,7 @@ int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOper
while (!allFetched) {
hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
if (pJoin->midBlk->info.rows > 0) {
doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL);
HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL));
if (pJoin->midBlk->info.rows > 0) {
pCtx->readMatch = true;
HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
@ -170,7 +170,7 @@ int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOpera
while (!allFetched) {
hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
if (pJoin->midBlk->info.rows > 0) {
doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL);
HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL));
if (pJoin->midBlk->info.rows > 0) {
pCtx->readMatch = true;
HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));

View File

@ -238,10 +238,12 @@ static int32_t hJoinInitValColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
if (NULL == pTable->valVarCols) {
pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t));
if (NULL == pTable->valVarCols) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
}
taosArrayPush(pTable->valVarCols, &i);
if (NULL == taosArrayPush(pTable->valVarCols, &i)) {
return terrno;
}
}
pTable->valCols[i].bytes = pColNode->node.resType.bytes;
if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) {
@ -332,7 +334,7 @@ static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode*
return code;
}
memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
HJ_ERR_RET(hJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
@ -416,7 +418,9 @@ static FORCE_INLINE int32_t hJoinAddPageToBufs(SArray* pRowBufs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pRowBufs, &page);
if (NULL == taosArrayPush(pRowBufs, &page)) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
@ -464,12 +468,21 @@ static void hJoinDestroyKeyHash(SSHashObj** ppHash) {
*ppHash = NULL;
}
static FORCE_INLINE char* hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) {
static FORCE_INLINE int32_t hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow, char** ppData) {
*ppData = NULL;
if ((uint16_t)-1 == pRow->pageId) {
return NULL;
return TSDB_CODE_SUCCESS;
}
SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId);
return pPage->data + pRow->offset;
if (NULL == pPage) {
qError("fail to get %d page, total:%d", pRow->pageId, (int32_t)taosArrayGetSize(pRowBufs));
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
*ppData = pPage->data + pRow->offset;
return TSDB_CODE_SUCCESS;
}
static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
@ -479,9 +492,11 @@ static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum
int32_t probeIdx = 0;
SBufRowInfo* pRow = pStart;
int32_t code = 0;
char* pData = NULL;
for (int32_t r = 0; r < rowNum; ++r) {
char* pData = hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow);
HJ_ERR_RET(hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow, &pData));
char* pValData = pData + pBuild->valBitMapSize;
char* pKeyData = pProbe->keyData;
buildIdx = buildValIdx = probeIdx = 0;
@ -544,7 +559,7 @@ int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes,
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows);
QRY_ERR_RET(colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows));
probeIdx++;
}
@ -606,11 +621,11 @@ bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *p
}
if (pTable->keyCols[i].vardata) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
bufLen += varDataTLen(pData);
} else {
pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
bufLen += pTable->keyCols[i].bytes;
}
}
@ -627,6 +642,10 @@ bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *p
static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
if (NULL == pCol) {
qError("fail to get %d col, total:%d", pTable->keyCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata);
return TSDB_CODE_INVALID_PARA;
@ -654,6 +673,10 @@ static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable)
continue;
}
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot);
if (NULL == pCol) {
qError("fail to get %d col, total:%d", pTable->valCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata);
return TSDB_CODE_INVALID_PARA;
@ -683,7 +706,7 @@ static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32
char *pData = NULL;
size_t bufLen = pTable->valBitMapSize;
memset(pTable->valData, 0, pTable->valBitMapSize);
TAOS_MEMSET(pTable->valData, 0, pTable->valBitMapSize);
for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) {
if (pTable->valCols[i].keyCol) {
continue;
@ -693,7 +716,7 @@ static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32
colDataSetNull_f(pTable->valData, m);
} else {
pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx];
memcpy(pTable->valData + bufLen, pData, varDataTLen(pData));
TAOS_MEMCPY(pTable->valData + bufLen, pData, varDataTLen(pData));
bufLen += varDataTLen(pData);
}
} else {
@ -701,7 +724,7 @@ static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32
colDataSetNull_f(pTable->valData, m);
} else {
pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx;
memcpy(pTable->valData + bufLen, pData, pTable->valCols[i].bytes);
TAOS_MEMCPY(pTable->valData + bufLen, pData, pTable->valCols[i].bytes);
bufLen += pTable->valCols[i].bytes;
}
}
@ -1009,7 +1032,11 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
}
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
doFilter(pRes, pJoin->pFinFilter, NULL);
code = doFilter(pRes, pJoin->pFinFilter, NULL);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (pRes->info.rows > 0) {
return pRes;
@ -1037,7 +1064,11 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
}
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
doFilter(pRes, pJoin->pFinFilter, NULL);
code = doFilter(pRes, pJoin->pFinFilter, NULL);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (pRes->info.rows > 0) {
@ -1118,13 +1149,25 @@ static uint32_t hJoinGetFinBlkCapacity(SHJoinOperatorInfo* pJoin, SHashJoinPhysi
int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) {
pJoin->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
if (NULL == pJoin->finBlk) {
QRY_ERR_RET(terrno);
}
ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
if (TSDB_CODE_SUCCESS != code) {
QRY_ERR_RET(terrno);
}
if (NULL != pJoin->pPreFilter) {
pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false);
blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity);
if (NULL == pJoin->finBlk) {
QRY_ERR_RET(terrno);
}
code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity);
if (TSDB_CODE_SUCCESS != code) {
QRY_ERR_RET(terrno);
}
}
pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO;
@ -1151,8 +1194,8 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]));
HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]));
hJoinSetBuildAndProbeTable(pInfo, pJoinNode);
@ -1182,6 +1225,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
return pOperator;
_return:
if (pInfo != NULL) {
destroyHashJoinOperator(pInfo);
}

View File

@ -357,7 +357,7 @@ static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop)
do {
startRowIdx = build->grpRowIdx;
mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
(void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (pCtx->midBlk->info.rows > 0) {
if (build->rowBitmapSize > 0) {
@ -1283,7 +1283,7 @@ static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p
do {
blockDataCleanup(pCtx->midBlk);
mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
(void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter));
@ -1358,7 +1358,7 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
build->pHashCurGrp = *(SArray**)pGrp;
ASSERT(1 == taosArrayGetSize(build->pHashCurGrp));
build->grpRowIdx = 0;
mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
(void)mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
ASSERT(build->grpRowIdx < 0);
}
@ -1616,7 +1616,7 @@ static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p
do {
blockDataCleanup(pCtx->midBlk);
mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
(void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter));
@ -2102,7 +2102,9 @@ int32_t mAsofBackwardHandleGrpRemains(SMJoinWindowCtx* pCtx) {
return (pCtx->lastEqGrp) ? mAsofBackwardDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofBackwardDumpGrpCache(pCtx);
}
static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
*newBlock = false;
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
@ -2116,7 +2118,7 @@ static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo*
mJoinSetDone(pOperator);
}
return false;
return TSDB_CODE_SUCCESS;
}
break;
@ -2124,13 +2126,18 @@ static bool mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo*
if (buildGot && NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit);
if (NULL == pCtx->cache.outBlk) {
MJ_ERR_RET(terrno);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
}
pCtx->probeGrp.blk = pJoin->probe->blk;
pCtx->buildGrp.blk = pJoin->build->blk;
return true;
*newBlock = true;
return TSDB_CODE_SUCCESS;
}
@ -2142,6 +2149,7 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
bool newBlock = false;
blockDataCleanup(pCtx->finBlk);
@ -2154,7 +2162,8 @@ SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
}
do {
if (!mAsofBackwardRetrieve(pOperator, pJoin, pCtx)) {
MJ_ERR_JRET(mAsofBackwardRetrieve(pOperator, pJoin, pCtx, &newBlock));
if (!newBlock) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
@ -2271,7 +2280,7 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
pGrp->readIdx = 0;
//pGrp->endIdx = pGrp->blk->info.rows - 1;
} else {
taosArrayPop(pCache->grps);
(void)taosArrayPop(pCache->grps);
pGrp = taosArrayGet(pCache->grps, 0);
ASSERT(pGrp->blk == pCache->outBlk);
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
@ -2479,7 +2488,9 @@ int32_t mAsofForwardSkipBuildGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoi
return TSDB_CODE_SUCCESS;
}
static bool mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
*newBlock = false;
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
@ -2516,7 +2527,10 @@ static bool mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
if (buildGot && pJoin->build->newBlk) {
if (NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit);
if (NULL == pCtx->cache.outBlk) {
MJ_ERR_RET(terrno);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
}
MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk);
@ -2537,6 +2551,7 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
bool newBlock = false;
blockDataCleanup(pCtx->finBlk);
@ -2549,7 +2564,8 @@ SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
}
do {
if (!mAsofForwardRetrieve(pOperator, pJoin, pCtx)) {
MJ_ERR_JRET(mAsofForwardRetrieve(pOperator, pJoin, pCtx, &newBlock));
if (!newBlock) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
@ -2643,7 +2659,7 @@ static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpR
if (pGrp->blk == pCtx->cache.outBlk) {
blockDataCleanup(pGrp->blk);
} else if (pGrp->clonedBlk) {
blockDataDestroy(pGrp->blk);
(void)blockDataDestroy(pGrp->blk);
}
taosArrayPopFrontBatch(pCtx->cache.grps, 1);
@ -2668,20 +2684,26 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
pGrp->readIdx = 0;
}
if (NULL == pGrp->blk) {
MJ_ERR_RET(terrno);
}
pGrp->clonedBlk = true;
}
return TSDB_CODE_SUCCESS;
}
static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
*newBlock = false;
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
if (NULL == pJoin->build->blk) {
mWinJoinCloneCacheBlk(pCtx);
MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
}
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
@ -2692,7 +2714,7 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin
mJoinSetDone(pOperator);
}
return false;
return TSDB_CODE_SUCCESS;
}
if (buildGot && pCtx->forwardRowsAcq) {
@ -2709,8 +2731,9 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin
} while (true);
pCtx->probeGrp.blk = pJoin->probe->blk;
*newBlock = true;
return true;
return TSDB_CODE_SUCCESS;
}
int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
@ -3220,6 +3243,7 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
SColumnInfoData* pProbeCol = NULL;
bool newBlock = false;
blockDataCleanup(pCtx->finBlk);
@ -3232,7 +3256,8 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
}
do {
if (!mWinJoinRetrieve(pOperator, pJoin, pCtx)) {
MJ_ERR_JRET(mWinJoinRetrieve(pOperator, pJoin, pCtx, &newBlock));
if (!newBlock) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
@ -3367,7 +3392,7 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
}
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode));
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
@ -3407,11 +3432,14 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode));
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false);
blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity);
if (NULL == pCtx->midBlk) {
MJ_ERR_RET(terrno);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity));
}
pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;

View File

@ -1147,6 +1147,7 @@ void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
SMJoinGrpRows* pGrp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
return TSDB_CODE_SUCCESS;
@ -1207,7 +1208,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
pGrp->blk = createOneDataBlock(pTable->blk, true);
taosArrayPush(pTable->createdBlks, &pGrp->blk);
if (NULL == pGrp->blk) {
MJ_ERR_JRET(terrno);
}
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
MJ_ERR_JRET(terrno);
}
} else {
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;