refactor: enable the restart of stream tasks.

This commit is contained in:
Haojun Liao 2024-01-11 09:41:40 +08:00
parent 3b36dbbe18
commit 1c562bd535
2 changed files with 13 additions and 3 deletions

View File

@ -729,9 +729,9 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) {
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
if ((*pTask)->info.fillHistory == 1) {
streamResetParamForScanHistory(*pTask);
}
// if ((*pTask)->info.fillHistory == 1) {
// streamResetParamForScanHistory(*pTask);
// }
}
return 0;
@ -764,10 +764,19 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
streamMetaWLock(pMeta);
streamMetaClear(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
code = streamMetaLoadAllTasks(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
{
STaskStartInfo* pStartInfo = &pMeta->startInfo;
taosHashClear(pStartInfo->pReadyTaskSet);

View File

@ -469,6 +469,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;