fix(stream): fix dead lock.
This commit is contained in:
parent
ea6e78cbaa
commit
e734569de0
|
@ -811,7 +811,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
|
|
||||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
if (pStartInfo->restartCount > 0) {
|
if (pStartInfo->restartCount > 0) {
|
||||||
pStartInfo->restartCount -= 1;
|
pStartInfo->restartCount -= 1;
|
||||||
|
@ -820,10 +820,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role,
|
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role,
|
||||||
pStartInfo->restartCount);
|
pStartInfo->restartCount);
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
|
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
|
||||||
} else {
|
} else {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d start all tasks completed", pMeta->vgId);
|
tqDebug("vgId:%d start all tasks completed", pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -348,18 +348,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->startInfo.completeFn = fn;
|
pMeta->startInfo.completeFn = fn;
|
||||||
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
|
|
||||||
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
|
||||||
// while (pMeta->streamBackend == NULL) {
|
|
||||||
// qError("vgId:%d failed to init stream backend", pMeta->vgId);
|
|
||||||
// taosMsleep(2 * 1000);
|
|
||||||
// qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
|
|
||||||
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
|
||||||
// if (pMeta->streamBackend == NULL) {
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
|
||||||
|
|
||||||
pMeta->numOfPausedTasks = 0;
|
pMeta->numOfPausedTasks = 0;
|
||||||
pMeta->numOfStreamTasks = 0;
|
pMeta->numOfStreamTasks = 0;
|
||||||
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
|
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
|
||||||
|
|
|
@ -1107,12 +1107,13 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
|
||||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||||
streamMetaResetStartInfo(pStartInfo);
|
streamMetaResetStartInfo(pStartInfo);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
pStartInfo->completeFn(pMeta);
|
pStartInfo->completeFn(pMeta);
|
||||||
} else {
|
} else {
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue