feature: add merge interval operator
This commit is contained in:
parent
9193eaf891
commit
25636d6201
|
@ -895,6 +895,9 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor);
|
||||||
bool isSmaStream(int8_t triggerType);
|
bool isSmaStream(int8_t triggerType);
|
||||||
|
|
||||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
||||||
|
int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||||
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1955,6 +1955,51 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t finalizeResultRowIntoSDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||||
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
|
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
||||||
|
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
|
||||||
|
|
||||||
|
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
|
||||||
|
if (pRow->numOfRows == 0) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
|
||||||
|
|
||||||
|
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
|
if (pCtx[j].fpSet.finalize) {
|
||||||
|
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
|
if (TAOS_FAILED(code)) {
|
||||||
|
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
|
// do nothing, todo refactor
|
||||||
|
} else {
|
||||||
|
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
|
||||||
|
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||||
|
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||||
|
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
|
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfExprs) {
|
int32_t numOfExprs) {
|
||||||
|
|
|
@ -3175,3 +3175,229 @@ _error:
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SMergeIntervalAggOperatorInfo {
|
||||||
|
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
SHashObj* groupIntervalHash;
|
||||||
|
} SMergeIntervalAggOperatorInfo;
|
||||||
|
|
||||||
|
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
SMergeIntervalAggOperatorInfo* pInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||||
|
taosHashCleanup(pInfo->groupIntervalHash);
|
||||||
|
destroyIntervalOperatorInfo(&pInfo->intervalAggOperatorInfo, numOfOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||||
|
int32_t scanFlag, SSDataBlock* pResultBlock) {
|
||||||
|
SMergeIntervalAggOperatorInfo *miaInfo = pOperatorInfo->info;
|
||||||
|
SIntervalAggOperatorInfo * pInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
|
|
||||||
|
int32_t startPos = 0;
|
||||||
|
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||||
|
int64_t* tsCols = extractTsCol(pBlock, pInfo);
|
||||||
|
uint64_t tableGroupId = pBlock->info.groupId;
|
||||||
|
bool ascScan = (pInfo->order == TSDB_ORDER_ASC);
|
||||||
|
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||||
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
|
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &pInfo->interval,
|
||||||
|
pInfo->interval.precision, &pInfo->win);
|
||||||
|
//TODO: pResultBlock full
|
||||||
|
//TODO: pBlock not process not finished
|
||||||
|
//TODO: different block group id or no group id
|
||||||
|
//TODO: lastWin may be none, p1 shall not be null
|
||||||
|
//TODO: the last datablock
|
||||||
|
//TODO: blockDataUpdateTsWindow(pBlock, 0);
|
||||||
|
|
||||||
|
int32_t ret =
|
||||||
|
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
|
||||||
|
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||||
|
int32_t forwardRows =
|
||||||
|
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||||
|
ASSERT(forwardRows > 0);
|
||||||
|
|
||||||
|
// prev time window not interpolation yet.
|
||||||
|
if (pInfo->timeWindowInterpo) {
|
||||||
|
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
|
||||||
|
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
|
||||||
|
|
||||||
|
// restore current time window
|
||||||
|
ret =
|
||||||
|
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
|
||||||
|
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
// window start key interpolation
|
||||||
|
doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||||
|
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||||
|
pBlock->info.rows, numOfOutput, pInfo->order);
|
||||||
|
|
||||||
|
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||||
|
STimeWindow *lastWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
||||||
|
if (ascScan && win.skey > lastWin->ekey || (!ascScan) && win.skey < lastWin->ekey) {
|
||||||
|
SET_RES_WINDOW_KEY(pInfo->aggSup.keyBuf, &lastWin->skey, TSDB_KEYSIZE, tableGroupId);
|
||||||
|
SResultRowPosition* p1 =
|
||||||
|
(SResultRowPosition*)taosHashGet(pInfo->aggSup.pResultRowHashTable, pInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
|
finalizeResultRowIntoSDataBlock(pInfo->aggSup.pResultBuf, p1,
|
||||||
|
pInfo->binfo.pCtx, pOperatorInfo->pExpr, pOperatorInfo->numOfExprs, pInfo->binfo.rowCellInfoOffset,
|
||||||
|
pResultBlock, pTaskInfo);
|
||||||
|
taosHashRemove(pInfo->aggSup.pResultRowHashTable, pInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
|
|
||||||
|
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), &win, sizeof(STimeWindow));
|
||||||
|
}
|
||||||
|
|
||||||
|
STimeWindow nextWin = win;
|
||||||
|
while (1) {
|
||||||
|
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||||
|
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||||
|
if (startPos < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// null data, failed to allocate more memory buffer
|
||||||
|
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||||
|
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset,
|
||||||
|
&pInfo->aggSup, pTaskInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||||
|
forwardRows =
|
||||||
|
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||||
|
|
||||||
|
// window start(end) key interpolation
|
||||||
|
doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||||
|
forwardRows);
|
||||||
|
|
||||||
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||||
|
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||||
|
tsCols, pBlock->info.rows, numOfOutput, pInfo->order);
|
||||||
|
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->timeWindowInterpo) {
|
||||||
|
saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
|
||||||
|
SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
|
||||||
|
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true);
|
||||||
|
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||||
|
|
||||||
|
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||||
|
doMergeIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRes->info.rows == 0) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t rows = pRes->info.rows;
|
||||||
|
pOperator->resultInfo.totalRows += rows;
|
||||||
|
return (rows == 0) ? NULL : pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
|
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||||
|
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (miaInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
pInfo->win = pTaskInfo->window;
|
||||||
|
pInfo->order = TSDB_ORDER_ASC;
|
||||||
|
pInfo->interval = *pInterval;
|
||||||
|
pInfo->execModel = pTaskInfo->execModel;
|
||||||
|
pInfo->twAggSup = *pTwAggSupp;
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||||
|
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
|
int32_t code =
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
|
||||||
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||||
|
|
||||||
|
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo);
|
||||||
|
if (pInfo->timeWindowInterpo) {
|
||||||
|
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
||||||
|
}
|
||||||
|
|
||||||
|
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||||
|
|
||||||
|
pOperator->name = "TimeMergeIntervalAggOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->pExpr = pExprInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
pOperator->numOfExprs = numOfCols;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, doStreamIntervalAgg, NULL,
|
||||||
|
destroyIntervalOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
destroyMergeIntervalOperatorInfo(pInfo, numOfCols);
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue