From a9bf05bc018d338103c01874dc41b64fdd4ac054 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Nov 2022 18:32:08 +0800 Subject: [PATCH] refactor: stream meta ref count --- include/libs/stream/tstream.h | 7 +++- source/dnode/mnode/impl/src/mndStream.c | 1 + source/dnode/snode/src/snode.c | 23 ++++++++---- source/dnode/vnode/src/tq/tq.c | 48 ++++++++++++++++++------- source/libs/executor/src/scanoperator.c | 2 ++ source/libs/stream/src/streamMeta.c | 41 +++++++++++++++++++++ source/libs/wal/src/walRef.c | 4 +-- 7 files changed, 103 insertions(+), 23 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0354078b7b..ecd1b6f916 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -338,7 +338,7 @@ typedef struct SStreamTask { int32_t recoverWaitingUpstream; int64_t checkReqId; SArray* checkReqIds; // shuffle - + int32_t refCnt; } SStreamTask; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -565,6 +565,7 @@ typedef struct SStreamMeta { TXN txn; FTaskExpand* expandFunc; int32_t vgId; + SRWLatch lock; } SStreamMeta; SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); @@ -575,6 +576,10 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, c int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); +void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); +void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId); + int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c649180285..2cfb81a817 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -738,6 +738,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); return TSDB_CODE_ACTION_IN_PROGRESS; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f53350c10b..aa55204ae5 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -36,13 +36,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { int32_t taskId = req.taskId; - SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; streamProcessDispatchReq(pTask, &req, &rsp, false); + streamMetaReleaseTask(pSnode->pMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); return; @@ -63,6 +64,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); + pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); @@ -166,15 +168,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; - return streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); + streamMetaRemoveTask1(pSnode->pMeta, pReq->taskId); + return 0; } int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTaskRunReq *pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { streamProcessRunReq(pTask); + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { return -1; @@ -191,13 +195,14 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; - SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; streamProcessDispatchReq(pTask, &req, &rsp, exec); + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { return -1; @@ -215,13 +220,14 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; - SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; streamProcessRetrieveReq(pTask, &req, &rsp); + streamMetaReleaseTask(pSnode->pMeta, pTask); tDeleteStreamRetrieveReq(&req); return 0; } else { @@ -232,9 +238,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { return -1; @@ -274,15 +281,17 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); // find task - SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, req.taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId); if (pTask == NULL) { return -1; } // do process request if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { + streamMetaReleaseTask(pSnode->pMeta, pTask); return -1; } + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1cc771dbb4..419d4c3b9b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -882,6 +882,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } + pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); @@ -975,13 +976,15 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamNodeId = req.upstreamNodeId, .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { rsp.status = 1; } else { rsp.status = 0; } + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); @@ -1027,12 +1030,14 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_ tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, rsp.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); if (pTask == NULL) { return -1; } - return streamProcessTaskCheckRsp(pTask, &rsp, version); + code = streamProcessTaskCheckRsp(pTask, &rsp, version); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + return code; } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { @@ -1077,15 +1082,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { int32_t msgLen = pMsg->contLen; SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { return -1; } + ASSERT(pReq->taskId == pTask->taskId); // check param int64_t fillVer1 = pTask->startVer; if (fillVer1 <= 0) { ASSERT(0); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } @@ -1096,10 +1103,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { SStreamRecoverStep2Req req; code = streamBuildSourceRecover2Req(pTask, &req); if (code < 0) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } - ASSERT(pReq->taskId == pTask->taskId); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); // serialize msg int32_t len = sizeof(SStreamRecoverStep1Req); @@ -1127,7 +1135,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t code; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { return -1; } @@ -1135,27 +1143,33 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m // do recovery step 2 code = streamSourceRecoverScanStep2(pTask, version); if (code < 0) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } // restore param code = streamRestoreParam(pTask); if (code < 0) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } // set status normal code = streamSetStatusNormal(pTask); if (code < 0) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } // dispatch recover finish req to all related downstream task code = streamDispatchRecoverFinishReq(pTask); if (code < 0) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + return 0; } @@ -1172,15 +1186,17 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); // find task - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask == NULL) { return -1; } // do process request if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -1354,9 +1370,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { streamProcessRunReq(pTask); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { return -1; @@ -1373,13 +1390,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; streamProcessDispatchReq(pTask, &req, &rsp, exec); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { return -1; @@ -1389,10 +1407,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = ntohl(pRsp->upstreamTaskId); - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); tqDebug("recv dispatch rsp, code: %x", pMsg->code); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { return -1; @@ -1401,7 +1420,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); + streamMetaRemoveTask1(pTq->pStreamMeta, pReq->taskId); + return 0; } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { @@ -1414,13 +1434,14 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; streamProcessRetrieveReq(pTask, &req, &rsp); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); tDeleteStreamRetrieveReq(&req); return 0; } else { @@ -1452,13 +1473,14 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; streamProcessDispatchReq(pTask, &req, &rsp, false); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); return 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d2e59ea8e1..77d2aeba28 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2385,6 +2385,8 @@ static void destroyStreamScanOperatorInfo(void* param) { taosMemoryFree(pStreamScan->pPseudoExpr); } + cleanupExprSupp(&pStreamScan->tbnameCalSup); + updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index da082b7f74..5ec8828c05 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -169,6 +169,47 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { } } +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { + taosRLockLatch(&pMeta->lock); + + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + SStreamTask* pTask = *ppTask; + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__DROPPING) { + atomic_add_fetch_32(&pTask->refCnt, 1); + taosRUnLockLatch(&pMeta->lock); + return pTask; + } else { + taosRUnLockLatch(&pMeta->lock); + return NULL; + } + } + taosRUnLockLatch(&pMeta->lock); + return NULL; +} + +void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { + int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); + ASSERT(left >= 0); + if (left == 0) { + ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING); + tFreeSStreamTask(pTask); + } +} + +void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) { + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + SStreamTask* pTask = *ppTask; + taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); + atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + + taosWLockLatch(&pMeta->lock); + streamMetaReleaseTask(pMeta, pTask); + taosWUnLockLatch(&pMeta->lock); + } +} + int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index f9994fd315..eae5d9f1a7 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -32,7 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) { return pRef; } -#if 0 +#if 1 void walCloseRef(SWal *pWal, int64_t refId) { SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); if (ppRef == NULL) return; @@ -67,7 +67,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { return 0; } -#if 0 +#if 1 void walUnrefVer(SWalRef *pRef) { pRef->refId = -1; pRef->refFile = -1;