fix: synchronize access to pVnode->inUse
This commit is contained in:
parent
6bb9ee9b4e
commit
a78556a3e6
|
@ -94,6 +94,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
||||||
int vnodeCloseBufPool(SVnode *pVnode) {
|
int vnodeCloseBufPool(SVnode *pVnode) {
|
||||||
SVBufPool *pPool;
|
SVBufPool *pPool;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pVnode->mutex);
|
||||||
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) {
|
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) {
|
||||||
pVnode->pPool = pPool->next;
|
pVnode->pPool = pPool->next;
|
||||||
vnodeBufPoolDestroy(pPool);
|
vnodeBufPoolDestroy(pPool);
|
||||||
|
@ -103,8 +104,9 @@ int vnodeCloseBufPool(SVnode *pVnode) {
|
||||||
vnodeBufPoolDestroy(pVnode->inUse);
|
vnodeBufPoolDestroy(pVnode->inUse);
|
||||||
pVnode->inUse = NULL;
|
pVnode->inUse = NULL;
|
||||||
}
|
}
|
||||||
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
|
taosThreadMutexUnlock(&pVnode->mutex);
|
||||||
|
|
||||||
|
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,6 +246,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
||||||
pVnode->pPool = pPool;
|
pVnode->pPool = pPool;
|
||||||
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
||||||
|
|
||||||
|
if (pVnode->inUse == pPool) {
|
||||||
|
pVnode->inUse = NULL;
|
||||||
|
}
|
||||||
taosThreadMutexUnlock(&pVnode->mutex);
|
taosThreadMutexUnlock(&pVnode->mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,22 +87,21 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeShouldCommit(SVnode *pVnode) {
|
int vnodeShouldCommit(SVnode *pVnode) {
|
||||||
if (!pVnode->inUse || !osDataSpaceAvailable()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SVCommitSched *pSched = &pVnode->commitSched;
|
SVCommitSched *pSched = &pVnode->commitSched;
|
||||||
int64_t nowMs = taosGetMonoTimestampMs();
|
int64_t nowMs = taosGetMonoTimestampMs();
|
||||||
|
bool diskAvail = osDataSpaceAvailable();
|
||||||
|
bool needCommit = false;
|
||||||
|
|
||||||
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
|
taosThreadMutexLock(&pVnode->mutex);
|
||||||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
|
if (!pVnode->inUse || !diskAvail) {
|
||||||
}
|
goto _out;
|
||||||
|
|
||||||
int vnodeShouldCommitOld(SVnode *pVnode) {
|
|
||||||
if (pVnode->inUse) {
|
|
||||||
return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
|
|
||||||
}
|
}
|
||||||
return false;
|
needCommit =
|
||||||
|
(((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
|
||||||
|
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
|
||||||
|
_out:
|
||||||
|
taosThreadMutexUnlock(&pVnode->mutex);
|
||||||
|
return needCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
||||||
|
@ -259,7 +258,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
metaPrepareAsyncCommit(pVnode->pMeta);
|
metaPrepareAsyncCommit(pVnode->pMeta);
|
||||||
|
|
||||||
vnodeBufPoolUnRef(pVnode->inUse);
|
vnodeBufPoolUnRef(pVnode->inUse);
|
||||||
pVnode->inUse = NULL;
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
Loading…
Reference in New Issue