refactor: do some internal refactor.
This commit is contained in:
parent
a170b3fa9d
commit
18fcb33465
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <libs/function/function.h>
|
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
@ -1601,9 +1600,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
|
|
||||||
static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes);
|
|
||||||
|
|
||||||
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
||||||
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
||||||
uint32_t status = BLK_DATA_NOT_LOAD;
|
uint32_t status = BLK_DATA_NOT_LOAD;
|
||||||
|
@ -1771,100 +1767,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* set tag value in SqlFunctionCtx
|
|
||||||
* e.g.,tag information into input buffer
|
|
||||||
*/
|
|
||||||
static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes) {
|
|
||||||
taosVariantDestroy(tag);
|
|
||||||
|
|
||||||
char* val = NULL;
|
|
||||||
// if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
|
||||||
// val = tsdbGetTableName(pTable);
|
|
||||||
// assert(val != NULL);
|
|
||||||
// } else {
|
|
||||||
// val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (val == NULL || isNull(val, type)) {
|
|
||||||
tag->nType = TSDB_DATA_TYPE_NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
|
|
||||||
int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
|
|
||||||
taosVariantCreateFromBinary(tag, varDataVal(val), len, type);
|
|
||||||
// taosVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
|
|
||||||
} else {
|
|
||||||
taosVariantCreateFromBinary(tag, val, bytes, type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId) {
|
|
||||||
assert(pTagColList != NULL && numOfTags > 0);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
|
||||||
if (pTagColList[i].colId == colId) {
|
|
||||||
return &pTagColList[i];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|
||||||
SExprInfo* pExpr = pOperatorInfo->pExpr;
|
|
||||||
SExprInfo* pExprInfo = &pExpr[0];
|
|
||||||
int32_t functionId = getExprFunctionId(pExprInfo);
|
|
||||||
#if 0
|
|
||||||
if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) {
|
|
||||||
assert(pExprInfo->base.numOfParams == 1);
|
|
||||||
|
|
||||||
// int16_t tagColId = (int16_t)pExprInfo->base.param[0].i;
|
|
||||||
int16_t tagColId = -1;
|
|
||||||
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId);
|
|
||||||
|
|
||||||
doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// set tag value, by which the results are aggregated.
|
|
||||||
int32_t offset = 0;
|
|
||||||
memset(pRuntimeEnv->tagVal, 0, pQueryAttr->tagLen);
|
|
||||||
|
|
||||||
for (int32_t idx = 0; idx < numOfOutput; ++idx) {
|
|
||||||
SExprInfo* pLocalExprInfo = &pExpr[idx];
|
|
||||||
|
|
||||||
// ts_comp column required the tag value for join filter
|
|
||||||
if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.pParam[0].pCol->flag)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo use tag column index to optimize performance
|
|
||||||
doSetTagValueInParam(pTable, pLocalExprInfo->base.pParam[0].pCol->colId, &pCtx[idx].tag,
|
|
||||||
pLocalExprInfo->base.resSchema.type, pLocalExprInfo->base.resSchema.bytes);
|
|
||||||
|
|
||||||
if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resSchema.type) ||
|
|
||||||
pLocalExprInfo->base.resSchema.type == TSDB_DATA_TYPE_BOOL ||
|
|
||||||
pLocalExprInfo->base.resSchema.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
|
||||||
memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i, pLocalExprInfo->base.resSchema.bytes);
|
|
||||||
} else {
|
|
||||||
if (pCtx[idx].tag.pz != NULL) {
|
|
||||||
memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += pLocalExprInfo->base.resSchema.bytes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the tsBuf start position before check each data block
|
|
||||||
if (pRuntimeEnv->pTsBuf != NULL) {
|
|
||||||
setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
|
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
|
|
||||||
|
@ -4038,12 +3940,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
if (pProjectInfo->existDataBlock) { // TODO refactor
|
if (pProjectInfo->existDataBlock) { // TODO refactor
|
||||||
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
||||||
pProjectInfo->existDataBlock = NULL;
|
pProjectInfo->existDataBlock = NULL;
|
||||||
*newgroup = true;
|
|
||||||
|
|
||||||
// todo dynamic set tags
|
|
||||||
// if (pTableQueryInfo != NULL) {
|
|
||||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
||||||
|
@ -4084,13 +3980,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo set tags
|
|
||||||
|
|
||||||
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
|
||||||
// if (pTableQueryInfo != NULL) {
|
|
||||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
|
|
||||||
|
@ -4430,10 +4319,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
doDestroyBasicInfo(pInfo, numOfOutput);
|
doDestroyBasicInfo(pInfo, numOfOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
|
|
||||||
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
|
|
||||||
}
|
|
||||||
|
|
||||||
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
|
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
|
||||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||||
|
@ -4778,7 +4663,6 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||||
|
@ -5447,150 +5331,3 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
|
||||||
|
|
||||||
SSDataBlock* pRes = pJoinInfo->pRes;
|
|
||||||
blockDataCleanup(pRes);
|
|
||||||
blockDataEnsureCapacity(pRes, 4096);
|
|
||||||
|
|
||||||
int32_t nrows = 0;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
|
||||||
SOperatorInfo* ds1 = pOperator->pDownstream[0];
|
|
||||||
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
||||||
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
|
|
||||||
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
|
|
||||||
pJoinInfo->leftPos = 0;
|
|
||||||
if (pJoinInfo->pLeft == NULL) {
|
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
|
||||||
SOperatorInfo* ds2 = pOperator->pDownstream[1];
|
|
||||||
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
||||||
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
|
|
||||||
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
|
|
||||||
pJoinInfo->rightPos = 0;
|
|
||||||
if (pJoinInfo->pRight == NULL) {
|
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
|
|
||||||
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
|
|
||||||
|
|
||||||
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
|
|
||||||
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
|
|
||||||
|
|
||||||
// only the timestamp match support for ordinary table
|
|
||||||
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
||||||
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[i];
|
|
||||||
|
|
||||||
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
|
|
||||||
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
|
|
||||||
int32_t rowIndex = -1;
|
|
||||||
|
|
||||||
SColumnInfoData* pSrc = NULL;
|
|
||||||
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
|
||||||
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
|
|
||||||
rowIndex = pJoinInfo->leftPos;
|
|
||||||
} else {
|
|
||||||
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
|
|
||||||
rowIndex = pJoinInfo->rightPos;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (colDataIsNull_s(pSrc, rowIndex)) {
|
|
||||||
colDataAppendNULL(pDst, nrows);
|
|
||||||
} else {
|
|
||||||
char* p = colDataGetData(pSrc, rowIndex);
|
|
||||||
colDataAppend(pDst, nrows, p, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pJoinInfo->leftPos += 1;
|
|
||||||
pJoinInfo->rightPos += 1;
|
|
||||||
|
|
||||||
nrows += 1;
|
|
||||||
} else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
|
|
||||||
pJoinInfo->leftPos += 1;
|
|
||||||
|
|
||||||
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
|
|
||||||
pJoinInfo->rightPos += 1;
|
|
||||||
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
|
||||||
pRes->info.rows = nrows;
|
|
||||||
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
|
|
||||||
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
|
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
||||||
if (pOperator == NULL || pInfo == NULL) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 4096);
|
|
||||||
|
|
||||||
pInfo->pRes = pResBlock;
|
|
||||||
pOperator->name = "MergeJoinOperator";
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
|
|
||||||
pOperator->blocking = false;
|
|
||||||
pOperator->status = OP_NOT_OPENED;
|
|
||||||
pOperator->pExpr = pExprInfo;
|
|
||||||
pOperator->numOfExprs = numOfCols;
|
|
||||||
pOperator->info = pInfo;
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
|
|
||||||
SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
|
|
||||||
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
|
|
||||||
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
|
|
||||||
|
|
||||||
pOperator->fpSet =
|
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
|
||||||
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
_error:
|
|
||||||
taosMemoryFree(pInfo);
|
|
||||||
taosMemoryFree(pOperator);
|
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
|
|
||||||
pColumn->slotId = pColumnNode->slotId;
|
|
||||||
pColumn->type = pColumnNode->node.resType.type;
|
|
||||||
pColumn->bytes = pColumnNode->node.resType.bytes;
|
|
||||||
pColumn->precision = pColumnNode->node.resType.precision;
|
|
||||||
pColumn->scale = pColumnNode->node.resType.scale;
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,184 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "function.h"
|
||||||
|
#include "os.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "executorimpl.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
||||||
|
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
|
||||||
|
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
|
||||||
|
|
||||||
|
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
|
||||||
|
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pOperator == NULL || pInfo == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
|
pInfo->pRes = pResBlock;
|
||||||
|
pOperator->name = "MergeJoinOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->pExpr = pExprInfo;
|
||||||
|
pOperator->numOfExprs = numOfCols;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) {
|
||||||
|
SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
|
||||||
|
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
|
||||||
|
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
|
||||||
|
} else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
||||||
|
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
|
||||||
|
pColumn->slotId = pColumnNode->slotId;
|
||||||
|
pColumn->type = pColumnNode->node.resType.type;
|
||||||
|
pColumn->bytes = pColumnNode->node.resType.bytes;
|
||||||
|
pColumn->precision = pColumnNode->node.resType.precision;
|
||||||
|
pColumn->scale = pColumnNode->node.resType.scale;
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
|
||||||
|
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
||||||
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
|
SSDataBlock* pRes = pJoinInfo->pRes;
|
||||||
|
blockDataCleanup(pRes);
|
||||||
|
blockDataEnsureCapacity(pRes, 4096);
|
||||||
|
|
||||||
|
int32_t nrows = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
// todo extract method
|
||||||
|
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||||
|
SOperatorInfo* ds1 = pOperator->pDownstream[0];
|
||||||
|
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
|
||||||
|
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
|
pJoinInfo->leftPos = 0;
|
||||||
|
if (pJoinInfo->pLeft == NULL) {
|
||||||
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
||||||
|
SOperatorInfo* ds2 = pOperator->pDownstream[1];
|
||||||
|
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
|
||||||
|
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
|
pJoinInfo->rightPos = 0;
|
||||||
|
if (pJoinInfo->pRight == NULL) {
|
||||||
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
|
||||||
|
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
|
||||||
|
|
||||||
|
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
|
||||||
|
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
|
||||||
|
|
||||||
|
// only the timestamp match support for ordinary table
|
||||||
|
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
||||||
|
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
||||||
|
|
||||||
|
SExprInfo* pExprInfo = &pOperator->pExpr[i];
|
||||||
|
|
||||||
|
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
|
||||||
|
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
int32_t rowIndex = -1;
|
||||||
|
|
||||||
|
SColumnInfoData* pSrc = NULL;
|
||||||
|
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
||||||
|
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
|
||||||
|
rowIndex = pJoinInfo->leftPos;
|
||||||
|
} else {
|
||||||
|
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
|
||||||
|
rowIndex = pJoinInfo->rightPos;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrc, rowIndex)) {
|
||||||
|
colDataAppendNULL(pDst, nrows);
|
||||||
|
} else {
|
||||||
|
char* p = colDataGetData(pSrc, rowIndex);
|
||||||
|
colDataAppend(pDst, nrows, p, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pJoinInfo->leftPos += 1;
|
||||||
|
pJoinInfo->rightPos += 1;
|
||||||
|
|
||||||
|
nrows += 1;
|
||||||
|
} else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
|
||||||
|
pJoinInfo->leftPos += 1;
|
||||||
|
|
||||||
|
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
|
||||||
|
pJoinInfo->rightPos += 1;
|
||||||
|
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
pRes->info.rows = nrows;
|
||||||
|
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (pRes->info.rows > 0) ? pRes : NULL;
|
||||||
|
}
|
|
@ -773,7 +773,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||||
|
@ -1062,8 +1061,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
|
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
|
||||||
// caller. Note that all the time window are not close till now.
|
// caller. Note that all the time window are not close till now.
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
if (pInfo->invertible) {
|
if (pInfo->invertible) {
|
||||||
|
@ -1377,7 +1374,6 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
|
|
Loading…
Reference in New Issue