From 1309c489e278cf4d4e286e8b369fce534ae57934 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 Aug 2022 19:31:53 +0800 Subject: [PATCH] fix:error in tmq for snapshot --- examples/c/tmq_taosx.c | 4 +- include/libs/executor/executor.h | 1 - source/common/src/tdatablock.c | 1 + source/dnode/vnode/src/tq/tq.c | 10 ++- source/dnode/vnode/src/tq/tqExec.c | 4 +- source/dnode/vnode/src/tq/tqMeta.c | 1 - source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/scanoperator.c | 98 +++++++++++++++---------- tests/test/c/tmq_taosx_ci.c | 3 +- 10 files changed, 78 insertions(+), 54 deletions(-) diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c index d0def44269..9dd42bef6e 100644 --- a/examples/c/tmq_taosx.c +++ b/examples/c/tmq_taosx.c @@ -84,7 +84,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 4"); + pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1"); if (taos_errno(pRes) != 0) { printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); return -1; @@ -98,7 +98,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create database if not exists abc1 vgroups 3"); + pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 9fb0967193..188c5de462 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -43,7 +43,6 @@ typedef struct SReadHandle { int32_t numOfVgroups; void* sContext; // SSnapContext* - void* pWalReader; SHashObj *pFilterOutTbUid; } SReadHandle; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 51c21eafa9..d92f454f21 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1228,6 +1228,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) { } taosArrayDestroy(pBlock->pDataBlock); + pBlock->pDataBlock = NULL; taosMemoryFreeClear(pBlock->pBlockAgg); memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fac2f7cee1..f672647b96 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -385,11 +385,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { code = -1; } + taosMemoryFree(metaRsp.metaRsp); goto OVER; } tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, no data", consumerId, pHandle->subKey, TD_VID(pTq->pVnode)); + + tqOffsetResetToLog(&dataRsp.rspOffset, metaRsp.rspOffset.version); + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } OVER: // TODO wrap in destroy func @@ -404,8 +410,6 @@ OVER: taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); } - taosMemoryFreeClear(metaRsp.metaRsp); - return code; } @@ -497,7 +501,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); handle.tqReader = pHandle->execHandle.pExecReader; - handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader; handle.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid; pHandle->execHandle.task = @@ -516,7 +519,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); handle.tqReader = pHandle->execHandle.pExecReader; - handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader; pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 7d94b2ed88..ccf1c30579 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -124,6 +124,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* if (pRsp->blockNum > 0){ qStreamExtractOffset(task, &pRsp->rspOffset); + tqDebug("task exec exited, get data"); break; } @@ -131,11 +132,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){ qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType); tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; + tqDebug("task exec change to get meta"); continue; } *pMetaRsp = *tmp; - tqDebug("task exec exited"); + tqDebug("task exec exited, get meta"); break; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index f7309b93ea..471393f4ae 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -92,7 +92,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); - reader.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); reader.tqReader = handle.execHandle.pExecReader; reader.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c393b17c0d..a936de589a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -498,13 +498,15 @@ typedef struct SStreamRawScanInfo{ // int64_t snapVersion; // void *metaInfo; // void *dataInfo; - + SVnode* vnode; SWalCkHead* pCkHead; - SReadHandle* readHandle; + bool needFetchLog; + bool hasDataInOneFetchVer; SSDataBlock pRes; // result SSDataBlock - uint64_t groupId; STsdbReader* dataReader; SSnapContext* sContext; + STqReader* tqReader; + SHashObj* pFilterOutTbUid; }SStreamRawScanInfo; typedef struct SSysTableScanInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 21b7043200..53a3151163 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -768,7 +768,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); - tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts); }else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f8c9247ce3..73745dd8a9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1464,6 +1464,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamRawScanInfo* pInfo = pOperator->info; pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta + pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; qDebug("stream scan called"); if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){ @@ -1529,29 +1530,49 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; }else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { - if(pInfo->pCkHead == NULL){ - pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048); - if (pInfo->pCkHead == NULL) { - - } - walSetReaderCapacity(pInfo->readHandle->pWalReader, 2048); - } - int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version; - SWalCont* pHead = &pInfo->pCkHead->head; - if(pHead->msgType != TDMT_VND_SUBMIT){ - fetchVer++; - if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { - return NULL; - } - qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); - pHead = &pInfo->pCkHead->head; + while(1){ + if(pInfo->needFetchLog){ + fetchVer++; + if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { + qDebug("tmq poll: consumer log end. offset %" PRId64, fetchVer); + pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; + pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; + return NULL; + } + SWalCont* pHead = &pInfo->pCkHead->head; + qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); - if(pHead->msgType == TDMT_VND_SUBMIT){ - SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0); - }else if(pInfo->sContext->withMeta){ + if (pHead->msgType == TDMT_VND_SUBMIT) { + SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); + pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; + pTaskInfo->streamInfo.lastStatus.version = fetchVer; + pInfo->hasDataInOneFetchVer = false; + } + } + + SWalCont* pHead = &pInfo->pCkHead->head; + + if (pHead->msgType == TDMT_VND_SUBMIT) { + blockDataFreeRes(&pInfo->pRes); + SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); + if(block){ + qDebug("fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + pInfo->needFetchLog = false; + pInfo->hasDataInOneFetchVer = true; + return block; + }else{ + pInfo->needFetchLog = true; + + if(pInfo->hasDataInOneFetchVer){ + return block; + }else{ + continue; + } + } + } else if(pInfo->sContext->withMeta){ ASSERT(IS_META_MSG(pHead->msgType)); qDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; @@ -1562,23 +1583,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); return NULL; } - } - if (pHead->msgType == TDMT_VND_SUBMIT) { - while(1){ - blockDataFreeRes(&pInfo->pRes); - SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->readHandle->tqReader, pInfo->readHandle->pFilterOutTbUid, &pInfo->pRes); - if(!block){ - fetchVer++; - if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { - return NULL; - } - pHead = &pInfo->pCkHead->head; - SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0); - } - return block; - } + pInfo->needFetchLog = true; } } return NULL; @@ -1587,13 +1593,13 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; taosMemoryFreeClear(pRawScan->pCkHead); - if (pRawScan->readHandle->tqReader) { - tqCloseReader(pRawScan->readHandle->tqReader); + if (pRawScan->tqReader) { + tqCloseReader(pRawScan->tqReader); } blockDataFreeRes(&pRawScan->pRes); tsdbReaderClose(pRawScan->dataReader); destroySnapContext(pRawScan->sContext); - taosHashCleanup(pRawScan->readHandle->pFilterOutTbUid); + taosHashCleanup(pRawScan->pFilterOutTbUid); taosMemoryFree(pRawScan); } @@ -1613,7 +1619,19 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT return NULL; } - pInfo->readHandle = pHandle; + pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048); + if (pInfo->pCkHead == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + pInfo->needFetchLog = true; + pInfo->hasDataInOneFetchVer = false; + + pInfo->vnode = pHandle->vnode; + pInfo->pFilterOutTbUid = pHandle->pFilterOutTbUid; + pInfo->tqReader = pHandle->tqReader; + walSetReaderCapacity(pInfo->tqReader->pWalReader, 2048); + pInfo->sContext = pHandle->sContext; pOperator->name = "RawStreamScanOperator"; // pOperator->blocking = false; diff --git a/tests/test/c/tmq_taosx_ci.c b/tests/test/c/tmq_taosx_ci.c index ece7ad4819..ee5af03f05 100644 --- a/tests/test/c/tmq_taosx_ci.c +++ b/tests/test/c/tmq_taosx_ci.c @@ -501,7 +501,8 @@ int main(int argc, char* argv[]) { if(argc == 3 && strcmp(argv[1], "-c") == 0) { strcpy(dir, argv[2]); }else{ - strcpy(dir, "../../../sim/psim/cfg"); +// strcpy(dir, "../../../sim/psim/cfg"); + strcpy(dir, "/var/log"); } printf("env init\n");