From 963287798dcacd32ef03b70f83f3813c006e0c67 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 17 Aug 2022 19:44:53 +0800 Subject: [PATCH] refactor(mnode): drop stream task --- include/libs/stream/tstream.h | 1 - source/dnode/mnode/impl/src/mndStb.c | 7 ++++- source/dnode/vnode/src/tq/tqSink.c | 44 +++++++++++++++------------- source/libs/stream/src/streamMeta.c | 8 +++++ source/libs/stream/src/streamQueue.c | 5 ++-- source/libs/stream/src/streamTask.c | 8 +++++ 6 files changed, 48 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 484d0991f2..384c6a289f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -216,7 +216,6 @@ typedef struct { void* vnode; FTbSink* tbSinkFunc; STSchema* pTSchema; - SHashObj* pHash; // groupId to tbuid } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 59c6d65953..6083a76981 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2044,7 +2044,12 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - if (pStream->smaId == 0 && pStream->targetStbUid == suid) { + if (pStream->smaId != 0) { + sdbRelease(pSdb, pStream); + continue; + } + + if (pStream->targetStbUid == suid) { sdbRelease(pSdb, pStream); return -1; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 55630511bf..522bf46aa1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, - pTask->tbSink.stbFullName, &deleteReq); + SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, + pTask->tbSink.stbFullName, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); - int32_t code; - int32_t len; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); - } - SEncoder encoder; - void* buf = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - - ((SMsgHead*)buf)->vgId = pVnode->config.vgId; - if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { + int32_t code; + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code < 0) { + // + ASSERT(0); + } + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + + ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; + SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, - .pCont = buf, + .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(serializedDeleteReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } @@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { // build write msg SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, - .pCont = pReq, - .contLen = ntohl(pReq->length), + .pCont = submitReq, + .contLen = ntohl(submitReq->length), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(submitReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f34f68ffc6..5ff700546c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -15,6 +15,7 @@ #include "executor.h" #include "tstream.h" +#include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -166,13 +167,20 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { /*return -1;*/ } + if (pTask->triggerParam != 0) { + taosTmrStop(pTask->timer); + } + while (1) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { tFreeSStreamTask(pTask); break; + } else if (schedStatus == TASK_SCHED_STATUS__DROPPING) { + break; } + taosMsleep(10); } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 45b78a8c6e..ac10c82587 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tstream.h" +#include "streamInc.h" SStreamQueue* streamQueueOpen() { SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); @@ -36,11 +36,12 @@ void streamQueueClose(SStreamQueue* queue) { while (1) { void* qItem = streamQueueNextItem(queue); if (qItem) { - taosFreeQitem(qItem); + streamFreeQitem(qItem); } else { break; } } taosFreeQall(queue->qall); taosCloseQueue(queue->queue); + taosMemoryFree(queue); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0c35c1408e..d588d90543 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -156,5 +156,13 @@ void tFreeSStreamTask(SStreamTask* pTask) { if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); + taosArrayDestroy(pTask->childEpInfo); + if (pTask->outputType == TASK_OUTPUT__TABLE) { + tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); + taosMemoryFree(pTask->tbSink.pTSchema); + } + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + } taosMemoryFree(pTask); }