refactor stream scan log

This commit is contained in:
liuyao 2023-08-15 15:42:50 +08:00
parent 1b4e51ee4a
commit 37e6100204
3 changed files with 26 additions and 36 deletions

View File

@ -2180,39 +2180,30 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
char* getStreamOpName(uint16_t opType) { char* getStreamOpName(uint16_t opType) {
switch (opType) { switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: { case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "stream scan"; return "stream scan";
}; case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: { return "project";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single"; return "interval single";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: {
return "interval final"; return "interval final";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: {
return "interval semi"; return "interval semi";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: {
return "stream fill"; return "stream fill";
} case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: {
return "session single"; return "session single";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: {
return "session semi"; return "session semi";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: {
return "session final"; return "session final";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
return "state single"; return "state single";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
return "stream partitionby"; return "stream partitionby";
}; case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: {
return "stream event"; return "stream event";
};
} }
return ""; return "";
} }

View File

@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
}
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }

View File

@ -1921,7 +1921,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
switch (pInfo->scanMode) { switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: { case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo)); printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes; return pInfo->pRecoverRes;
} break; } break;
default: default:
@ -1932,22 +1932,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) { if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes); calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// } else {
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
// }
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl", GET_TASKID(pTaskInfo)); printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows); qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo)); printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes; return pInfo->pRecoverRes;
} }
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
@ -2003,7 +1998,7 @@ FETCH_NEXT_BLOCK:
pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break; } break;
case STREAM_DELETE_DATA: { case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL; SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) { if (pInfo->tqReader) {
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
@ -2014,7 +2009,7 @@ FETCH_NEXT_BLOCK:
setBlockGroupIdByUid(pInfo, pDelBlock); setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered", GET_TASKID(pTaskInfo)); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", GET_TASKID(pTaskInfo));
if (pDelBlock->info.rows == 0) { if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) { if (pInfo->tqReader) {
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
@ -2025,7 +2020,7 @@ FETCH_NEXT_BLOCK:
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pDelBlock, "stream scan delete result", GET_TASKID(pTaskInfo)); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
if (pInfo->pDeleteDataRes->info.rows > 0) { if (pInfo->pDeleteDataRes->info.rows > 0) {
@ -2040,7 +2035,7 @@ FETCH_NEXT_BLOCK:
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pDelBlock, "stream scan delete data", GET_TASKID(pTaskInfo)); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
if (pInfo->tqReader) { if (pInfo->tqReader) {
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
} }
@ -2055,7 +2050,7 @@ FETCH_NEXT_BLOCK:
default: default:
break; break;
} }
// printDataBlock(pBlock, "stream scan recv"); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
@ -2091,7 +2086,7 @@ FETCH_NEXT_BLOCK:
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "stream scan update", GET_TASKID(pTaskInfo)); printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
calBlockTbName(pInfo, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }