fix(stream): check the task status checkpoint-ready msg send timer.

This commit is contained in:
Haojun Liao 2024-06-21 18:44:54 +08:00
parent 5966b4f83f
commit 5a9500055d
3 changed files with 17 additions and 6 deletions

View File

@ -2053,7 +2053,11 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
stInfo("%s newly create db in state-backend", key); stInfo("%s newly create db in state-backend", key);
// pre create db // pre create db
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
if (pTaskDb->db == NULL) goto _EXIT; if (pTaskDb->db == NULL) {
stError("%s open state-backend failed, reason:%s", key, err);
goto _EXIT;
}
rocksdb_close(pTaskDb->db); rocksdb_close(pTaskDb->db);
if (cfNames != NULL) { if (cfNames != NULL) {

View File

@ -737,14 +737,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
} }
pActiveInfo->checkCounter = 0; pActiveInfo->checkCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now); stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) { if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr, stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
vgId, ref);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
@ -754,8 +753,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// checkpoint-trigger recv flag is set, quit // checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) { if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
pTask->id.idStr, vgId, ref); ref);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);

View File

@ -796,6 +796,14 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
taosThreadMutexLock(&pActiveInfo->lock); taosThreadMutexLock(&pActiveInfo->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, ref);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
SArray* pList = pActiveInfo->pReadyMsgList; SArray* pList = pActiveInfo->pReadyMsgList;
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));