diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 88460cd3ca..0b355a2b0f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -261,7 +261,7 @@ bool tqNextBlockImpl(STqReader *pReader); int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveDataBlock(STqReader *pReader, const char* idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2b5ca59408..9c89bd3f82 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -503,13 +503,11 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { - tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); +int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) { + tqDebug("tq reader retrieve data block %p, index:%d, total:%d, %s", pReader->msg.msgStr, pReader->nextBlk, + (int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); - if (pSubmitTbDataRet) { - *pSubmitTbDataRet = pSubmitTbData; - } SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); @@ -672,8 +670,6 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); while (1) { SColVal colVal; - tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx); - tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e1fa7a282b..f7e730242a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2078,7 +2078,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ff51a5a6ae..7b10a1fc99 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -272,7 +272,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { times++; taosMsleep(1); - qDebug("===stream===try agian batchSize:%d", batchSize); + qDebug("===stream===try again batchSize:%d", batchSize); continue; }