fix(stream): fix deadlock.

This commit is contained in:
Haojun Liao 2023-11-13 18:20:30 +08:00
parent 44da5833bb
commit aad8a2eb87
5 changed files with 32 additions and 9 deletions

View File

@ -825,6 +825,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);

View File

@ -1969,6 +1969,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
streamMetaInitBackend(pMeta);
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);

View File

@ -169,10 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
streamMetaWLock(pWriter->pTq->pStreamMeta);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
if (code == 0) {
streamMetaInitBackend(pWriter->pTq->pStreamMeta);
code = streamStateLoadTasks(pWriter);
}
streamMetaWUnLock(pWriter->pTq->pStreamMeta);
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter);
return code;

View File

@ -144,7 +144,6 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
@ -153,6 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
return code;
}
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);

View File

@ -263,18 +263,33 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
}
}
// todo: not wait in a critical region
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
// todo refactor: the lock shoud be restricted in one function
void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend == NULL) {
streamMetaWUnLock(pMeta);
while (1) {
streamMetaWLock(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend != NULL) {
break;
}
streamMetaWUnLock(pMeta);
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100);
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
void streamMetaClear(SStreamMeta* pMeta) {