fix(stream): fix error.

This commit is contained in:
Haojun Liao 2023-08-11 14:17:09 +08:00
parent ba7825141f
commit 4afe603c6e
2 changed files with 12 additions and 2 deletions

View File

@ -262,9 +262,13 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
taosWLockLatch(&pMeta->lock);
int64_t keys[2];
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
keys[0] = pId->streamId;
keys[1] = pId->taskId;
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
int8_t prev = p->status.taskStatus;
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);

View File

@ -485,6 +485,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
pTask->status.stage += 1;
streamSetStatusNormal(pTask);
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
streamMetaCommit(pTask->pMeta);
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));