From a4d475b582465d2594a3a22bcc15c249c891afeb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Nov 2023 20:27:53 +0800 Subject: [PATCH] fix rsma --- include/libs/stream/tstream.h | 23 +++++++++++++---------- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 5 ++++- source/libs/stream/inc/streamsm.h | 8 ++++---- source/libs/stream/src/streamCheckpoint.c | 4 ++++ 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2e94bbdacd..f91223b863 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -108,6 +108,7 @@ typedef enum { TASK_LEVEL__SOURCE = 1, TASK_LEVEL__AGG, TASK_LEVEL__SINK, + TASK_LEVEL_SMA, } ETASK_LEVEL; enum { @@ -666,19 +667,19 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct STaskStatusEntry { STaskId id; int32_t status; - int32_t statusLastDuration; // to record the last duration of current status + int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - int32_t relatedHTask; // has related fill-history task - int64_t activeCheckpointId; // current active checkpoint id - bool checkpointFailed; // denote if the checkpoint is failed or not - double inputQUsed; // in MiB + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + int32_t relatedHTask; // has related fill-history task + int64_t activeCheckpointId; // current active checkpoint id + bool checkpointFailed; // denote if the checkpoint is failed or not + double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size } STaskStatusEntry; typedef struct SStreamHbMsg { @@ -864,6 +865,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int8_t isSucceed); +SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); +void* streamDestroyStateMachine(SStreamTaskSM* pSM); #ifdef __cplusplus } #endif diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c72a998f88..e8ff530d78 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -248,7 +248,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 40; +int32_t tsStreamCheckpointInterval = 10; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 15; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c7623eca78..abe4c3f2fc 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -297,6 +297,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG); pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); + pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask); + pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; @@ -1282,10 +1284,11 @@ _checkpoint: if (pItem && pItem->pStreamTask) { SStreamTask *pTask = pItem->pStreamTask; // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); - // pTask->checkpointingId = checkpointId; + pTask->chkInfo.checkpointingId = checkpointId; pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId; pTask->chkInfo.checkpointVer = pItem->submitReqVer; pTask->info.triggerParam = pItem->fetchResultVer; + pTask->info.taskLevel = TASK_LEVEL_SMA; if (!checkpointBuilt) { // the stream states share one checkpoint diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index be3665fde7..7be655fbed 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -32,8 +32,8 @@ typedef int32_t (*__state_trans_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); typedef struct SAttachedEventInfo { - ETaskStatus status; // required status that this event can be handled - EStreamTaskEvent event; // the delayed handled event + ETaskStatus status; // required status that this event can be handled + EStreamTaskEvent event; // the delayed handled event } SAttachedEventInfo; typedef struct STaskStateTrans { @@ -64,8 +64,8 @@ typedef struct SStreamEventInfo { const char* name; } SStreamEventInfo; -SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); -void* streamDestroyStateMachine(SStreamTaskSM* pSM); +// SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); +// void* streamDestroyStateMachine(SStreamTaskSM* pSM); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7473e7a411..e2561de841 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -291,6 +291,10 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { return code; } + if (p->info.taskLevel > TASK_LEVEL__SINK) { + return code; + } + taosThreadMutexLock(&p->lock); ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId &&