fix(stream): fix the deadlock.
This commit is contained in:
parent
3f5d6c4868
commit
8c15f59d68
|
@ -3439,7 +3439,6 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
|
|||
}
|
||||
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
stDebug("streamStateGetGroupKVByCur_rocksdb");
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -3885,7 +3884,6 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
|
|||
}
|
||||
|
||||
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||
stDebug("streamStateSessionGetKVByCur_rocksdb");
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -3985,7 +3983,6 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
|
|||
return NULL;
|
||||
}
|
||||
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
stDebug("streamStateFillGetKVByCur_rocksdb");
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -900,9 +900,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
|
|||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
|
||||
ref);
|
||||
|
||||
// streamMutexUnlock(&pTask->lock);
|
||||
// streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -911,9 +908,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
|
|||
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
|
||||
", quit, ref:%d",
|
||||
id, vgId, pTmrInfo->launchChkptId, ref);
|
||||
|
||||
// streamMutexUnlock(&pActiveInfo->lock);
|
||||
// streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -922,9 +916,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
|
|||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
|
||||
id, vgId, ref);
|
||||
|
||||
// streamMutexUnlock(&pActiveInfo->lock);
|
||||
// streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1020,7 +1011,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
|
||||
int32_t code = doChkptStatusCheck(pTask);
|
||||
if (code) {
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue