From fb52cfa816f6428bd33ba3c225afc4475043df90 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 20 Jul 2022 19:31:56 +0800 Subject: [PATCH] fix(stream): memory error --- source/dnode/vnode/src/inc/tq.h | 4 ++++ source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 23 ++++++++++++++++++++++- source/libs/executor/src/scanoperator.c | 1 + 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 07bee22a1f..abac77dc01 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -108,6 +108,10 @@ typedef struct { // exec STqExecHandle execHandle; + + // prevent drop + int64_t ntbUid; + SArray* colIdList; // SArray } STqHandle; struct STQ { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fd0f97a638..d785376925 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -142,6 +142,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); +int32_t tqCheckColModifiable(STQ* pTq, int32_t colId); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 208b5d3fa0..b2679ee245 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -208,6 +208,26 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } +int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) { + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->handles, pIter); + if (pIter == NULL) break; + STqHandle* pExec = (STqHandle*)pIter; + if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + int32_t sz = taosArrayGetSize(pExec->colIdList); + for (int32_t i = 0; i < sz; i++) { + int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i); + if (forbidColId == colId) { + taosHashCancelIterate(pTq->handles, pIter); + return -1; + } + } + } + } + return 0; +} + static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) { pRsp->reqOffset = pReq->reqOffset; @@ -752,8 +772,9 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); if (ppTask) { + SStreamTask* pTask = *ppTask; taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING); + atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); } // todo // clear queue diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2e78b61b8c..fcadffa4d0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1519,6 +1519,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pStreamScan->pPullDataRes); blockDataDestroy(pStreamScan->pDeleteDataRes); taosArrayDestroy(pStreamScan->pBlockLists); + taosArrayDestroy(pStreamScan->tsArray); taosMemoryFree(pStreamScan); }