From d585f34ea9c202b7b6860b324f4215e65ba5c7e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 18:48:50 +0800 Subject: [PATCH 1/6] fix(tmq): avoid seek to previous position. --- include/libs/wal/wal.h | 3 ++- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 34 +++++++++++++++++++-------- source/libs/wal/src/walRead.c | 24 +++++++++---------- 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 46dc179295..0a1f5d51d4 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -193,9 +193,10 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); +int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); +int64_t walReaderGetValidFirstVer(const SWalReader* pReader); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9b6d97d6e2..313c92fce8 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { - if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { return -1; } tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 0bb33b1215..3ad7041a1b 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -107,26 +107,40 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit - SWal *pWal = pTask->exec.pWalReader->pWal; - if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) { - pTask->chkInfo.currentVer = pWal->vers.firstVer; - code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { + int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); + if (pTask->chkInfo.currentVer < firstVer) { + pTask->chkInfo.currentVer = firstVer; + tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); + + // todo need retry if failed + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } else { + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer != -1) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit streamMetaReleaseTask(pStreamMeta, pTask); continue; } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamMetaReleaseTask(pStreamMeta, pTask); - continue; } // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); SPackedData packData = {0}; - code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 37d97b35a6..ac80b828a0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -62,9 +62,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { void walCloseReader(SWalReader *pReader) { taosCloseFile(&pReader->pIdxFile); taosCloseFile(&pReader->pLogFile); - /*if (pReader->cond.enableRef) {*/ - /*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/ - /*}*/ taosMemoryFreeClear(pReader->pHead); taosMemoryFree(pReader); } @@ -74,20 +71,22 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); + if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); -// taosMsleep(10); } -// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; + int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", end index:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } + if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { if (walFetchBodyNew(pReader) < 0) { @@ -98,13 +97,16 @@ int32_t walNextValidMsg(SWalReader *pReader) { if (walSkipFetchBodyNew(pReader) < 0) { return -1; } + fetchVer = pReader->curVersion; } } + return -1; } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } +int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); } static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; @@ -206,7 +208,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return 0; } -int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { +int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); @@ -236,7 +238,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); if (pRead->curVersion != fetchVer) { - if (walReadSeekVer(pRead, fetchVer) < 0) { + if (walReaderSeekVer(pRead, fetchVer) < 0) { return -1; } seeked = true; @@ -276,6 +278,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pReader->pHead = ptr; pReadHead = &pReader->pHead->head; pReader->capacity = pReadHead->bodyLen; @@ -291,14 +294,11 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } -// pRead->curInvalid = 1; return -1; } if (walValidBodyCksum(pReader->pHead) != 0) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver); -// pRead->curInvalid = 1; - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -340,7 +340,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); + code = walReaderSeekVer(pRead, ver); if (code < 0) { // pRead->curVersion = ver; // pRead->curInvalid = 1; @@ -475,7 +475,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { taosThreadMutexLock(&pReader->mutex); if (pReader->curVersion != ver) { - if (walReadSeekVer(pReader, ver) < 0) { + if (walReaderSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1; From 5e16db4e19a942c11765bd4083982c64f73c65ee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 08:54:03 +0800 Subject: [PATCH 2/6] fix(stream):extract delete msg from wal. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/tq.h | 3 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 62 ++++++++++++++++++++++++++- source/dnode/vnode/src/tq/tqPush.c | 4 +- source/dnode/vnode/src/tq/tqRead.c | 45 +++++++++++++------ source/dnode/vnode/src/tq/tqRestore.c | 19 ++++---- source/dnode/vnode/src/tq/tqUtil.c | 4 +- source/libs/wal/src/walRead.c | 2 +- 9 files changed, 110 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 88460cd3ca..c8e1784eec 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -258,7 +258,7 @@ int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqNextBlockInWal(STqReader* pReader); bool tqNextBlockImpl(STqReader *pReader); -int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index dece28de6b..115ecfed38 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -182,8 +182,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqStreamTasksScanWal(STQ* pTq); // tq util +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); +int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); bool tqIsHandleExecuting(STqHandle* pHandle); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1aea479511..0173db976d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -213,7 +213,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); -int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); +int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f3da17553b..be0dc1ec45 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -942,7 +942,66 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { + SDecoder* pCoder = &(SDecoder){0}; + SDeleteRes* pRes = &(SDeleteRes){0}; + + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); + if (pRes->uidList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + tDecoderInit(pCoder, (uint8_t*)pData, len); + tDecodeDeleteRes(pCoder, pRes); + tDecoderClear(pCoder); + + int32_t numOfTables = taosArrayGetSize(pRes->uidList); + if (numOfTables == 0 || pRes->affectedRows == 0) { + taosArrayDestroy(pRes->uidList); + return TSDB_CODE_SUCCESS; + } + + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + blockDataEnsureCapacity(pDelBlock, numOfTables); + pDelBlock->info.rows = numOfTables; + pDelBlock->info.version = ver; + + for (int32_t i = 0; i < numOfTables; i++) { + // start key column + SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); + colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column + SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); + colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false); + // uid column + SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); + int64_t* pUid = taosArrayGet(pRes->uidList, i); + colDataSetVal(pUidCol, i, (const char*)pUid, false); + + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); + } + + taosArrayDestroy(pRes->uidList); + + int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); + *pRef = 1; + + *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); + if (pRefBlock == NULL) { + taosMemoryFree(pRef); + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; + (*pRefBlock)->pBlock = pDelBlock; + (*pRefBlock)->dataRef = pRef; + atomic_add_fetch_32((*pRefBlock)->dataRef, 1); + + return TSDB_CODE_SUCCESS; +} + +int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { bool failed = false; SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -962,6 +1021,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { taosArrayDestroy(pRes->uidList); return 0; } + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); blockDataEnsureCapacity(pDelBlock, sz); pDelBlock->info.rows = sz; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 85fcb4fd80..50d94c5ed5 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -34,12 +34,12 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } - if (msgType == TDMT_VND_SUBMIT) { + if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) { tqStartStreamTasks(pTq); } if (msgType == TDMT_VND_DELETE) { - tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); +// tqProcessDeleteDataReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 313c92fce8..a26a749af3 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -301,25 +301,44 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { return 0; } -int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { - if (walNextValidMsg(pReader) < 0) { - return -1; +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { + int32_t code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pReader->pHead->head.version; - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; + if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return -1; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("%s failed to create data submit for stream since out of memory", id); + return terrno; + } + } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); + int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); + + extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + } else { + ASSERT(0); } - memcpy(data, pBody, len); - *pPackedData = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3ad7041a1b..7db078140d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -124,7 +124,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (currentVer != -1) { + if (currentVer == -1) { int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit streamMetaReleaseTask(pStreamMeta, pTask); @@ -137,26 +137,24 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); +// tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); - SPackedData packData = {0}; - int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + SStreamQueueItem* pItem = NULL; + int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; } - SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); + // delete ignore + if (pItem == NULL) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } noNewDataInWal = false; - code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); + code = tqAddInputBlockNLaunchTask(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, @@ -165,8 +163,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamDataSubmitDestroy(p); - taosFreeQitem(p); + streamMetaReleaseTask(pStreamMeta, pTask); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index c5bf4268a7..27e817c50f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -26,10 +26,10 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { +int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + tqError("s-task:%s failed to put into queue, too many, next ver:%" PRId64, pTask->id.idStr, /*pPackedData->ver*/ 0L); return -1; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index ac80b828a0..bfb2865444 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -87,7 +87,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { return -1; } - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || + if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || pReader->pHead->head.msgType == TDMT_VND_DELETE || (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { if (walFetchBodyNew(pReader) < 0) { return -1; From e1700d00dc1ecf34f3bf31a75d6f6b17a1a47373 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 09:26:19 +0800 Subject: [PATCH 3/6] fix(stream): avoid memory leak. --- source/libs/stream/src/stream.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9ed297bd6b..7047461ca8 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -288,14 +288,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); - if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - + SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem; int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, From 30e7cb5860fca3398d21cab5bd11070fc804f1aa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 10:42:02 +0800 Subject: [PATCH 4/6] fix(tmq): avoid return delete msg for table subscription. --- include/libs/wal/wal.h | 1 + source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqScan.c | 1 + source/libs/wal/src/walRead.c | 5 +++-- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 0a1f5d51d4..9c90c6b5c0 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -135,6 +135,7 @@ typedef struct { // int8_t scanUncommited; int8_t scanNotApplied; int8_t scanMeta; + int8_t deleteMsg; int8_t enableRef; } SWalFilterCond; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index be0dc1ec45..81d64fb98a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -648,7 +648,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + SWalFilterCond cond = {.deleteMsg = 1}; + pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } streamSetupTrigger(pTask); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 800bcc8b71..e4b2fa8821 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -82,6 +82,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; qStreamSetOpen(task); + tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index bfb2865444..d63208522f 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -87,8 +87,9 @@ int32_t walNextValidMsg(SWalReader *pReader) { return -1; } - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || pReader->pHead->head.msgType == TDMT_VND_DELETE || - (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { + int32_t type = pReader->pHead->head.msgType; + if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || + (IS_META_MSG(type) && pReader->cond.scanMeta)) { if (walFetchBodyNew(pReader) < 0) { return -1; } From e35d145734406d1fb624b59f8cdc84816dbf434c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 13:59:13 +0800 Subject: [PATCH 5/6] fix(stream): fix memory leak. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 12 ++---------- source/dnode/vnode/src/tq/tqUtil.c | 2 +- source/libs/executor/src/scanoperator.c | 1 - source/libs/stream/src/streamData.c | 8 +------- 5 files changed, 4 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5fd9a8b12b..0d021e2fa2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -135,7 +135,6 @@ typedef struct { typedef struct { int8_t type; int64_t ver; - int32_t* dataRef; SSDataBlock* pBlock; } SStreamRefDataBlock; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81d64fb98a..81bb5034f1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -947,6 +947,8 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; + *pRefBlock = NULL; + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); if (pRes->uidList == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -984,21 +986,13 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream } taosArrayDestroy(pRes->uidList); - - int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); - *pRef = 1; - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); if (pRefBlock == NULL) { - taosMemoryFree(pRef); return TSDB_CODE_OUT_OF_MEMORY; } (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; (*pRefBlock)->pBlock = pDelBlock; - (*pRefBlock)->dataRef = pRef; - atomic_add_fetch_32((*pRefBlock)->dataRef, 1); - return TSDB_CODE_SUCCESS; } @@ -1069,8 +1063,6 @@ int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->pBlock = pDelBlock; - pRefBlock->dataRef = pRef; - atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { atomic_sub_fetch_32(pRef, 1); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 27e817c50f..c74a25eba1 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -29,7 +29,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next ver:%" PRId64, pTask->id.idStr, /*pPackedData->ver*/ 0L); + tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); return -1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7cb3c00c1a..5abab30e7c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1912,7 +1912,6 @@ FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); - /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3..96022850a3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; - - int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - blockDataDestroy(pRefBlock->pBlock); - taosMemoryFree(pRefBlock->dataRef); - } + blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); } } From f3f52c02323a01c194d96296724e9708639e0fc2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 May 2023 10:43:23 +0800 Subject: [PATCH 6/6] fix(stream): add some logs and set the wal offset to be the ended version when finishing the step2 recover stage. --- source/dnode/vnode/src/tq/tq.c | 5 ++++- source/libs/executor/src/scanoperator.c | 6 +++--- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamRecover.c | 11 +++++++++-- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81bb5034f1..6724e2d303 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -853,7 +853,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage", pTask->id.idStr); + tqDebug("s-task:%s start to recover blocking stage", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -877,6 +877,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } + qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion); + walReaderSeekVer(pTask->exec.pWalReader, sversion); + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5abab30e7c..aa006e3960 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1792,7 +1792,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; - qDebug("stream scan called"); + qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { @@ -1801,13 +1801,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { pTSInfo->base.cond.startVersion = 0; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; - qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; - qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e10562f5cb..e093d45074 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -60,7 +60,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0324580885..26429ea764 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -224,12 +224,19 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; + const char* id = pTask->id.idStr; - qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); + int64_t st = taosGetTimestampMs(); + qDebug("s-task:%s recover step2(blocking stage) started", id); if (qStreamSourceRecoverStep2(exec, ver) < 0) { } - return streamScanExec(pTask, 100); + int32_t code = streamScanExec(pTask, 100); + + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id, el); + + return code; } int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {