fix stream restart crash
This commit is contained in:
parent
471580ec5e
commit
95469124f8
|
@ -273,7 +273,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
||||||
pBackend->pTask = pTask;
|
pBackend->pTask = pTask;
|
||||||
pBackend->pMeta = pMeta;
|
pBackend->pMeta = pMeta;
|
||||||
|
|
||||||
pTask->chkInfo.processedVer = processVer;
|
if (processVer != -1) pTask->chkInfo.processedVer = processVer;
|
||||||
|
|
||||||
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||||
|
@ -905,7 +905,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
|
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
|
stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -990,7 +990,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
|
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
|
||||||
|
|
||||||
stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
|
stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
|
||||||
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
|
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
streamMetaWaitForHbTmrQuit(pMeta);
|
streamMetaWaitForHbTmrQuit(pMeta);
|
||||||
|
@ -1175,7 +1175,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now);
|
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
|
||||||
|
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
|
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
|
||||||
|
|
Loading…
Reference in New Issue