475 lines
14 KiB
C
475 lines
14 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "executorInt.h"
|
|
#include "filter.h"
|
|
#include "function.h"
|
|
#include "operator.h"
|
|
#include "os.h"
|
|
#include "querynodes.h"
|
|
#include "querytask.h"
|
|
#include "tcompare.h"
|
|
#include "tdatablock.h"
|
|
#include "thash.h"
|
|
#include "tmsg.h"
|
|
#include "ttypes.h"
|
|
#include "mergejoin.h"
|
|
|
|
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
|
|
SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
|
|
if (p) {
|
|
p[0] = pDownstream[0];
|
|
p[1] = pDownstream[0];
|
|
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(p[0], 0);
|
|
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(p[1], 1);
|
|
}
|
|
|
|
return p;
|
|
}
|
|
|
|
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t *numOfDownstream, bool *newDownstreams) {
|
|
if (1 == *numOfDownstream) {
|
|
*newDownstreams = true;
|
|
pDownstream = mJoinBuildDownstreams(pInfo, pDownstream);
|
|
if (NULL == pDownstream) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
*numOfDownstream = 2;
|
|
} else {
|
|
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0);
|
|
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0);
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static int32_t mJoinInitPrimKeyInfo(SMJoinTableInfo* pTable, int32_t slotId) {
|
|
pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
|
|
if (NULL == pTable->primCol) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
pTable->primCol->srcSlot = slotId;
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static void mJoinGetValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
|
|
*colNum = 0;
|
|
|
|
SNode* pNode = NULL;
|
|
FOREACH(pNode, pList) {
|
|
STargetNode* pTarget = (STargetNode*)pNode;
|
|
SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
|
|
if (pCol->dataBlockId == blkId) {
|
|
(*colNum)++;
|
|
}
|
|
}
|
|
}
|
|
|
|
static int32_t mJoinInitValColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) {
|
|
mJoinGetValColNum(pList, pTable->blkId, &pTable->valNum);
|
|
if (pTable->valNum == 0) {
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SMJoinColInfo));
|
|
if (NULL == pTable->valCols) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
int32_t i = 0;
|
|
int32_t colNum = 0;
|
|
SNode* pNode = NULL;
|
|
FOREACH(pNode, pList) {
|
|
STargetNode* pTarget = (STargetNode*)pNode;
|
|
SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
|
|
if (pColNode->dataBlockId == pTable->blkId) {
|
|
if (valColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) {
|
|
pTable->valCols[i].keyCol = true;
|
|
} else {
|
|
pTable->valCols[i].keyCol = false;
|
|
pTable->valCols[i].srcSlot = pColNode->slotId;
|
|
pTable->valColExist = true;
|
|
colNum++;
|
|
}
|
|
pTable->valCols[i].dstSlot = pTarget->slotId;
|
|
pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
|
|
if (pTable->valCols[i].vardata) {
|
|
if (NULL == pTable->valVarCols) {
|
|
pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t));
|
|
if (NULL == pTable->valVarCols) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
}
|
|
taosArrayPush(pTable->valVarCols, &i);
|
|
}
|
|
pTable->valCols[i].bytes = pColNode->node.resType.bytes;
|
|
if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) {
|
|
pTable->valBufSize += pColNode->node.resType.bytes;
|
|
}
|
|
i++;
|
|
}
|
|
}
|
|
|
|
pTable->valBitMapSize = BitmapLen(colNum);
|
|
pTable->valBufSize += pTable->valBitMapSize;
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static int32_t mJoinInitKeyColsInfo(SMJoinTableInfo* pTable, SNodeList* pList) {
|
|
pTable->keyNum = LIST_LENGTH(pList);
|
|
|
|
pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SMJoinColInfo));
|
|
if (NULL == pTable->keyCols) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
int64_t bufSize = 0;
|
|
int32_t i = 0;
|
|
SNode* pNode = NULL;
|
|
FOREACH(pNode, pList) {
|
|
SColumnNode* pColNode = (SColumnNode*)pNode;
|
|
pTable->keyCols[i].srcSlot = pColNode->slotId;
|
|
pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
|
|
pTable->keyCols[i].bytes = pColNode->node.resType.bytes;
|
|
bufSize += pColNode->node.resType.bytes;
|
|
++i;
|
|
}
|
|
|
|
if (pTable->keyNum > 1) {
|
|
pTable->keyBuf = taosMemoryMalloc(bufSize);
|
|
if (NULL == pTable->keyBuf) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
|
|
SMJoinTableInfo* pTable = &pJoin->tbs[idx];
|
|
pTable->downStream = pDownstream[idx];
|
|
pTable->blkId = pDownstream[idx]->resultDataBlockId;
|
|
int32_t code = mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->LeftPrimSlotId : pJoinNode->rightPrimSlotId);
|
|
if (code) {
|
|
return code;
|
|
}
|
|
code = mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight);
|
|
if (code) {
|
|
return code;
|
|
}
|
|
code = mJoinInitValColsInfo(pTable, pJoinNode->pTargets);
|
|
if (code) {
|
|
return code;
|
|
}
|
|
|
|
memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
|
|
int32_t buildIdx = 0;
|
|
int32_t probeIdx = 1;
|
|
|
|
pInfo->joinType = pJoinNode->joinType;
|
|
pInfo->subType = pJoinNode->subType;
|
|
|
|
switch (pInfo->joinType) {
|
|
case JOIN_TYPE_INNER:
|
|
case JOIN_TYPE_FULL:
|
|
if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) {
|
|
buildIdx = 0;
|
|
probeIdx = 1;
|
|
} else {
|
|
buildIdx = 1;
|
|
probeIdx = 0;
|
|
}
|
|
break;
|
|
case JOIN_TYPE_LEFT:
|
|
buildIdx = 1;
|
|
probeIdx = 0;
|
|
break;
|
|
case JOIN_TYPE_RIGHT:
|
|
buildIdx = 0;
|
|
probeIdx = 1;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
pInfo->joinFp = (pInfo->subType == JOIN_STYPE_ASOF || pInfo->subType == JOIN_STYPE_WIN) ? mJoinProcessWinJoin: mJoinProcessMergeJoin;
|
|
|
|
pInfo->pBuild = &pInfo->tbs[buildIdx];
|
|
pInfo->pProbe = &pInfo->tbs[probeIdx];
|
|
|
|
pInfo->pBuild->downStreamIdx = buildIdx;
|
|
pInfo->pProbe->downStreamIdx = probeIdx;
|
|
}
|
|
|
|
static int32_t mJoinBuildResColMap(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
|
|
pInfo->pResColNum = pJoinNode->pTargets->length;
|
|
pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t));
|
|
if (NULL == pInfo->pResColMap) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
SNode* pNode = NULL;
|
|
int32_t i = 0;
|
|
FOREACH(pNode, pJoinNode->pTargets) {
|
|
STargetNode* pTarget = (STargetNode*)pNode;
|
|
SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
|
|
if (pCol->dataBlockId == pInfo->pBuild->blkId) {
|
|
pInfo->pResColMap[i] = 1;
|
|
}
|
|
|
|
i++;
|
|
}
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
|
|
static FORCE_INLINE int32_t mJoinAddPageToBufList(SArray* pRowBufs) {
|
|
SBufPageInfo page;
|
|
page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE;
|
|
page.offset = 0;
|
|
page.data = taosMemoryMalloc(page.pageSize);
|
|
if (NULL == page.data) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
taosArrayPush(pRowBufs, &page);
|
|
return TSDB_CODE_SUCCESS;
|
|
}
|
|
|
|
static int32_t mJoinInitBufPages(SMJoinOperatorInfo* pInfo) {
|
|
pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo));
|
|
if (NULL == pInfo->pRowBufs) {
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
}
|
|
|
|
return mJoinAddPageToBufList(pInfo->pRowBufs);
|
|
}
|
|
|
|
static SSDataBlock* mJoinHanleMergeJoin(SOperatorInfo* pOperator) {
|
|
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
|
|
|
|
int32_t nrows = pRes->info.rows;
|
|
|
|
bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
|
|
|
|
while (1) {
|
|
int64_t leftTs = 0;
|
|
int64_t rightTs = 0;
|
|
if (pJoinInfo->rowCtx.rowRemains) {
|
|
leftTs = pJoinInfo->rowCtx.ts;
|
|
rightTs = pJoinInfo->rowCtx.ts;
|
|
} else {
|
|
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
|
|
if (!hasNextTs) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (leftTs == rightTs) {
|
|
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
|
|
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
|
|
pJoinInfo->leftPos += 1;
|
|
|
|
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
|
|
continue;
|
|
}
|
|
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
|
|
pJoinInfo->rightPos += 1;
|
|
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// the pDataBlock are always the same one, no need to call this again
|
|
pRes->info.rows = nrows;
|
|
pRes->info.dataLoad = 1;
|
|
pRes->info.scanFlag = MAIN_SCAN;
|
|
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
|
|
SMJoinOperatorInfo* pJoinInfo = pOperator->info;
|
|
if (pOperator->status == OP_EXEC_DONE) {
|
|
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
|
|
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows);
|
|
return NULL;
|
|
} else {
|
|
resetMergeJoinOperator(pOperator);
|
|
qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
|
|
}
|
|
}
|
|
|
|
int64_t st = 0;
|
|
if (pOperator->cost.openCost == 0) {
|
|
st = taosGetTimestampUs();
|
|
}
|
|
|
|
SSDataBlock* pRes = pJoinInfo->pRes;
|
|
blockDataCleanup(pRes);
|
|
|
|
while (true) {
|
|
int32_t numOfRowsBefore = pRes->info.rows;
|
|
mJoinImpl(pOperator, pRes);
|
|
int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
|
|
if (numOfNewRows == 0) {
|
|
break;
|
|
}
|
|
if (pJoinInfo->pFinFilter != NULL) {
|
|
doFilter(pRes, pJoinInfo->pFinFilter, NULL);
|
|
}
|
|
if (pRes->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (pOperator->cost.openCost == 0) {
|
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
|
}
|
|
|
|
if (pRes->info.rows > 0) {
|
|
pJoinInfo->resRows += pRes->info.rows;
|
|
qDebug("%s merge join returns res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pRes->info.rows);
|
|
return pRes;
|
|
} else {
|
|
qDebug("%s total merge join res rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), pJoinInfo->resRows);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
|
|
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
|
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
|
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
|
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
bool newDownstreams = false;
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
if (pOperator == NULL || pInfo == NULL) {
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
goto _error;
|
|
}
|
|
|
|
code = mJoinInitDownstreamInfo(pInfo, pDownstream, numOfDownstream, newDownstreams);
|
|
if (TSDB_CODE_SUCCESS != code) {
|
|
goto _error;
|
|
}
|
|
|
|
int32_t numOfCols = 0;
|
|
pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
|
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
|
|
|
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
|
|
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
|
|
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
|
|
|
|
mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
|
|
|
|
code = mJoinBuildResColMap(pInfo, pJoinNode);
|
|
if (code) {
|
|
goto _error;
|
|
}
|
|
|
|
code = initHJoinBufPages(pInfo);
|
|
if (code) {
|
|
goto _error;
|
|
}
|
|
|
|
if (pJoinNode->pColOnCond != NULL) {
|
|
code = filterInitFromNode(pJoinNode->pColOnCond, &pInfo->pPreFilter, 0);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
goto _error;
|
|
}
|
|
}
|
|
|
|
if (pJoinNode->node.pConditions != NULL) {
|
|
code = filterInitFromNode(pJoinNode->node.pConditions, &pInfo->pFinFilter, 0);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
goto _error;
|
|
}
|
|
}
|
|
|
|
if (pJoinNode->node.inputTsOrder == ORDER_ASC) {
|
|
pInfo->inputTsOrder = TSDB_ORDER_ASC;
|
|
} else if (pJoinNode->node.inputTsOrder == ORDER_DESC) {
|
|
pInfo->inputTsOrder = TSDB_ORDER_DESC;
|
|
} else {
|
|
pInfo->inputTsOrder = TSDB_ORDER_ASC;
|
|
}
|
|
|
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
|
|
|
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
goto _error;
|
|
}
|
|
if (newDownstreams) {
|
|
taosMemoryFree(pDownstream);
|
|
pOperator->numOfRealDownstream = 1;
|
|
} else {
|
|
pOperator->numOfRealDownstream = 2;
|
|
}
|
|
|
|
return pOperator;
|
|
|
|
_error:
|
|
if (pInfo != NULL) {
|
|
destroyMergeJoinOperator(pInfo);
|
|
}
|
|
if (newDownstreams) {
|
|
taosMemoryFree(pDownstream);
|
|
}
|
|
|
|
taosMemoryFree(pOperator);
|
|
pTaskInfo->code = code;
|
|
return NULL;
|
|
}
|
|
|
|
void destroyMergeJoinOperator(void* param) {
|
|
SMJoinOperatorInfo* pJoinOperator = (SMJoinOperatorInfo*)param;
|
|
if (pJoinOperator->pColEqualOnConditions != NULL) {
|
|
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
|
|
taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf);
|
|
taosArrayDestroy(pJoinOperator->rightEqOnCondCols);
|
|
|
|
taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf);
|
|
taosArrayDestroy(pJoinOperator->leftEqOnCondCols);
|
|
}
|
|
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
|
|
|
|
taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
|
|
taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
|
|
taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
|
|
taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);
|
|
|
|
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
|
|
taosMemoryFreeClear(param);
|
|
}
|
|
|