enh(stream): check checkpoint-trigger msg every 100ms.

This commit is contained in:
Haojun Liao 2024-05-29 00:30:42 +08:00
parent d1d868f239
commit 225dfb1a93
4 changed files with 39 additions and 10 deletions

View File

@ -61,7 +61,9 @@ struct SActiveCheckpointInfo {
bool dispatchTrigger;
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
tmr_h pCheckTmr;
int32_t checkCounter;
tmr_h pCheckTmr;
};
typedef struct {

View File

@ -285,7 +285,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref);
stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);

View File

@ -36,8 +36,9 @@ static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, con
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
@ -194,11 +195,13 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code;
}
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check-rsp monitor in 10s, ref:%d ", pTask->id.idStr, ref);
SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo;
if (pActive->pCheckTmr == NULL) {
pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 10000, pTask, streamTimer);
pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActive->pCheckTmr);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActive->pCheckTmr);
}
}
@ -575,17 +578,33 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor start, ts:%" PRId64, pTask->id.idStr, vgId, now);
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pActiveInfo->checkCounter < 100) {
return;
}
pActiveInfo->checkCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
taosThreadMutexUnlock(&pTask->lock);
@ -622,8 +641,12 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check every 100ms
if (size > 0) {
taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActiveInfo->pCheckTmr);
stDebug("s-task:%s start monitor trigger in 10sec", pTask->id.idStr);
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr);
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
taosArrayDestroy(pNotSendList);

View File

@ -597,6 +597,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
if (pTask->pMeta->vgId == 2) {
taosSsleep(20);
}
streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}