From 4b10393c81af31e1411155f0c3c6d239cd12a446 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 18 Aug 2023 18:02:08 +0800 Subject: [PATCH 1/4] fix(stream): fix memory leak and do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamQueue.c | 21 ++++++++++----------- source/libs/stream/src/streamTask.c | 9 +++++++-- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02bb65b762..b47288bf45 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -189,7 +189,7 @@ int32_t streamInit(); void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* queue); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index aaf9fdec72..22e09693c8 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -35,18 +35,17 @@ FAIL: return NULL; } -void streamQueueClose(SStreamQueue* queue) { - while (1) { - void* qItem = streamQueueNextItem(queue); - if (qItem) { - streamFreeQitem(qItem); - } else { - break; - } +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { + qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); + + void* qItem = NULL; + while ((qItem = streamQueueNextItem(pQueue)) != NULL) { + streamFreeQitem(qItem); } - taosFreeQall(queue->qall); - taosCloseQueue(queue->queue); - taosMemoryFree(queue); + + taosFreeQall(pQueue->qall); + taosCloseQueue(pQueue->queue); + taosMemoryFree(pQueue); } #if 0 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9056fa8d93..eda9c1f2bb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) { int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue); + streamQueueClose(pTask->inputQueue, pTask->id.taskId); } if (pTask->outputInfo.queue) { - streamQueueClose(pTask->outputInfo.queue); + streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId); } if (pTask->exec.qmsg) { @@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) { streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } + if (pTask->msgInfo.pData != NULL) { + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + } + if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); } From 718bb4a8815df5fbe82fc67c1acec7c3674ed8c8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 18 Aug 2023 22:40:04 +0800 Subject: [PATCH 2/4] fix(stream): fix the invalid free. --- source/libs/stream/src/streamDispatch.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bb32173404..3d21db2d79 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -766,6 +766,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } streamFreeQitem(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; return TSDB_CODE_SUCCESS; } From 523bad3951f2138d9d1298092df3cfa671b8aded Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 15:45:43 +0800 Subject: [PATCH 3/4] fix(stream): continue check wal when meeting empty delete block msg. --- source/dnode/vnode/src/tq/tqRead.c | 107 +++++++++++++++++------------ 1 file changed, 62 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 43f38ade97..d3157dc3b0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { } int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { - int32_t code = walNextValidMsg(pReader); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = 0; + SWalCont* pCont = &pReader->pHead->head; + + while(1) { + code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int64_t ver = pCont->version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); + return TSDB_CODE_SUCCESS; + } + + if (pCont->msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg)); + int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return code; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + tqError("%s failed to create data submit for stream since out of memory", id); + return code; + } + } else if (pCont->msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); + int32_t len = pCont->bodyLen - sizeof(SMsgHead); + + code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + if (code == TSDB_CODE_SUCCESS) { + if (*pItem == NULL) { + tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); + // we need to continue check next data in the wal files. + continue; + } else { + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver); + } + } else { + terrno = code; + tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); + return code; + } + + } else { + ASSERT(0); + } + return code; } - - int64_t ver = pReader->pHead->head.version; - if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); - return TSDB_CODE_SUCCESS; - } - - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; - } - - memcpy(data, pBody, len); - SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; - - *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); - if (*pItem == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", id); - return terrno; - } - } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); - - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); - } else { - tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); - } - } else { - ASSERT(0); - } - - return 0; } // todo ignore the error in wal? From 7d9d4ae8505a56e0e2ed1249e9122138718fb0e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 16:26:30 +0800 Subject: [PATCH 4/4] fix(stream): fix the ptr ref. --- source/dnode/vnode/src/tq/tqRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d3157dc3b0..ae150750df 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -297,7 +297,6 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { int32_t code = 0; - SWalCont* pCont = &pReader->pHead->head; while(1) { code = walNextValidMsg(pReader); @@ -305,6 +304,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con return code; } + SWalCont* pCont = &pReader->pHead->head; int64_t ver = pCont->version; if (ver > maxVer) { tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);