refactor(stream): optimize interval
This commit is contained in:
parent
692fdf3397
commit
d251180fae
|
@ -924,9 +924,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
|
||||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
|
|
@ -871,7 +871,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
|
||||||
|
|
||||||
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
|
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
|
||||||
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
|
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
|
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
|
||||||
|
@ -1595,7 +1594,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
@ -1886,62 +1885,6 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
|
||||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
|
||||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
pInfo->inputOrder = TSDB_ORDER_ASC;
|
|
||||||
pInfo->interval = *pInterval;
|
|
||||||
pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
|
|
||||||
pInfo->win = pTaskInfo->window;
|
|
||||||
pInfo->twAggSup = *pTwAggSupp;
|
|
||||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
|
||||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
|
||||||
|
|
||||||
pOperator->name = "StreamTimeIntervalAggOperator";
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
|
|
||||||
pOperator->blocking = true;
|
|
||||||
pOperator->status = OP_NOT_OPENED;
|
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
|
||||||
pOperator->info = pInfo;
|
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
|
|
||||||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
_error:
|
|
||||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
|
||||||
taosMemoryFreeClear(pOperator);
|
|
||||||
pTaskInfo->code = code;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo handle multiple timeline cases. assume no timeline interweaving
|
// todo handle multiple timeline cases. assume no timeline interweaving
|
||||||
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -3109,9 +3052,10 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
|
||||||
|
|
||||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||||
TSKEY maxTs = INT64_MIN;
|
TSKEY maxTs = INT64_MIN;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue