From a48968e5e8f61c630e7bf4f41a204b51110c8fa8 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 3 Nov 2023 14:33:45 +0800 Subject: [PATCH] enh: rsma checkpoint --- include/common/tmsg.h | 5 -- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 7 +-- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/sma.h | 12 ++-- source/dnode/vnode/src/sma/smaCommit.c | 7 ++- source/dnode/vnode/src/sma/smaOpen.c | 18 +++--- source/dnode/vnode/src/sma/smaRollup.c | 67 +++++++++++++-------- source/dnode/vnode/src/tsdb/tsdbRead2.c | 6 +- source/dnode/vnode/src/vnd/vnodeCfg.c | 29 ++++----- source/libs/stream/src/streamMeta.c | 5 +- 10 files changed, 80 insertions(+), 78 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 57906c1695..4ef4273631 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -445,11 +445,6 @@ typedef struct SRetention { int8_t keepUnit; } SRetention; -typedef struct SRetentionEx { - SRetention rtn; - int64_t checkpointId; -} SRetentionEx; - #define RETENTION_VALID(l, r) ((((l) == 0 && (r)->freq >= 0) || ((r)->freq > 0)) && ((r)->keep > 0)) #pragma pack(push, 1) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 5a4b341662..c4d525a871 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -134,11 +134,10 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.minRows = pCreate->minRows; pCfg->tsdbCfg.maxRows = pCreate->maxRows; for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { - SRetentionEx *pRetention = &pCfg->tsdbCfg.retentions[i]; - memcpy(&pRetention->rtn, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); - pRetention->checkpointId = -1; + SRetention *pRetention = &pCfg->tsdbCfg.retentions[i]; + memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); if (i == 0) { - if ((pRetention->rtn.freq >= 0 && pRetention->rtn.keep > 0)) pCfg->isRsma = 1; + if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1; } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e92fc04f6e..6a0c991be4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -287,7 +287,7 @@ struct STsdbCfg { int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead int32_t keepTimeOffset; // just for save config, use STsdbKeepCfg in STsdb instead - SRetentionEx retentions[TSDB_RETENTION_MAX]; + SRetention retentions[TSDB_RETENTION_MAX]; }; typedef struct { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index bce5e1b0b2..63d6e7e5c2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -155,7 +155,7 @@ struct SRSmaInfo { int64_t lastRecv; // ms int8_t assigned; // 0 idle, 1 assgined for exec int8_t delFlag; - int16_t padding; + int8_t padding; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t @@ -163,12 +163,10 @@ struct SRSmaInfo { STaosQall *qall; // buffer qall of SubmitReq }; -#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) -#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) -#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) -#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) -#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i]) -#define RSMA_INFO_ITEM(r, i) (&(r)->items[i]) +#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) +#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) +#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) +#define RSMA_INFO_ITEM(r, i) (&(r)->items[i]) enum { TASK_TRIGGER_STAT_INIT = 0, diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 652aab3c01..fad2e4d7e9 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -156,10 +156,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { nLoops = 0; while (1) { if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) { - smaDebug("vgId:%d, rsma commit:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit); + smaDebug("vgId:%d, rsma commit, type:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit); break; } else { - smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit); + smaDebug("vgId:%d, rsma commit, type:%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit); } TD_SMA_LOOPS_CHECK(nLoops, 1000); } @@ -175,6 +175,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { TD_SMA_LOOPS_CHECK(nLoops, 1000); } + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); if (!isCommit) goto _exit; @@ -183,7 +184,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + // all rsma results are written completely STsdb *pTsdb = NULL; diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index cea4ccb1b7..c0af670a17 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -16,13 +16,13 @@ #include "sma.h" #include "tsdb.h" -static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration); +static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration); static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); static int32_t rsmaRestore(SSma *pSma); #define SMA_SET_KEEP_CFG(v, l) \ do { \ - SRetention *r = &(pCfg->retentions[l].rtn); \ + SRetention *r = &(pCfg->retentions[l]); \ pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \ pKeepCfg->keep0 = pKeepCfg->keep2; \ pKeepCfg->keep1 = pKeepCfg->keep2; \ @@ -32,7 +32,7 @@ static int32_t rsmaRestore(SSma *pSma); #define SMA_OPEN_RSMA_IMPL(v, l, force) \ do { \ - SRetention *r = &(((SRetentionEx *)VND_RETENTIONS(v) + l)->rtn); \ + SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ if (!RETENTION_VALID(l, r)) { \ if (l == 0) { \ code = TSDB_CODE_INVALID_PARA; \ @@ -59,9 +59,9 @@ static int32_t rsmaRestore(SSma *pSma); * @param duration * @return int32_t */ -static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration) { - int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.freq, precision, TIME_UNIT_MINUTE); - int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.keep, precision, TIME_UNIT_MINUTE); +static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) { + int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE); + int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE); int32_t days = duration; // min if (days < freqDuration) { @@ -76,10 +76,10 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t goto _exit; } - freqDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.freq, precision, TIME_UNIT_MINUTE); - keepDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.keep, precision, TIME_UNIT_MINUTE); + freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE); + keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE); - int32_t nFreqTimes = (r + level)->rtn.freq / (60 * 1000); // use 60s for freq of 1st level + int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level days *= (nFreqTimes > 1 ? nFreqTimes : 1); if (days < freqDuration) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index cd93dda4fb..8f81829dc2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -232,6 +232,29 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui return TSDB_CODE_SUCCESS; } +static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { + int64_t checkpointId = -1; + STaskId id = {.streamId = streamId, .taskId = taskId}; + taosRLockLatch(&pMeta->lock); + SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask && *ppTask) { + checkpointId = (*ppTask)->chkInfo.checkpointId; + } + taosRUnLockLatch(&pMeta->lock); + return checkpointId; +} + +static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { + streamMetaUnregisterTask(pMeta, streamId, taskId); + taosWLockLatch(&pMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pMeta->lock); + smaDebug("vgId:%d rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks); +} + static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx) { if ((param->qmsgLen > 0) && param->qmsg[idx]) { @@ -267,7 +290,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->pMeta = pVnode->pTq->pStreamMeta; pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1); sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG); - pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId; + pStreamTask->chkInfo.checkpointId = + tdRSmaTaskGetCheckpointId(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId); pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; @@ -275,6 +299,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } pItem->pStreamState = pStreamState; + tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId); + SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); @@ -1129,19 +1155,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { TSDB_CHECK_CODE(code, lino, _exit); } - if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { - smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, - nStreamFlushed, nTaskInfo); - taosHashCancelIterate(pInfoHash, infoHash); - goto _checkpoint; + if (streamFlushed) { + pRSmaInfo->items[i].streamFlushed = 1; + if (++nStreamFlushed >= nTaskInfo) { + smaInfo("vgId:%d rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), + nSleep * 10, nStreamFlushed, nTaskInfo); + taosHashCancelIterate(pInfoHash, infoHash); + goto _checkpoint; + } } } } } taosUsleep(10); ++nSleep; - smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, - nStreamFlushed, nTaskInfo); + smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), + nSleep * 10, nStreamFlushed, nTaskInfo); } } while (0); @@ -1149,7 +1178,6 @@ _checkpoint: // stream state: build checkpoint in backend do { void *infoHash = NULL; - while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { @@ -1170,29 +1198,16 @@ _checkpoint: } taosWLockLatch(&pTask->pMeta->lock); - if (streamMetaSaveTask(pTask->pMeta, pTask) != 0) { + if (0 != streamMetaSaveTask(pTask->pMeta, pTask) || 0 != streamMetaCommit(pTask->pMeta)) { taosWUnLockLatch(&pTask->pMeta->lock); - code = TSDB_CODE_OUT_OF_MEMORY; - taosHashCancelIterate(pInfoHash, infoHash); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (streamMetaCommit(pTask->pMeta) != 0) { - taosWUnLockLatch(&pTask->pMeta->lock); - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY; taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); } taosWUnLockLatch(&pTask->pMeta->lock); - // save checkpointId to vnode.json - (pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId; - - smaInfo("vgId:%d, commit task:%p, build stream checkpoint success, table:%" PRIi64 - ", level:%d, checkpointId:%" PRIi64, - TD_VID(pVnode), pTask, pRSmaInfo->suid, i + 1, pTask->checkpointingId); - - + smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint/task:%" PRIi64 "/%p, table:%" PRIi64 ", level:%d", + TD_VID(pVnode), pTask->checkpointingId, pTask, pRSmaInfo->suid, i + 1); } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index be88a5a435..d1919d95ba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -49,7 +49,7 @@ static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idstr, +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); @@ -3140,7 +3140,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idStr, +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr, int8_t* pLevel) { if (VND_IS_RSMA(pVnode)) { int8_t level = 0; @@ -3151,7 +3151,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* r : 1000000L); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { - SRetention* pRetention = &((retentions + level)->rtn); + SRetention* pRetention = retentions + level; if (pRetention->keep <= 0) { if (level > 0) { --level; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index d429eb2a94..07bfa6c719 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -106,24 +106,23 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1; - if (pCfg->tsdbCfg.retentions[0].rtn.keep > 0) { + if (pCfg->tsdbCfg.retentions[0].keep > 0) { int32_t nRetention = 1; - if (pCfg->tsdbCfg.retentions[1].rtn.freq > 0) { + if (pCfg->tsdbCfg.retentions[1].freq > 0) { ++nRetention; - if (pCfg->tsdbCfg.retentions[2].rtn.freq > 0) { + if (pCfg->tsdbCfg.retentions[2].freq > 0) { ++nRetention; } } SJson *pNodeRetentions = tjsonCreateArray(); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); for (int32_t i = 0; i < nRetention; ++i) { - SJson *pNodeRetention = tjsonCreateObject(); - const SRetentionEx *pRetention = pCfg->tsdbCfg.retentions + i; - tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->rtn.freq); - tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit); - tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->rtn.keep); - tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit); - tjsonAddIntegerToObject(pNodeRetention, "checkpointId", pRetention->checkpointId); + SJson *pNodeRetention = tjsonCreateObject(); + const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; + tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); + tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); + tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep); + tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit); tjsonAddItemToArray(pNodeRetentions, pNodeRetention); } } @@ -232,12 +231,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { for (int32_t i = 0; i < nRetention; ++i) { SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); ASSERT(pNodeRetention != NULL); - SRetentionEx *pRetention = &(pCfg->tsdbCfg.retentions[i]); - tjsonGetNumberValue(pNodeRetention, "freq", pRetention->rtn.freq, code); - tjsonGetNumberValue(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit, code); - tjsonGetNumberValue(pNodeRetention, "keep", pRetention->rtn.keep, code); - tjsonGetNumberValue(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit, code); - tjsonGetNumberValue(pNodeRetention, "checkpointId", pRetention->checkpointId, code); + tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); + tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); + tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code); + tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code); } tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code); if (code < 0) return -1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7f023f2451..f788e244cd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -687,10 +687,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } tDecoderClear(&decoder); - if (0 == strcmp(pTask->exec.qmsg, "rsma_task")) { - tFreeStreamTask(pTask); - continue; - } else if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask);