fix(stream): avoid lock in timer to avoid sync operation blocks.
This commit is contained in:
parent
1ced6d0d65
commit
e1c4451f81
|
@ -788,6 +788,7 @@ void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta);
|
||||||
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
|
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
|
int32_t streamMetaTryRlock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWLock(SStreamMeta* pMeta);
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -146,9 +146,13 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaRLock(pMeta);
|
code = streamMetaTryRlock(pMeta);
|
||||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
if (code == 0) {
|
||||||
streamMetaRUnLock(pMeta);
|
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
} else {
|
||||||
|
numOfTasks = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -167,7 +171,6 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
|
||||||
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
|
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
|
||||||
tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
|
tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,15 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaTryRlock(SStreamMeta* pMeta) {
|
||||||
|
int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock);
|
||||||
|
if (code) {
|
||||||
|
stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||||
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
||||||
|
|
Loading…
Reference in New Issue