fix(stream): to avoid repeatly start checkpoint timer if previous timer is not started yet.

This commit is contained in:
Haojun Liao 2024-07-18 14:57:39 +08:00
parent 53ed030967
commit fb3fe03c1f
5 changed files with 102 additions and 63 deletions

View File

@ -48,24 +48,30 @@ extern "C" {
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct SStreamTmrInfo {
int32_t activeCounter; // make sure only launch one checkpoint trigger check tmr
tmr_h tmrHandle;
int64_t launchChkptId;
int8_t isActive;
} SStreamTmrInfo;
struct SActiveCheckpointInfo {
TdThreadMutex lock;
int32_t transId;
int64_t firstRecvTs; // first time to recv checkpoint trigger info
int64_t activeId; // current active checkpoint id
int64_t failedId;
bool dispatchTrigger;
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<STaskCheckpointReadyInfo*>
int8_t allUpstreamTriggerRecv;
SArray* pCheckpointReadyRecvList; // SArray<STaskDownstreamReadyInfo>
int32_t checkCounter;
tmr_h pChkptTriggerTmr;
int32_t sendReadyCheckCounter;
tmr_h pSendReadyMsgTmr;
int64_t sendReadyTmrChkptId;
TdThreadMutex lock;
int32_t transId;
int64_t firstRecvTs; // first time to recv checkpoint trigger info
int64_t activeId; // current active checkpoint id
int64_t failedId;
bool dispatchTrigger;
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<STaskCheckpointReadyInfo*>
int8_t allUpstreamTriggerRecv;
SArray* pCheckpointReadyRecvList; // SArray<STaskDownstreamReadyInfo>
SStreamTmrInfo chkptTriggerMsgTmr;
SStreamTmrInfo chkptReadyMsgTmr;
};
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask);
typedef struct {
int8_t type;
SSDataBlock* pBlock;

View File

@ -265,14 +265,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code;
}
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
// if previous launched timer not started yet, not start a new timer
// todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
// a new checkpoint-trigger timer right now.
// And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
// procedure to be stucked.
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pActiveInfo->pChkptTriggerTmr == NULL) {
pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr);
if (pTmrInfo->tmrHandle == NULL) {
pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
}
pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else { // already launched, do nothing
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
}
}
@ -741,27 +753,28 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pActiveInfo->checkCounter < 100) {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr);
if (++pTmrInfo->activeCounter < 50) {
taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
return;
}
pActiveInfo->checkCounter = 0;
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
taosThreadMutexUnlock(&pTask->lock);
@ -771,7 +784,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
@ -779,7 +792,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexLock(&pActiveInfo->lock);
@ -820,9 +832,9 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check every 100ms
if (size > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr);
taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
}

View File

@ -777,31 +777,32 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
}
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
if (++pActiveInfo->sendReadyCheckCounter < 100) {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
if (++pTmrInfo->activeCounter < 50) {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
return;
}
pActiveInfo->sendReadyCheckCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
pState->name, ref);
taosThreadMutexUnlock(&pTask->lock);
@ -815,11 +816,12 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
if (pActiveInfo->sendReadyTmrChkptId < pActiveInfo->activeId) {
if (pTmrInfo->launchChkptId < pActiveInfo->activeId) {
taosThreadMutexUnlock(&pActiveInfo->lock);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stWarn("s-task:%s vgId:%d tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d",
id, vgId, pActiveInfo->sendReadyTmrChkptId, ref);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
@ -828,7 +830,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) {
taosThreadMutexUnlock(&pActiveInfo->lock);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
@ -871,10 +873,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
}
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
taosTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
taosThreadMutexUnlock(&pActiveInfo->lock);
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"and quit from timer, ref:%d",
@ -916,18 +918,25 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
// start to check if checkpoint ready msg has successfully received by upstream tasks.
if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
if (pActiveInfo->pSendReadyMsgTmr == NULL) {
pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer);
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pTmrInfo->tmrHandle == NULL) {
pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer);
} else {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
}
// mark the timer monitor checkpointId
pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
stError("s-task:%s previous checkpoint-ready monitor tmr is set, not start new one", pTask->id.idStr);
}
// mark the timer monitor checkpointId
pActiveInfo->sendReadyTmrChkptId = pActiveInfo->activeId;
}
taosThreadMutexUnlock(&pActiveInfo->lock);

View File

@ -1064,14 +1064,16 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
pInfo->pCheckpointReadyRecvList = NULL;
if (pInfo->pChkptTriggerTmr != NULL) {
taosTmrStop(pInfo->pChkptTriggerTmr);
pInfo->pChkptTriggerTmr = NULL;
SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
if (pTriggerTmr->tmrHandle != NULL) {
taosTmrStop(pTriggerTmr->tmrHandle);
pTriggerTmr->tmrHandle = NULL;
}
if (pInfo->pSendReadyMsgTmr != NULL) {
taosTmrStop(pInfo->pSendReadyMsgTmr);
pInfo->pSendReadyMsgTmr = NULL;
SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
if (pReadyTmr->tmrHandle != NULL) {
taosTmrStop(pReadyTmr->tmrHandle);
pReadyTmr->tmrHandle = NULL;
}
taosMemoryFree(pInfo);

View File

@ -38,3 +38,13 @@ void streamTimerCleanUp() {
tmr_h streamTimerGetInstance() {
return streamTimer;
}
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {
pInfo->activeCounter = 0;
pInfo->launchChkptId = 0;
atomic_store_8(&pInfo->isActive, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
ASSERT(ref >= 0);
return ref;
}