feature: add merge interval operator
This commit is contained in:
parent
25636d6201
commit
c97dcbcf76
|
@ -895,7 +895,7 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor);
|
|||
bool isSmaStream(int8_t triggerType);
|
||||
|
||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
||||
int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
|
|
@ -1955,7 +1955,7 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
|
|||
}
|
||||
}
|
||||
|
||||
int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
||||
|
@ -1967,9 +1967,13 @@ int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition*
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||
releaseBufPage(pBuf, page);
|
||||
return -1;
|
||||
while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||
int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
|
||||
if (TAOS_FAILED(code)) {
|
||||
releaseBufPage(pBuf, page);
|
||||
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||
|
|
|
@ -3180,117 +3180,140 @@ typedef struct SMergeIntervalAggOperatorInfo {
|
|||
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
||||
|
||||
SHashObj* groupIntervalHash;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
SSDataBlock *prefetchedBlock;
|
||||
} SMergeIntervalAggOperatorInfo;
|
||||
|
||||
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SMergeIntervalAggOperatorInfo* pInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||
taosHashCleanup(pInfo->groupIntervalHash);
|
||||
destroyIntervalOperatorInfo(&pInfo->intervalAggOperatorInfo, numOfOutput);
|
||||
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||
taosHashCleanup(miaInfo->groupIntervalHash);
|
||||
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
|
||||
}
|
||||
|
||||
static int32_t outputPrevIntervalResult(SOperatorInfo * pOperatorInfo, uint64_t tableGroupId, SSDataBlock *pResultBlock, STimeWindow* newWin) {
|
||||
SMergeIntervalAggOperatorInfo *miaInfo = pOperatorInfo->info;
|
||||
SIntervalAggOperatorInfo * iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
||||
|
||||
STimeWindow *prevWin= taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
||||
if (prevWin == NULL) {
|
||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) {
|
||||
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
ASSERT(p1 != NULL);
|
||||
|
||||
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
|
||||
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
|
||||
pTaskInfo);
|
||||
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
|
||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||
int32_t scanFlag, SSDataBlock* pResultBlock) {
|
||||
SMergeIntervalAggOperatorInfo *miaInfo = pOperatorInfo->info;
|
||||
SIntervalAggOperatorInfo * pInfo = &miaInfo->intervalAggOperatorInfo;
|
||||
SIntervalAggOperatorInfo * iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
|
||||
int32_t startPos = 0;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
int64_t* tsCols = extractTsCol(pBlock, pInfo);
|
||||
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
|
||||
uint64_t tableGroupId = pBlock->info.groupId;
|
||||
bool ascScan = (pInfo->order == TSDB_ORDER_ASC);
|
||||
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
||||
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &pInfo->interval,
|
||||
pInfo->interval.precision, &pInfo->win);
|
||||
//TODO: pResultBlock full
|
||||
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
||||
iaInfo->interval.precision, &iaInfo->win);
|
||||
//TODO: pBlock not process not finished
|
||||
//TODO: different block group id or no group id
|
||||
//TODO: lastWin may be none, p1 shall not be null
|
||||
//TODO: the last datablock
|
||||
//TODO: blockDataUpdateTsWindow(pBlock, 0);
|
||||
|
||||
int32_t ret =
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
|
||||
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
||||
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||
int32_t forwardRows =
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
|
||||
ASSERT(forwardRows > 0);
|
||||
|
||||
// prev time window not interpolation yet.
|
||||
if (pInfo->timeWindowInterpo) {
|
||||
if (iaInfo->timeWindowInterpo) {
|
||||
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
|
||||
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
|
||||
|
||||
// restore current time window
|
||||
ret =
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
|
||||
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||
iaInfo->binfo.pCtx,
|
||||
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
// window start key interpolation
|
||||
doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
|
||||
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
|
||||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pBlock->info.rows, numOfOutput, pInfo->order);
|
||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pBlock->info.rows, numOfOutput, iaInfo->order);
|
||||
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
||||
|
||||
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||
STimeWindow *lastWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
||||
if (ascScan && win.skey > lastWin->ekey || (!ascScan) && win.skey < lastWin->ekey) {
|
||||
SET_RES_WINDOW_KEY(pInfo->aggSup.keyBuf, &lastWin->skey, TSDB_KEYSIZE, tableGroupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pInfo->aggSup.pResultRowHashTable, pInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
finalizeResultRowIntoSDataBlock(pInfo->aggSup.pResultBuf, p1,
|
||||
pInfo->binfo.pCtx, pOperatorInfo->pExpr, pOperatorInfo->numOfExprs, pInfo->binfo.rowCellInfoOffset,
|
||||
pResultBlock, pTaskInfo);
|
||||
taosHashRemove(pInfo->aggSup.pResultRowHashTable, pInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
|
||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), &win, sizeof(STimeWindow));
|
||||
}
|
||||
// output previous interval results after this interval (&win) is closed
|
||||
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
|
||||
|
||||
STimeWindow nextWin = win;
|
||||
while (1) {
|
||||
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// null data, failed to allocate more memory buffer
|
||||
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset,
|
||||
&pInfo->aggSup, pTaskInfo);
|
||||
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset,
|
||||
&iaInfo->aggSup, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||
forwardRows =
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
|
||||
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||
forwardRows);
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
tsCols, pBlock->info.rows, numOfOutput, pInfo->order);
|
||||
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
|
||||
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
||||
|
||||
// output previous interval results after this interval (&nextWin) is closed
|
||||
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
|
||||
}
|
||||
|
||||
if (pInfo->timeWindowInterpo) {
|
||||
saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
|
||||
if (iaInfo->timeWindowInterpo) {
|
||||
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3299,36 +3322,56 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
|
||||
SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo;
|
||||
SIntervalAggOperatorInfo *iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
SSDataBlock* pRes = iaInfo->binfo.pRes;
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
SSDataBlock *pBlock = NULL;
|
||||
if (miaInfo->prefetchedBlock == NULL) {
|
||||
pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
} else {
|
||||
pBlock = miaInfo->prefetchedBlock;
|
||||
miaInfo->groupId = pBlock->info.groupId;
|
||||
}
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true);
|
||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||
if (!miaInfo->hasGroupId) {
|
||||
miaInfo->hasGroupId = true;
|
||||
miaInfo->groupId = pBlock->info.groupId;
|
||||
} else if (miaInfo->groupId != pBlock->info.groupId) {
|
||||
miaInfo->prefetchedBlock = pBlock;
|
||||
break;
|
||||
}
|
||||
|
||||
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
|
||||
setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true);
|
||||
STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent;
|
||||
|
||||
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||
doMergeIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
|
||||
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
|
||||
|
||||
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pRes->info.groupId = miaInfo->groupId;
|
||||
if (pRes->info.rows == 0) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
} else {
|
||||
//TODO: ts column index
|
||||
blockDataUpdateTsWindow(pRes, 0);
|
||||
}
|
||||
|
||||
size_t rows = pRes->info.rows;
|
||||
|
@ -3384,7 +3427,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
|||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, doStreamIntervalAgg, NULL,
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||
destroyIntervalOperatorInfo, NULL, NULL, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
Loading…
Reference in New Issue