fix(stream): fix deadlock.

This commit is contained in:
Haojun Liao 2023-11-13 18:20:30 +08:00
parent a1cae2c6cc
commit f99e22b91f
5 changed files with 32 additions and 9 deletions

View File

@ -825,6 +825,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta); int32_t streamMetaReopen(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);

View File

@ -1974,6 +1974,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
streamMetaInitBackend(pMeta);
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId); tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);

View File

@ -169,10 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
streamMetaWLock(pWriter->pTq->pStreamMeta);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
if (code == 0) { if (code == 0) {
streamMetaInitBackend(pWriter->pTq->pStreamMeta);
code = streamStateLoadTasks(pWriter); code = streamStateLoadTasks(pWriter);
} }
streamMetaWUnLock(pWriter->pTq->pStreamMeta);
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;

View File

@ -144,7 +144,6 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta); code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId); tqError("vgId:%d failed to reopen stream meta", vgId);
@ -153,6 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
return code; return code;
} }
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);

View File

@ -262,18 +262,33 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
} }
} }
// todo: not wait in a critical region taosMemoryFree(defaultPath);
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) { taosMemoryFree(newPath);
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100); return 0;
}
// todo refactor: the lock shoud be restricted in one function
void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend == NULL) {
streamMetaWUnLock(pMeta);
while (1) {
streamMetaWLock(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend != NULL) {
break;
}
streamMetaWUnLock(pMeta);
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100);
}
} }
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta); streamBackendLoadCheckpointInfo(pMeta);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
} }
void streamMetaClear(SStreamMeta* pMeta) { void streamMetaClear(SStreamMeta* pMeta) {