Merge pull request #13695 from taosdata/szhou/feature/merge-interval
feature: add merge interval operator
This commit is contained in:
commit
a862b0f9fe
|
@ -809,6 +809,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
|
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
|
||||||
|
|
||||||
|
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
@ -907,6 +912,9 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor);
|
||||||
bool isSmaStream(int8_t triggerType);
|
bool isSmaStream(int8_t triggerType);
|
||||||
|
|
||||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
||||||
|
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||||
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1956,6 +1956,57 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
|
||||||
|
const int32_t* rowCellOffset, SSDataBlock* pBlock,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
||||||
|
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
|
||||||
|
|
||||||
|
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
|
||||||
|
if (pRow->numOfRows == 0) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
|
int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
|
||||||
|
if (TAOS_FAILED(code)) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
|
||||||
|
|
||||||
|
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
|
if (pCtx[j].fpSet.finalize) {
|
||||||
|
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
|
if (TAOS_FAILED(code)) {
|
||||||
|
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
|
// do nothing, todo refactor
|
||||||
|
} else {
|
||||||
|
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
|
||||||
|
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||||
|
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||||
|
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
|
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfExprs) {
|
int32_t numOfExprs) {
|
||||||
|
@ -4689,6 +4740,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
pOptr =
|
pOptr =
|
||||||
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
||||||
|
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
|
||||||
|
SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
|
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||||
|
.sliding = pIntervalPhyNode->sliding,
|
||||||
|
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||||
|
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||||
|
.offset = pIntervalPhyNode->offset,
|
||||||
|
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||||
|
|
||||||
|
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
|
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
|
||||||
qDebug("[******]create Semi");
|
qDebug("[******]create Semi");
|
||||||
int32_t children = 0;
|
int32_t children = 0;
|
||||||
|
|
|
@ -1881,8 +1881,8 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
|
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId,
|
||||||
int32_t tableGroupId, SArray* pUpdated) {
|
SArray* pUpdated) {
|
||||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
||||||
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
||||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
|
@ -1897,7 +1897,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
||||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||||
tsCols = (int64_t*)pColDataInfo->pData;
|
tsCols = (int64_t*)pColDataInfo->pData;
|
||||||
} else {
|
} else {
|
||||||
return ;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||||
|
@ -1914,13 +1914,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
||||||
pos->groupId = tableGroupId;
|
pos->groupId = tableGroupId;
|
||||||
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
*(int64_t*)pos->key = pResult->win.skey;
|
*(int64_t*)pos->key = pResult->win.skey;
|
||||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos,
|
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
||||||
nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
TSDB_ORDER_ASC);
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
||||||
saveResult(pResult, tableGroupId, pUpdated);
|
saveResult(pResult, tableGroupId, pUpdated);
|
||||||
}
|
}
|
||||||
// window start(end) key interpolation
|
// window start(end) key interpolation
|
||||||
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
|
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||||
|
// forwardRows);
|
||||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||||
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||||
|
@ -2049,10 +2050,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
int32_t childIndex = getChildIndex(pBlock);
|
int32_t childIndex = getChildIndex(pBlock);
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||||
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
|
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||||
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval,
|
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex,
|
||||||
pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL);
|
pChildOp->numOfExprs, pBlock, NULL);
|
||||||
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
|
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs,
|
||||||
pOperator->numOfExprs, pOperator->pTaskInfo);
|
pOperator->pTaskInfo);
|
||||||
taosArrayDestroy(pUpWins);
|
taosArrayDestroy(pUpWins);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2062,7 +2063,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (isFinalInterval(pInfo)) {
|
if (isFinalInterval(pInfo)) {
|
||||||
int32_t chIndex = getChildIndex(pBlock);
|
int32_t chIndex = getChildIndex(pBlock);
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
// if chIndex + 1 - size > 0, add new child
|
// if chIndex + 1 - size > 0, add new child
|
||||||
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
|
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
|
||||||
|
@ -2072,7 +2073,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||||
}
|
}
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
||||||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||||
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
||||||
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
||||||
|
@ -2080,12 +2081,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
|
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isFinalInterval(pInfo)) {
|
if (isFinalInterval(pInfo)) {
|
||||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
|
||||||
&pInfo->interval, pClosed);
|
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
|
||||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
|
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
taosArrayAddAll(pUpdated, pClosed);
|
taosArrayAddAll(pUpdated, pClosed);
|
||||||
}
|
}
|
||||||
|
@ -2109,31 +2108,32 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
||||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||||
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
|
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
pInfo->order = TSDB_ORDER_ASC;
|
pInfo->order = TSDB_ORDER_ASC;
|
||||||
pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval,
|
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
|
||||||
.sliding = pIntervalPhyNode->sliding,
|
.sliding = pIntervalPhyNode->sliding,
|
||||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||||
.offset = pIntervalPhyNode->offset,
|
.offset = pIntervalPhyNode->offset,
|
||||||
.precision =
|
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||||
((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||||
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
|
.waterMark = pIntervalPhyNode->window.watermark,
|
||||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||||
.maxTs = INT64_MIN,
|
.maxTs = INT64_MIN,
|
||||||
.winMap = NULL, };
|
.winMap = NULL,
|
||||||
|
};
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
|
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
|
||||||
pResBlock, keyBufSize, pTaskInfo->id.str);
|
pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
@ -2160,7 +2160,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
if (!isFinalInterval(pInfo)) {
|
if (!isFinalInterval(pInfo)) {
|
||||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||||
}
|
}
|
||||||
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\
|
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
|
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
|
||||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||||
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
|
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
|
||||||
|
@ -2174,9 +2174,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL,
|
pOperator->fpSet =
|
||||||
destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow,
|
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
NULL);
|
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2216,8 +2216,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
|
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
|
||||||
int32_t numOfCols, SSDataBlock* pResultBlock) {
|
|
||||||
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
||||||
pBasicInfo->pRes = pResultBlock;
|
pBasicInfo->pRes = pResultBlock;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
@ -3195,3 +3194,281 @@ _error:
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SMergeIntervalAggOperatorInfo {
|
||||||
|
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
SHashObj* groupIntervalHash;
|
||||||
|
bool hasGroupId;
|
||||||
|
uint64_t groupId;
|
||||||
|
SSDataBlock* prefetchedBlock;
|
||||||
|
bool inputBlocksFinished;
|
||||||
|
} SMergeIntervalAggOperatorInfo;
|
||||||
|
|
||||||
|
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||||
|
taosHashCleanup(miaInfo->groupIntervalHash);
|
||||||
|
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
|
||||||
|
STimeWindow* newWin) {
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
|
||||||
|
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
|
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
||||||
|
|
||||||
|
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
||||||
|
if (prevWin == NULL) {
|
||||||
|
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) {
|
||||||
|
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
|
||||||
|
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||||
|
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
|
ASSERT(p1 != NULL);
|
||||||
|
|
||||||
|
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
|
||||||
|
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
|
||||||
|
pTaskInfo);
|
||||||
|
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
|
if (newWin == NULL) {
|
||||||
|
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
||||||
|
} else {
|
||||||
|
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||||
|
int32_t scanFlag, SSDataBlock* pResultBlock) {
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
|
||||||
|
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
|
|
||||||
|
int32_t startPos = 0;
|
||||||
|
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||||
|
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
|
||||||
|
uint64_t tableGroupId = pBlock->info.groupId;
|
||||||
|
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
||||||
|
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||||
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
|
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
||||||
|
iaInfo->interval.precision, &iaInfo->win);
|
||||||
|
|
||||||
|
int32_t ret =
|
||||||
|
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
||||||
|
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||||
|
int32_t forwardRows =
|
||||||
|
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
|
||||||
|
ASSERT(forwardRows > 0);
|
||||||
|
|
||||||
|
// prev time window not interpolation yet.
|
||||||
|
if (iaInfo->timeWindowInterpo) {
|
||||||
|
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
|
||||||
|
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
|
||||||
|
|
||||||
|
// restore current time window
|
||||||
|
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||||
|
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup,
|
||||||
|
pTaskInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
// window start key interpolation
|
||||||
|
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
|
||||||
|
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||||
|
pBlock->info.rows, numOfOutput, iaInfo->order);
|
||||||
|
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
||||||
|
|
||||||
|
// output previous interval results after this interval (&win) is closed
|
||||||
|
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
|
||||||
|
|
||||||
|
STimeWindow nextWin = win;
|
||||||
|
while (1) {
|
||||||
|
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||||
|
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order);
|
||||||
|
if (startPos < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// null data, failed to allocate more memory buffer
|
||||||
|
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||||
|
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset,
|
||||||
|
&iaInfo->aggSup, pTaskInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||||
|
forwardRows =
|
||||||
|
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
|
||||||
|
|
||||||
|
// window start(end) key interpolation
|
||||||
|
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||||
|
forwardRows);
|
||||||
|
|
||||||
|
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||||
|
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||||
|
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
|
||||||
|
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
||||||
|
|
||||||
|
// output previous interval results after this interval (&nextWin) is closed
|
||||||
|
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iaInfo->timeWindowInterpo) {
|
||||||
|
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
|
||||||
|
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pRes = iaInfo->binfo.pRes;
|
||||||
|
blockDataCleanup(pRes);
|
||||||
|
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
if (!miaInfo->inputBlocksFinished) {
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
|
if (miaInfo->prefetchedBlock == NULL) {
|
||||||
|
pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
} else {
|
||||||
|
pBlock = miaInfo->prefetchedBlock;
|
||||||
|
miaInfo->groupId = pBlock->info.groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
miaInfo->inputBlocksFinished = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!miaInfo->hasGroupId) {
|
||||||
|
miaInfo->hasGroupId = true;
|
||||||
|
miaInfo->groupId = pBlock->info.groupId;
|
||||||
|
} else if (miaInfo->groupId != pBlock->info.groupId) {
|
||||||
|
miaInfo->prefetchedBlock = pBlock;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
|
||||||
|
setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true);
|
||||||
|
STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent;
|
||||||
|
|
||||||
|
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||||
|
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
|
||||||
|
|
||||||
|
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pRes->info.groupId = miaInfo->groupId;
|
||||||
|
} else {
|
||||||
|
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
|
||||||
|
if (p != NULL) {
|
||||||
|
size_t len = 0;
|
||||||
|
uint64_t* pKey = taosHashGetKey(p, &len);
|
||||||
|
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRes->info.rows == 0) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t rows = pRes->info.rows;
|
||||||
|
pOperator->resultInfo.totalRows += rows;
|
||||||
|
return (rows == 0) ? NULL : pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (miaInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
iaInfo->win = pTaskInfo->window;
|
||||||
|
iaInfo->order = TSDB_ORDER_ASC;
|
||||||
|
iaInfo->interval = *pInterval;
|
||||||
|
|
||||||
|
iaInfo->execModel = pTaskInfo->execModel;
|
||||||
|
|
||||||
|
iaInfo->primaryTsIndex = primaryTsSlotId;
|
||||||
|
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
|
int32_t code =
|
||||||
|
initAggInfo(&iaInfo->binfo, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
|
||||||
|
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||||
|
|
||||||
|
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(iaInfo->binfo.pCtx, numOfCols, iaInfo);
|
||||||
|
if (iaInfo->timeWindowInterpo) {
|
||||||
|
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
||||||
|
}
|
||||||
|
|
||||||
|
// iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1);
|
||||||
|
|
||||||
|
pOperator->name = "TimeMergeIntervalAggOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->pExpr = pExprInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
pOperator->numOfExprs = numOfCols;
|
||||||
|
pOperator->info = miaInfo;
|
||||||
|
|
||||||
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||||
|
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
|
||||||
|
taosMemoryFreeClear(miaInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue