diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 8eeeff4148..205d9d85f3 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -257,6 +257,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b87af9ed04..51f624b7e8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2189,6 +2189,8 @@ char* getStreamOpName(uint16_t opType) { return "interval final"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: return "interval semi"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL: + return "interval mid"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: return "stream fill"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index a718373f60..06566633c4 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3193,3 +3193,205 @@ _error: pTaskInfo->code = code; return NULL; } + +static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pUpdatedMap) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + + SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + int32_t numOfOutput = pSup->numOfExprs; + int32_t step = 1; + SRowBuffPos* pResPos = NULL; + SResultRow* pResult = NULL; + int32_t forwardRows = 1; + uint64_t groupId = pSDataBlock->info.id.groupId; + + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + TSKEY* tsCol = (int64_t*)pColDataInfo->pData; + + int32_t startPos = 0; + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCol); + STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval); + + while (1) { + bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); + if ((pInfo->ignoreExpiredData && isClosed) || + !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin); + if (startPos < 0) { + break; + } + continue; + } + + int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, + pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->statestore); + pResult = (SResultRow*)pResPos->pRowBuff; + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + } + + SWinKey key = { + .ts = pResult->win.skey, + .groupId = groupId, + }; + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { + saveWinResult(&key, pResPos, pUpdatedMap); + } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); + } + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + pSDataBlock->info.rows, numOfOutput); + key.ts = nextWin.skey; + + if (pInfo->delKey.ts > key.ts) { + pInfo->delKey = key; + } + int32_t prevEndPos = (forwardRows - 1) * step + startPos; + if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { + qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 + ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + blockDataUpdateTsWindow(pSDataBlock, 0); + + // timestamp of the data is incorrect + if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { + qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + } + } + startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCol, prevEndPos); + if (startPos < 0) { + break; + } + } +} + +static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SExprSupp* pSup = &pOperator->exprSupp; + + qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } else if (pOperator->status == OP_RES_TO_RETURN) { + SSDataBlock* resBlock = buildIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + + setOperatorCompleted(pOperator); + clearFunctionContext(&pOperator->exprSupp); + clearStreamIntervalOperator(pInfo); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); + qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); + return NULL; + } else { + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + } + + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + } + if (!pInfo->pUpdatedMap) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + } + + while (1) { + if (isTaskKilled(pTaskInfo)) { + if (pInfo->pUpdated != NULL) { + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + } + + if (pInfo->pUpdatedMap != NULL) { + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + } + + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + pOperator->status = OP_RES_TO_RETURN; + qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), + pInfo->numOfDatapack); + pInfo->numOfDatapack = 0; + break; + } + pInfo->numOfDatapack++; + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + + if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { + pInfo->binfo.pRes->info.type = pBlock->info.type; + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || + pBlock->info.type == STREAM_CLEAR) { + SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + removeResults(delWins, pInfo->pUpdatedMap); + taosArrayAddAll(pInfo->pDelWins, delWins); + taosArrayDestroy(delWins); + + doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + if (pBlock->info.type == STREAM_CLEAR) { + pInfo->pDelRes->info.type = STREAM_CLEAR; + } else { + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + } + return pInfo->pDelRes; + } + break; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else { + ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + } + + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + doStreamMidIntervalAggImpl(pOperator, pBlock, pInfo->pUpdatedMap); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); + pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); + } + + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; + + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { + taosArrayPush(pInfo->pUpdated, pIte); + } + + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + taosArraySort(pInfo->pUpdated, winPosCmprImpl); + + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + + return buildIntervalResult(pOperator); +}