feature: add merge interval operator
This commit is contained in:
parent
c97dcbcf76
commit
62780bbfb4
|
@ -3236,10 +3236,6 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
||||||
iaInfo->interval.precision, &iaInfo->win);
|
iaInfo->interval.precision, &iaInfo->win);
|
||||||
//TODO: pBlock not process not finished
|
|
||||||
//TODO: different block group id or no group id
|
|
||||||
//TODO: the last datablock
|
|
||||||
//TODO: blockDataUpdateTsWindow(pBlock, 0);
|
|
||||||
|
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
||||||
|
@ -3370,7 +3366,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
if (pRes->info.rows == 0) {
|
if (pRes->info.rows == 0) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
} else {
|
} else {
|
||||||
//TODO: ts column index
|
|
||||||
blockDataUpdateTsWindow(pRes, 0);
|
blockDataUpdateTsWindow(pRes, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3387,36 +3382,38 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
||||||
if (miaInfo == NULL || pOperator == NULL) {
|
if (miaInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
SIntervalAggOperatorInfo *pInfo = &miaInfo->intervalAggOperatorInfo;
|
|
||||||
|
|
||||||
pInfo->win = pTaskInfo->window;
|
SIntervalAggOperatorInfo * iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
pInfo->order = TSDB_ORDER_ASC;
|
|
||||||
pInfo->interval = *pInterval;
|
|
||||||
pInfo->execModel = pTaskInfo->execModel;
|
|
||||||
pInfo->twAggSup = *pTwAggSupp;
|
|
||||||
|
|
||||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
iaInfo->win = pTaskInfo->window;
|
||||||
|
iaInfo->order = TSDB_ORDER_ASC;
|
||||||
|
iaInfo->interval = *pInterval;
|
||||||
|
|
||||||
|
iaInfo->execModel = pTaskInfo->execModel;
|
||||||
|
iaInfo->twAggSup = *pTwAggSupp;
|
||||||
|
|
||||||
|
iaInfo->primaryTsIndex = primaryTsSlotId;
|
||||||
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&iaInfo->binfo, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||||
|
|
||||||
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo);
|
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(iaInfo->binfo.pCtx, numOfCols, iaInfo);
|
||||||
if (pInfo->timeWindowInterpo) {
|
if (iaInfo->timeWindowInterpo) {
|
||||||
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
|
||||||
}
|
}
|
||||||
|
|
||||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
// iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
|
if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1);
|
||||||
|
|
||||||
pOperator->name = "TimeMergeIntervalAggOperator";
|
pOperator->name = "TimeMergeIntervalAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
|
||||||
|
@ -3425,10 +3422,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = miaInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||||
destroyIntervalOperatorInfo, NULL, NULL, NULL);
|
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -3438,8 +3435,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
destroyMergeIntervalOperatorInfo(pInfo, numOfCols);
|
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
|
||||||
taosMemoryFreeClear(pInfo);
|
taosMemoryFreeClear(miaInfo);
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue