enh(stream): stream query is not limited by window count
This commit is contained in:
parent
3ace0b150b
commit
cf5f06bd68
|
@ -301,7 +301,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
|
||||
// too many time window in query
|
||||
if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
|
||||
taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||
}
|
||||
|
||||
|
@ -3596,7 +3597,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
|
||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType,
|
||||
int32_t order) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
|
||||
|
||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
|
||||
|
@ -3633,7 +3635,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
|
||||
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
|
@ -3835,7 +3837,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
|
|||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool groupbyTbname(SNodeList* pGroupList) {
|
||||
bool groupbyTbname(SNodeList* pGroupList) {
|
||||
bool bytbname = false;
|
||||
if (LIST_LENGTH(pGroupList) > 0) {
|
||||
SNode* p = nodesListGetNode(pGroupList, 0);
|
||||
|
@ -3877,7 +3879,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
|||
bool assignUid = groupbyTbname(group);
|
||||
|
||||
int32_t groupNum = 0;
|
||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
|
@ -4610,7 +4612,7 @@ void releaseQueryBuf(size_t numOfTables) {
|
|||
}
|
||||
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
|
||||
SExplainExecInfo execInfo = {0};
|
||||
SExplainExecInfo execInfo = {0};
|
||||
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
|
||||
|
||||
pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
|
||||
|
@ -4620,7 +4622,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
|
|||
pExplainInfo->verboseInfo = NULL;
|
||||
|
||||
if (operatorInfo->fpSet.getExplainFn) {
|
||||
int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
|
||||
int32_t code =
|
||||
operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
|
||||
if (code) {
|
||||
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
|
||||
return code;
|
||||
|
@ -4631,7 +4634,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
|
|||
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
|
||||
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// taosMemoryFreeClear(*pRes);
|
||||
// taosMemoryFreeClear(*pRes);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -928,8 +928,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
STimeWindow win =
|
||||
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
|
||||
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||
|
@ -1091,8 +1092,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true);
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
|
||||
}
|
||||
|
||||
|
@ -1791,8 +1792,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
}
|
||||
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
|
@ -2180,7 +2181,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
|
@ -2457,7 +2457,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
|
||||
|
@ -2475,11 +2475,11 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
|||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->winSup.prevTs = INT64_MIN;
|
||||
pInfo->reptScan = false;
|
||||
pInfo->pCondition = pSessionNode->window.node.pConditions;
|
||||
pInfo->reptScan = false;
|
||||
pInfo->pCondition = pSessionNode->window.node.pConditions;
|
||||
|
||||
pOperator->name = "SessionWindowAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
|
||||
|
@ -3155,7 +3155,7 @@ void destroyStateWinInfo(void* ptr) {
|
|||
if (ptr == NULL) {
|
||||
return;
|
||||
}
|
||||
SStateWindowInfo* pWin = (SStateWindowInfo*) ptr;
|
||||
SStateWindowInfo* pWin = (SStateWindowInfo*)ptr;
|
||||
taosMemoryFreeClear(pWin->stateKey.pData);
|
||||
}
|
||||
|
||||
|
@ -3465,7 +3465,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
|
|||
assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
|
||||
// too many time window in query
|
||||
int32_t size = taosArrayGetSize(pAggSup->pCurWins);
|
||||
if (size > MAX_INTERVAL_TIME_WINDOW) {
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && size > MAX_INTERVAL_TIME_WINDOW) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||
}
|
||||
|
||||
|
@ -3647,8 +3647,8 @@ void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
|
|||
taosArrayRemove(pWinInfos, index);
|
||||
}
|
||||
|
||||
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap,
|
||||
SArray* result, FDelete fp) {
|
||||
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result,
|
||||
FDelete fp) {
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
|
@ -4673,7 +4673,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
currTs = tsCols[currPos];
|
||||
currWin.skey = currTs;
|
||||
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
|
||||
iaInfo->interval.precision) - 1;
|
||||
iaInfo->interval.precision) -
|
||||
1;
|
||||
|
||||
startPos = currPos;
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||
|
@ -4933,8 +4934,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
|||
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
STimeWindow win =
|
||||
getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder);
|
||||
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
||||
iaInfo->inputOrder);
|
||||
|
||||
int32_t ret =
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
|
||||
|
@ -4975,7 +4976,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
|||
STimeWindow nextWin = win;
|
||||
while (1) {
|
||||
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
|
||||
startPos =
|
||||
getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue