more code
This commit is contained in:
parent
823b1c3fea
commit
ee56c50069
|
@ -631,7 +631,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
|
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
|
||||||
|
|
||||||
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
|
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
|
||||||
#if 1
|
#if 0
|
||||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
return terrno;
|
return terrno;
|
||||||
#else
|
#else
|
||||||
|
@ -641,7 +641,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
|
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
|
||||||
#if 1
|
#if 0
|
||||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
return terrno;
|
return terrno;
|
||||||
#else
|
#else
|
||||||
|
@ -1311,7 +1311,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
mInfo("db:%s, vgroup version changed from %d to %d", pDbVgVersion->dbFName, pDbVgVersion->vgVersion,
|
mInfo("db:%s, vgroup version changed from %d to %d", pDbVgVersion->dbFName, pDbVgVersion->vgVersion,
|
||||||
pDb->vgVersion);
|
pDb->vgVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
|
|
|
@ -72,7 +72,7 @@ struct SVBufPool {
|
||||||
SVBufPoolNode node;
|
SVBufPoolNode node;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
|
int32_t vnodeOpenBufPool(SVnode* pVnode);
|
||||||
int32_t vnodeCloseBufPool(SVnode* pVnode);
|
int32_t vnodeCloseBufPool(SVnode* pVnode);
|
||||||
void vnodeBufPoolReset(SVBufPool* pPool);
|
void vnodeBufPoolReset(SVBufPool* pPool);
|
||||||
|
|
||||||
|
|
|
@ -16,20 +16,53 @@
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
/* ------------------------ STRUCTURES ------------------------ */
|
/* ------------------------ STRUCTURES ------------------------ */
|
||||||
|
#define VNODE_BUFPOOL_SEGMENTS 3
|
||||||
|
|
||||||
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool);
|
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
|
||||||
static int vnodeBufPoolDestroy(SVBufPool *pPool);
|
SVBufPool *pPool;
|
||||||
|
|
||||||
int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
|
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
|
||||||
|
if (pPool == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
|
||||||
|
taosMemoryFree(pPool);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPool->next = NULL;
|
||||||
|
pPool->pVnode = pVnode;
|
||||||
|
pPool->nRef = 0;
|
||||||
|
pPool->size = 0;
|
||||||
|
pPool->ptr = pPool->node.data;
|
||||||
|
pPool->pTail = &pPool->node;
|
||||||
|
pPool->node.prev = NULL;
|
||||||
|
pPool->node.pnext = &pPool->pTail;
|
||||||
|
pPool->node.size = size;
|
||||||
|
|
||||||
|
*ppPool = pPool;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
||||||
|
vnodeBufPoolReset(pPool);
|
||||||
|
taosThreadSpinDestroy(&pPool->lock);
|
||||||
|
taosMemoryFree(pPool);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int vnodeOpenBufPool(SVnode *pVnode) {
|
||||||
SVBufPool *pPool = NULL;
|
SVBufPool *pPool = NULL;
|
||||||
int ret;
|
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
|
||||||
|
|
||||||
ASSERT(pVnode->pPool == NULL);
|
ASSERT(pVnode->pPool == NULL);
|
||||||
|
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
// create pool
|
// create pool
|
||||||
ret = vnodeBufPoolCreate(pVnode, size, &pPool);
|
if (vnodeBufPoolCreate(pVnode, size, &pPool)) {
|
||||||
if (ret < 0) {
|
|
||||||
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
vnodeCloseBufPool(pVnode);
|
vnodeCloseBufPool(pVnode);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -41,7 +74,6 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
|
vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,9 +95,7 @@ int vnodeCloseBufPool(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeBufPoolReset(SVBufPool *pPool) {
|
void vnodeBufPoolReset(SVBufPool *pPool) {
|
||||||
SVBufPoolNode *pNode;
|
for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
|
||||||
|
|
||||||
for (pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
|
|
||||||
ASSERT(pNode->pnext == &pPool->pTail);
|
ASSERT(pNode->pnext == &pPool->pTail);
|
||||||
pNode->prev->pnext = &pPool->pTail;
|
pNode->prev->pnext = &pPool->pTail;
|
||||||
pPool->pTail = pNode->prev;
|
pPool->pTail = pNode->prev;
|
||||||
|
@ -81,7 +111,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
|
||||||
|
|
||||||
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
||||||
SVBufPoolNode *pNode;
|
SVBufPoolNode *pNode;
|
||||||
void *p;
|
void *p = NULL;
|
||||||
taosThreadSpinLock(&pPool->lock);
|
taosThreadSpinLock(&pPool->lock);
|
||||||
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
|
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
|
||||||
// allocate from the anchor node
|
// allocate from the anchor node
|
||||||
|
@ -124,43 +154,6 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// STATIC METHODS -------------------
|
|
||||||
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
|
|
||||||
SVBufPool *pPool;
|
|
||||||
|
|
||||||
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
|
|
||||||
if (pPool == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
|
|
||||||
taosMemoryFree(pPool);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pPool->next = NULL;
|
|
||||||
pPool->pVnode = pVnode;
|
|
||||||
pPool->nRef = 0;
|
|
||||||
pPool->size = 0;
|
|
||||||
pPool->ptr = pPool->node.data;
|
|
||||||
pPool->pTail = &pPool->node;
|
|
||||||
pPool->node.prev = NULL;
|
|
||||||
pPool->node.pnext = &pPool->pTail;
|
|
||||||
pPool->node.size = size;
|
|
||||||
|
|
||||||
*ppPool = pPool;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
|
||||||
vnodeBufPoolReset(pPool);
|
|
||||||
taosThreadSpinDestroy(&pPool->lock);
|
|
||||||
taosMemoryFree(pPool);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeBufPoolRef(SVBufPool *pPool) {
|
void vnodeBufPoolRef(SVBufPool *pPool) {
|
||||||
int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
|
int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
|
||||||
ASSERT(nRef > 0);
|
ASSERT(nRef > 0);
|
||||||
|
@ -175,6 +168,19 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
||||||
|
|
||||||
taosThreadMutexLock(&pVnode->mutex);
|
taosThreadMutexLock(&pVnode->mutex);
|
||||||
|
|
||||||
|
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
|
||||||
|
if (pPool->node.size != size) {
|
||||||
|
SVBufPool *pPoolT = NULL;
|
||||||
|
if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) {
|
||||||
|
vWarn("vgId:%d try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode),
|
||||||
|
pPool->node.size, size, tstrerror(errno));
|
||||||
|
} else {
|
||||||
|
vnodeBufPoolDestroy(pPool);
|
||||||
|
pPool = pPoolT;
|
||||||
|
vDebug("vgId:%d change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pPool->next = pVnode->pPool;
|
pPool->next = pVnode->pPool;
|
||||||
pVnode->pPool = pPool;
|
pVnode->pPool = pPool;
|
||||||
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
||||||
|
|
|
@ -96,7 +96,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
|
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
|
||||||
|
|
||||||
// open buffer pool
|
// open buffer pool
|
||||||
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
|
if (vnodeOpenBufPool(pVnode) < 0) {
|
||||||
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1040,6 +1040,14 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
||||||
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.szBuf != alterReq.buffer * 1024LL * 1024LL) {
|
||||||
|
pVnode->config.szBuf = alterReq.buffer * 1024LL * 1024LL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->config.szCache != alterReq.pages) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
if (pVnode->config.cacheLast != alterReq.cacheLast) {
|
if (pVnode->config.cacheLast != alterReq.cacheLast) {
|
||||||
pVnode->config.cacheLast = alterReq.cacheLast;
|
pVnode->config.cacheLast = alterReq.cacheLast;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue