fix(stream): fix error in stream read.

This commit is contained in:
Haojun Liao 2023-05-18 16:59:18 +08:00
parent 5d5b2bb16a
commit 8a328b6636
2 changed files with 33 additions and 127 deletions

View File

@ -2080,17 +2080,35 @@ FETCH_NEXT_BLOCK:
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
while (1) { while (1) {
bool hasResult = tqNextBlockInWal(pInfo->tqReader, id); if (pInfo->tqReader->msg.msgStr == NULL) {
SSDataBlock* pRes = pInfo->tqReader->pResBlock; if (pInfo->validBlockIndex >= totalBlocks) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
return NULL;
}
int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
continue;
}
}
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
if (hasResult) { while (tqNextBlockImpl(pInfo->tqReader, id)) {
qDebug("stream scan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
pTaskInfo->streamInfo.currentOffset.version); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
setBlockIntoRes(pInfo, pRes, true); continue;
}
setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false);
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
@ -2101,16 +2119,16 @@ FETCH_NEXT_BLOCK:
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
} else {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
return NULL; break;
}
} }
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break; break;
} else {
continue;
} }
} }
@ -2118,7 +2136,7 @@ FETCH_NEXT_BLOCK:
pInfo->numOfExec++; pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows; pOperator->resultInfo.totalRows += pBlockInfo->rows;
qDebug("stream scan get source rows:%" PRId64 ", %s", pBlockInfo->rows, id); qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pBlock; return pBlock;
} }
@ -2128,83 +2146,8 @@ FETCH_NEXT_BLOCK:
} }
goto NEXT_SUBMIT_BLK; goto NEXT_SUBMIT_BLK;
// } else {
// qDebug("stream scan get none from log, return, version:%" PRId64,
// pTaskInfo->streamInfo.currentOffset.version); return NULL;
// }
// while (1) {
// if (pInfo->tqReader->msg.msgStr == NULL) {
// if (pInfo->validBlockIndex >= totalBlocks) {
// updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
// doClearBufferedBlocks(pInfo);
//
// qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
// return NULL;
// }
//
// int32_t current = pInfo->validBlockIndex++;
// SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
//
// qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
// if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
// qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit,
// current, totalBlocks, id); continue;
// }
// }
//
// blockDataCleanup(pInfo->pRes);
//
// while (tqNextBlockImpl(pInfo->tqReader, id)) {
// int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
// if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
// continue;
// }
//
// setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false);
//
// if (pInfo->pCreateTbRes->info.rows > 0) {
// pInfo->scanMode = STREAM_SCAN_FROM_RES;
// return pInfo->pCreateTbRes;
// }
//
// doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
// doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
// pInfo->pRes->info.dataLoad = 1;
// blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
//
// if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
// break;
// }
// }
//
// if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
// break;
// } else {
// continue;
// }
// }
//
// // record the scan action.
// pInfo->numOfExec++;
// pOperator->resultInfo.totalRows += pBlockInfo->rows;
//
// qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
// if (pBlockInfo->rows > 0) {
// return pInfo->pRes;
// }
//
// if (pInfo->pUpdateDataRes->info.rows > 0) {
// goto FETCH_NEXT_BLOCK;
// }
//
// goto NEXT_SUBMIT_BLK;
// } else {
// ASSERT(0);
// return NULL;
// }
} }
return NULL; return NULL;
} }
@ -2293,44 +2236,6 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
//
// while(1){
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// return NULL;
// }
// SWalCont* pHead = &pInfo->pCkHead->head;
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
//
// if (pHead->msgType == TDMT_VND_SUBMIT) {
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
// &pInfo->pRes); if(block){
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// return block;
// }else{
// fetchVer++;
// }
// } else{
// ASSERT(pInfo->sContext->withMeta);
// ASSERT(IS_META_MSG(pHead->msgType));
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
// return NULL;
// }
// }
return NULL; return NULL;
} }

View File

@ -27,6 +27,7 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
taosSetQueueCapacity(pQueue->queue, cap); taosSetQueueCapacity(pQueue->queue, cap);
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
return pQueue; return pQueue;
FAIL: FAIL:
if (pQueue->queue) taosCloseQueue(pQueue->queue); if (pQueue->queue) taosCloseQueue(pQueue->queue);
if (pQueue->qall) taosFreeQall(pQueue->qall); if (pQueue->qall) taosFreeQall(pQueue->qall);