fix: return code validation

This commit is contained in:
dapan1121 2024-07-22 11:06:24 +08:00
parent 558d22dff4
commit 45348b15be
4 changed files with 503 additions and 94 deletions

View File

@ -432,6 +432,15 @@ typedef struct SMJoinOperatorInfo {
} \
} while (0)
#define MJ_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin);
@ -459,7 +468,7 @@ int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pT
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable);
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable);
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp);
bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build);
int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build, bool* cont);
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond);
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx);
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp);

View File

@ -149,7 +149,7 @@ void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (NULL == p) {
qError("fail to get %dth col in dataBlock, numOfCols:%d", i, numOfCols);
qError("fail to get %dth col in dataBlock, numOfCols:%d", i, (int32_t)numOfCols);
continue;
}
taosMemoryFreeClear(p->pData);
@ -352,7 +352,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
}
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
if (NULL == pBufInfo) {
qError("fail to get blk %d from pCache->pDirtyBlk", pBufInfo->basic.blkId);
qError("fail to get blk %" PRId64 " from pCache->pDirtyBlk", pBufInfo->basic.blkId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
int32_t code = TSDB_CODE_SUCCESS;
@ -462,7 +462,7 @@ void blockDataDeepClear(SSDataBlock* pDataBlock) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (NULL == p) {
qError("fail to get %d col from pDataBlock, numOfCols:%d", i, numOfCols);
qError("fail to get %d col from pDataBlock, numOfCols:%d", i, (int32_t)numOfCols);
continue;
}
p->pData = NULL;
@ -1239,7 +1239,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
*ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (NULL == *ppSession) {
qError("fail to get session %d from pSessions", pGcParam->sessionId);
qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}

View File

@ -72,9 +72,15 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
if ((!pCtx->seqWinGrp) && 0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
if (NULL == pFirstBuild) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
@ -89,6 +95,9 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
probeGrp->endIdx = probeGrp->readIdx;
for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
@ -134,7 +143,8 @@ static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (build->grpRowIdx >= 0) {
bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
bool contLoop = false;
MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
@ -171,7 +181,8 @@ static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) {
}
build->grpRowIdx = 0;
bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
bool contLoop = false;
MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
if (!contLoop) {
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
@ -193,15 +204,27 @@ static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
if (NULL == pFirstBuild) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
@ -215,6 +238,9 @@ static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
probeGrp->endIdx = probeGrp->readIdx;
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
@ -254,6 +280,10 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
int32_t rowsLeft = pCtx->midBlk->info.capacity;
@ -274,6 +304,10 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (startRowIdx < 0) {
startRowIdx = buildGrp->readIdx;
}
@ -351,13 +385,17 @@ static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop)
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
int32_t startRowIdx = 0;
//blockDataCleanup(pCtx->midBlk);
do {
startRowIdx = build->grpRowIdx;
(void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
if (pCtx->midBlk->info.rows > 0) {
if (build->rowBitmapSize > 0) {
@ -406,6 +444,10 @@ static int32_t mOuterJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
bool contLoop = false;
if (build->grpRowIdx >= 0) {
@ -500,7 +542,15 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
if (NULL == pProbeCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
if (NULL == pBuildCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
buildGot = false;
@ -649,15 +699,27 @@ static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
if (NULL == pFirstBuild) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
@ -671,6 +733,9 @@ static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) {
probeGrp->endIdx = probeGrp->readIdx;
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
@ -711,9 +776,13 @@ static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (build->grpRowIdx >= 0) {
bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
bool contLoop = false;
MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
}
@ -734,7 +803,8 @@ static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) {
if (NULL != pGrp) {
build->pHashCurGrp = *pGrp;
build->grpRowIdx = 0;
bool contLoop = mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
bool contLoop = false;
MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
if (!contLoop) {
if (build->grpRowIdx < 0) {
probeGrp->readIdx++;
@ -772,7 +842,15 @@ static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
if (NULL == pProbeCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
if (NULL == pBuildCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
buildGot = false;
@ -903,6 +981,10 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) {
static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) {
SMJoinGrpRows grp = {0};
SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx);
if (NULL == pPos) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
grp.blk = pPos->pBlk;
grp.readIdx = pPos->pos;
grp.endIdx = pPos->pos;
@ -1042,6 +1124,10 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
grpDone = false;
SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx);
if (NULL == pGrpRows) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pGrpRows->allRowsMatch) {
continue;
}
@ -1283,7 +1369,7 @@ static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p
do {
blockDataCleanup(pCtx->midBlk);
(void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter));
@ -1312,6 +1398,9 @@ static int32_t mSemiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
@ -1343,6 +1432,10 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
size_t bufLen = 0;
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
@ -1358,7 +1451,7 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
build->pHashCurGrp = *(SArray**)pGrp;
ASSERT(1 == taosArrayGetSize(build->pHashCurGrp));
build->grpRowIdx = 0;
(void)mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build);
MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, NULL));
ASSERT(build->grpRowIdx < 0);
}
@ -1372,6 +1465,10 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SMJoinGrpRows* buildGrp = NULL;
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
@ -1387,6 +1484,9 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
blockDataCleanup(pCtx->midBlk);
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
@ -1448,6 +1548,10 @@ static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, 0);
if (NULL == buildGrp || NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
@ -1584,6 +1688,10 @@ static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
@ -1616,7 +1724,7 @@ static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p
do {
blockDataCleanup(pCtx->midBlk);
(void)mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build);
MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
if (pCtx->midBlk->info.rows > 0) {
MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter));
@ -1642,6 +1750,10 @@ static int32_t mAntiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
size_t bufLen = 0;
int32_t probeEndIdx = probeGrp->endIdx;
@ -1682,6 +1794,10 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (NULL == probeGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SMJoinGrpRows* buildGrp = NULL;
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx;
@ -1697,6 +1813,10 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
blockDataCleanup(pCtx->midBlk);
for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
@ -1889,6 +2009,9 @@ int32_t mAsofBackwardAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWin
grp.blk = pTable->blk;
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
return TSDB_CODE_SUCCESS;
@ -2249,6 +2372,10 @@ int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) {
}
SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) {
MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx));
pCtx->pJoin->build->blkRowIdx = 0;
@ -2271,6 +2398,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
int32_t grpNum = taosArrayGetSize(pCache->grps);
if (grpNum >= 1) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pGrp->blk != pCache->outBlk) {
int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0;
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx));
@ -2282,6 +2413,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
} else {
(void)taosArrayPop(pCache->grps);
pGrp = taosArrayGet(pCache->grps, 0);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
ASSERT(pGrp->blk == pCache->outBlk);
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
}
@ -2320,16 +2455,20 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
void mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
int32_t mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
int32_t grpNum = taosArrayGetSize(pCtx->cache.grps);
if (grpNum <= 0) {
return;
return TSDB_CODE_SUCCESS;
}
SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (1 == grpNum) {
pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows - pGrp->beginIdx, pCtx->jLimit) - 1;
return;
return TSDB_CODE_SUCCESS;
}
ASSERT(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx));
@ -2338,7 +2477,13 @@ void mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1);
pGrp = taosArrayGet(pCtx->cache.grps, 1);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1;
return TSDB_CODE_SUCCESS;
}
int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
@ -2347,13 +2492,16 @@ int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
MJ_ERR_RET(mAsofForwardChkFillGrpCache(pCtx));
}
mAsofForwardUpdateBuildGrpEndIdx(pCtx);
MJ_ERR_RET(mAsofForwardUpdateBuildGrpEndIdx(pCtx));
return mWinJoinDumpGrpCache(pCtx);
}
int32_t mAsofForwardSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
*wholeBlk = false;
@ -2513,6 +2661,10 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
if (NULL == pProbeCol || NULL == pBuildCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
MJOIN_POP_TB_BLK(&pCtx->cache);
@ -2674,6 +2826,10 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
}
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGetLast(pGrpArray);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (!pGrp->clonedBlk) {
if (0 == pGrp->beginIdx) {
pGrp->blk = createOneDataBlock(pGrp->blk, true);
@ -2720,6 +2876,10 @@ static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJ
if (buildGot && pCtx->forwardRowsAcq) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
if (NULL == pProbeCol || NULL == pBuildCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
buildGot = false;
@ -2739,6 +2899,10 @@ static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJ
int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
SSDataBlock* pBlk = build->blk;
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pCtx->ascTs) {
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) < pCtx->winBeginTs) {
*winEnd = false;
@ -2759,6 +2923,9 @@ int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache,
if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
@ -2795,7 +2962,10 @@ int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache,
if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) {
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
@ -2864,7 +3034,15 @@ int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) {
int32_t grpNum = taosArrayGetSize(pCache->grps);
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) {
mWinJoinPopFrontGroup(pCtx, pGrp);
grpNum--;
@ -2900,6 +3078,7 @@ int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) {
pCache->grps = pCache->grpsQueue;
pCache->rowNum = 1;
pCache->grpsQueue = NULL;
continue;
}
@ -2916,7 +3095,15 @@ int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) {
int32_t grpNum = taosArrayGetSize(pCache->grps);
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) > pCtx->winEndTs) {
mWinJoinPopFrontGroup(pCtx, pGrp);
@ -2953,6 +3140,7 @@ int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) {
pCache->grps = pCache->grpsQueue;
pCache->rowNum = 1;
pCache->grpsQueue = NULL;
continue;
}
@ -2986,6 +3174,10 @@ void mWinJoinRemoveOverflowGrp(SMJoinWindowCtx* pCtx) {
int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
SSDataBlock* pBlk = build->blk;
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
if (pCtx->ascTs) {
@ -3001,6 +3193,9 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) {
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pBlk->info.rows - 1;
@ -3028,6 +3223,9 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S
}
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = build->blkRowIdx - 1;
@ -3048,6 +3246,9 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) {
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pBlk->info.rows - 1;
@ -3070,7 +3271,10 @@ int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, S
}
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = build->blkRowIdx - 1;
@ -3130,7 +3334,15 @@ int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) {
}
SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) {
pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
if (pCache->rowNum >= pCtx->jLimit) {
@ -3173,7 +3385,15 @@ int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) {
}
SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) >= pCtx->winBeginTs) {
pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
pGrp->endIdx = pGrp->blk->info.rows - 1;
@ -3217,6 +3437,10 @@ int32_t mWinJoinTrimDumpGrpCache(SMJoinWindowCtx* pCtx) {
int32_t buildGrpNum = taosArrayGetSize(cache->grps);
for (int32_t i = 0; i < buildGrpNum && skipRows > 0; ++i) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, i);
if (NULL == buildGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (skipRows >= GRP_REMAIN_ROWS(buildGrp)) {
skipRows -= GRP_REMAIN_ROWS(buildGrp);
mWinJoinPopFrontGroup(pCtx, buildGrp);
@ -3318,7 +3542,7 @@ int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin,
pCache->grps = taosArrayInit(2, sizeof(SMJoinGrpRows));
if (NULL == pCache->grps) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
//taosArrayReserve(pTable->eqGrps, 1);
@ -3392,6 +3616,10 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
}
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
if (NULL == pCtx->finBlk) {
MJ_ERR_RET(terrno);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
@ -3430,6 +3658,10 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
}
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
if (NULL == pCtx->finBlk) {
MJ_ERR_RET(terrno);
}
ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));

View File

@ -30,6 +30,9 @@
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
pGrp->beginIdx = pTable->blkRowIdx;
pGrp->readIdx = pTable->blkRowIdx;
@ -59,12 +62,16 @@ int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBl
}
void mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
int32_t mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
int32_t bmLen = BitmapLen(pBlock->info.rows);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
if (NULL == pDst) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
continue;
@ -86,7 +93,7 @@ void mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
} else {
bool isNull = colDataIsNull_f(pDst->nullbitmap, 0);
memset(pDst->nullbitmap, 0, bmLen);
TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
if (isNull) {
colDataSetNull_f(pDst->nullbitmap, 0);
}
@ -94,11 +101,14 @@ void mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
}
pBlock->info.rows = 1;
return TSDB_CODE_SUCCESS;
}
void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
// int32_t totalRows = pBlock->info.rows;
int32_t code = TSDB_CODE_SUCCESS;
int32_t bmLen = BitmapLen(totalRows);
char* pBitmap = NULL;
int32_t maxRows = 0;
@ -106,6 +116,10 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
if (NULL == pDst) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
continue;
@ -135,8 +149,12 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo
len = varDataTLen(p1);
}
char* p2 = taosMemoryMalloc(len);
memcpy(p2, p1, len);
colDataSetVal(pDst, numOfRows, p2, false);
TAOS_MEMCPY(p2, p1, len);
code = colDataSetVal(pDst, numOfRows, p2, false);
if (code) {
taosMemoryFreeClear(p2);
MJ_ERR_RET(terrno);
}
taosMemoryFree(p2);
}
numOfRows += 1;
@ -150,10 +168,13 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo
} else {
if (pBitmap == NULL) {
pBitmap = taosMemoryCalloc(1, bmLen);
if (NULL == pBitmap) {
MJ_ERR_RET(terrno);
}
}
memcpy(pBitmap, pDst->nullbitmap, bmLen);
memset(pDst->nullbitmap, 0, bmLen);
TAOS_MEMCPY(pBitmap, pDst->nullbitmap, bmLen);
TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
int32_t j = 0;
@ -243,6 +264,8 @@ void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBo
if (pBitmap != NULL) {
taosMemoryFree(pBitmap);
}
return TSDB_CODE_SUCCESS;
}
@ -290,8 +313,10 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
code = TSDB_CODE_SUCCESS;
_err:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
@ -300,18 +325,19 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
goto _return;
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
goto _return;
}
int32_t rowNum = 0;
@ -320,6 +346,9 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) {
for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i);
if (NULL == buildGrp) {
MJ_ERR_JRET(terrno);
}
if (buildGrp->allRowsMatch) {
rowNum += buildGrp->endIdx - startRowIdx + 1;
continue;
@ -350,9 +379,11 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
code = TSDB_CODE_SUCCESS;
_err:
_return:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
@ -366,29 +397,31 @@ int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInf
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
goto _return;
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
goto _return;
}
if (status == FILTER_RESULT_ALL_QUALIFIED) {
pBlock->info.rows = 1;
mJoinTrimKeepFirstRow(pBlock);
MJ_ERR_JRET(mJoinTrimKeepFirstRow(pBlock));
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData);
MJ_ERR_JRET(mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData));
}
code = TSDB_CODE_SUCCESS;
_err:
_return:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
@ -418,8 +451,10 @@ int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo)
code = TSDB_CODE_SUCCESS;
_err:
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
@ -480,12 +515,20 @@ int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows);
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows));
}
for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
if (NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
colDataSetNItemsNull(pOutCol, currRows, firstRows);
}
@ -536,6 +579,10 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
@ -543,7 +590,7 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows));
uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes);
ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes);
colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true);
MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true));
}
}
}
@ -552,28 +599,40 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
for (int32_t r = 0; r < firstRows; ++r) {
colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows);
MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows));
}
}
pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
return TSDB_CODE_SUCCESS;
}
bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) {
int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build, bool* cont) {
if (NULL != cont) {
*cont = false;
}
int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
if (rowsLeft <= 0) {
return false;
return TSDB_CODE_SUCCESS;
}
int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
int32_t grpRows = buildGrpRows - build->grpRowIdx;
if (grpRows <= 0 || build->grpRowIdx < 0) {
build->grpRowIdx = -1;
return true;
if (NULL != cont) {
*cont = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t actRows = TMIN(grpRows, rowsLeft);
@ -583,10 +642,14 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S
SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows);
} else {
colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true);
MJ_ERR_RET(colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true));
}
}
@ -595,8 +658,16 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
if (NULL == pRow) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1);
if (NULL == pInCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1));
}
}
@ -609,16 +680,24 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, S
}
if (actRows == rowsLeft) {
return false;
return TSDB_CODE_SUCCESS;
}
return true;
if (NULL != cont) {
*cont = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) {
int32_t grpNum = taosArrayGetSize(pTb->eqGrps);
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i);
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset));
pGrp->rowMatchNum = 0;
}
@ -632,9 +711,9 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB
pCtx->lastEqGrp = true;
mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true);
MJ_ERR_RET(mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true));
if (!lastBuildGrp) {
mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp);
MJ_ERR_RET(mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp));
} else {
pJoin->build->grpIdx = 0;
}
@ -661,7 +740,7 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB
pCtx->hashJoin = false;
if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) {
mJoinAllocGrpRowBitmap(pJoin->build);
MJ_ERR_RET(mJoinAllocGrpRowBitmap(pJoin->build));
}
return (*pCtx->mergeCartFp)(pCtx);
@ -721,7 +800,7 @@ int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDow
*newDownstreams = true;
*pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream);
if (NULL == *pDownstream) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
*numOfDownstream = 2;
}
@ -732,7 +811,7 @@ int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDow
static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) {
pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
if (NULL == pTable->primCol) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pTable->primCol->srcSlot = slotId;
@ -745,7 +824,7 @@ static int32_t mJoinInitColsInfo(int32_t* colNum, int64_t* rowSize, SMJoinColInf
*pCols = taosMemoryMalloc((*colNum) * sizeof(SMJoinColInfo));
if (NULL == *pCols) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
*rowSize = 0;
@ -779,7 +858,7 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bo
pTable->keyBuf = taosMemoryMalloc(TMAX(rowSize, pTable->keyNullSize));
if (NULL == pTable->keyBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
}
@ -790,7 +869,7 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bo
static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap));
if (NULL == pTable->finCols) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int32_t i = 0;
@ -837,8 +916,20 @@ static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoi
}
SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL;
if (NULL == pUnit) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SValueNode* pCurrTz = NULL;
if (5 == pFunc->pParameterList->length){
pCurrTz = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
if (NULL == pCurrTz) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
}
SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
if (NULL == pTimeZone) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
pCtx->truncateUnit = pUnit->typeData;
if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
@ -859,24 +950,32 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType));
MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets));
memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
//taosArrayReserve(pTable->eqGrps, 1);
if (NULL == pTable->eqGrps) {
return terrno;
}
if (E_JOIN_TB_BUILD == pTable->type) {
pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
if (NULL == pTable->createdBlks) {
return terrno;
}
pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES);
if (NULL == pTable->pGrpArrays) {
return terrno;
}
pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pTable->createdBlks || NULL == pTable->pGrpArrays || NULL == pTable->pGrpHash) {
return TSDB_CODE_OUT_OF_MEMORY;
if (NULL == pTable->pGrpHash) {
return terrno;
}
if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) {
pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE;
pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize);
if (NULL == pTable->pRowBitmap) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
}
@ -945,7 +1044,15 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
if (NULL == pPrimIn) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pPrimOut) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (0 != pCtx->timezoneUnit) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
@ -961,6 +1068,7 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
SSDataBlock* pTmp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t dsIdx = pTable->downStreamIdx;
if (E_JOIN_TB_PROBE == pTable->type) {
if (pTable->remainInBlk) {
@ -1028,7 +1136,12 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa
_return:
mJoinLaunchPrimExpr(pTmp, pTable);
code = mJoinLaunchPrimExpr(pTmp, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
return pTmp;
}
@ -1041,7 +1154,11 @@ static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SM
if (NULL == pTmp) {
pTable->dsFetchDone = true;
} else {
mJoinLaunchPrimExpr(pTmp, pTable);
int32_t code = mJoinLaunchPrimExpr(pTmp, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
}
return pTmp;
@ -1107,7 +1224,7 @@ bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** pp
static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
int32_t blkNum = taosArrayGetSize(pCreatedBlks);
for (int32_t i = 0; i < blkNum; ++i) {
blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
(void)blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
}
taosArrayClear(pCreatedBlks);
}
@ -1119,12 +1236,12 @@ int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t
int64_t newSize = reqSize * 1.1;
pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize);
if (NULL == pTable->pRowBitmap) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pTable->rowBitmapSize = newSize;
}
memset(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen);
TAOS_MEMSET(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen);
*rowBitmapOffset = pTable->rowBitmapOffset;
pTable->rowBitmapOffset += bitmapLen;
@ -1140,12 +1257,16 @@ void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
taosArrayClear(pTable->eqGrps);
if (pTable->rowBitmapSize > 0) {
pTable->rowBitmapOffset = 1;
memset(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx));
TAOS_MEMSET(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx));
}
}
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
SMJoinGrpRows* pGrp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
@ -1159,7 +1280,10 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
bool keepGrp = true;
pGrp = taosArrayReserve(pTable->eqGrps, 1);
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
pGrp->beginIdx = pTable->blkRowIdx++;
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
@ -1209,10 +1333,10 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
pGrp->blk = createOneDataBlock(pTable->blk, true);
if (NULL == pGrp->blk) {
MJ_ERR_JRET(terrno);
MJ_ERR_RET(terrno);
}
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
MJ_ERR_JRET(terrno);
MJ_ERR_RET(terrno);
}
} else {
if (!pTable->multiEqGrpRows) {
@ -1236,10 +1360,16 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
pTable->eqRowNum += rowNum;
pGrp->blk = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum);
if (NULL == pGrp->blk) {
MJ_ERR_RET(terrno);
}
pGrp->endIdx -= pGrp->beginIdx;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
taosArrayPush(pTable->createdBlks, &pGrp->blk);
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
MJ_ERR_RET(terrno);
}
}
}
@ -1248,19 +1378,19 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
_return:
if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
taosArrayPop(pTable->eqGrps);
(void)taosArrayPop(pTable->eqGrps);
} else {
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) {
bool wholeBlk = false;
mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true);
MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true));
while (wholeBlk && !pTable->dsFetchDone) {
pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable);
@ -1273,7 +1403,7 @@ int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable
}
wholeBlk = false;
mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false);
MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false));
}
return TSDB_CODE_SUCCESS;
@ -1282,6 +1412,10 @@ int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata);
return TSDB_CODE_INVALID_PARA;
@ -1326,15 +1460,15 @@ bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *p
}
if (pTable->keyCols[0].jsonData) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData));
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData));
bufLen += getJsonValueLen(pData);
} else if (pTable->keyCols[i].vardata) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
bufLen += varDataTLen(pData);
} else {
pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
bufLen += pTable->keyCols[i].bytes;
}
}
@ -1352,15 +1486,20 @@ static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes)
do {
if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
*ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
if (NULL == *ppRes) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
taosArrayClear(*ppRes);
return TSDB_CODE_SUCCESS;
}
SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos));
if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
if (NULL == taosArrayPush(pTable->pGrpArrays, &pNew)) {
return terrno;
}
taosArrayPush(pTable->pGrpArrays, &pNew);
} while (true);
return TSDB_CODE_SUCCESS;
@ -1374,10 +1513,14 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat
SArray* pNewGrp = NULL;
MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp));
taosArrayPush(pNewGrp, &pos);
tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES);
if (NULL == taosArrayPush(pNewGrp, &pos)) {
return terrno;
}
MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES));
} else if (pBuild->multiRowsGrp) {
taosArrayPush(*pGrpRows, &pos);
if (NULL == taosArrayPush(*pGrpRows, &pos)) {
return terrno;
}
}
return TSDB_CODE_SUCCESS;
@ -1392,10 +1535,14 @@ static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, S
SMJoinHashGrpRows pNewGrp = {0};
MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows));
taosArrayPush(pNewGrp.pRows, &pos);
tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp));
if (NULL == taosArrayPush(pNewGrp.pRows, &pos)) {
return terrno;
}
MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)));
} else {
taosArrayPush(pGrpRows->pRows, &pos);
if (NULL == taosArrayPush(pGrpRows->pRows, &pos)) {
return terrno;
}
}
return TSDB_CODE_SUCCESS;
@ -1412,6 +1559,9 @@ int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pT
int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
for (int32_t g = 0; g < grpNum; ++g) {
SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
if (NULL == pGrp) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
@ -1440,6 +1590,10 @@ int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable
int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
for (int32_t g = 0; g < grpNum; ++g) {
SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
if (NULL == pGrp) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
@ -1496,8 +1650,11 @@ void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) {
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
if (NULL == pGrp) {
continue;
}
if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) {
blockDataDestroy(pGrp->blk);
(void)blockDataDestroy(pGrp->blk);
}
}
@ -1539,6 +1696,7 @@ void mJoinResetOperator(struct SOperatorInfo* pOperator) {
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator->status == OP_EXEC_DONE) {
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
@ -1567,7 +1725,11 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
pBlock->info.id.blockId = pJoin->outBlkId;
if (pJoin->pFinFilter != NULL) {
doFilter(pBlock, pJoin->pFinFilter, NULL);
code = doFilter(pBlock, pJoin->pFinFilter, NULL);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
}
}
if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
@ -1703,13 +1865,17 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
bool newDownstreams = false;
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
if (pInfo == NULL) {
code = terrno;
goto _return;
}
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL) {
code = terrno;
goto _return;
}
@ -1722,8 +1888,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode));
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams);
MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams));
MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams));
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
MJ_ERR_JRET(mJoinSetImplFp(pInfo));
@ -1742,6 +1908,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
return pOperator;
_return:
if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo);
}
@ -1751,6 +1918,7 @@ _return:
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
}