From 4c6bc4d2c3f33686f138d75101371191cf405f5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 16:10:03 +0800 Subject: [PATCH] fix(stream): continue check wal when meeting empty delete block msg. --- source/dnode/vnode/src/tq/tqRead.c | 107 +++++++++++++++++------------ 1 file changed, 62 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 43f38ade97..d3157dc3b0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { } int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { - int32_t code = walNextValidMsg(pReader); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = 0; + SWalCont* pCont = &pReader->pHead->head; + + while(1) { + code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int64_t ver = pCont->version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); + return TSDB_CODE_SUCCESS; + } + + if (pCont->msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg)); + int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return code; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + tqError("%s failed to create data submit for stream since out of memory", id); + return code; + } + } else if (pCont->msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); + int32_t len = pCont->bodyLen - sizeof(SMsgHead); + + code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + if (code == TSDB_CODE_SUCCESS) { + if (*pItem == NULL) { + tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); + // we need to continue check next data in the wal files. + continue; + } else { + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver); + } + } else { + terrno = code; + tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); + return code; + } + + } else { + ASSERT(0); + } + return code; } - - int64_t ver = pReader->pHead->head.version; - if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); - return TSDB_CODE_SUCCESS; - } - - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; - } - - memcpy(data, pBody, len); - SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; - - *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); - if (*pItem == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", id); - return terrno; - } - } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); - - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); - } else { - tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); - } - } else { - ASSERT(0); - } - - return 0; } // todo ignore the error in wal?