From e710802f1f8fca12c797ab86efa4f2b63e33d8a1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 5 Jan 2023 15:40:15 +0800 Subject: [PATCH 01/22] more code --- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 21 ++++++++---- source/dnode/vnode/src/vnd/vnodeBufPool.c | 40 ++++++++++------------- source/dnode/vnode/src/vnd/vnodeCommit.c | 8 ++--- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- 5 files changed, 38 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 28797c5361..8c144b6fc4 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -61,7 +61,7 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* next; + SVBufPool* freeNext; SVnode* pVnode; TdThreadSpinlock* lock; volatile int32_t nRef; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e7f0a6d297..dd5e6fc575 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,6 +87,8 @@ typedef struct SCommitInfo SCommitInfo; #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" +#define VNODE_BUFPOOL_SEGMENTS 3 + #define VND_INFO_FNAME "vnode.json" // vnd.h @@ -326,16 +328,21 @@ struct STsdbKeepCfg { }; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - SVStatis statis; - STfs* pTfs; - SMsgCb msgCb; + char* path; + SVnodeCfg config; + SVState state; + SVStatis statis; + STfs* pTfs; + SMsgCb msgCb; + + // Buffer Pool TdThreadMutex mutex; TdThreadCond poolNotEmpty; - SVBufPool* pPool; + SVBufPool* aBufPool[VNODE_BUFPOOL_SEGMENTS]; + SVBufPool* freeList; SVBufPool* inUse; + SVBufPool* recycling; + SMeta* pMeta; SSma* pSma; STsdb* pTsdb; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 139ee23266..b9b273468c 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -16,8 +16,6 @@ #include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ -#define VNODE_BUFPOOL_SEGMENTS 3 - static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; @@ -44,7 +42,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) pPool->lock = NULL; } - pPool->next = NULL; + pPool->freeNext = NULL; pPool->pVnode = pVnode; pPool->nRef = 0; pPool->size = 0; @@ -69,22 +67,21 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { } int vnodeOpenBufPool(SVnode *pVnode) { - SVBufPool *pPool = NULL; - int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - ASSERT(pVnode->pPool == NULL); + ASSERT(pVnode->freeList == NULL); for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool - if (vnodeBufPoolCreate(pVnode, size, &pPool)) { + if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); return -1; } - // add pool to vnode - pPool->next = pVnode->pPool; - pVnode->pPool = pPool; + // add to free list + pVnode->aBufPool[i]->freeNext = pVnode->freeList; + pVnode->freeList = pVnode->aBufPool[i]; } vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size); @@ -92,19 +89,18 @@ int vnodeOpenBufPool(SVnode *pVnode) { } int vnodeCloseBufPool(SVnode *pVnode) { - SVBufPool *pPool; - - for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) { - pVnode->pPool = pPool->next; - vnodeBufPoolDestroy(pPool); + for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { + if (pVnode->aBufPool[i]) { + vnodeBufPoolDestroy(pVnode->aBufPool[i]); + pVnode->aBufPool[i] = NULL; + } } - if (pVnode->inUse) { - vnodeBufPoolDestroy(pVnode->inUse); - pVnode->inUse = NULL; - } + pVnode->freeList = NULL; + ASSERT(pVnode->inUse == NULL); + ASSERT(pVnode->recycling == NULL); + vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); - return 0; } @@ -240,8 +236,8 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { } } - pPool->next = pVnode->pPool; - pVnode->pPool = pPool; + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); taosThreadMutexUnlock(&pVnode->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index ab7f73121f..428af47f0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -25,14 +25,14 @@ int vnodeBegin(SVnode *pVnode) { // alloc buffer pool taosThreadMutexLock(&pVnode->mutex); - while (pVnode->pPool == NULL) { + while (pVnode->freeList == NULL) { taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); } - pVnode->inUse = pVnode->pPool; + pVnode->inUse = pVnode->freeList; pVnode->inUse->nRef = 1; - pVnode->pPool = pVnode->inUse->next; - pVnode->inUse->next = NULL; + pVnode->freeList = pVnode->inUse->freeNext; + pVnode->inUse->freeNext = NULL; taosThreadMutexUnlock(&pVnode->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index bec5d2977b..af5b7c281d 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -238,7 +238,7 @@ _err: if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); - if (pVnode->pPool) vnodeCloseBufPool(pVnode); + if (pVnode->freeList) vnodeCloseBufPool(pVnode); tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); From d7ec7ed936a7ece87899d6ae892859e87b14cb31 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 6 Jan 2023 15:24:27 +0800 Subject: [PATCH 02/22] more code --- source/dnode/vnode/src/inc/vnd.h | 2 ++ source/dnode/vnode/src/vnd/vnodeBufPool.c | 2 -- source/dnode/vnode/src/vnd/vnodeCommit.c | 44 +++++++++++++++++++---- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index e98e7736a9..a8fbff4597 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -62,6 +62,8 @@ struct SVBufPoolNode { struct SVBufPool { SVBufPool* freeNext; + SVBufPool* recycleNext; + SVBufPool* recyclePrev; SVnode* pVnode; TdThreadSpinlock* lock; volatile int32_t nRef; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index b9b273468c..8b6247163a 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -69,8 +69,6 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { int vnodeOpenBufPool(SVnode *pVnode) { int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - ASSERT(pVnode->freeList == NULL); - for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 428af47f0b..4114651904 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -21,18 +21,48 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeCommitImpl(SCommitInfo *pInfo); +#define WAIT_TIME_MILI_SEC 10 // miliseconds + int vnodeBegin(SVnode *pVnode) { // alloc buffer pool taosThreadMutexLock(&pVnode->mutex); - while (pVnode->freeList == NULL) { - taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); - } + int32_t nTry = 0; + for (;;) { + while (pVnode->freeList == NULL) { + vDebug("vgId:%d no free buffer pool, try to wait %d...", TD_VID(pVnode), ++nTry); - pVnode->inUse = pVnode->freeList; - pVnode->inUse->nRef = 1; - pVnode->freeList = pVnode->inUse->freeNext; - pVnode->inUse->freeNext = NULL; + struct timeval tv; + struct timespec ts; + + taosGetTimeOfDay(&tv); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; + + int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); + if (rc == ETIMEDOUT) { // time out, break + break; + } else if (rc != 0) { // error occurred + terrno = TAOS_SYSTEM_ERROR(rc); + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); + taosThreadMutexUnlock(&pVnode->mutex); + return -1; + } + } + + if (pVnode->freeList) { + // allocate from free list + pVnode->inUse = pVnode->freeList; + pVnode->inUse->nRef = 1; + pVnode->freeList = pVnode->inUse->freeNext; + pVnode->inUse->freeNext = NULL; + break; + } else if (nTry == 1) { + // try to recycle a buffer pool + vInfo("vgId:%d no free buffer pool, try to recycle...", TD_VID(pVnode)); + ASSERT(false); // TODO: condition of nTry == 1 may not be reasonable + } + } taosThreadMutexUnlock(&pVnode->mutex); From fda6fccac837219ad926c3a9ac50f009516a5b81 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 09:24:26 +0800 Subject: [PATCH 03/22] more code --- source/dnode/vnode/src/inc/vnodeInt.h | 4 +++- source/dnode/vnode/src/vnd/vnodeBufPool.c | 4 ---- source/dnode/vnode/src/vnd/vnodeCommit.c | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index dd5e6fc575..6fb6802966 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -341,7 +341,9 @@ struct SVnode { SVBufPool* aBufPool[VNODE_BUFPOOL_SEGMENTS]; SVBufPool* freeList; SVBufPool* inUse; - SVBufPool* recycling; + SVBufPool* onCommit; + SVBufPool* recycleHead; + SVBufPool* recycleTail; SMeta* pMeta; SSma* pSma; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 8b6247163a..3ae5755b49 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -94,10 +94,6 @@ int vnodeCloseBufPool(SVnode *pVnode) { } } - pVnode->freeList = NULL; - ASSERT(pVnode->inUse == NULL); - ASSERT(pVnode->recycling == NULL); - vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4114651904..4f28199105 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -29,7 +29,7 @@ int vnodeBegin(SVnode *pVnode) { int32_t nTry = 0; for (;;) { - while (pVnode->freeList == NULL) { + while (pVnode->freeList == NULL) { // move here below vDebug("vgId:%d no free buffer pool, try to wait %d...", TD_VID(pVnode), ++nTry); struct timeval tv; From 375545435c9dc8eed7afa669b6386caa0a049758 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 10:09:31 +0800 Subject: [PATCH 04/22] adjust more code --- source/dnode/vnode/src/vnd/vnodeCommit.c | 63 ++++++++++++++---------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4f28199105..e22add192d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -23,44 +23,53 @@ static int vnodeCommitImpl(SCommitInfo *pInfo); #define WAIT_TIME_MILI_SEC 10 // miliseconds +static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { + int32_t code = 0; + ASSERT(0); // TODO +_exit: + return code; +} int vnodeBegin(SVnode *pVnode) { // alloc buffer pool taosThreadMutexLock(&pVnode->mutex); int32_t nTry = 0; - for (;;) { - while (pVnode->freeList == NULL) { // move here below - vDebug("vgId:%d no free buffer pool, try to wait %d...", TD_VID(pVnode), ++nTry); - - struct timeval tv; - struct timespec ts; - - taosGetTimeOfDay(&tv); - ts.tv_sec = tv.tv_sec; - ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; - - int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); - if (rc == ETIMEDOUT) { // time out, break - break; - } else if (rc != 0) { // error occurred - terrno = TAOS_SYSTEM_ERROR(rc); - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); - taosThreadMutexUnlock(&pVnode->mutex); - return -1; - } - } - + while (++nTry) { if (pVnode->freeList) { - // allocate from free list + vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p", TD_VID(pVnode), nTry, pVnode->freeList); + pVnode->inUse = pVnode->freeList; pVnode->inUse->nRef = 1; pVnode->freeList = pVnode->inUse->freeNext; pVnode->inUse->freeNext = NULL; break; - } else if (nTry == 1) { - // try to recycle a buffer pool - vInfo("vgId:%d no free buffer pool, try to recycle...", TD_VID(pVnode)); - ASSERT(false); // TODO: condition of nTry == 1 may not be reasonable + } else { + vInfo("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); + + terrno = vnodeTryRecycleBufPool(pVnode); + if (terrno != TSDB_CODE_SUCCESS) { + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); + taosThreadMutexUnlock(&pVnode->mutex); + return -1; + } + + if (pVnode->freeList == NULL) { + vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); + + struct timeval tv; + struct timespec ts; + taosGetTimeOfDay(&tv); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; + + int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); + if (rc && rc != ETIMEDOUT) { + terrno = TAOS_SYSTEM_ERROR(rc); + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); + taosThreadMutexUnlock(&pVnode->mutex); + return -1; + } + } } } From 15995e7631af81fb1ffaabb1e40c3c0e1c3c0122 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 14:54:49 +0800 Subject: [PATCH 05/22] more code --- source/dnode/vnode/src/inc/vnd.h | 10 ++- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/vnd/vnodeBufPool.c | 90 ++++++++++++++--------- source/dnode/vnode/src/vnd/vnodeCommit.c | 73 ++++++++++++++++-- 4 files changed, 129 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index a8fbff4597..32dfd5a41c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -61,12 +61,14 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* freeNext; - SVBufPool* recycleNext; - SVBufPool* recyclePrev; + SVBufPool* freeNext; + SVBufPool* recycleNext; + SVBufPool* recyclePrev; + SVnode* pVnode; - TdThreadSpinlock* lock; + int32_t id; volatile int32_t nRef; + TdThreadSpinlock* lock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6fb6802966..3b751d506f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -344,6 +344,7 @@ struct SVnode { SVBufPool* onCommit; SVBufPool* recycleHead; SVBufPool* recycleTail; + SVBufPool* onRecycle; SMeta* pMeta; SSma* pSma; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 3ae5755b49..3a02a4a0bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -16,7 +16,7 @@ #include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ -static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { +static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); @@ -24,6 +24,14 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + memset(pPool, 0, sizeof(SVBufPool)); + pPool->pVnode = pVnode; + pPool->id = id; + pPool->ptr = pPool->node.data; + pPool->pTail = &pPool->node; + pPool->node.prev = NULL; + pPool->node.pnext = &pPool->pTail; + pPool->node.size = size; if (VND_IS_RSMA(pVnode)) { pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); @@ -42,16 +50,6 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) pPool->lock = NULL; } - pPool->freeNext = NULL; - pPool->pVnode = pVnode; - pPool->nRef = 0; - pPool->size = 0; - pPool->ptr = pPool->node.data; - pPool->pTail = &pPool->node; - pPool->node.prev = NULL; - pPool->node.pnext = &pPool->pTail; - pPool->node.size = size; - *ppPool = pPool; return 0; } @@ -71,7 +69,7 @@ int vnodeOpenBufPool(SVnode *pVnode) { for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool - if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) { + if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); return -1; @@ -206,34 +204,54 @@ void vnodeBufPoolRef(SVBufPool *pPool) { } void vnodeBufPoolUnRef(SVBufPool *pPool) { - if (pPool == NULL) { - return; - } - int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); - if (nRef == 0) { - SVnode *pVnode = pPool->pVnode; + if (pPool == NULL) return; - vnodeBufPoolReset(pPool); + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) return; - taosThreadMutexLock(&pVnode->mutex); + SVnode *pVnode = pPool->pVnode; - int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - if (pPool->node.size != size) { - SVBufPool *pPoolT = NULL; - if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) { - vWarn("vgId:%d, try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode), - pPool->node.size, size, tstrerror(errno)); - } else { - vnodeBufPoolDestroy(pPool); - pPool = pPoolT; - vDebug("vgId:%d, change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size); - } + taosThreadMutexLock(&pVnode->mutex); + + // remove from recycle list or on-recycle position + if (pVnode->onRecycle == pPool) { + pVnode->onRecycle = NULL; + } else { + if (pPool->recyclePrev) { + pPool->recyclePrev->recycleNext = pPool->recycleNext; + } else { + pVnode->recycleHead = pPool->recycleNext; } - pPool->freeNext = pVnode->freeList; - pVnode->freeList = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); - - taosThreadMutexUnlock(&pVnode->mutex); + if (pPool->recycleNext) { + pPool->recycleNext->recyclePrev = pPool->recyclePrev; + } else { + pVnode->recycleTail = pPool->recyclePrev; + } + pPool->recyclePrev = pPool->recycleNext = NULL; } + + // change the pool size if need + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; + if (pPool->node.size != size) { + SVBufPool *pNewPool = NULL; + if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { + vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", + TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); + } else { + vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, + pPool->node.size, size); + + vnodeBufPoolDestroy(pPool); + pPool = pNewPool; + pVnode->aBufPool[pPool->id] = pPool; + } + } + + // add to free list + vnodeBufPoolReset(pPool); + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); + + taosThreadMutexUnlock(&pVnode->mutex); } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e22add192d..f16f9ae1ef 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -25,7 +25,34 @@ static int vnodeCommitImpl(SCommitInfo *pInfo); static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { int32_t code = 0; - ASSERT(0); // TODO + + if (pVnode->onRecycle) { + vDebug("vgId:%d buffer pool %p of id %d is on recycling", TD_VID(pVnode), pVnode->onRecycle, pVnode->onRecycle->id); + goto _exit; + } + + if (pVnode->recycleHead) { + vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + pVnode->recycleHead->id); + + // pop the header buffer pool for recycling + pVnode->onRecycle = pVnode->recycleHead; + if (pVnode->recycleHead == pVnode->recycleTail) { + pVnode->recycleHead = pVnode->recycleTail = NULL; + } else { + pVnode->recycleHead = pVnode->recycleHead->recycleNext; + pVnode->recycleHead->recyclePrev = NULL; + } + pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; + + { + // TODO: do recycle the buffer pool + ASSERT(0); + } + } else { + vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + } + _exit: return code; } @@ -36,7 +63,8 @@ int vnodeBegin(SVnode *pVnode) { int32_t nTry = 0; while (++nTry) { if (pVnode->freeList) { - vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p", TD_VID(pVnode), nTry, pVnode->freeList); + vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, + pVnode->freeList->id); pVnode->inUse = pVnode->freeList; pVnode->inUse->nRef = 1; @@ -259,8 +287,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; - vnodeBufPoolUnRef(pVnode->inUse); + taosThreadMutexLock(&pVnode->mutex); + ASSERT(pVnode->onCommit == NULL); + pVnode->onCommit = pVnode->inUse; pVnode->inUse = NULL; + taosThreadMutexUnlock(&pVnode->mutex); _exit: if (code) { @@ -272,19 +303,51 @@ _exit: return code; } - static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; SCommitInfo *pInfo = (SCommitInfo *)arg; + SVnode *pVnode = pInfo->pVnode; // commit code = vnodeCommitImpl(pInfo); if (code) goto _exit; + // recycle buffer pool + SVBufPool *pPool = pVnode->onCommit; + + taosThreadMutexLock(&pVnode->mutex); + + pVnode->onCommit = NULL; + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + if (nRef == 0) { + // add to free list + vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); + + vnodeBufPoolReset(pPool); + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); + } else { + // add to recycle list + vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); + + if (pVnode->recycleTail == NULL) { + pPool->recyclePrev = pPool->recycleNext = NULL; + pVnode->recycleHead = pVnode->recycleTail = pPool; + } else { + pPool->recyclePrev = pVnode->recycleTail; + pPool->recycleNext = NULL; + pVnode->recycleTail->recycleNext = pPool; + pVnode->recycleTail = pPool; + } + } + + taosThreadMutexUnlock(&pVnode->mutex); + _exit: // end commit - tsem_post(&pInfo->pVnode->canCommit); + tsem_post(&pVnode->canCommit); taosMemoryFree(pInfo); return code; } From 2e443c395eed0a198a92004ed0303086456a4f2d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 15:51:30 +0800 Subject: [PATCH 06/22] more code --- source/dnode/vnode/src/inc/tsdb.h | 15 +---- source/dnode/vnode/src/inc/vnd.h | 5 ++ source/dnode/vnode/src/inc/vnodeInt.h | 11 ++++ source/dnode/vnode/src/tsdb/tsdbMemTable.c | 75 ++++++++++------------ source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/dnode/vnode/src/vnd/vnodeBufPool.c | 47 ++++++++++++++ 6 files changed, 101 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a58301adf2..70dbadeeea 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,7 +65,6 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; -typedef struct SQueryNode SQueryNode; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; typedef struct SDiskDataBuilder SDiskDataBuilder; @@ -209,12 +208,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable -typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle); - int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter @@ -293,7 +290,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); @@ -369,13 +366,6 @@ struct STbData { STbData *next; }; -struct SQueryNode { - SQueryNode *pNext; - SQueryNode **ppNext; - void *pQHandle; - _tsdb_reseek_func_t reseek; -}; - struct SMemTable { SRWLatch latch; STsdb *pTsdb; @@ -392,7 +382,6 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; - SQueryNode qList; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 32dfd5a41c..b900e02bfd 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -65,6 +65,11 @@ struct SVBufPool { SVBufPool* recycleNext; SVBufPool* recyclePrev; + // query handle list + TdThreadMutex mutex; + int32_t nQuery; + SQueryNode qList; + SVnode* pVnode; int32_t id; volatile int32_t nRef; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3b751d506f..ba46d84667 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SCommitInfo SCommitInfo; +typedef struct SQueryNode SQueryNode; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" @@ -92,6 +93,13 @@ typedef struct SCommitInfo SCommitInfo; #define VND_INFO_FNAME "vnode.json" // vnd.h +typedef int32_t (*_query_reseek_func_t)(void* pQHandle); +struct SQueryNode { + SQueryNode* pNext; + SQueryNode** ppNext; + void* pQHandle; + _query_reseek_func_t reseek; +}; void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); @@ -100,6 +108,9 @@ void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); +int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn); +int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool); + // meta typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 6bbaa31ed3..0b762cca1e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -64,8 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } - pMemTable->qList.pNext = &pMemTable->qList; - pMemTable->qList.ppNext = &pMemTable->qList.pNext; + // pMemTable->qList.pNext = &pMemTable->qList; + // pMemTable->qList.ppNext = &pMemTable->qList.pNext; vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; @@ -749,39 +749,34 @@ _exit: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - /* - // register handle (todo: take concurrency in consideration) - *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); - if (*ppNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - (*ppNode)->pQHandle = pQHandle; - (*ppNode)->reseek = reseek; - (*ppNode)->pNext = pMemTable->qList.pNext; - (*ppNode)->ppNext = &pMemTable->qList.pNext; - pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; - pMemTable->qList.pNext = *ppNode; - */ + + vnodeBufPoolRegisterQuery(pMemTable->pPool, pQHandle, reseek); + _exit: return code; } int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; - /* + + if (pNode) { + vnodeBufPoolDeregisterQuery(pMemTable->pPool); + } + +#if 0 // unregister handle (todo: take concurrency in consideration) if (pNode) { pNode->pNext->ppNext = pNode->ppNext; *pNode->ppNext = pNode->pNext; taosMemoryFree(pNode); } - */ +#endif + int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); @@ -828,28 +823,28 @@ _exit: return aTbDataP; } -int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { - int32_t code = 0; +// int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { +// int32_t code = 0; - SQueryNode *pNode = pMemTable->qList.pNext; - while (1) { - ASSERT(pNode != &pMemTable->qList); - SQueryNode *pNextNode = pNode->pNext; +// SQueryNode *pNode = pMemTable->qList.pNext; +// while (1) { +// ASSERT(pNode != &pMemTable->qList); +// SQueryNode *pNextNode = pNode->pNext; - if (pNextNode == &pMemTable->qList) { - code = (*pNode->reseek)(pNode->pQHandle); - if (code) goto _exit; - break; - } else { - code = (*pNode->reseek)(pNode->pQHandle); - if (code) goto _exit; - pNode = pMemTable->qList.pNext; - ASSERT(pNode == pNextNode); - } - } +// if (pNextNode == &pMemTable->qList) { +// code = (*pNode->reseek)(pNode->pQHandle); +// if (code) goto _exit; +// break; +// } else { +// code = (*pNode->reseek)(pNode->pQHandle); +// if (code) goto _exit; +// pNode = pMemTable->qList.pNext; +// ASSERT(pNode == pNextNode); +// } +// } - // NOTE: Take care here, pMemTable is destroyed +// // NOTE: Take care here, pMemTable is destroyed -_exit: - return code; -} +// _exit: +// return code; +// } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 87b5e59bc0..b52c75f158 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4614,7 +4614,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { +int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->verRange; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 3a02a4a0bf..e519c69d44 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -25,6 +25,13 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo return -1; } memset(pPool, 0, sizeof(SVBufPool)); + + // query handle list + taosThreadMutexInit(&pPool->mutex, NULL); + pPool->nQuery = 0; + pPool->qList.pNext = &pPool->qList; + pPool->qList.ppNext = &pPool->qList.pNext; + pPool->pVnode = pVnode; pPool->id = id; pPool->ptr = pPool->node.data; @@ -60,6 +67,7 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { taosThreadSpinDestroy(pPool->lock); taosMemoryFree((void *)pPool->lock); } + taosThreadMutexDestroy(&pPool->mutex); taosMemoryFree(pPool); return 0; } @@ -97,6 +105,7 @@ int vnodeCloseBufPool(SVnode *pVnode) { } void vnodeBufPoolReset(SVBufPool *pPool) { + ASSERT(pPool->nQuery == 0); for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { ASSERT(pNode->pnext == &pPool->pTail); pNode->prev->pnext = &pPool->pTail; @@ -255,3 +264,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { taosThreadMutexUnlock(&pVnode->mutex); } + +int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, void *pQHandle, _query_reseek_func_t reseekFn) { + int32_t code = 0; + + SQueryNode *pQNode = taosMemoryMalloc(sizeof(*pQNode)); + if (pQNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pQNode->pQHandle = pQHandle; + pQNode->reseek = reseekFn; + + taosThreadMutexLock(&pPool->mutex); + + pQNode->pNext = pPool->qList.pNext; + pQNode->ppNext = &pPool->qList.pNext; + pPool->qList.pNext->ppNext = &pQNode->pNext; + pPool->qList.pNext = pQNode; + pPool->nQuery++; + + taosThreadMutexUnlock(&pPool->mutex); + +_exit: + return code; +} + +int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool) { + int32_t code = 0; + + taosThreadMutexLock(&pPool->mutex); + + ASSERT(0); + + taosThreadMutexUnlock(&pPool->mutex); + +_exit: + return code; +} \ No newline at end of file From f8c9ae50b822d859452d19000186d72dcc9553d5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 16:06:07 +0800 Subject: [PATCH 07/22] more code --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 11 +---------- source/dnode/vnode/src/vnd/vnodeBufPool.c | 6 ++++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 15 ++++++++++++--- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ba46d84667..e8771f15b5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -109,7 +109,7 @@ void vnodeBufPoolUnRef(SVBufPool* pPool); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn); -int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool); +int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode); // meta typedef struct SMCtbCursor SMCtbCursor; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 0b762cca1e..f661bf5ddc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -765,18 +765,9 @@ int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { int32_t code = 0; if (pNode) { - vnodeBufPoolDeregisterQuery(pMemTable->pPool); + vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode); } -#if 0 - // unregister handle (todo: take concurrency in consideration) - if (pNode) { - pNode->pNext->ppNext = pNode->ppNext; - *pNode->ppNext = pNode->pNext; - taosMemoryFree(pNode); - } -#endif - int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); if (nRef == 0) { tsdbMemTableDestroy(pMemTable); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index e519c69d44..00706ad12d 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -290,12 +290,14 @@ _exit: return code; } -int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool) { +int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { int32_t code = 0; taosThreadMutexLock(&pPool->mutex); - ASSERT(0); + pQNode->pNext->ppNext = pQNode->ppNext; + *pQNode->ppNext = pQNode->pNext; + pPool->nQuery--; taosThreadMutexUnlock(&pPool->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index f16f9ae1ef..8cc50876cb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -45,10 +45,19 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { } pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; - { - // TODO: do recycle the buffer pool - ASSERT(0); + // do recycle the buffer pool + SVBufPool *pPool = pVnode->onRecycle; + + taosThreadMutexLock(&pPool->mutex); + + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + // TODO: refact/finish here + pNode->reseek(pNode->pQHandle); + pNode = pNode->pNext; } + + taosThreadMutexUnlock(&pPool->mutex); } else { vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); } From ad9e8a9aa3345fae2e37402f420186894100a37f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Jan 2023 19:03:09 +0800 Subject: [PATCH 08/22] more code --- include/util/taoserror.h | 1 + source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 57 ++++++++---- source/dnode/vnode/src/vnd/vnodeBufPool.c | 16 ++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 100 ++++++++++++--------- source/util/src/terror.c | 1 + 8 files changed, 109 insertions(+), 74 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 59bba315f4..52d8a75ee0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -415,6 +415,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) +#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 70dbadeeea..fbc78e1104 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -211,7 +211,7 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e8771f15b5..2069ee070c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -108,7 +108,7 @@ void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); -int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, void* pQHandle, _query_reseek_func_t reseekFn); +int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode); // meta diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index f661bf5ddc..8d87db9a15 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -749,13 +749,13 @@ _exit: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _query_reseek_func_t reseek, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - vnodeBufPoolRegisterQuery(pMemTable->pPool, pQHandle, reseek); + vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode); _exit: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b52c75f158..e3dfaaa667 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4620,46 +4620,67 @@ int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsd SVersionRange* pRange = &pReader->verRange; // alloc - *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); - if (*ppSnap == NULL) { + STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap)); + if (pSnap == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } // lock - code = taosThreadRwlockRdlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _exit; - } + taosThreadRwlockRdlock(&pTsdb->rwLock); // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode); - (*ppSnap)->pMem = pTsdb->mem; + pSnap->pMem = pTsdb->mem; + pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); + if (pSnap->pNode == NULL) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pSnap->pNode->pQHandle = pReader; + pSnap->pNode->reseek = reseek; + + tsdbRefMemTable(pTsdb->mem, pSnap->pNode); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode); - (*ppSnap)->pIMem = pTsdb->imem; + pSnap->pIMem = pTsdb->imem; + pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); + if (pSnap->pINode == NULL) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pSnap->pINode->pQHandle = pReader; + pSnap->pINode->reseek = reseek; + + tsdbRefMemTable(pTsdb->imem, pSnap->pINode); } // fs - code = tsdbFSRef(pTsdb, &(*ppSnap)->fs); + code = tsdbFSRef(pTsdb, &pSnap->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _exit; } // unlock - code = taosThreadRwlockUnlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _exit; - } + taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); + _exit: + if (code) { + *ppSnap = NULL; + if (pSnap) { + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); + taosMemoryFree(pSnap); + } + } else { + *ppSnap = pSnap; + } return code; } @@ -4676,6 +4697,8 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { } tsdbFSUnref(pTsdb, &pSnap->fs); + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); taosMemoryFree(pSnap); } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 00706ad12d..d05f126413 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -215,12 +215,12 @@ void vnodeBufPoolRef(SVBufPool *pPool) { void vnodeBufPoolUnRef(SVBufPool *pPool) { if (pPool == NULL) return; - if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) return; - SVnode *pVnode = pPool->pVnode; taosThreadMutexLock(&pVnode->mutex); + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; + // remove from recycle list or on-recycle position if (pVnode->onRecycle == pPool) { pVnode->onRecycle = NULL; @@ -262,20 +262,14 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); +_exit: taosThreadMutexUnlock(&pVnode->mutex); + return; } -int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, void *pQHandle, _query_reseek_func_t reseekFn) { +int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { int32_t code = 0; - SQueryNode *pQNode = taosMemoryMalloc(sizeof(*pQNode)); - if (pQNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pQNode->pQHandle = pQHandle; - pQNode->reseek = reseekFn; - taosThreadMutexLock(&pPool->mutex); pQNode->pNext = pPool->qList.pNext; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8cc50876cb..00ea7fbd05 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -26,43 +26,57 @@ static int vnodeCommitImpl(SCommitInfo *pInfo); static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { int32_t code = 0; - if (pVnode->onRecycle) { - vDebug("vgId:%d buffer pool %p of id %d is on recycling", TD_VID(pVnode), pVnode->onRecycle, pVnode->onRecycle->id); - goto _exit; - } - - if (pVnode->recycleHead) { - vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, - pVnode->recycleHead->id); - - // pop the header buffer pool for recycling - pVnode->onRecycle = pVnode->recycleHead; - if (pVnode->recycleHead == pVnode->recycleTail) { - pVnode->recycleHead = pVnode->recycleTail = NULL; + if (pVnode->onRecycle == NULL) { + if (pVnode->recycleHead == NULL) { + vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + goto _exit; } else { - pVnode->recycleHead = pVnode->recycleHead->recycleNext; - pVnode->recycleHead->recyclePrev = NULL; + vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + pVnode->recycleHead->id); + + pVnode->onRecycle = pVnode->recycleHead; + if (pVnode->recycleHead == pVnode->recycleTail) { + pVnode->recycleHead = pVnode->recycleTail = NULL; + } else { + pVnode->recycleHead = pVnode->recycleHead->recycleNext; + pVnode->recycleHead->recyclePrev = NULL; + } + pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; } - pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; - - // do recycle the buffer pool - SVBufPool *pPool = pVnode->onRecycle; - - taosThreadMutexLock(&pPool->mutex); - - SQueryNode *pNode = pPool->qList.pNext; - while (pNode != &pPool->qList) { - // TODO: refact/finish here - pNode->reseek(pNode->pQHandle); - pNode = pNode->pNext; - } - - taosThreadMutexUnlock(&pPool->mutex); - } else { - vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); } + // do recycle the buffer pool + SVBufPool *pPool = pVnode->onRecycle; + vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id); + + taosThreadMutexLock(&pPool->mutex); + + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + int32_t rc = pNode->reseek(pNode->pQHandle); + if (rc == 0) { + SQueryNode *pTNode = pNode->pNext; + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + pPool->nQuery--; + pNode = pTNode; + } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { + pNode = pNode->pNext; + } else { + taosThreadMutexUnlock(&pPool->mutex); + code = rc; + goto _exit; + } + } + + taosThreadMutexUnlock(&pPool->mutex); + + // TODO: if (pPool->nQuery == 0) add to free list + _exit: + if (code) { + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); + } return code; } int vnodeBegin(SVnode *pVnode) { @@ -267,6 +281,12 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); + taosThreadMutexLock(&pVnode->mutex); + ASSERT(pVnode->onCommit == NULL); + pVnode->onCommit = pVnode->inUse; + pVnode->inUse = NULL; + taosThreadMutexUnlock(&pVnode->mutex); + pVnode->state.commitTerm = pVnode->state.applyTerm; pInfo->info.config = pVnode->config; @@ -296,12 +316,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; - taosThreadMutexLock(&pVnode->mutex); - ASSERT(pVnode->onCommit == NULL); - pVnode->onCommit = pVnode->inUse; - pVnode->inUse = NULL; - taosThreadMutexUnlock(&pVnode->mutex); - _exit: if (code) { vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino, @@ -323,12 +337,12 @@ static int32_t vnodeCommitTask(void *arg) { if (code) goto _exit; // recycle buffer pool - SVBufPool *pPool = pVnode->onCommit; - taosThreadMutexLock(&pVnode->mutex); + SVBufPool *pPool = pVnode->onCommit; + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + pVnode->onCommit = NULL; - int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); if (nRef == 0) { // add to free list vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); @@ -337,7 +351,7 @@ static int32_t vnodeCommitTask(void *arg) { pPool->freeNext = pVnode->freeList; pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); - } else { + } else if (nRef > 0) { // add to recycle list vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); @@ -350,6 +364,8 @@ static int32_t vnodeCommitTask(void *arg) { pVnode->recycleTail->recycleNext = pPool; pVnode->recycleTail = pPool; } + } else { + ASSERT(0); } taosThreadMutexUnlock(&pVnode->mutex); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a142bb76d7..bab3edc870 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -320,6 +320,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subsc TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_QUERY_BUSY, "Query busy") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") From 1dfeaf8b183b7c7970150da44dc0d6fcf6928558 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 09:53:38 +0800 Subject: [PATCH 09/22] refact code --- source/dnode/vnode/src/vnd/vnodeCommit.c | 41 +++++++++++++++++------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 7d874f1d36..de6bbdbf40 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -79,12 +79,15 @@ _exit: } return code; } -int vnodeBegin(SVnode *pVnode) { - // alloc buffer pool +static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { + int32_t code = 0; + taosThreadMutexLock(&pVnode->mutex); int32_t nTry = 0; - while (++nTry) { + for (;;) { + ++nTry; + if (pVnode->freeList) { vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, pVnode->freeList->id); @@ -126,26 +129,42 @@ int vnodeBegin(SVnode *pVnode) { taosThreadMutexUnlock(&pVnode->mutex); +_exit: + return code; +} +int vnodeBegin(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; + pVnode->state.commitID++; + + // alloc buffer pool + code = vnodeGetBufPoolToUse(pVnode); + TSDB_CHECK_CODE(code, lino, _exit); + // begin meta if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { - vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // begin tsdb if (tsdbBegin(pVnode->pTsdb) < 0) { - vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // begin sma if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { - vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } - return 0; +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; } void vnodeUpdCommitSched(SVnode *pVnode) { @@ -160,7 +179,7 @@ int vnodeShouldCommit(SVnode *pVnode) { } SVCommitSched *pSched = &pVnode->commitSched; - int64_t nowMs = taosGetMonoTimestampMs(); + int64_t nowMs = taosGetMonoTimestampMs(); return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); From 851dba5166a6be1d90664cafab1c13ef8857a157 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 09:56:38 +0800 Subject: [PATCH 10/22] refact code --- source/dnode/vnode/src/vnd/vnodeCommit.c | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index de6bbdbf40..8657b80bde 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -81,6 +81,7 @@ _exit: } static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { int32_t code = 0; + int32_t lino = 0; taosThreadMutexLock(&pVnode->mutex); @@ -100,12 +101,8 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { } else { vInfo("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); - terrno = vnodeTryRecycleBufPool(pVnode); - if (terrno != TSDB_CODE_SUCCESS) { - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); - taosThreadMutexUnlock(&pVnode->mutex); - return -1; - } + code = vnodeTryRecycleBufPool(pVnode); + TSDB_CHECK_CODE(code, lino, _exit); if (pVnode->freeList == NULL) { vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); @@ -118,18 +115,18 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); if (rc && rc != ETIMEDOUT) { - terrno = TAOS_SYSTEM_ERROR(rc); - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); - taosThreadMutexUnlock(&pVnode->mutex); - return -1; + code = TAOS_SYSTEM_ERROR(rc); + TSDB_CHECK_CODE(code, lino, _exit); } } } } - taosThreadMutexUnlock(&pVnode->mutex); - _exit: + taosThreadMutexUnlock(&pVnode->mutex); + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } return code; } int vnodeBegin(SVnode *pVnode) { From 3178843b72aa14239d044d3f39a0d34bf9092438 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 10 Jan 2023 10:11:17 +0800 Subject: [PATCH 11/22] tsdb/cache: reseek with busy always --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 1c1626ba5c..1eea398fdf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -208,13 +208,21 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) { int32_t code = 0; SCacheRowsReader* pReader = pQHandle; - taosThreadMutexLock(&pReader->readerMutex); + code = taosThreadMutexTryLock(&pReader->readerMutex); + if (code == 0) { + // pause current reader's state if not paused, save ts & version for resuming + // just wait for the big all tables' snapshot untaking for now - // pause current reader's state if not paused, save ts & version for resuming - // just wait for the big all tables' snapshot untaking for now + code = TSDB_CODE_VND_QUERY_BUSY; - taosThreadMutexUnlock(&pReader->readerMutex); - return code; + taosThreadMutexUnlock(&pReader->readerMutex); + + return code; + } else if (code == EBUSY) { + return TSDB_CODE_VND_QUERY_BUSY; + } else { + return -1; + } } int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { From d69ec98723f91af0d149aca37428d62c68f4a515 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 10 Jan 2023 10:38:44 +0800 Subject: [PATCH 12/22] tsdb/read: reseek with try lock mutex --- source/dnode/vnode/src/tsdb/tsdbRead.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 68e6a0d86b..785243d365 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4068,18 +4068,23 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { int32_t code = 0; STsdbReader* pReader = pQHandle; - taosThreadMutexLock(&pReader->readerMutex); + code = taosThreadMutexTryLock(&pReader->readerMutex); + if (code == 0) { + if (pReader->suspended) { + taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } + + tsdbReaderSuspend(pReader); - if (pReader->suspended) { taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } else if (code == EBUSY) { + return TSDB_CODE_VND_QUERY_BUSY; + } else { + return -1; } - - tsdbReaderSuspend(pReader); - - taosThreadMutexUnlock(&pReader->readerMutex); - - return code; } int32_t tsdbReaderResume(STsdbReader* pReader) { From f26f661f47920f050a7d3d8a6af954afc38439b4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 11:01:41 +0800 Subject: [PATCH 13/22] more code --- source/dnode/vnode/src/inc/vnd.h | 2 + source/dnode/vnode/src/vnd/vnodeBufPool.c | 100 ++++++++++++++++------ source/dnode/vnode/src/vnd/vnodeCommit.c | 66 ++++---------- 3 files changed, 94 insertions(+), 74 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index fafb146ad8..e0d0e9408b 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -83,6 +83,8 @@ struct SVBufPool { int32_t vnodeOpenBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode); void vnodeBufPoolReset(SVBufPool* pPool); +void vnodeBufPoolAddToFreeList(SVBufPool* pPool); +int32_t vnodeBufPoolRecycle(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index d05f126413..2aff4a1927 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -212,34 +212,9 @@ void vnodeBufPoolRef(SVBufPool *pPool) { ASSERT(nRef > 0); } -void vnodeBufPoolUnRef(SVBufPool *pPool) { - if (pPool == NULL) return; - +void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { SVnode *pVnode = pPool->pVnode; - taosThreadMutexLock(&pVnode->mutex); - - if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; - - // remove from recycle list or on-recycle position - if (pVnode->onRecycle == pPool) { - pVnode->onRecycle = NULL; - } else { - if (pPool->recyclePrev) { - pPool->recyclePrev->recycleNext = pPool->recycleNext; - } else { - pVnode->recycleHead = pPool->recycleNext; - } - - if (pPool->recycleNext) { - pPool->recycleNext->recyclePrev = pPool->recyclePrev; - } else { - pVnode->recycleTail = pPool->recyclePrev; - } - pPool->recyclePrev = pPool->recycleNext = NULL; - } - - // change the pool size if need int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; if (pPool->node.size != size) { SVBufPool *pNewPool = NULL; @@ -257,10 +232,41 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { } // add to free list + vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); vnodeBufPoolReset(pPool); pPool->freeNext = pVnode->freeList; pVnode->freeList = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); +} + +void vnodeBufPoolUnRef(SVBufPool *pPool) { + if (pPool == NULL) return; + + SVnode *pVnode = pPool->pVnode; + + taosThreadMutexLock(&pVnode->mutex); + + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; + + // remove from recycle queue or on-recycle position + if (pVnode->onRecycle == pPool) { + pVnode->onRecycle = NULL; + } else { + if (pPool->recyclePrev) { + pPool->recyclePrev->recycleNext = pPool->recycleNext; + } else { + pVnode->recycleHead = pPool->recycleNext; + } + + if (pPool->recycleNext) { + pPool->recycleNext->recyclePrev = pPool->recyclePrev; + } else { + pVnode->recycleTail = pPool->recyclePrev; + } + pPool->recyclePrev = pPool->recycleNext = NULL; + } + + vnodeBufPoolAddToFreeList(pPool); _exit: taosThreadMutexUnlock(&pVnode->mutex); @@ -295,6 +301,48 @@ int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { taosThreadMutexUnlock(&pPool->mutex); +_exit: + return code; +} + +int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { + int32_t code = 0; + + bool canRecycle; + SVnode *pVnode = pPool->pVnode; + + vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); + + taosThreadMutexLock(&pPool->mutex); + + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + int32_t rc = pNode->reseek(pNode->pQHandle); + if (rc == 0) { + SQueryNode *pTNode = pNode->pNext; + pNode->pNext->ppNext = pNode->ppNext; + *pNode->ppNext = pNode->pNext; + pPool->nQuery--; + pNode = pTNode; + } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { + pNode = pNode->pNext; + } else { + taosThreadMutexUnlock(&pPool->mutex); + code = rc; + goto _exit; + } + } + + canRecycle = (pPool->nQuery == 0); + + taosThreadMutexUnlock(&pPool->mutex); + + if (canRecycle) { + ASSERT(atomic_load_32(&pPool->nRef) == 0); + pVnode->onRecycle = NULL; + vnodeBufPoolAddToFreeList(pPool); + } + _exit: return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8657b80bde..d3fe8e6cc5 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -31,7 +31,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); goto _exit; } else { - vDebug("vgId:%d buffer pool %p of id %d on recycle list, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, pVnode->recycleHead->id); pVnode->onRecycle = pVnode->recycleHead; @@ -45,33 +45,8 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { } } - // do recycle the buffer pool - SVBufPool *pPool = pVnode->onRecycle; - vDebug("vgId:%d buffer pool %p of id %d on recycle", TD_VID(pVnode), pPool, pPool->id); - - taosThreadMutexLock(&pPool->mutex); - - SQueryNode *pNode = pPool->qList.pNext; - while (pNode != &pPool->qList) { - int32_t rc = pNode->reseek(pNode->pQHandle); - if (rc == 0) { - SQueryNode *pTNode = pNode->pNext; - pNode->pNext->ppNext = pNode->ppNext; - *pNode->ppNext = pNode->pNext; - pPool->nQuery--; - pNode = pTNode; - } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { - pNode = pNode->pNext; - } else { - taosThreadMutexUnlock(&pPool->mutex); - code = rc; - goto _exit; - } - } - - taosThreadMutexUnlock(&pPool->mutex); - - // TODO: if (pPool->nQuery == 0) add to free list + code = vnodeBufPoolRecycle(pVnode->onRecycle); + if (code) goto _exit; _exit: if (code) { @@ -360,17 +335,7 @@ _exit: return code; } -static int32_t vnodeCommitTask(void *arg) { - int32_t code = 0; - - SCommitInfo *pInfo = (SCommitInfo *)arg; - SVnode *pVnode = pInfo->pVnode; - - // commit - code = vnodeCommitImpl(pInfo); - if (code) goto _exit; - - // recycle buffer pool +static void vnodeReturnBufPool(SVnode *pVnode) { taosThreadMutexLock(&pVnode->mutex); SVBufPool *pPool = pVnode->onCommit; @@ -378,16 +343,9 @@ static int32_t vnodeCommitTask(void *arg) { pVnode->onCommit = NULL; if (nRef == 0) { - // add to free list - vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); - - vnodeBufPoolReset(pPool); - pPool->freeNext = pVnode->freeList; - pVnode->freeList = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); + vnodeBufPoolAddToFreeList(pPool); } else if (nRef > 0) { - // add to recycle list - vDebug("vgId:%d buffer pool %p of id %d is added to recycle list", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); if (pVnode->recycleTail == NULL) { pPool->recyclePrev = pPool->recycleNext = NULL; @@ -403,6 +361,18 @@ static int32_t vnodeCommitTask(void *arg) { } taosThreadMutexUnlock(&pVnode->mutex); +} +static int32_t vnodeCommitTask(void *arg) { + int32_t code = 0; + + SCommitInfo *pInfo = (SCommitInfo *)arg; + SVnode *pVnode = pInfo->pVnode; + + // commit + code = vnodeCommitImpl(pInfo); + if (code) goto _exit; + + vnodeReturnBufPool(pVnode); _exit: // end commit From bff8c216ccb92e6d5ae7ce88ad646dc6fa725b62 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 11:36:40 +0800 Subject: [PATCH 14/22] more code --- source/dnode/vnode/src/inc/tsdb.h | 4 +-- source/dnode/vnode/src/inc/vnodeInt.h | 4 +-- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 +-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 13 ++++---- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +-- source/dnode/vnode/src/vnd/vnodeBufPool.c | 36 +++++++--------------- 7 files changed, 26 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index fbc78e1104..4610ea93a0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -209,10 +209,10 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); -void tsdbMemTableDestroy(SMemTable *pMemTable); +void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ed9ce45995..9c5f47b731 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -105,11 +105,11 @@ void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolRef(SVBufPool* pPool); -void vnodeBufPoolUnRef(SVBufPool* pPool); +void vnodeBufPoolUnRef(SVBufPool* pPool, bool proactive); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); -int32_t vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode); +void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive); // meta typedef struct SMCtbCursor SMCtbCursor; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 31d6fd85c6..d15f848cfd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { pTsdb->imem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable, NULL); + tsdbUnrefMemTable(pMemTable, NULL, true); goto _exit; } @@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); if (pMemTable) { - tsdbUnrefMemTable(pMemTable, NULL); + tsdbUnrefMemTable(pMemTable, NULL, true); } _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 8d87db9a15..d68afbed1f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -76,9 +76,9 @@ _err: return code; } -void tsdbMemTableDestroy(SMemTable *pMemTable) { +void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) { if (pMemTable) { - vnodeBufPoolUnRef(pMemTable->pPool); + vnodeBufPoolUnRef(pMemTable->pPool, proactive); taosMemoryFree(pMemTable->aBucket); taosMemoryFree(pMemTable); } @@ -761,16 +761,15 @@ _exit: return code; } -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) { int32_t code = 0; if (pNode) { - vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode); + vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive); } - int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); - if (nRef == 0) { - tsdbMemTableDestroy(pMemTable); + if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) { + tsdbMemTableDestroy(pMemTable, proactive); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index f71b5b6706..8901f64459 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -88,7 +88,7 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { taosThreadRwlockWrlock(&(*pTsdb)->rwLock); - tsdbMemTableDestroy((*pTsdb)->mem); + tsdbMemTableDestroy((*pTsdb)->mem, true); (*pTsdb)->mem = NULL; taosThreadRwlockUnlock(&(*pTsdb)->rwLock); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 785243d365..d8e514963f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4719,11 +4719,11 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode); + tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, true); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode); + tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, true); } tsdbFSUnref(pTsdb, &pSnap->fs); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 2aff4a1927..1c5493cb6a 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -239,12 +239,12 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { taosThreadCondSignal(&pVnode->poolNotEmpty); } -void vnodeBufPoolUnRef(SVBufPool *pPool) { +void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) { if (pPool == NULL) return; SVnode *pVnode = pPool->pVnode; - taosThreadMutexLock(&pVnode->mutex); + if (proactive) taosThreadMutexLock(&pVnode->mutex); if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; @@ -252,6 +252,8 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { if (pVnode->onRecycle == pPool) { pVnode->onRecycle = NULL; } else { + ASSERT(proactive); + if (pPool->recyclePrev) { pPool->recyclePrev->recycleNext = pPool->recycleNext; } else { @@ -269,7 +271,7 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { vnodeBufPoolAddToFreeList(pPool); _exit: - taosThreadMutexUnlock(&pVnode->mutex); + if (proactive) taosThreadMutexUnlock(&pVnode->mutex); return; } @@ -290,25 +292,21 @@ _exit: return code; } -int32_t vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { +void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) { int32_t code = 0; - taosThreadMutexLock(&pPool->mutex); + if (proactive) taosThreadMutexLock(&pPool->mutex); pQNode->pNext->ppNext = pQNode->ppNext; *pQNode->ppNext = pQNode->pNext; pPool->nQuery--; - taosThreadMutexUnlock(&pPool->mutex); - -_exit: - return code; + if (proactive) taosThreadMutexUnlock(&pPool->mutex); } int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { int32_t code = 0; - bool canRecycle; SVnode *pVnode = pPool->pVnode; vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); @@ -317,15 +315,11 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { SQueryNode *pNode = pPool->qList.pNext; while (pNode != &pPool->qList) { + SQueryNode *pTNode = pNode->pNext; + int32_t rc = pNode->reseek(pNode->pQHandle); - if (rc == 0) { - SQueryNode *pTNode = pNode->pNext; - pNode->pNext->ppNext = pNode->ppNext; - *pNode->ppNext = pNode->pNext; - pPool->nQuery--; + if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) { pNode = pTNode; - } else if (rc == TSDB_CODE_VND_QUERY_BUSY) { - pNode = pNode->pNext; } else { taosThreadMutexUnlock(&pPool->mutex); code = rc; @@ -333,16 +327,8 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { } } - canRecycle = (pPool->nQuery == 0); - taosThreadMutexUnlock(&pPool->mutex); - if (canRecycle) { - ASSERT(atomic_load_32(&pPool->nRef) == 0); - pVnode->onRecycle = NULL; - vnodeBufPoolAddToFreeList(pPool); - } - _exit: return code; } \ No newline at end of file From e48bdbfd0f1b69bce52dad85155c571b14307133 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 10 Jan 2023 11:52:40 +0800 Subject: [PATCH 15/22] tsdb/log-query: new proactive flag for mem table unref --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 13 +++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4610ea93a0..8c8775548d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -291,7 +291,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); -void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); +void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 1eea398fdf..6a5acecfc3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -383,7 +383,7 @@ _end: tsdbDataFReaderClose(&pr->pDataFReader); resetLastBlockLoadInfo(pr->pLoadInfo); - tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap); + tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true); taosThreadMutexUnlock(&pr->readerMutex); for (int32_t j = 0; j < pr->numOfCols; ++j) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d8e514963f..d84ce5206e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3953,7 +3953,7 @@ void tsdbReaderClose(STsdbReader* pReader) { } qTrace("tsdb/reader: %p, untake snapshot", pReader); - tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true); taosThreadMutexDestroy(&pReader->readerMutex); @@ -4052,7 +4052,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { } } - tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); pReader->suspended = true; @@ -4083,7 +4083,8 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { } else if (code == EBUSY) { return TSDB_CODE_VND_QUERY_BUSY; } else { - return -1; + terrno = TAOS_SYSTEM_ERROR(code); + return TSDB_CODE_FAILED; } } @@ -4714,16 +4715,16 @@ _exit: return code; } -void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { +void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) { STsdb* pTsdb = pReader->pTsdb; if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, true); + tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, true); + tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); } tsdbFSUnref(pTsdb, &pSnap->fs); From 2e161729b63f5e29fed954fee260ebbd20c2f4e7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 11:56:09 +0800 Subject: [PATCH 16/22] refact code --- source/dnode/vnode/src/vnd/vnodeBufPool.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 1c5493cb6a..80a422ca3b 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -321,14 +321,12 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) { pNode = pTNode; } else { - taosThreadMutexUnlock(&pPool->mutex); code = rc; goto _exit; } } - taosThreadMutexUnlock(&pPool->mutex); - _exit: + taosThreadMutexUnlock(&pPool->mutex); return code; } \ No newline at end of file From 086ec246883c6a7212ad21d5bbbc01c1f981611f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 10 Jan 2023 14:43:51 +0800 Subject: [PATCH 17/22] tsdb/reader: reset reader status with lock protected --- source/dnode/vnode/src/tsdb/tsdbRead.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d84ce5206e..b27acdfb63 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4434,15 +4434,13 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { SReaderStatus* pStatus = &pReader->status; - qTrace("tsdb/read: %p, take read mutex", pReader); + qTrace("tsdb/reader-reset: %p, take read mutex", pReader); taosThreadMutexLock(&pReader->readerMutex); if (pReader->suspended) { tsdbReaderResume(pReader); } - taosThreadMutexUnlock(&pReader->readerMutex); - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); return TSDB_CODE_SUCCESS; @@ -4480,6 +4478,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { if (code != TSDB_CODE_SUCCESS) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + + taosThreadMutexUnlock(&pReader->readerMutex); + return code; } } @@ -4489,6 +4490,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); + taosThreadMutexUnlock(&pReader->readerMutex); + return code; } From 4032340e9244478fc2e7eb5f529d0499cecf8390 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 10 Jan 2023 14:43:51 +0800 Subject: [PATCH 18/22] tsdb/reader: reset reader status with lock protected --- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d84ce5206e..1156bae50f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4434,17 +4434,18 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { SReaderStatus* pStatus = &pReader->status; - qTrace("tsdb/read: %p, take read mutex", pReader); + qTrace("tsdb/reader-reset: %p, take read mutex", pReader); taosThreadMutexLock(&pReader->readerMutex); if (pReader->suspended) { tsdbReaderResume(pReader); } - taosThreadMutexUnlock(&pReader->readerMutex); - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); + + taosThreadMutexUnlock(&pReader->readerMutex); + return TSDB_CODE_SUCCESS; } @@ -4480,6 +4481,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { if (code != TSDB_CODE_SUCCESS) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + + taosThreadMutexUnlock(&pReader->readerMutex); + return code; } } @@ -4489,6 +4493,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); + taosThreadMutexUnlock(&pReader->readerMutex); + return code; } From 1154f90aa0c6841c0e0758c6a04b583575430732 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 17:00:25 +0800 Subject: [PATCH 19/22] fix: cond wait EINVAL --- source/dnode/vnode/src/vnd/vnodeCommit.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index d3fe8e6cc5..8cbeadb229 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -85,8 +85,13 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { struct timeval tv; struct timespec ts; taosGetTimeOfDay(&tv); - ts.tv_sec = tv.tv_sec; ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; + if (ts.tv_nsec > 999999999l) { + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec -= 1000000000l; + } else { + ts.tv_sec = tv.tv_sec; + } int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); if (rc && rc != ETIMEDOUT) { @@ -134,6 +139,7 @@ int vnodeBegin(SVnode *pVnode) { _exit: if (code) { + terrno = code; vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; From 020b79a094f37945e0043884719afbfd24c8d20f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Jan 2023 17:04:23 +0800 Subject: [PATCH 20/22] little fix --- source/dnode/vnode/src/vnd/vnodeCommit.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 8cbeadb229..4473d4e9aa 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -74,7 +74,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { pVnode->inUse->freeNext = NULL; break; } else { - vInfo("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); + vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); code = vnodeTryRecycleBufPool(pVnode); TSDB_CHECK_CODE(code, lino, _exit); From 5024fc1e9f1f01afbc6ccc1724015efc5b6d33c2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 10 Jan 2023 18:40:15 +0800 Subject: [PATCH 21/22] tsdb/suspend: fix table block scan info --- source/dnode/vnode/src/tsdb/tsdbRead.c | 31 +++++++++++++++++--------- source/dnode/vnode/src/vnd/vnodeSvr.c | 6 ++--- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1156bae50f..5ba44decb4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4001,7 +4001,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { - pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pBlockScanInfo = + *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pBlockScanInfo == NULL) { code = TSDB_CODE_INVALID_PARA; tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, @@ -4015,20 +4016,28 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); // resetDataBlockScanInfo excluding lastKey - STableBlockScanInfo* p = NULL; + STableBlockScanInfo** p = NULL; while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) { - p->iterInit = false; - p->iiter.hasVal = false; - if (p->iter.iter != NULL) { - p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; + + pInfo->iterInit = false; + pInfo->iter.hasVal = false; + pInfo->iiter.hasVal = false; + + if (pInfo->iter.iter != NULL) { + pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); } - p->delSkyline = taosArrayDestroy(p->delSkyline); - // p->lastKey = ts; + if (pInfo->iiter.iter != NULL) { + pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); + } + + pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); + // pInfo->lastKey = ts; } } else { - pBlockScanInfo = *pStatus->pTableIter; + pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter; if (pBlockScanInfo) { // save lastKey to restore memory iterator STimeWindow w = pReader->pResBlock->info.window; @@ -4053,10 +4062,12 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { } tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); + pReader->pReadSnap = NULL; pReader->suspended = true; - tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr); + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, + pReader->idStr); return code; _err: diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index deb9da2050..6abc144b91 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -191,13 +191,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp void *pReq; int32_t len; int32_t ret; - + /* if (!pVnode->inUse) { terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL; vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr()); return -1; } - + */ if (version <= pVnode->state.applied) { vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, pVnode->state.applied); @@ -1001,7 +1001,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq code = terrno; goto _exit; } - pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid; // update uid if table exist for using below + pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid; // update uid if table exist for using below } } From ba68d5d7b3424f5ca690373a67c39ea4b75b6091 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 11 Jan 2023 14:03:51 +0800 Subject: [PATCH 22/22] vnode: disable buffer pool recycling for 3.0 merging --- source/dnode/vnode/src/vnd/vnodeCommit.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4473d4e9aa..63c05ce88c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -75,10 +75,10 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { break; } else { vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); - + /* code = vnodeTryRecycleBufPool(pVnode); TSDB_CHECK_CODE(code, lino, _exit); - + */ if (pVnode->freeList == NULL) { vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);