From d1c289823f0f42c98d78342e633c1ba132713362 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 14 Aug 2023 17:51:20 +0800 Subject: [PATCH] refactor stream session window --- source/libs/executor/inc/executil.h | 7 +- source/libs/executor/src/executil.c | 53 ++++ source/libs/executor/src/filloperator.c | 12 +- source/libs/executor/src/groupoperator.c | 6 +- .../executor/src/streamtimewindowoperator.c | 233 +++++++----------- 5 files changed, 149 insertions(+), 162 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 3339772b1c..e92414d0a2 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -183,9 +183,10 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); int32_t convertFillType(int32_t mode); int32_t resultrowComparAsc(const void* p1, const void* p2); -int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI); - -void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr); +int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI); +char* getStreamOpName(uint16_t opType); +void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr); +void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 8e0569e0f5..9b4d0c1725 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2178,6 +2178,45 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return TSDB_CODE_SUCCESS; } +char* getStreamOpName(uint16_t opType) { + switch (opType) { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: { + return "stream scan"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: { + return "interval single"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: { + return "interval final"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: { + return "interval semi"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: { + return "stream fill"; + } + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: { + return "session single"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: { + return "session semi"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: { + return "session final"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: { + return "state single"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: { + return "stream partitionby"; + }; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: { + return "stream event"; + }; + } + return ""; +} + void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) { if (!pBlock || pBlock->info.rows == 0) { qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); @@ -2188,6 +2227,20 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr taosMemoryFree(pBuf); } +void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { + if (!pBlock || pBlock->info.rows == 0) { + qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); + return; + } + if (qDebugFlag & DEBUG_DEBUG) { + char* pBuf = NULL; + char flagBuf[64]; + snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); + qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); + taosMemoryFree(pBuf); + } +} + TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index ed00329aed..1788027bd6 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pRes; } } if (pOperator->status == OP_RES_TO_RETURN) { doDeleteFillFinalize(pOperator); if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pRes; } setOperatorCompleted(pOperator); @@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pInfo->pFillInfo->preRowKey = INT64_MIN; if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pRes; } break; } - printDataBlock(pBlock, "stream fill recv", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) { pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId; @@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { pInfo->pFillSup->hasDelete = true; doDeleteFillResult(pOperator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "stream fill delete", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } continue; @@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; - printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pRes; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 4f0bedba3d..d097c58835 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -995,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pDest->info.rows; pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); ASSERT(pDest->info.rows > 0); - printDataBlock(pDest, "stream partitionby", GET_TASKID(pTaskInfo)); + printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pDest; } @@ -1116,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); return NULL; } - printDataBlock(pBlock, "stream partitionby recv", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); switch (pBlock->info.type) { case STREAM_NORMAL: case STREAM_PULL_DATA: @@ -1126,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { case STREAM_DELETE_DATA: { copyDataBlock(pInfo->pDelRes, pBlock); pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pInfo->pDelRes, "stream partitionby delete", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } break; default: diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2832af5d10..a718373f60 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -913,18 +913,7 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { pPos->beUsed = true; } } -static char* getStreamOpName(uint16_t opType) { - switch (opType) { - case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: - return "interval single"; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: - return "interval final"; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: - return "interval semi"; - default: - return ""; - } -} + static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; @@ -1030,14 +1019,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; - qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, - IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", pInfo->numOfDatapack); + qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), + pInfo->numOfDatapack); pInfo->numOfDatapack = 0; break; } pInfo->numOfDatapack++; - printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv", - GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -1061,8 +1049,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", - GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_CLEAR) { pInfo->pDelRes->info.type = STREAM_CLEAR; } else { @@ -1260,20 +1247,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->numOfChild = numOfChild; - pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); - if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { - pOperator->name = "StreamFinalIntervalOperator"; - } else { - // semi interval operator does not catch result - pOperator->name = "StreamSemiIntervalOperator"; - } - - if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { - pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - } - pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo)); pInfo->pullIndex = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -1299,6 +1274,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->recvGetAll = false; pOperator->operatorType = pPhyNode->type; + if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { + pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; + } + pOperator->name = getStreamOpName(pOperator->operatorType); pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; @@ -1984,6 +1963,25 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* cleanupGroupResInfo(pGroupResInfo); } } + +static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); + if (pBInfo->pRes->info.rows > 0) { + printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pBInfo->pRes; + } + return NULL; +} + void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { int32_t size = taosArrayGetSize(pAllWins); if (size == 0) { @@ -2011,23 +2009,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - qDebug("===stream=== stream session agg"); + qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", - GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + SSDataBlock* opRes = buildSessionResult(pOperator); + if (opRes) { + return opRes; } - doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); - if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", - GET_TASKID(pTaskInfo)); - return pBInfo->pRes; - } - setOperatorCompleted(pOperator); return NULL; } @@ -2045,8 +2034,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv", - GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -2117,24 +2105,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); -#if 0 - char* pBuf = streamStateSessionDump(pAggSup->pState); - qDebug("===stream===final session%s", pBuf); - taosMemoryFree(pBuf); -#endif - - doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", - GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; - } - - doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); - if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", - GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + SSDataBlock* opRes = buildSessionResult(pOperator); + if (opRes) { + return opRes; } setOperatorCompleted(pOperator); @@ -2276,8 +2249,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } - - setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; + setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); @@ -2312,22 +2285,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - qDebug("===stream=== stream session semi agg"); + qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); if (pOperator->status == OP_EXEC_DONE) { return NULL; } { - doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); - if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo)); - return pBInfo->pRes; - } - - doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + SSDataBlock* opRes = buildSessionResult(pOperator); + if (opRes) { + return opRes; } if (pOperator->status == OP_RES_TO_RETURN) { @@ -2354,7 +2320,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; break; } - printDataBlock(pBlock, "semi session recv", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -2395,22 +2361,9 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = NULL; blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); -#if 0 - char* pBuf = streamStateSessionDump(pAggSup->pState); - qDebug("===stream===semi session%s", pBuf); - taosMemoryFree(pBuf); -#endif - - doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); - if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo)); - return pBInfo->pRes; - } - - doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + SSDataBlock* opRes = buildSessionResult(pOperator); + if (opRes) { + return opRes; } clearFunctionContext(&pOperator->exprSupp); @@ -2432,7 +2385,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; pOperator->operatorType = pPhyNode->type; - char* name = getStreamOpName(pOperator->operatorType); if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); @@ -2440,8 +2392,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); } - setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); - setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); @@ -2697,6 +2648,25 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } } +static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); + if (pBInfo->pRes->info.rows > 0) { + printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pBInfo->pRes; + } + return NULL; +} + static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2708,16 +2678,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("===stream=== stream state agg"); if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; - } - - doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); - if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + SSDataBlock* resBlock = buildStateResult(pOperator); + if (resBlock != NULL) { + return resBlock; } setOperatorCompleted(pOperator); @@ -2782,22 +2745,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); -#if 0 - char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState); - qDebug("===stream===final session%s", pBuf); - taosMemoryFree(pBuf); -#endif - - doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; - } - - doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); - if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + SSDataBlock* resBlock = buildStateResult(pOperator); + if (resBlock != NULL) { + return resBlock; } setOperatorCompleted(pOperator); return NULL; @@ -3015,24 +2965,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; - SExprSupp* pSup = &pOperator->exprSupp; + qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); if (pOperator->status == OP_EXEC_DONE) { return NULL; } if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; - } - - doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); - if (pInfo->binfo.pRes->info.rows > 0) { - printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo)); - return pInfo->binfo.pRes; + SSDataBlock* resBlock = buildIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->recvGetAll) { @@ -3065,25 +3009,26 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); + qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), + pInfo->numOfDatapack); pInfo->numOfDatapack = 0; break; } pInfo->numOfDatapack++; - printDataBlock(pBlock, "single interval recv", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - qDebug("===stream===single interval recv|block type STREAM_GET_ALL"); + qDebug("===stream===%s recv|block type STREAM_GET_ALL", getStreamOpName(pOperator->operatorType)); pInfo->recvGetAll = true; getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { - printDataBlock(pBlock, "single interval", GET_TASKID(pTaskInfo)); + printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pBlock; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); @@ -3129,19 +3074,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; - } - - doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); - if (pInfo->binfo.pRes->info.rows > 0) { - printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo)); - return pInfo->binfo.pRes; - } - - return NULL; + return buildIntervalResult(pOperator); } SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,