From 853e6e29888afcf14ce4a743c9b82f1c19850bb9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 17 Aug 2022 19:19:58 +0800 Subject: [PATCH 1/4] refactor(mnode): drop stream task --- examples/c/stream_demo.c | 7 +++--- include/common/tcommon.h | 1 + include/libs/stream/tstream.h | 5 +++++ include/util/taoserror.h | 1 + source/dnode/mnode/impl/src/mndScheduler.c | 2 ++ source/dnode/mnode/impl/src/mndStb.c | 11 ++++++---- source/dnode/vnode/src/tq/tq.c | 14 +++--------- source/libs/stream/inc/streamInc.h | 1 - source/libs/stream/src/stream.c | 8 +++++-- source/libs/stream/src/streamMeta.c | 25 ++++++++++++++++------ source/libs/stream/src/streamQueue.c | 4 +++- source/libs/stream/src/streamTask.c | 4 ++-- source/util/src/terror.c | 1 + 13 files changed, 53 insertions(+), 31 deletions(-) diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index dd4fbc8d2d..2fcf4dd62c 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -98,10 +98,9 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = - taos_query(pConn, - "create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition " - "by tbname session(ts, 10s) "); + pRes = taos_query(pConn, + "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " + "count(k) from st1 partition by tbname interval(20s) "); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index e04d9d5e86..dbe020f7ec 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -60,6 +60,7 @@ enum { STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__DESTROY, }; typedef enum EStreamType { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e6fcb021d5..484d0991f2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -53,6 +53,7 @@ enum { TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE, TASK_SCHED_STATUS__FAILED, + TASK_SCHED_STATUS__DROPPING, }; enum { @@ -127,6 +128,10 @@ typedef struct { int8_t type; } SStreamCheckpoint; +typedef struct { + int8_t type; +} SStreamTaskDestroy; + typedef struct { int8_t type; SSDataBlock* pBlock; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d7ec3697af..c3796fbadd 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -291,6 +291,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1) #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) #define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3) +#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a24b7ef459..3bfd7eb596 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } mndAddTaskToTaskSet(taskSourceLevel, pTask); + pTask->triggerParam = 0; + // source pTask->taskLevel = TASK_LEVEL__SOURCE; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index dd2b595c29..59c6d65953 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2021,8 +2021,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid) { - mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + if (pCol->tableId == suid) { sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); return -1; @@ -2045,6 +2044,11 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; + if (pStream->smaId == 0 && pStream->targetStbUid == suid) { + sdbRelease(pSdb, pStream); + return -1; + } + SNode *pAst = NULL; if (nodesStringToNode(pStream->ast, &pAst) != 0) { ASSERT(0); @@ -2057,8 +2061,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid) { - mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + if (pCol->tableId == suid) { sdbRelease(pSdb, pStream); nodesDestroyNode(pAst); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a98fea1988..c6bc8e6e59 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -628,8 +628,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { - int32_t code = 0; - if (pTask->taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } @@ -640,8 +638,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->outputQueue = streamQueueOpen(); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { - code = -1; - goto FAIL; + return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; @@ -686,14 +683,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { streamSetupTrigger(pTask); - tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, + tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, pTask->selfChildId); - -FAIL: - if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); - if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); - // TODO free executor - return code; + return 0; } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 3776cb261f..6e30eeaa86 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -32,7 +32,6 @@ typedef struct { static SStreamGlobalEnv streamEnv; -int32_t streamExec(SStreamTask* pTask); int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch); int32_t streamDispatch(SStreamTask* pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6da7d4fd59..d6e87c2736 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S tFreeStreamDispatchReq(pReq); if (exec) { - streamTryExec(pTask); + if (streamTryExec(pTask) < 0) { + return -1; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); @@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { } int32_t streamProcessRunReq(SStreamTask* pTask) { - streamTryExec(pTask); + if (streamTryExec(pTask) < 0) { + return -1; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 64a9537e6c..f34f68ffc6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -99,16 +99,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* goto FAIL; } - taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + goto FAIL; + } if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) { + taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t)); ASSERT(0); - return -1; + goto FAIL; } return 0; FAIL: - if (pTask) taosMemoryFree(pTask); + if (pTask) tFreeSStreamTask(pTask); return -1; } @@ -158,11 +161,21 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + + if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { + /*return -1;*/ + } + + while (1) { + int8_t schedStatus = + atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + tFreeSStreamTask(pTask); + break; + } + } } - if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { - /*return -1;*/ - } return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6819e5329f..45b78a8c6e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -38,7 +38,9 @@ void streamQueueClose(SStreamQueue* queue) { if (qItem) { taosFreeQitem(qItem); } else { - return; + break; } } + taosFreeQall(queue->qall); + taosCloseQueue(queue->queue); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 638d39e5cc..0c35c1408e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -152,8 +152,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } void tFreeSStreamTask(SStreamTask* pTask) { - streamQueueClose(pTask->inputQueue); - streamQueueClose(pTask->outputQueue); + if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); + if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); taosMemoryFree(pTask); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 7b06967940..f6b62b5ea8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") From 963287798dcacd32ef03b70f83f3813c006e0c67 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 17 Aug 2022 19:44:53 +0800 Subject: [PATCH 2/4] refactor(mnode): drop stream task --- include/libs/stream/tstream.h | 1 - source/dnode/mnode/impl/src/mndStb.c | 7 ++++- source/dnode/vnode/src/tq/tqSink.c | 44 +++++++++++++++------------- source/libs/stream/src/streamMeta.c | 8 +++++ source/libs/stream/src/streamQueue.c | 5 ++-- source/libs/stream/src/streamTask.c | 8 +++++ 6 files changed, 48 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 484d0991f2..384c6a289f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -216,7 +216,6 @@ typedef struct { void* vnode; FTbSink* tbSinkFunc; STSchema* pTSchema; - SHashObj* pHash; // groupId to tbuid } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 59c6d65953..6083a76981 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2044,7 +2044,12 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - if (pStream->smaId == 0 && pStream->targetStbUid == suid) { + if (pStream->smaId != 0) { + sdbRelease(pSdb, pStream); + continue; + } + + if (pStream->targetStbUid == suid) { sdbRelease(pSdb, pStream); return -1; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 55630511bf..522bf46aa1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, - pTask->tbSink.stbFullName, &deleteReq); + SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, + pTask->tbSink.stbFullName, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); - int32_t code; - int32_t len; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); - } - SEncoder encoder; - void* buf = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - - ((SMsgHead*)buf)->vgId = pVnode->config.vgId; - if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { + int32_t code; + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code < 0) { + // + ASSERT(0); + } + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + + ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; + SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, - .pCont = buf, + .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(serializedDeleteReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } @@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { // build write msg SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, - .pCont = pReq, - .contLen = ntohl(pReq->length), + .pCont = submitReq, + .contLen = ntohl(submitReq->length), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(submitReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f34f68ffc6..5ff700546c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -15,6 +15,7 @@ #include "executor.h" #include "tstream.h" +#include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -166,13 +167,20 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { /*return -1;*/ } + if (pTask->triggerParam != 0) { + taosTmrStop(pTask->timer); + } + while (1) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { tFreeSStreamTask(pTask); break; + } else if (schedStatus == TASK_SCHED_STATUS__DROPPING) { + break; } + taosMsleep(10); } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 45b78a8c6e..ac10c82587 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tstream.h" +#include "streamInc.h" SStreamQueue* streamQueueOpen() { SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); @@ -36,11 +36,12 @@ void streamQueueClose(SStreamQueue* queue) { while (1) { void* qItem = streamQueueNextItem(queue); if (qItem) { - taosFreeQitem(qItem); + streamFreeQitem(qItem); } else { break; } } taosFreeQall(queue->qall); taosCloseQueue(queue->queue); + taosMemoryFree(queue); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0c35c1408e..d588d90543 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -156,5 +156,13 @@ void tFreeSStreamTask(SStreamTask* pTask) { if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); + taosArrayDestroy(pTask->childEpInfo); + if (pTask->outputType == TASK_OUTPUT__TABLE) { + tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); + taosMemoryFree(pTask->tbSink.pTSchema); + } + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + } taosMemoryFree(pTask); } From a2757eb4f381ff4dc7b561fa1ca90caa572aa90a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 18 Aug 2022 14:26:11 +0800 Subject: [PATCH 3/4] fix(mnode): memory leak --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndStb.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 19 +++++++++++++++++++ source/dnode/mnode/impl/src/mndStb.c | 16 ++++++++++------ source/dnode/mnode/impl/src/mndStream.c | 13 ++++++++++++- 5 files changed, 43 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 455da6a40e..8cff7fe48e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -636,6 +636,7 @@ typedef struct { int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj); +void tFreeStreamObj(SStreamObj* pObj); typedef struct { char streamName[TSDB_STREAM_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index 44a7fdadde..010199a89f 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate); SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); +void mndFreeStb(SStbObj *pStb); void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst); void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 08ce161409..e6f1a40993 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { return 0; } +void tFreeStreamObj(SStreamObj *pStream) { + taosMemoryFree(pStream->sql); + taosMemoryFree(pStream->ast); + taosMemoryFree(pStream->physicalPlan); + if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema); + + int32_t sz = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + int32_t taskSz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < taskSz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + tFreeSStreamTask(pTask); + } + taosArrayDestroy(pLevel); + } + taosArrayDestroy(pStream->tasks); +} + SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6083a76981..e0f2b83160 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -266,6 +266,15 @@ _OVER: return pRow; } +void mndFreeStb(SStbObj *pStb) { + taosArrayDestroy(pStb->pFuncs); + taosMemoryFreeClear(pStb->pColumns); + taosMemoryFreeClear(pStb->pTags); + taosMemoryFreeClear(pStb->comment); + taosMemoryFreeClear(pStb->pAst1); + taosMemoryFreeClear(pStb->pAst2); +} + static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb); return 0; @@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); - taosArrayDestroy(pStb->pFuncs); - taosMemoryFreeClear(pStb->pColumns); - taosMemoryFreeClear(pStb->pTags); - taosMemoryFreeClear(pStb->comment); - taosMemoryFreeClear(pStb->pAst1); - taosMemoryFreeClear(pStb->pAst2); + mndFreeStb(pStb); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8c453e0c88..6dc8e2072b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) { static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) { mTrace("stream:%s, perform delete action", pStream->name); + taosWLockLatch(&pStream->lock); + tFreeStreamObj(pStream); + taosWUnLockLatch(&pStream->lock); return 0; } @@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre stbObj.uid = pStream->targetStbUid; - if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) { + mndFreeStb(&stbObj); + goto _OVER; + } + + tFreeSMCreateStbReq(&createReq); + mndFreeStb(&stbObj); return 0; _OVER: + tFreeSMCreateStbReq(&createReq); mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); return -1; @@ -715,6 +725,7 @@ _OVER: mndReleaseDb(pMnode, pDb); tFreeSCMCreateStreamReq(&createStreamReq); + tFreeStreamObj(&streamObj); return code; } From d9e978814ad66b87478a88e53a506afe57a02a88 Mon Sep 17 00:00:00 2001 From: Hui Li <52318143+plum-lihui@users.noreply.github.com> Date: Thu, 18 Aug 2022 15:33:12 +0800 Subject: [PATCH 4/4] ci: rust dir The 2.x and 3.x have the same file name but different contents. --- Jenkinsfile2 | 1 + 1 file changed, 1 insertion(+) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 754617f99f..12e806c87a 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -43,6 +43,7 @@ def pre_test(){ cd ${WKC} git reset --hard git clean -fxd + rm -rf examples/rust/ git remote prune origin git fetch '''