Merge branch 'fix/long_query' of https://github.com/taosdata/TDengine into fix/long_query
This commit is contained in:
commit
f0e9656b62
|
@ -79,12 +79,16 @@ _exit:
|
|||
}
|
||||
return code;
|
||||
}
|
||||
int vnodeBegin(SVnode *pVnode) {
|
||||
// alloc buffer pool
|
||||
static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
taosThreadMutexLock(&pVnode->mutex);
|
||||
|
||||
int32_t nTry = 0;
|
||||
while (++nTry) {
|
||||
for (;;) {
|
||||
++nTry;
|
||||
|
||||
if (pVnode->freeList) {
|
||||
vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
|
||||
pVnode->freeList->id);
|
||||
|
@ -97,12 +101,8 @@ int vnodeBegin(SVnode *pVnode) {
|
|||
} else {
|
||||
vInfo("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
|
||||
|
||||
terrno = vnodeTryRecycleBufPool(pVnode);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno));
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = vnodeTryRecycleBufPool(pVnode);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pVnode->freeList == NULL) {
|
||||
vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
|
||||
|
@ -115,37 +115,53 @@ int vnodeBegin(SVnode *pVnode) {
|
|||
|
||||
int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
|
||||
if (rc && rc != ETIMEDOUT) {
|
||||
terrno = TAOS_SYSTEM_ERROR(rc);
|
||||
vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno));
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(rc);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
if (code) {
|
||||
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
int vnodeBegin(SVnode *pVnode) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
pVnode->state.commitID++;
|
||||
|
||||
// alloc buffer pool
|
||||
code = vnodeGetBufPoolToUse(pVnode);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// begin meta
|
||||
if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) {
|
||||
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// begin tsdb
|
||||
if (tsdbBegin(pVnode->pTsdb) < 0) {
|
||||
vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// begin sma
|
||||
if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) {
|
||||
vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
return 0;
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void vnodeUpdCommitSched(SVnode *pVnode) {
|
||||
|
@ -160,7 +176,7 @@ int vnodeShouldCommit(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
SVCommitSched *pSched = &pVnode->commitSched;
|
||||
int64_t nowMs = taosGetMonoTimestampMs();
|
||||
int64_t nowMs = taosGetMonoTimestampMs();
|
||||
|
||||
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
|
||||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
|
||||
|
|
Loading…
Reference in New Issue