From 440fa772828229b99d21851e067d827a30f91381 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 10 Oct 2023 16:55:36 +0800 Subject: [PATCH] fix:merge datablock if data in same wal version --- source/dnode/vnode/src/tq/tqRead.c | 82 ++++++++++-------------------- 1 file changed, 28 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 3b052a3edd..1c2561b1a8 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -366,82 +366,56 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con // todo ignore the error in wal? bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; + SSDataBlock* pDataBlock = NULL; uint64_t st = taosGetTimestampMs(); while (1) { - SArray* pBlockList = pReader->submit.aSubmitTbData; - if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { - // try next message in wal file - // todo always retry to avoid read failure caused by wal file deletion - if (walNextValidMsg(pWalReader) < 0) { - return false; - } - - void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pWalReader->pHead->head.version; - - SDecoder decoder = {0}; - tDecoderInit(&decoder, pBody, bodyLen); - - { - int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData); - for (int32_t i = 0; i < nSubmitTbData; i++) { - SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i); - if (pData->pCreateTbReq != NULL) { - taosArrayDestroy(pData->pCreateTbReq->ctb.tagName); - taosMemoryFreeClear(pData->pCreateTbReq); - } - pData->aRowP = taosArrayDestroy(pData->aRowP); - } - pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData); - } - - if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) { - tDecoderClear(&decoder); - tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver); - return false; - } - - tDecoderClear(&decoder); - pReader->nextBlk = 0; + // try next message in wal file + if (walNextValidMsg(pWalReader) < 0) { + return false; } + void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pWalReader->pHead->head.version; + + tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); + pReader->nextBlk = 0; int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk, - numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk); + tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, + numOfBlocks, pReader->msg.msgLen, pReader->msg.ver); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pReader->tbIdHash == NULL) { - SSDataBlock* pRes = NULL; - int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); - if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { - return true; - } - } - - void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); - if (ret != NULL) { - tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver); - + if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) { + tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid); SSDataBlock* pRes = NULL; int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { - return true; + if(pDataBlock == NULL){ + pDataBlock = createOneDataBlock(pRes, true); + }else{ + blockDataMerge(pDataBlock, pRes); + } } } else { pReader->nextBlk += 1; tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); } } - - qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); - pReader->msg.msgStr = NULL; + if(pDataBlock != NULL){ + blockDataCleanup(pReader->pResBlock); + copyDataBlock(pReader->pResBlock, pDataBlock); + blockDataDestroy(pDataBlock); + return true; + }else{ + qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); + } + if(taosGetTimestampMs() - st > 1000){ return false; }