diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02bb65b762..b47288bf45 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -189,7 +189,7 @@ int32_t streamInit(); void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueClose(SStreamQueue* queue); +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7c58431b57..c15a92cc2e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { } int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { - int32_t code = walNextValidMsg(pReader); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = 0; + + while(1) { + code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SWalCont* pCont = &pReader->pHead->head; + int64_t ver = pCont->version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); + return TSDB_CODE_SUCCESS; + } + + if (pCont->msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg)); + int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return code; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + tqError("%s failed to create data submit for stream since out of memory", id); + return code; + } + } else if (pCont->msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); + int32_t len = pCont->bodyLen - sizeof(SMsgHead); + + code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + if (code == TSDB_CODE_SUCCESS) { + if (*pItem == NULL) { + tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); + // we need to continue check next data in the wal files. + continue; + } else { + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver); + } + } else { + terrno = code; + tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); + return code; + } + + } else { + ASSERT(0); + } + return code; } - - int64_t ver = pReader->pHead->head.version; - if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); - return TSDB_CODE_SUCCESS; - } - - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; - } - - memcpy(data, pBody, len); - SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; - - *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); - if (*pItem == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", id); - return terrno; - } - } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); - - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); - } else { - tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); - } - } else { - ASSERT(0); - } - - return 0; } // todo ignore the error in wal? diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 694b0808f2..0864eb3c28 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -780,6 +780,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } streamFreeQitem(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 65135ec9a1..e28c93b8b1 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -35,18 +35,17 @@ FAIL: return NULL; } -void streamQueueClose(SStreamQueue* queue) { - while (1) { - void* qItem = streamQueueNextItem(queue); - if (qItem) { - streamFreeQitem(qItem); - } else { - break; - } +void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { + qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); + + void* qItem = NULL; + while ((qItem = streamQueueNextItem(pQueue)) != NULL) { + streamFreeQitem(qItem); } - taosFreeQall(queue->qall); - taosCloseQueue(queue->queue); - taosMemoryFree(queue); + + taosFreeQall(pQueue->qall); + taosCloseQueue(pQueue->queue); + taosMemoryFree(pQueue); } #if 0 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 122d18e9f0..232ca132ab 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -236,11 +236,11 @@ void tFreeStreamTask(SStreamTask* pTask) { int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue); + streamQueueClose(pTask->inputQueue, pTask->id.taskId); } if (pTask->outputInfo.queue) { - streamQueueClose(pTask->outputInfo.queue); + streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId); } if (pTask->exec.qmsg) { @@ -271,6 +271,11 @@ void tFreeStreamTask(SStreamTask* pTask) { streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); } + if (pTask->msgInfo.pData != NULL) { + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + } + if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); }