Merge pull request #15432 from taosdata/szhou/fix/udf
feat: add input ts order for join operator
This commit is contained in:
commit
d154eaac0b
|
@ -804,6 +804,7 @@ typedef struct STagFilterOperatorInfo {
|
||||||
typedef struct SJoinOperatorInfo {
|
typedef struct SJoinOperatorInfo {
|
||||||
SSDataBlock *pRes;
|
SSDataBlock *pRes;
|
||||||
int32_t joinType;
|
int32_t joinType;
|
||||||
|
int32_t inputTsOrder;
|
||||||
|
|
||||||
SSDataBlock *pLeft;
|
SSDataBlock *pLeft;
|
||||||
int32_t leftPos;
|
int32_t leftPos;
|
||||||
|
|
|
@ -92,7 +92,7 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo
|
||||||
|
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
|
|
||||||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyFillOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
|
@ -3553,16 +3553,17 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
|
||||||
cleanupAggSup(&pInfo->aggSup);
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
|
cleanupExprSupp(&pInfo->scalarExprSup);
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyFillOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
||||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
taosMemoryFreeClear(pInfo->p);
|
taosMemoryFreeClear(pInfo->p);
|
||||||
|
taosArrayDestroy(pInfo->pColMatchColInfo);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3653,7 +3654,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
|
@ -77,6 +77,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
pInfo->pCondAfterMerge = NULL;
|
pInfo->pCondAfterMerge = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->inputTsOrder = TSDB_ORDER_ASC;
|
||||||
|
if (pJoinNode->inputTsOrder == ORDER_ASC) {
|
||||||
|
pInfo->inputTsOrder = TSDB_ORDER_ASC;
|
||||||
|
} else if (pJoinNode->inputTsOrder == ORDER_DESC) {
|
||||||
|
pInfo->inputTsOrder = TSDB_ORDER_DESC;
|
||||||
|
}
|
||||||
|
//TODO: remove this when JoinNode inputTsOrder is ready
|
||||||
|
pInfo->inputTsOrder = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
||||||
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
|
@ -107,11 +116,42 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) {
|
||||||
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
||||||
|
|
||||||
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
|
||||||
|
|
||||||
|
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
|
||||||
|
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
int32_t rowIndex = -1;
|
||||||
|
|
||||||
|
SColumnInfoData* pSrc = NULL;
|
||||||
|
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
||||||
|
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
|
||||||
|
rowIndex = pJoinInfo->leftPos;
|
||||||
|
} else {
|
||||||
|
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
|
||||||
|
rowIndex = pJoinInfo->rightPos;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrc, rowIndex)) {
|
||||||
|
colDataAppendNULL(pDst, currRow);
|
||||||
|
} else {
|
||||||
|
char* p = colDataGetData(pSrc, rowIndex);
|
||||||
|
colDataAppend(pDst, currRow, p, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
|
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
int32_t nrows = 0;
|
int32_t nrows = pRes->info.rows;
|
||||||
|
|
||||||
|
bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// todo extract method
|
// todo extract method
|
||||||
|
@ -146,43 +186,20 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
// only the timestamp match support for ordinary table
|
// only the timestamp match support for ordinary table
|
||||||
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
||||||
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
doJoinOneRow(pOperator, pRes, nrows);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
|
|
||||||
|
|
||||||
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
|
|
||||||
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
|
|
||||||
int32_t rowIndex = -1;
|
|
||||||
|
|
||||||
SColumnInfoData* pSrc = NULL;
|
|
||||||
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
|
||||||
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
|
|
||||||
rowIndex = pJoinInfo->leftPos;
|
|
||||||
} else {
|
|
||||||
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
|
|
||||||
rowIndex = pJoinInfo->rightPos;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (colDataIsNull_s(pSrc, rowIndex)) {
|
|
||||||
colDataAppendNULL(pDst, nrows);
|
|
||||||
} else {
|
|
||||||
char* p = colDataGetData(pSrc, rowIndex);
|
|
||||||
colDataAppend(pDst, nrows, p, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
|
|
||||||
nrows += 1;
|
nrows += 1;
|
||||||
} else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
|
} else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal ||
|
||||||
|
!asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
|
|
||||||
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
|
} else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal ||
|
||||||
|
!asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -514,8 +514,15 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
|
||||||
if (pFillInfo == NULL) {
|
if (pFillInfo == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev); ++i) {
|
||||||
|
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
|
||||||
|
taosMemoryFree(pKey->pData);
|
||||||
|
}
|
||||||
taosArrayDestroy(pFillInfo->prev);
|
taosArrayDestroy(pFillInfo->prev);
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next); ++i) {
|
||||||
|
SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i);
|
||||||
|
taosMemoryFree(pKey->pData);
|
||||||
|
}
|
||||||
taosArrayDestroy(pFillInfo->next);
|
taosArrayDestroy(pFillInfo->next);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
|
for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
|
||||||
|
|
Loading…
Reference in New Issue