From 37e6100204f309204462632178e9a5820b749e89 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 15 Aug 2023 15:42:50 +0800 Subject: [PATCH] refactor stream scan log --- source/libs/executor/src/executil.c | 35 ++++++++-------------- source/libs/executor/src/projectoperator.c | 4 +++ source/libs/executor/src/scanoperator.c | 23 ++++++-------- 3 files changed, 26 insertions(+), 36 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9b4d0c1725..0344143fdb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2180,39 +2180,30 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags char* getStreamOpName(uint16_t opType) { switch (opType) { - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return "stream scan"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: { + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + return "project"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return "interval single"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: return "interval final"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: return "interval semi"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: return "stream fill"; - } - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: return "session single"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: return "session semi"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: return "session final"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: return "state single"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: return "stream partitionby"; - }; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: { + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: return "stream event"; - }; } return ""; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 1cc377b3ee..8e31bc042f 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + } + return (p->info.rows > 0) ? p : NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 86a368721b..4a8ef0565e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1921,7 +1921,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { switch (pInfo->scanMode) { case STREAM_SCAN_FROM_RES: { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo)); return pInfo->pRecoverRes; } break; default: @@ -1932,22 +1932,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { calBlockTbName(pInfo, pInfo->pRecoverRes); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - // if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); - // } else { - // pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); - // doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); - // } } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; - printDataBlock(pInfo->pCreateTbRes, "recover createTbl", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo)); return pInfo->pCreateTbRes; } qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows); - printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo)); return pInfo->pRecoverRes; } pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; @@ -2003,7 +1998,7 @@ FETCH_NEXT_BLOCK: pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); } break; case STREAM_DELETE_DATA: { - printDataBlock(pBlock, "stream scan delete recv", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo)); SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -2014,7 +2009,7 @@ FETCH_NEXT_BLOCK: setBlockGroupIdByUid(pInfo, pDelBlock); rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); - printDataBlock(pDelBlock, "stream scan delete recv filtered", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", GET_TASKID(pTaskInfo)); if (pDelBlock->info.rows == 0) { if (pInfo->tqReader) { blockDataDestroy(pDelBlock); @@ -2025,7 +2020,7 @@ FETCH_NEXT_BLOCK: if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pDelBlock, "stream scan delete result", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); blockDataDestroy(pDelBlock); if (pInfo->pDeleteDataRes->info.rows > 0) { @@ -2040,7 +2035,7 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - printDataBlock(pDelBlock, "stream scan delete data", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } @@ -2055,7 +2050,7 @@ FETCH_NEXT_BLOCK: default: break; } - // printDataBlock(pBlock, "stream scan recv"); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); @@ -2091,7 +2086,7 @@ FETCH_NEXT_BLOCK: STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); - printDataBlock(pSDB, "stream scan update", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo)); calBlockTbName(pInfo, pSDB); return pSDB; }