From 8b12d4d3dad000c91fef10a218db6231a72f3ee3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 May 2023 15:02:03 +0800 Subject: [PATCH] fix(stream): secure the delete task operation. TD-1198 --- source/dnode/vnode/src/tq/tq.c | 2 ++ source/dnode/vnode/src/tq/tqRead.c | 1 - source/dnode/vnode/src/tq/tqRestore.c | 1 + source/libs/stream/src/streamExec.c | 1 + source/libs/stream/src/streamMeta.c | 4 +++- 5 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 12b81b6c3f..6b46a6a12f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1326,7 +1326,9 @@ int32_t tqStartStreamTasks(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + taosWLockLatch(&pMeta->lock); + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); if (numOfTasks == 0) { tqInfo("vgId:%d no stream tasks exists", vgId); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index ead00dcc35..0deac6e82c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1039,6 +1039,5 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } taosWUnLockLatch(&pTq->pStreamMeta->lock); - return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 58cb7b9e63..c3c7f7ba7b 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -36,6 +36,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); + pMeta->walScanCounter -= 1; times = pMeta->walScanCounter; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f33e126068..aa7aaf93d4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -313,6 +313,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); + streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { taosWUnLockLatch(&pTask->pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 822ae2a485..0d797f0bcb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -216,12 +216,14 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; + + taosWLockLatch(&pMeta->lock); + taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); - taosWLockLatch(&pMeta->lock); streamMetaReleaseTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); }