diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 773fc0e1af..edea7724b5 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -504,7 +504,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap } int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) { - tqDebug("tq reader retrieve data block %p, index:%d, total:%d, %s", pReader->msg.msgStr, pReader->nextBlk, + tqDebug("tq reader retrieve data block %p, index:%d/%d, %s", pReader->msg.msgStr, pReader->nextBlk, (int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b818e35e64..5a539106b3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2046,17 +2046,18 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } + const char* id = GET_TASKID(pTaskInfo); SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - - int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists); + int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: while (1) { if (pInfo->tqReader->msg.msgStr == NULL) { - if (pInfo->validBlockIndex >= totBlockNum) { + if (pInfo->validBlockIndex >= totalBlocks) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); - qDebug("stream scan return empty, consume block %d", totBlockNum); + + qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); void* buff = NULL; // int32_t len = streamScanOperatorEncode(pInfo, &buff); // if (len > 0) { @@ -2068,17 +2069,18 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); + + qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { - qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, - totBlockNum); + qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); continue; } } blockDataCleanup(pInfo->pRes); - while (tqNextBlockImpl(pInfo->tqReader, pTaskInfo->id.str)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str); + while (tqNextBlockImpl(pInfo->tqReader, id)) { + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -2099,6 +2101,7 @@ FETCH_NEXT_BLOCK: break; } } + if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } else { @@ -2110,7 +2113,7 @@ FETCH_NEXT_BLOCK: pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - qDebug("scan rows: %" PRId64", %s", pBlockInfo->rows, pTaskInfo->id.str); + qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { return pInfo->pRes; }