diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c0412d2617..dac723aa58 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); const char* blockDecode(SSDataBlock* pBlock, const char* pData); // for debug -char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); +char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr); int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5188b1e27c..5ce763ab8f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } // for debug -char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { +char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { int32_t size = 2048*1024; *pDataBuf = taosMemoryCalloc(size, 1); char* dumpBuf = *pDataBuf; @@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t rows = pDataBlock->info.rows; int32_t len = 0; len += snprintf(dumpBuf + len, size - len, - "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 + "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", - flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, + taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); if (len >= size - 1) return dumpBuf; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b22650d249..cce31688bc 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); void* colData = colDataGetData(pColData, j); - tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); + tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); } if (IS_SET_NULL(pCol)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 33c9d845b9..485b4f01de 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -185,7 +185,7 @@ int32_t convertFillType(int32_t mode); int32_t resultrowComparAsc(const void* p1, const void* p2); int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI); -void printDataBlock(SSDataBlock* pBlock, const char* flag); +void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index aa0c7945b0..54e86415f5 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2178,12 +2178,12 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return TSDB_CODE_SUCCESS; } -void printDataBlock(SSDataBlock* pBlock, const char* flag) { +void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) { if (!pBlock || pBlock->info.rows == 0) { - qDebug("===stream===%s: Block is Null or Empty", flag); + qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); return; } char* pBuf = NULL; - qDebug("%s", dumpBlockData(pBlock, flag, &pBuf)); + qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr)); taosMemoryFree(pBuf); } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 80c88a803e..ed00329aed 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) { doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, "stream fill"); + printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); return pInfo->pRes; } } if (pOperator->status == OP_RES_TO_RETURN) { doDeleteFillFinalize(pOperator); if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, "stream fill"); + printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); return pInfo->pRes; } setOperatorCompleted(pOperator); @@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pInfo->pFillInfo->preRowKey = INT64_MIN; if (pInfo->pRes->info.rows > 0) { - printDataBlock(pInfo->pRes, "stream fill"); + printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); return pInfo->pRes; } break; } - printDataBlock(pBlock, "stream fill recv"); + printDataBlock(pBlock, "stream fill recv", GET_TASKID(pTaskInfo)); if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) { pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId; @@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { pInfo->pFillSup->hasDelete = true; doDeleteFillResult(pOperator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "stream fill delete"); + printDataBlock(pInfo->pDelRes, "stream fill delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } continue; @@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; - printDataBlock(pInfo->pRes, "stream fill"); + printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo)); return pInfo->pRes; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9228c923a6..4f0bedba3d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; } static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamPartitionOperatorInfo* pInfo = pOperator->info; SSDataBlock* pDest = pInfo->binfo.pRes; @@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pDest->info.rows; pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); ASSERT(pDest->info.rows > 0); - printDataBlock(pDest, "stream partitionby"); + printDataBlock(pDest, "stream partitionby", GET_TASKID(pTaskInfo)); return pDest; } @@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); return NULL; } - printDataBlock(pBlock, "stream partitionby recv"); + printDataBlock(pBlock, "stream partitionby recv", GET_TASKID(pTaskInfo)); switch (pBlock->info.type) { case STREAM_NORMAL: case STREAM_PULL_DATA: @@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { case STREAM_DELETE_DATA: { copyDataBlock(pInfo->pDelRes, pBlock); pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pInfo->pDelRes, "stream partitionby delete"); + printDataBlock(pInfo->pDelRes, "stream partitionby delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } break; default: diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9a836caa8c..85c9c39d11 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1343,7 +1343,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS if (rows == 0) { return TSDB_CODE_SUCCESS; } - + SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); @@ -1360,7 +1360,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS TSKEY startTs = srcStartTsCol[0]; TSKEY endTs = srcEndTsCol[0]; SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver); - printDataBlock(pPreRes, "pre res"); + printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo)); blockDataCleanup(pSrcBlock); int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows); if (code != TSDB_CODE_SUCCESS) { @@ -1375,7 +1375,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid, &groupId, NULL); } - printDataBlock(pSrcBlock, "new delete"); + printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo)); } uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; @@ -1921,38 +1921,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { switch (pInfo->scanMode) { case STREAM_SCAN_FROM_RES: { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - printDataBlock(pInfo->pRecoverRes, "scan recover"); + printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo)); return pInfo->pRecoverRes; } break; - // case STREAM_SCAN_FROM_UPDATERES: { - // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - // printDataBlock(pInfo->pUpdateRes, "recover update"); - // return pInfo->pUpdateRes; - // } break; - // case STREAM_SCAN_FROM_DELETE_DATA: { - // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - // copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - // pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - // printDataBlock(pInfo->pDeleteDataRes, "recover delete"); - // return pInfo->pDeleteDataRes; - // } break; - // case STREAM_SCAN_FROM_DATAREADER_RANGE: { - // SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - // if (pSDB) { - // STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - // pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - // checkUpdateData(pInfo, true, pSDB, false); - // printDataBlock(pSDB, "scan recover update"); - // calBlockTbName(pInfo, pSDB); - // return pSDB; - // } - // blockDataCleanup(pInfo->pUpdateDataRes); - // pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - // } break; default: break; } @@ -1971,12 +1942,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; - printDataBlock(pInfo->pCreateTbRes, "recover createTbl"); + printDataBlock(pInfo->pCreateTbRes, "recover createTbl", GET_TASKID(pTaskInfo)); return pInfo->pCreateTbRes; } qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows); - printDataBlock(pInfo->pRecoverRes, "scan recover"); + printDataBlock(pInfo->pRecoverRes, "scan recover", GET_TASKID(pTaskInfo)); return pInfo->pRecoverRes; } pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; @@ -2032,7 +2003,7 @@ FETCH_NEXT_BLOCK: pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); } break; case STREAM_DELETE_DATA: { - printDataBlock(pBlock, "stream scan delete recv"); + printDataBlock(pBlock, "stream scan delete recv", GET_TASKID(pTaskInfo)); SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -2043,7 +2014,7 @@ FETCH_NEXT_BLOCK: setBlockGroupIdByUid(pInfo, pDelBlock); rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); - printDataBlock(pDelBlock, "stream scan delete recv filtered"); + printDataBlock(pDelBlock, "stream scan delete recv filtered", GET_TASKID(pTaskInfo)); if (pDelBlock->info.rows == 0) { if (pInfo->tqReader) { blockDataDestroy(pDelBlock); @@ -2054,7 +2025,7 @@ FETCH_NEXT_BLOCK: if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pDelBlock, "stream scan delete result"); + printDataBlock(pDelBlock, "stream scan delete result", GET_TASKID(pTaskInfo)); blockDataDestroy(pDelBlock); if (pInfo->pDeleteDataRes->info.rows > 0) { @@ -2069,7 +2040,7 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - printDataBlock(pDelBlock, "stream scan delete data"); + printDataBlock(pDelBlock, "stream scan delete data", GET_TASKID(pTaskInfo)); if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } @@ -2120,7 +2091,7 @@ FETCH_NEXT_BLOCK: STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); - printDataBlock(pSDB, "stream scan update"); + printDataBlock(pSDB, "stream scan update", GET_TASKID(pTaskInfo)); calBlockTbName(pInfo, pSDB); return pSDB; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3abb4f44f4..992147ab10 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2589,20 +2589,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); if (pInfo->pPullDataRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->pPullDataRes; } doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { - printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->binfo.pRes; } @@ -2633,7 +2633,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } } @@ -2670,7 +2670,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } pInfo->numOfDatapack++; - printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -2694,7 +2694,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_CLEAR) { pInfo->pDelRes->info.type = STREAM_CLEAR; } else { @@ -2758,20 +2758,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); if (pInfo->pPullDataRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->pPullDataRes; } doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { - printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); return pInfo->binfo.pRes; } @@ -3660,18 +3660,19 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("===stream=== stream session agg"); if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); + printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -3692,7 +3693,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv"); + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -3770,13 +3771,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); + printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -3950,6 +3951,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { TSKEY maxTs = INT64_MIN; SExprSupp* pSup = &pOperator->exprSupp; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("===stream=== stream session semi agg"); if (pOperator->status == OP_EXEC_DONE) { @@ -3959,13 +3961,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { { doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "semi session"); + printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "semi session delete"); + printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } @@ -3993,7 +3995,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; break; } - printDataBlock(pBlock, "semi session recv"); + printDataBlock(pBlock, "semi session recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -4042,13 +4044,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "semi session"); + printDataBlock(pBInfo->pRes, "semi session", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "semi session delete"); + printDataBlock(pInfo->pDelRes, "semi session delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } @@ -4341,17 +4343,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamStateAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("===stream=== stream state agg"); if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single state delete"); + printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "single state"); + printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -4372,7 +4375,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, "single state recv"); + printDataBlock(pBlock, "single state recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -4425,13 +4428,13 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single state delete"); + printDataBlock(pInfo->pDelRes, "single state delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "single state"); + printDataBlock(pBInfo->pRes, "single state", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } setOperatorCompleted(pOperator); @@ -5211,13 +5214,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single interval delete"); + printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows > 0) { - printDataBlock(pInfo->binfo.pRes, "single interval"); + printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo)); return pInfo->binfo.pRes; } @@ -5257,7 +5260,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } pInfo->numOfDatapack++; - printDataBlock(pBlock, "single interval recv"); + printDataBlock(pBlock, "single interval recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -5269,7 +5272,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { - printDataBlock(pBlock, "single interval"); + printDataBlock(pBlock, "single interval", GET_TASKID(pTaskInfo)); return pBlock; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); @@ -5323,13 +5326,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single interval delete"); + printDataBlock(pInfo->pDelRes, "single interval delete", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows > 0) { - printDataBlock(pInfo->binfo.pRes, "single interval"); + printDataBlock(pInfo->binfo.pRes, "single interval", GET_TASKID(pTaskInfo)); return pInfo->binfo.pRes; }