From ee56c5006924ef66dbff5e0f1e8dfdc3ffcf33ce Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 9 Oct 2022 13:52:44 +0800 Subject: [PATCH] more code --- source/dnode/mnode/impl/src/mndDb.c | 6 +- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/vnd/vnodeBufPool.c | 102 ++++++++++++---------- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++ 5 files changed, 67 insertions(+), 53 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 9f0cb0b510..adc5ff2b42 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -631,7 +631,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) { -#if 1 +#if 0 terrno = TSDB_CODE_OPS_NOT_SUPPORT; return terrno; #else @@ -641,7 +641,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { } if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) { -#if 1 +#if 0 terrno = TSDB_CODE_OPS_NOT_SUPPORT; return terrno; #else @@ -1311,7 +1311,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, continue; } else { mInfo("db:%s, vgroup version changed from %d to %d", pDbVgVersion->dbFName, pDbVgVersion->vgVersion, - pDb->vgVersion); + pDb->vgVersion); } usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 900d29b97e..aca99ecd2f 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -72,7 +72,7 @@ struct SVBufPool { SVBufPoolNode node; }; -int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size); +int32_t vnodeOpenBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode); void vnodeBufPoolReset(SVBufPool* pPool); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 6e02425b55..3c0a041197 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -16,20 +16,53 @@ #include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ +#define VNODE_BUFPOOL_SEGMENTS 3 -static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool); -static int vnodeBufPoolDestroy(SVBufPool *pPool); +static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { + SVBufPool *pPool; -int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { + pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); + if (pPool == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (taosThreadSpinInit(&pPool->lock, 0) != 0) { + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pPool->next = NULL; + pPool->pVnode = pVnode; + pPool->nRef = 0; + pPool->size = 0; + pPool->ptr = pPool->node.data; + pPool->pTail = &pPool->node; + pPool->node.prev = NULL; + pPool->node.pnext = &pPool->pTail; + pPool->node.size = size; + + *ppPool = pPool; + return 0; +} + +static int vnodeBufPoolDestroy(SVBufPool *pPool) { + vnodeBufPoolReset(pPool); + taosThreadSpinDestroy(&pPool->lock); + taosMemoryFree(pPool); + return 0; +} + +int vnodeOpenBufPool(SVnode *pVnode) { SVBufPool *pPool = NULL; - int ret; + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; ASSERT(pVnode->pPool == NULL); for (int i = 0; i < 3; i++) { // create pool - ret = vnodeBufPoolCreate(pVnode, size, &pPool); - if (ret < 0) { + if (vnodeBufPoolCreate(pVnode, size, &pPool)) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); return -1; @@ -41,7 +74,6 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { } vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size); - return 0; } @@ -63,9 +95,7 @@ int vnodeCloseBufPool(SVnode *pVnode) { } void vnodeBufPoolReset(SVBufPool *pPool) { - SVBufPoolNode *pNode; - - for (pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { + for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { ASSERT(pNode->pnext == &pPool->pTail); pNode->prev->pnext = &pPool->pTail; pPool->pTail = pNode->prev; @@ -81,7 +111,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { SVBufPoolNode *pNode; - void *p; + void *p = NULL; taosThreadSpinLock(&pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node @@ -124,43 +154,6 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) { } } -// STATIC METHODS ------------------- -static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { - SVBufPool *pPool; - - pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); - if (pPool == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (taosThreadSpinInit(&pPool->lock, 0) != 0) { - taosMemoryFree(pPool); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - pPool->next = NULL; - pPool->pVnode = pVnode; - pPool->nRef = 0; - pPool->size = 0; - pPool->ptr = pPool->node.data; - pPool->pTail = &pPool->node; - pPool->node.prev = NULL; - pPool->node.pnext = &pPool->pTail; - pPool->node.size = size; - - *ppPool = pPool; - return 0; -} - -static int vnodeBufPoolDestroy(SVBufPool *pPool) { - vnodeBufPoolReset(pPool); - taosThreadSpinDestroy(&pPool->lock); - taosMemoryFree(pPool); - return 0; -} - void vnodeBufPoolRef(SVBufPool *pPool) { int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1); ASSERT(nRef > 0); @@ -175,6 +168,19 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { taosThreadMutexLock(&pVnode->mutex); + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; + if (pPool->node.size != size) { + SVBufPool *pPoolT = NULL; + if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) { + vWarn("vgId:%d try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode), + pPool->node.size, size, tstrerror(errno)); + } else { + vnodeBufPoolDestroy(pPool); + pPool = pPoolT; + vDebug("vgId:%d change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size); + } + } + pPool->next = pVnode->pPool; pVnode->pPool = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index b5307cecf2..616aa39bdf 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -96,7 +96,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadCondInit(&pVnode->poolNotEmpty, NULL); // open buffer pool - if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { + if (vnodeOpenBufPool(pVnode) < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 28093dfc70..aea68cd462 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1040,6 +1040,14 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024); } + if (pVnode->config.szBuf != alterReq.buffer * 1024LL * 1024LL) { + pVnode->config.szBuf = alterReq.buffer * 1024LL * 1024LL; + } + + if (pVnode->config.szCache != alterReq.pages) { + // TODO + } + if (pVnode->config.cacheLast != alterReq.cacheLast) { pVnode->config.cacheLast = alterReq.cacheLast; }