This commit is contained in:
yihaoDeng 2023-11-27 20:27:53 +08:00
parent 630f4f9d80
commit a4d475b582
5 changed files with 26 additions and 16 deletions

View File

@ -108,6 +108,7 @@ typedef enum {
TASK_LEVEL__SOURCE = 1, TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG, TASK_LEVEL__AGG,
TASK_LEVEL__SINK, TASK_LEVEL__SINK,
TASK_LEVEL_SMA,
} ETASK_LEVEL; } ETASK_LEVEL;
enum { enum {
@ -666,19 +667,19 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct STaskStatusEntry { typedef struct STaskStatusEntry {
STaskId id; STaskId id;
int32_t status; int32_t status;
int32_t statusLastDuration; // to record the last duration of current status int32_t statusLastDuration; // to record the last duration of current status
int64_t stage; int64_t stage;
int32_t nodeId; int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
int32_t relatedHTask; // has related fill-history task int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not bool checkpointFailed; // denote if the checkpoint is failed or not
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double sinkQuota; // existed quota size for sink task double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size double sinkDataSize; // sink to dst data size
} STaskStatusEntry; } STaskStatusEntry;
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {
@ -864,6 +865,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed); int8_t isSucceed);
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -248,7 +248,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 40; int32_t tsStreamCheckpointInterval = 10;
float tsSinkDataRate = 2.0; float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 15; int32_t tsStreamNodeCheckInterval = 15;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;

View File

@ -297,6 +297,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG); sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
@ -1282,10 +1284,11 @@ _checkpoint:
if (pItem && pItem->pStreamTask) { if (pItem && pItem->pStreamTask) {
SStreamTask *pTask = pItem->pStreamTask; SStreamTask *pTask = pItem->pStreamTask;
// atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
// pTask->checkpointingId = checkpointId; pTask->chkInfo.checkpointingId = checkpointId;
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId; pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
pTask->chkInfo.checkpointVer = pItem->submitReqVer; pTask->chkInfo.checkpointVer = pItem->submitReqVer;
pTask->info.triggerParam = pItem->fetchResultVer; pTask->info.triggerParam = pItem->fetchResultVer;
pTask->info.taskLevel = TASK_LEVEL_SMA;
if (!checkpointBuilt) { if (!checkpointBuilt) {
// the stream states share one checkpoint // the stream states share one checkpoint

View File

@ -32,8 +32,8 @@ typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct SAttachedEventInfo { typedef struct SAttachedEventInfo {
ETaskStatus status; // required status that this event can be handled ETaskStatus status; // required status that this event can be handled
EStreamTaskEvent event; // the delayed handled event EStreamTaskEvent event; // the delayed handled event
} SAttachedEventInfo; } SAttachedEventInfo;
typedef struct STaskStateTrans { typedef struct STaskStateTrans {
@ -64,8 +64,8 @@ typedef struct SStreamEventInfo {
const char* name; const char* name;
} SStreamEventInfo; } SStreamEventInfo;
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); // SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM); // void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -291,6 +291,10 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
return code; return code;
} }
if (p->info.taskLevel > TASK_LEVEL__SINK) {
return code;
}
taosThreadMutexLock(&p->lock); taosThreadMutexLock(&p->lock);
ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId && ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId &&