enh: skip rsma_task during load stream tasks

This commit is contained in:
kailixu 2023-11-02 20:32:54 +08:00
parent fa5d896787
commit 6c944bb192
2 changed files with 7 additions and 3 deletions

View File

@ -22,6 +22,7 @@
#define RSMA_FETCH_DELAY_MAX (120000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms
#define RSMA_TASK_FLAG "rsma_task"
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
@ -264,8 +265,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pStreamTask->id.streamId = pRSmaInfo->suid + idx;
pStreamTask->chkInfo.startTs = taosGetTimestampMs();
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamTask->exec.qmsg = taosMemoryMalloc(2);
sprintf(pStreamTask->exec.qmsg, "%d", idx);
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId;
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) {

View File

@ -687,7 +687,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
tDecoderClear(&decoder);
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
if (0 == strcmp(pTask->exec.qmsg, "rsma_task")) {
tFreeStreamTask(pTask);
continue;
} else if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);