fix(streamMeta): commit tdb after load tasks

This commit is contained in:
Minglei Jin 2024-06-26 11:21:31 +08:00
parent 59c8656e61
commit 3014c66965
1 changed files with 17 additions and 12 deletions

View File

@ -557,7 +557,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return -1; return -1;
} }
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
pTask->ver = SSTREAM_TASK_VER; pTask->ver = SSTREAM_TASK_VER;
} }
@ -907,7 +907,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (p == NULL) { if (p == NULL) {
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) { if (code < 0) {
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
continue; continue;
} }
@ -958,6 +958,8 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
(void)streamMetaCommit(pMeta);
} }
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
@ -1052,12 +1054,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
} }
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; entry.checkpointInfo.failed =
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
if (entry.checkpointInfo.failed) { if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr,
(*pTask)->chkInfo.pActiveInfo->transId);
} }
} }
@ -1384,7 +1388,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now); stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%" PRId64, vgId, numOfTasks, now);
if (numOfTasks == 0) { if (numOfTasks == 0) {
stInfo("vgId:%d no tasks to be started", pMeta->vgId); stInfo("vgId:%d no tasks to be started", pMeta->vgId);
@ -1725,7 +1729,8 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64 stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
" ms", id, vgId, transId, el); " ms",
id, vgId, transId, el);
} else { } else {
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
id, vgId, transId, el); id, vgId, transId, el);