diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0c93365c1c..0537702a75 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2080,17 +2080,35 @@ FETCH_NEXT_BLOCK: int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: - while (1) { - bool hasResult = tqNextBlockInWal(pInfo->tqReader, id); - SSDataBlock* pRes = pInfo->tqReader->pResBlock; + if (pInfo->tqReader->msg.msgStr == NULL) { + if (pInfo->validBlockIndex >= totalBlocks) { + updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); + doClearBufferedBlocks(pInfo); + + qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); + return NULL; + } + + int32_t current = pInfo->validBlockIndex++; + SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); + + qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); + if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { + qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); + continue; + } + } blockDataCleanup(pBlock); - if (hasResult) { - qDebug("stream scan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, - pTaskInfo->streamInfo.currentOffset.version); - setBlockIntoRes(pInfo, pRes, true); + while (tqNextBlockImpl(pInfo->tqReader, id)) { + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); + if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { + continue; + } + + setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false); if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; @@ -2101,16 +2119,16 @@ FETCH_NEXT_BLOCK: doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - } else { - updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); - doClearBufferedBlocks(pInfo); - qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); - return NULL; + if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { + break; + } } if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; + } else { + continue; } } @@ -2118,7 +2136,7 @@ FETCH_NEXT_BLOCK: pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - qDebug("stream scan get source rows:%" PRId64 ", %s", pBlockInfo->rows, id); + qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { return pBlock; } @@ -2128,83 +2146,8 @@ FETCH_NEXT_BLOCK: } goto NEXT_SUBMIT_BLK; - - // } else { - // qDebug("stream scan get none from log, return, version:%" PRId64, - // pTaskInfo->streamInfo.currentOffset.version); return NULL; - // } - - // while (1) { - // if (pInfo->tqReader->msg.msgStr == NULL) { - // if (pInfo->validBlockIndex >= totalBlocks) { - // updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); - // doClearBufferedBlocks(pInfo); - // - // qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); - // return NULL; - // } - // - // int32_t current = pInfo->validBlockIndex++; - // SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - // - // qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); - // if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { - // qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, - // current, totalBlocks, id); continue; - // } - // } - // - // blockDataCleanup(pInfo->pRes); - // - // while (tqNextBlockImpl(pInfo->tqReader, id)) { - // int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); - // if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { - // continue; - // } - // - // setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false); - // - // if (pInfo->pCreateTbRes->info.rows > 0) { - // pInfo->scanMode = STREAM_SCAN_FROM_RES; - // return pInfo->pCreateTbRes; - // } - // - // doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); - // doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - // pInfo->pRes->info.dataLoad = 1; - // blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); - // - // if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { - // break; - // } - // } - // - // if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { - // break; - // } else { - // continue; - // } - // } - // - // // record the scan action. - // pInfo->numOfExec++; - // pOperator->resultInfo.totalRows += pBlockInfo->rows; - // - // qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); - // if (pBlockInfo->rows > 0) { - // return pInfo->pRes; - // } - // - // if (pInfo->pUpdateDataRes->info.rows > 0) { - // goto FETCH_NEXT_BLOCK; - // } - // - // goto NEXT_SUBMIT_BLK; - // } else { - // ASSERT(0); - // return NULL; - // } } + return NULL; } @@ -2293,44 +2236,6 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } - // else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { - // int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; - // - // while(1){ - // if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { - // qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); - // pTaskInfo->streamInfo.lastStatus.version = fetchVer; - // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - // return NULL; - // } - // SWalCont* pHead = &pInfo->pCkHead->head; - // qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); - // - // if (pHead->msgType == TDMT_VND_SUBMIT) { - // SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - // tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); - // SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, - // &pInfo->pRes); if(block){ - // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - // pTaskInfo->streamInfo.lastStatus.version = fetchVer; - // qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - // return block; - // }else{ - // fetchVer++; - // } - // } else{ - // ASSERT(pInfo->sContext->withMeta); - // ASSERT(IS_META_MSG(pHead->msgType)); - // qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); - // pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; - // pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; - // pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; - // pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; - // pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); - // memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); - // return NULL; - // } - // } return NULL; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 20abcca197..d2b2c7d840 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -27,6 +27,7 @@ SStreamQueue* streamQueueOpen(int64_t cap) { taosSetQueueCapacity(pQueue->queue, cap); taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); return pQueue; + FAIL: if (pQueue->queue) taosCloseQueue(pQueue->queue); if (pQueue->qall) taosFreeQall(pQueue->qall);