fix:merge datablock if data in same wal version

This commit is contained in:
wangmm0220 2023-10-10 16:55:36 +08:00
parent 9113c3c3b6
commit 440fa77282
1 changed files with 28 additions and 54 deletions

View File

@ -366,82 +366,56 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
// todo ignore the error in wal?
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
SWalReader* pWalReader = pReader->pWalReader;
SSDataBlock* pDataBlock = NULL;
uint64_t st = taosGetTimestampMs();
while (1) {
SArray* pBlockList = pReader->submit.aSubmitTbData;
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
// try next message in wal file
// todo always retry to avoid read failure caused by wal file deletion
if (walNextValidMsg(pWalReader) < 0) {
return false;
}
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version;
SDecoder decoder = {0};
tDecoderInit(&decoder, pBody, bodyLen);
{
int32_t nSubmitTbData = taosArrayGetSize(pReader->submit.aSubmitTbData);
for (int32_t i = 0; i < nSubmitTbData; i++) {
SSubmitTbData* pData = taosArrayGet(pReader->submit.aSubmitTbData, i);
if (pData->pCreateTbReq != NULL) {
taosArrayDestroy(pData->pCreateTbReq->ctb.tagName);
taosMemoryFreeClear(pData->pCreateTbReq);
}
pData->aRowP = taosArrayDestroy(pData->aRowP);
}
pReader->submit.aSubmitTbData = taosArrayDestroy(pReader->submit.aSubmitTbData);
}
if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
tDecoderClear(&decoder);
tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
return false;
}
tDecoderClear(&decoder);
pReader->nextBlk = 0;
// try next message in wal file
if (walNextValidMsg(pWalReader) < 0) {
return false;
}
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version;
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
pReader->nextBlk = 0;
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {
SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
return true;
}
}
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
return true;
if(pDataBlock == NULL){
pDataBlock = createOneDataBlock(pRes, true);
}else{
blockDataMerge(pDataBlock, pRes);
}
}
} else {
pReader->nextBlk += 1;
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
}
}
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
if(pDataBlock != NULL){
blockDataCleanup(pReader->pResBlock);
copyDataBlock(pReader->pResBlock, pDataBlock);
blockDataDestroy(pDataBlock);
return true;
}else{
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
}
if(taosGetTimestampMs() - st > 1000){
return false;
}