fix: compile errors

This commit is contained in:
dapan1121 2023-12-14 14:13:01 +08:00
parent 7171d38415
commit 6d24c08651
9 changed files with 364 additions and 269 deletions

View File

@ -190,12 +190,14 @@ int32_t getJsonValueLen(const char* data);
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows);
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows, bool isNull); uint32_t numOfRows, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2); const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
const SDataBlockInfo* pBlockInfo); const SDataBlockInfo* pBlockInfo);
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);

View File

@ -484,6 +484,7 @@ typedef struct SSortMergeJoinPhysiNode {
SNode* pColOnCond; SNode* pColOnCond;
SNode* pFullOnCond; SNode* pFullOnCond;
SNodeList* pTargets; SNodeList* pTargets;
SQueryStat inputStat[2];
} SSortMergeJoinPhysiNode; } SSortMergeJoinPhysiNode;
typedef struct SHashJoinPhysiNode { typedef struct SHashJoinPhysiNode {

View File

@ -540,7 +540,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
} else { } else {
if (pSrc->hasNull) { if (pSrc->hasNull) {
if (0 == BitPos(dstIdx) && 0 == BitPos(srcIdx)) { if (0 == BitPos(dstIdx) && 0 == BitPos(srcIdx)) {
memcpy(BMCharPos(pDst->nullbitmap, dstIdx), BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows)); memcpy(&BMCharPos(pDst->nullbitmap, dstIdx), &BMCharPos(pSrc->nullbitmap, srcIdx), BitmapLen(numOfRows));
if (!pDst->hasNull) { if (!pDst->hasNull) {
int32_t nullBytes = BitmapLen(numOfRows); int32_t nullBytes = BitmapLen(numOfRows);
int32_t startPos = CharPos(dstIdx); int32_t startPos = CharPos(dstIdx);
@ -554,7 +554,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
} else if (BitPos(dstIdx) == BitPos(srcIdx)) { } else if (BitPos(dstIdx) == BitPos(srcIdx)) {
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
if (0 == BitPos(dstIdx)) { if (0 == BitPos(dstIdx)) {
memcpy(BMCharPos(pDst->nullbitmap, dstIdx + i), BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i)); memcpy(&BMCharPos(pDst->nullbitmap, dstIdx + i), &BMCharPos(pSrc->nullbitmap, srcIdx + i), BitmapLen(numOfRows - i));
if (!pDst->hasNull) { if (!pDst->hasNull) {
int32_t nullBytes = BitmapLen(numOfRows - i); int32_t nullBytes = BitmapLen(numOfRows - i);
int32_t startPos = CharPos(dstIdx + i); int32_t startPos = CharPos(dstIdx + i);
@ -586,7 +586,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
} }
} }
} else { } else {
memset(BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows)); memset(&BMCharPos(pDst->nullbitmap, dstIdx), 0, BitmapLen(numOfRows));
} }
if (pSrc->pData != NULL) { if (pSrc->pData != NULL) {

View File

@ -23,6 +23,8 @@ extern "C" {
#define MJOIN_HJOIN_CART_THRESHOLD 16 #define MJOIN_HJOIN_CART_THRESHOLD 16
#define MJOIN_BLK_SIZE_LIMIT 10485760 #define MJOIN_BLK_SIZE_LIMIT 10485760
struct SMJoinOperatorInfo;
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*); typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
typedef enum EJoinTableType { typedef enum EJoinTableType {
@ -55,7 +57,7 @@ typedef struct SMJoinColInfo {
} SMJoinColInfo; } SMJoinColInfo;
typedef struct SMJoinTableInfo { typedef struct SMJoinTableCtx {
EJoinTableType type; EJoinTableType type;
int32_t downStreamIdx; int32_t downStreamIdx;
SOperatorInfo* downStream; SOperatorInfo* downStream;
@ -87,19 +89,22 @@ typedef struct SMJoinTableInfo {
SSDataBlock* blk; SSDataBlock* blk;
int32_t blkRowIdx; int32_t blkRowIdx;
// merge join
int64_t grpTotalRows; int64_t grpTotalRows;
int32_t grpIdx; int32_t grpIdx;
SArray* eqGrps; SArray* eqGrps;
SArray* createdBlks; SArray* createdBlks;
// hash join
int32_t grpArrayIdx; int32_t grpArrayIdx;
SArray* pGrpArrays; SArray* pGrpArrays;
// hash join
int32_t grpRowIdx; int32_t grpRowIdx;
SArray* pHashCurGrp; SArray* pHashCurGrp;
SSHashObj* pGrpHash; SSHashObj* pGrpHash;
} SMJoinTableInfo; } SMJoinTableCtx;
typedef struct SMJoinGrpRows { typedef struct SMJoinGrpRows {
SSDataBlock* blk; SSDataBlock* blk;
@ -110,6 +115,7 @@ typedef struct SMJoinGrpRows {
} SMJoinGrpRows; } SMJoinGrpRows;
typedef struct SMJoinMergeCtx { typedef struct SMJoinMergeCtx {
struct SMJoinOperatorInfo* pJoin;
bool hashCan; bool hashCan;
bool keepOrder; bool keepOrder;
bool grpRemains; bool grpRemains;
@ -121,7 +127,6 @@ typedef struct SMJoinMergeCtx {
int64_t lastEqTs; int64_t lastEqTs;
SMJoinGrpRows probeNEqGrp; SMJoinGrpRows probeNEqGrp;
bool hashJoin; bool hashJoin;
SMJoinOperatorInfo* pJoin;
} SMJoinMergeCtx; } SMJoinMergeCtx;
typedef struct SMJoinWinCtx { typedef struct SMJoinWinCtx {
@ -161,11 +166,9 @@ typedef struct SMJoinOperatorInfo {
int32_t subType; int32_t subType;
int32_t inputTsOrder; int32_t inputTsOrder;
int32_t errCode; int32_t errCode;
SMJoinTableInfo tbs[2]; SMJoinTableCtx tbs[2];
SMJoinTableInfo* build; SMJoinTableCtx* build;
SMJoinTableInfo* probe; SMJoinTableCtx* probe;
int32_t pResColNum;
int8_t* pResColMap;
SFilterInfo* pFPreFilter; SFilterInfo* pFPreFilter;
SFilterInfo* pPreFilter; SFilterInfo* pPreFilter;
SFilterInfo* pFinFilter; SFilterInfo* pFinFilter;

View File

@ -32,8 +32,6 @@ SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo**
if (p) { if (p) {
p[0] = pDownstream[0]; p[0] = pDownstream[0];
p[1] = pDownstream[0]; p[1] = pDownstream[0];
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1);
} }
return p; return p;
@ -47,15 +45,12 @@ int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDown
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
*numOfDownstream = 2; *numOfDownstream = 2;
} else {
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mJoinInitPrimKeyInfo(SMJoinTableInfo* pTable, int32_t slotId) { static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) {
pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo)); pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
if (NULL == pTable->primCol) { if (NULL == pTable->primCol) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -66,71 +61,7 @@ static int32_t mJoinInitPrimKeyInfo(SMJoinTableInfo* pTable, int32_t slotId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void mJoinGetValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
*colNum = 0;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
STargetNode* pTarget = (STargetNode*)pNode;
SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
if (pCol->dataBlockId == blkId) {
(*colNum)++;
}
}
}
static int32_t mJoinInitValColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) {
mJoinGetValColNum(pList, pTable->blkId, &pTable->valNum);
if (pTable->valNum == 0) {
return TSDB_CODE_SUCCESS;
}
pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SMJoinColInfo));
if (NULL == pTable->valCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t i = 0;
int32_t colNum = 0;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
STargetNode* pTarget = (STargetNode*)pNode;
SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
if (pColNode->dataBlockId == pTable->blkId) {
if (valColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) {
pTable->valCols[i].keyCol = true;
} else {
pTable->valCols[i].keyCol = false;
pTable->valCols[i].srcSlot = pColNode->slotId;
pTable->valColExist = true;
colNum++;
}
pTable->valCols[i].dstSlot = pTarget->slotId;
pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
if (pTable->valCols[i].vardata) {
if (NULL == pTable->valVarCols) {
pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t));
if (NULL == pTable->valVarCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pTable->valVarCols, &i);
}
pTable->valCols[i].bytes = pColNode->node.resType.bytes;
if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) {
pTable->valBufSize += pColNode->node.resType.bytes;
}
i++;
}
}
pTable->valBitMapSize = BitmapLen(colNum);
pTable->valBufSize += pTable->valBitMapSize;
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitKeyColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) {
pTable->keyNum = LIST_LENGTH(pList); pTable->keyNum = LIST_LENGTH(pList);
pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SMJoinColInfo)); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SMJoinColInfo));
@ -161,7 +92,7 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) {
} }
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
SMJoinTableInfo* pTable = &pJoin->tbs[idx]; SMJoinTableCtx* pTable = &pJoin->tbs[idx];
pTable->downStream = pDownstream[idx]; pTable->downStream = pDownstream[idx];
pTable->blkId = pDownstream[idx]->resultDataBlockId; pTable->blkId = pDownstream[idx]->resultDataBlockId;
int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId); int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId);
@ -172,11 +103,12 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
if (code) { if (code) {
return code; return code;
} }
/*
code = mJoinInitValColsInfo(pTable, pJoinNode->pTargets); code = mJoinInitValColsInfo(pTable, pJoinNode->pTargets);
if (code) { if (code) {
return code; return code;
} }
*/
memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows)); pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
@ -232,28 +164,6 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
pInfo->probe->type = E_JOIN_TB_PROBE; pInfo->probe->type = E_JOIN_TB_PROBE;
} }
static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
pInfo->pResColNum = pJoinNode->pTargets->length;
pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t));
if (NULL == pInfo->pResColMap) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode = NULL;
int32_t i = 0;
FOREACH(pNode, pJoinNode->pTargets) {
STargetNode* pTarget = (STargetNode*)pNode;
SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
if (pCol->dataBlockId == pInfo->build->blkId) {
pInfo->pResColMap[i] = 1;
}
i++;
}
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
@ -261,7 +171,7 @@ static int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiN
pCtx->hashCan = pJoin->probe->keyNum > 0; pCtx->hashCan = pJoin->probe->keyNum > 0;
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc.totalRowSize)); blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize));
if (pJoin->pFPreFilter) { if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false);
@ -297,14 +207,14 @@ static void mJoinSetDone(SOperatorInfo* pOperator) {
} }
static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) { static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp) {
SMJoinTableInfo* probe = pJoin->probe; SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build; SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0; int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pGrp); int32_t firstRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t c = 0; c < probe->finNum; ++c) { for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c; SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pGrp->blk, pFirstCol->srcSlot); SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows); colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows);
} }
@ -320,15 +230,15 @@ static int32_t mLeftJoinGrpNonEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRe
} }
static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) { static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) {
SMJoinTableInfo* probe = pJoin->probe; SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableInfo* build = pJoin->build; SMJoinTableCtx* build = pJoin->build;
int32_t currRows = append ? pRes->info.rows : 0; int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst); int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond); int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
for (int32_t c = 0; c < probe->finNum; ++c) { for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c; SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk, pFirstCol->srcSlot); SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) { for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) { if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
@ -341,7 +251,7 @@ static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes,
for (int32_t c = 0; c < build->finNum; ++c) { for (int32_t c = 0; c < build->finNum; ++c) {
SMJoinColMap* pSecondCol = build->finCols + c; SMJoinColMap* pSecondCol = build->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk, pSecondCol->srcSlot); SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot); SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < firstRows; ++r) { for (int32_t r = 0; r < firstRows; ++r) {
colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows); colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows);
@ -355,8 +265,8 @@ static int32_t mLeftJoinGrpEqCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes,
static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) { static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableInfo* probe = pCtx->pJoin->probe; SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
@ -407,7 +317,7 @@ static int32_t mLeftJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) { static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
SSDataBlock* pLess = NULL; SSDataBlock* pLess = NULL;
SSDataBlock* pMore = NULL; SSDataBlock* pMore = NULL;
if ((*ppMid)->info.rows < ppFin->info.rows) { if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid); pLess = (*ppMid);
pMore = (*ppFin); pMore = (*ppFin);
} else { } else {
@ -418,7 +328,7 @@ static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMi
int32_t totalRows = pMore->info.rows + pLess->info.rows; int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) { if (totalRows <= pMore->info.capacity) {
MJ_ERR_RET(blockDataMerge(pMore, pLess)); MJ_ERR_RET(blockDataMerge(pMore, pLess));
tDataBlkReset(pLess); blockDataReset(pLess);
pCtx->midRemains = false; pCtx->midRemains = false;
} else { } else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows; int32_t copyRows = pMore->info.capacity - pMore->info.rows;
@ -434,8 +344,8 @@ static int32_t mLeftJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMi
} }
static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) { static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableInfo* probe = pCtx->pJoin->probe; SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
int32_t buildGrpNum = taosArrayGetSize(build->eqGrps); int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
int32_t probeEndIdx = probeGrp->endIdx; int32_t probeEndIdx = probeGrp->endIdx;
@ -498,7 +408,7 @@ static int32_t mLeftJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
} }
} }
if (GRP_DONE(probeGrp->readIdx) || BLK_IS_FULL(pCtx->finBlk)) { if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
break; break;
} }
@ -541,20 +451,20 @@ static int32_t mLeftJoinNonEqCart(SMJoinMergeCtx* pCtx) {
static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableInfo* ppTb) { static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
if ((*ppTb)->dsFetchDone) { if (pTb->dsFetchDone) {
return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true; return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true;
} }
if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) { if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) {
(*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, (*ppTb)->downStreamIdx); (*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTb->downStreamIdx);
(*ppTb)->dsInitDone = true; pTb->dsInitDone = true;
qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(ppTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0); qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
*pIdx = 0; *pIdx = 0;
if (NULL == (*ppBlk)) { if (NULL == (*ppBlk)) {
(*ppTb)->dsFetchDone = true; pTb->dsFetchDone = true;
} }
return ((*ppBlk) == NULL) ? false : true; return ((*ppBlk) == NULL) ? false : true;
@ -565,12 +475,12 @@ static bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBl
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, &pJoin->probe); bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false; bool buildGot = false;
do { do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, &pJoin->build); buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
} }
if (!probeGot) { if (!probeGot) {
@ -592,50 +502,22 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
return true; return true;
} }
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { int32_t blkNum = taosArrayGetSize(pCreatedBlks);
int32_t numRows = pBlock->info.rows; for (int32_t i = 0; i < blkNum; ++i) {
ASSERT(startPos < numRows); blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
int32_t i = startPos;
for (; i < numRows; ++i) {
char* pNextVal = colDataGetData(pCol, i);
if (timestamp != *(int64_t*)pNextVal) {
break;
}
} }
int32_t endPos = i; taosArrayClear(pCreatedBlks);
*pEndPos = endPos;
if (endPos - startPos == 0) {
return 0;
}
SSDataBlock* block = pBlock;
bool createdNewBlock = false;
if (endPos == numRows) {
block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
taosArrayPush(createdBlocks, &block);
createdNewBlock = true;
}
SRowLocation location = {0};
for (int32_t j = startPos; j < endPos; ++j) {
location.pDataBlock = block;
location.pos = (createdNewBlock ? j - startPos : j);
taosArrayPush(rowLocations, &location);
}
return 0;
} }
static void mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
SMJoinGrpRows* pGrp = NULL; SMJoinGrpRows* pGrp = NULL;
if (restart) { if (restart) {
pTable->grpTotalRows = 0; pTable->grpTotalRows = 0;
pTable->grpIdx = 0; pTable->grpIdx = 0;
mJoinDestroyCreatedBlks(pTable->createdBlks);
pGrp = taosArrayGet(pTable->eqGrps, 0); pGrp = taosArrayGet(pTable->eqGrps, 0);
} else { } else {
pGrp = taosArrayReserve(pTable->eqGrps, 1); pGrp = taosArrayReserve(pTable->eqGrps, 1);
@ -675,7 +557,7 @@ static void mJoinBuildEqGroups(SOperatorInfo* pOperator, SMJoinTableInfo* pTable
} }
static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableInfo* pTable, int64_t timestamp) { static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp) {
bool wholeBlk = false; bool wholeBlk = false;
mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true); mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true);
@ -698,7 +580,7 @@ static int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable) { static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
for (int32_t i = 0; i < pTable->keyNum; ++i) { for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
@ -719,7 +601,7 @@ static int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableInfo* pTable)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE true mJoinCopyKeyColsDataToBuf(SMJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { static FORCE_INLINE bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL; char *pData = NULL;
size_t bufLen = 0; size_t bufLen = 0;
@ -760,7 +642,7 @@ static FORCE_INLINE true mJoinCopyKeyColsDataToBuf(SMJoinTableInfo* pTable, int3
return false; return false;
} }
static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes) { static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) {
do { do {
if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) { if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
*ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++); *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
@ -779,7 +661,7 @@ static int32_t mJoinGetAvailableGrpArray(SMJoinTableInfo* pTable, SArray** ppRes
} }
static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) { static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
SMJoinTableInfo* pBuild = pJoin->build; SMJoinTableCtx* pBuild = pJoin->build;
SMJoinRowPos pos = {pBlock, rowIdx}; SMJoinRowPos pos = {pBlock, rowIdx};
SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen); SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
if (!pGrpRows) { if (!pGrpRows) {
@ -796,7 +678,7 @@ static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDat
} }
static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo* pTable) { static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
size_t bufLen = 0; size_t bufLen = 0;
tSimpleHashClear(pJoin->build->pGrpHash); tSimpleHashClear(pJoin->build->pGrpHash);
@ -820,7 +702,7 @@ static int32_t mJoinMakeBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableInfo* probe, SMJoinTableInfo* build) { static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build) {
int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity; int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
if (rowsLeft <= 0) { if (rowsLeft <= 0) {
return false; return false;
@ -837,7 +719,7 @@ static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, boo
for (int32_t c = 0; c < probe->finNum; ++c) { for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c; SMJoinColMap* pFirstCol = probe->finCols + c;
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk, pFirstCol->srcSlot); SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot); SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) { if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows); colDataSetNItemsNull(pOutCol, currRows, actRows);
@ -851,7 +733,7 @@ static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, boo
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot); SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) { for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r); SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, r);
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk, pSecondCol->srcSlot); SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1); colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1);
} }
} }
@ -872,8 +754,8 @@ static bool mLeftJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, boo
} }
static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) { static int32_t mLeftJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableInfo* probe = pCtx->pJoin->probe; SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
if (build->grpRowIdx >= 0) { if (build->grpRowIdx >= 0) {
@ -916,8 +798,8 @@ _return:
} }
static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) { static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) {
SMJoinTableInfo* probe = pCtx->pJoin->probe; SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx); SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
blockDataReset(pCtx->midBlk); blockDataReset(pCtx->midBlk);
@ -970,8 +852,8 @@ static int32_t mLeftJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop)
static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) { static int32_t mLeftJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
SMJoinTableInfo* probe = pCtx->pJoin->probe; SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableInfo* build = pCtx->pJoin->build; SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0); SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
bool contLoop = false; bool contLoop = false;
@ -1039,14 +921,14 @@ static int32_t mLeftJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp,
pCtx->hashJoin = true; pCtx->hashJoin = true;
return mLeftJoinHashCart(pJoin, pCtx); return mLeftJoinHashCart(pCtx);
} }
return mLeftJoinMergeCart(pJoin, pCtx); return mLeftJoinMergeCart(pCtx);
} }
static bool mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) { static bool mLeftJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
ASSERT(0 < pCtx->midBlk.info.rows); ASSERT(0 < pCtx->midBlk->info.rows);
TSWAP(pCtx->midBlk, pCtx->finBlk); TSWAP(pCtx->midBlk, pCtx->finBlk);
@ -1161,16 +1043,46 @@ _return:
return pCtx->finBlk; return pCtx->finBlk;
} }
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
pCtx->dsInitDone = false;
pCtx->dsFetchDone = false;
mJoinDestroyCreatedBlks(pCtx->createdBlks);
tSimpleHashClear(pCtx->pGrpHash);
}
void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
pCtx->grpRemains = false;
pCtx->midRemains = false;
pCtx->lastEqGrp = false;
pCtx->lastEqTs = INT64_MIN;
pCtx->hashJoin = false;
}
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {
mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
}
void mJoinResetOperator(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
mJoinResetTableCtx(pJoin->build);
mJoinResetTableCtx(pJoin->probe);
mJoinResetCtx(pJoin);
pOperator->status = OP_OPENED;
}
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) { SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinOperatorInfo* pJoin = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoin->resRows); qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
return NULL; return NULL;
} else { } else {
resetMergeJoinOperator(pOperator); mJoinResetOperator(pOperator);
qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo)); qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
} }
} }
@ -1207,6 +1119,13 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
} }
void destroyMergeJoinOperator(void* param) {
SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param;
taosMemoryFreeClear(param);
}
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo)); SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
@ -1216,14 +1135,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) { if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _return;
} }
pInfo->pOperator = pOperator; pInfo->pOperator = pOperator;
code = mJoinInitDownstreamInfo(pInfo, pDownstream, numOfDownstream, newDownstreams); MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, pDownstream, &numOfDownstream, &newDownstreams));
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -1231,39 +1147,19 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode)); MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
code = mJoinBuildResColMap(pInfo, pJoinNode);
if (code) {
goto _error;
}
code = initHJoinBufPages(pInfo);
if (code) {
goto _error;
}
if (pJoinNode->pFullOnCond != NULL) { if (pJoinNode->pFullOnCond != NULL) {
code = filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0); MJ_ERR_JRET(filterInitFromNode(pJoinNode->pFullOnCond, &pInfo->pFPreFilter, 0));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
if (pJoinNode->pColOnCond != NULL) { if (pJoinNode->pColOnCond != NULL) {
code = filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0); MJ_ERR_JRET(filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
if (pJoinNode->node.pConditions != NULL) { if (pJoinNode->node.pConditions != NULL) {
code = filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0); MJ_ERR_JRET(filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
if (pJoinNode->node.inputTsOrder == ORDER_ASC) { if (pJoinNode->node.inputTsOrder == ORDER_ASC) {
@ -1276,10 +1172,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream); MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (newDownstreams) { if (newDownstreams) {
taosMemoryFree(pDownstream); taosMemoryFree(pDownstream);
pOperator->numOfRealDownstream = 1; pOperator->numOfRealDownstream = 1;
@ -1289,7 +1183,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
return pOperator; return pOperator;
_error: _return:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo); destroyMergeJoinOperator(pInfo);
} }
@ -1302,24 +1196,3 @@ _error:
return NULL; return NULL;
} }
void destroyMergeJoinOperator(void* param) {
SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param;
if (pJoinOperator->pColEqualOnConditions != NULL) {
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->rightEqOnCondCols);
taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf);
taosArrayDestroy(pJoinOperator->leftEqOnCondCols);
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(param);
}

View File

@ -2072,15 +2072,24 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
} }
static const char* jkJoinPhysiPlanJoinType = "JoinType"; static const char* jkJoinPhysiPlanJoinType = "JoinType";
static const char* jkJoinPhysiPlanSubType = "SubType";
static const char* jkJoinPhysiPlanWinOffset = "WindowOffset";
static const char* jkJoinPhysiPlanJoinLimit = "JoinLimit";
static const char* jkJoinPhysiPlanLeftPrimSlotId = "LeftPrimSlotId";
static const char* jkJoinPhysiPlanRightPrimSlotId = "RightPrimSlotId";
static const char* jkJoinPhysiPlanLeftEqCols = "LeftEqCols";
static const char* jkJoinPhysiPlanRightEqCols = "RightEqCols";
static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkJoinPhysiPlanOnLeftCols = "OnLeftColumns"; static const char* jkJoinPhysiPlanOnLeftCols = "OnLeftColumns";
static const char* jkJoinPhysiPlanOnRightCols = "OnRightColumns"; static const char* jkJoinPhysiPlanOnRightCols = "OnRightColumns";
static const char* jkJoinPhysiPlanPrimKeyCondition = "PrimKeyCondition"; static const char* jkJoinPhysiPlanPrimKeyCondition = "PrimKeyCondition";
static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static const char* jkJoinPhysiPlanTargets = "Targets"; static const char* jkJoinPhysiPlanTargets = "Targets";
static const char* jkJoinPhysiPlanColEqualOnConditions = "ColumnEqualOnConditions"; static const char* jkJoinPhysiPlanColOnConditions = "ColumnOnConditions";
static const char* jkJoinPhysiPlanInputRowNum = "InputRowNum"; static const char* jkJoinPhysiPlanLeftInputRowNum = "LeftInputRowNum";
static const char* jkJoinPhysiPlanInputRowSize = "InputRowSize"; static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum";
static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize";
static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize";
static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
@ -2090,7 +2099,25 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanPrimKeyCondition, nodeToJson, pNode->pPrimKeyCond); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanSubType, pNode->subType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanWinOffset, nodeToJson, pNode->pWindowOffset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanJoinLimit, nodeToJson, pNode->pJLimit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanLeftEqCols, pNode->pEqLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanRightEqCols, pNode->pEqRight);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond); code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond);
@ -2099,8 +2126,21 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanColEqualOnConditions, nodeToJson, pNode->pColEqCond); code = tjsonAddObject(pJson, jkJoinPhysiPlanColOnConditions, nodeToJson, pNode->pColOnCond);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize);
}
return code; return code;
} }
@ -2112,17 +2152,48 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond); tjsonGetNumberValue(pJson, jkJoinPhysiPlanSubType, pNode->subType, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanPrimKeyCondition, &pNode->pPrimKeyCond); code = jsonToNodeObject(pJson, jkJoinPhysiPlanWinOffset, &pNode->pWindowOffset);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanJoinLimit, &pNode->pJLimit);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanLeftEqCols, &pNode->pEqLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanRightEqCols, &pNode->pEqRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanColEqualOnConditions, &pNode->pColEqCond); code = jsonToNodeObject(pJson, jkJoinPhysiPlanColOnConditions, &pNode->pColOnCond);
} }
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code);
}
return code; return code;
} }
@ -2146,16 +2217,16 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) {
code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[0].inputRowNum); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[0].inputRowSize); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[1].inputRowNum); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[1].inputRowSize); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize);
} }
return code; return code;
} }
@ -2181,16 +2252,16 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[0].inputRowNum, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowNum, pNode->inputStat[0].inputRowNum, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[0].inputRowSize, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftInputRowSize, pNode->inputStat[0].inputRowSize, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[1].inputRowNum, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowNum, pNode->inputStat[1].inputRowNum, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[1].inputRowSize, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code);
} }
return code; return code;
} }

View File

@ -2400,11 +2400,20 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) {
enum { enum {
PHY_SORT_MERGE_JOIN_CODE_BASE_NODE = 1, PHY_SORT_MERGE_JOIN_CODE_BASE_NODE = 1,
PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE,
PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE,
PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET,
PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT,
PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID,
PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID,
PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS,
PHY_SORT_MERGE_JOIN_CODE_RIGHT_EQ_COLS,
PHY_SORT_MERGE_JOIN_CODE_FULL_ON_CONDITIONS,
PHY_SORT_MERGE_JOIN_CODE_TARGETS, PHY_SORT_MERGE_JOIN_CODE_TARGETS,
PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, PHY_SORT_MERGE_JOIN_CODE_COL_ON_CONDITIONS,
PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1
}; };
static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2415,17 +2424,48 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, pNode->joinType); code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, pNode->joinType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, nodeToMsg, pNode->pPrimKeyCond); code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE, pNode->subType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond); code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET, nodeToMsg, pNode->pWindowOffset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT, nodeToMsg, pNode->pJLimit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID, pNode->leftPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID, pNode->rightPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS, nodeListToMsg, pNode->pEqLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_RIGHT_EQ_COLS, nodeListToMsg, pNode->pEqRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_COL_ON_CONDITIONS, nodeToMsg, pNode->pColOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_FULL_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pColEqCond); code = tlvEncodeI64(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0, pNode->inputStat[0].inputRowNum);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, pNode->inputStat[1].inputRowNum);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, pNode->inputStat[0].inputRowSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize);
}
return code; return code;
} }
@ -2442,17 +2482,47 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE: case PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType)); code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType));
break; break;
case PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION: case PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pPrimKeyCond); code = tlvDecodeEnum(pTlv, &pNode->subType, sizeof(pNode->subType));
break; break;
case PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS: case PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pWindowOffset);
break;
case PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pJLimit);
break;
case PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID:
code = tlvDecodeI32(pTlv, &pNode->leftPrimSlotId);
break;
case PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID:
code = tlvDecodeI32(pTlv, &pNode->rightPrimSlotId);
break;
case PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pEqLeft);
break;
case PHY_SORT_MERGE_JOIN_CODE_RIGHT_EQ_COLS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pEqRight);
break;
case PHY_SORT_MERGE_JOIN_CODE_COL_ON_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColOnCond);
break;
case PHY_SORT_MERGE_JOIN_CODE_FULL_ON_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond); code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond);
break; break;
case PHY_SORT_MERGE_JOIN_CODE_TARGETS: case PHY_SORT_MERGE_JOIN_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break; break;
case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColEqCond); code = tlvDecodeI64(pTlv, &pNode->inputStat[0].inputRowNum);
break;
case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1:
code = tlvDecodeI64(pTlv, &pNode->inputStat[1].inputRowNum);
break;
case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0:
code = tlvDecodeI32(pTlv, &pNode->inputStat[0].inputRowSize);
break;
case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1:
code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize);
break; break;
default: default:
break; break;

View File

@ -1339,10 +1339,15 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pWindowOffset);
nodesDestroyNode(pPhyNode->pJLimit);
nodesDestroyList(pPhyNode->pEqLeft);
nodesDestroyList(pPhyNode->pEqRight);
nodesDestroyNode(pPhyNode->pPrimKeyCond); nodesDestroyNode(pPhyNode->pPrimKeyCond);
nodesDestroyNode(pPhyNode->pFullOnCond); nodesDestroyNode(pPhyNode->pFullOnCond);
nodesDestroyList(pPhyNode->pTargets); nodesDestroyList(pPhyNode->pTargets);
nodesDestroyNode(pPhyNode->pColEqCond); nodesDestroyNode(pPhyNode->pColEqCond);
nodesDestroyNode(pPhyNode->pColOnCond);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: {

View File

@ -760,6 +760,71 @@ static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SData
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft, SNodeList** ppRight) {
if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
nodesListMakeStrictAppend(ppLeft, nodesCloneNode(pOp->pLeft));
} else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
nodesListMakeStrictAppend(ppRight, nodesCloneNode(pOp->pLeft));
} else {
planError("invalid col equal list, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
nodesListMakeStrictAppend(ppLeft, nodesCloneNode(pOp->pLeft));
} else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
nodesListMakeStrictAppend(ppRight, nodesCloneNode(pOp->pLeft));
} else {
planError("invalid col equal list, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
} else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) && ((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) {
SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond;
SNode* pNode = NULL;
FOREACH(pNode, pLogic->pParameterList) {
int32_t code = setColEqList(pNode, leftBlkId, rightBlkId, ppLeft, ppRight);
if (code) {
return code;
}
}
} else {
planError("invalid col equal cond, type:%d", nodeType(pEqCond));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static int32_t setColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, int32_t* pLeftId, int32_t* pRightId) {
if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
*pLeftId = ((SColumnNode*)pOp->pLeft)->slotId;
} else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
*pRightId = ((SColumnNode*)pOp->pLeft)->slotId;
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
*pLeftId = ((SColumnNode*)pOp->pRight)->slotId;
} else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
*pRightId = ((SColumnNode*)pOp->pRight)->slotId;
} else {
planError("invalid primary key col equal cond, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
} else {
planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
SSortMergeJoinPhysiNode* pJoin = SSortMergeJoinPhysiNode* pJoin =
@ -784,6 +849,9 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
&pJoin->pPrimKeyCond); &pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->leftPrimSlotId, &pJoin->rightPrimSlotId);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -806,11 +874,13 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
} }
*/ */
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) { if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId,
pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
if (TSDB_CODE_SUCCESS == code) {
code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft, &pJoin->pEqRight);
}
} }
if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) { if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
code = mergeEqCond(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond); code = mergeEqCond(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
} }