From 6c944bb1927fa157469e640c5a9cd018e7ebb5d7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 2 Nov 2023 20:32:54 +0800 Subject: [PATCH] enh: skip rsma_task during load stream tasks --- source/dnode/vnode/src/sma/smaRollup.c | 5 +++-- source/libs/stream/src/streamMeta.c | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8882dada9a..cd93dda4fb 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -22,6 +22,7 @@ #define RSMA_FETCH_DELAY_MAX (120000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms +#define RSMA_TASK_FLAG "rsma_task" #define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) @@ -264,8 +265,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->id.streamId = pRSmaInfo->suid + idx; pStreamTask->chkInfo.startTs = taosGetTimestampMs(); pStreamTask->pMeta = pVnode->pTq->pStreamMeta; - pStreamTask->exec.qmsg = taosMemoryMalloc(2); - sprintf(pStreamTask->exec.qmsg, "%d", idx); + pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1); + sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG); pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId; pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f788e244cd..7f023f2451 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -687,7 +687,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } tDecoderClear(&decoder); - if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + if (0 == strcmp(pTask->exec.qmsg, "rsma_task")) { + tFreeStreamTask(pTask); + continue; + } else if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask);