244 lines
6.8 KiB
C
244 lines
6.8 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "vnd.h"
|
|
|
|
/* ------------------------ STRUCTURES ------------------------ */
|
|
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
|
|
SVBufPool *pPool;
|
|
|
|
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
|
|
if (pPool == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return -1;
|
|
}
|
|
|
|
if (VND_IS_RSMA(pVnode)) {
|
|
pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
|
|
if (!pPool->lock) {
|
|
taosMemoryFree(pPool);
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return -1;
|
|
}
|
|
if (taosThreadSpinInit(pPool->lock, 0) != 0) {
|
|
taosMemoryFree((void *)pPool->lock);
|
|
taosMemoryFree(pPool);
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
} else {
|
|
pPool->lock = NULL;
|
|
}
|
|
|
|
pPool->freeNext = NULL;
|
|
pPool->pVnode = pVnode;
|
|
pPool->nRef = 0;
|
|
pPool->size = 0;
|
|
pPool->ptr = pPool->node.data;
|
|
pPool->pTail = &pPool->node;
|
|
pPool->node.prev = NULL;
|
|
pPool->node.pnext = &pPool->pTail;
|
|
pPool->node.size = size;
|
|
|
|
*ppPool = pPool;
|
|
return 0;
|
|
}
|
|
|
|
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
|
vnodeBufPoolReset(pPool);
|
|
if (pPool->lock) {
|
|
taosThreadSpinDestroy(pPool->lock);
|
|
taosMemoryFree((void *)pPool->lock);
|
|
}
|
|
taosMemoryFree(pPool);
|
|
return 0;
|
|
}
|
|
|
|
int vnodeOpenBufPool(SVnode *pVnode) {
|
|
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
|
|
|
|
for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
|
|
// create pool
|
|
if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) {
|
|
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
vnodeCloseBufPool(pVnode);
|
|
return -1;
|
|
}
|
|
|
|
// add to free list
|
|
pVnode->aBufPool[i]->freeNext = pVnode->freeList;
|
|
pVnode->freeList = pVnode->aBufPool[i];
|
|
}
|
|
|
|
vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
|
|
return 0;
|
|
}
|
|
|
|
int vnodeCloseBufPool(SVnode *pVnode) {
|
|
for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
|
|
if (pVnode->aBufPool[i]) {
|
|
vnodeBufPoolDestroy(pVnode->aBufPool[i]);
|
|
pVnode->aBufPool[i] = NULL;
|
|
}
|
|
}
|
|
|
|
pVnode->freeList = NULL;
|
|
ASSERT(pVnode->inUse == NULL);
|
|
ASSERT(pVnode->recycling == NULL);
|
|
|
|
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
|
|
return 0;
|
|
}
|
|
|
|
void vnodeBufPoolReset(SVBufPool *pPool) {
|
|
for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
|
|
ASSERT(pNode->pnext == &pPool->pTail);
|
|
pNode->prev->pnext = &pPool->pTail;
|
|
pPool->pTail = pNode->prev;
|
|
pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
|
|
taosMemoryFree(pNode);
|
|
}
|
|
|
|
ASSERT(pPool->size == pPool->ptr - pPool->node.data);
|
|
|
|
pPool->size = 0;
|
|
pPool->ptr = pPool->node.data;
|
|
}
|
|
|
|
void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
|
|
SVBufPoolNode *pNode;
|
|
void *p = NULL;
|
|
uint8_t *ptr = NULL;
|
|
int paddingLen = 0;
|
|
ASSERT(pPool != NULL);
|
|
|
|
if (pPool->lock) taosThreadSpinLock(pPool->lock);
|
|
|
|
ptr = pPool->ptr;
|
|
paddingLen = (((long)ptr + 7) & ~7) - (long)ptr;
|
|
|
|
if (pPool->node.size >= pPool->ptr - pPool->node.data + size + paddingLen) {
|
|
// allocate from the anchor node
|
|
p = pPool->ptr + paddingLen;
|
|
size += paddingLen;
|
|
pPool->ptr = pPool->ptr + size;
|
|
pPool->size += size;
|
|
} else {
|
|
// allocate a new node
|
|
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
|
if (pNode == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
|
return NULL;
|
|
}
|
|
|
|
p = pNode->data;
|
|
pNode->size = size;
|
|
pNode->prev = pPool->pTail;
|
|
pNode->pnext = &pPool->pTail;
|
|
pPool->pTail->pnext = &pNode->prev;
|
|
pPool->pTail = pNode;
|
|
|
|
pPool->size = pPool->size + sizeof(*pNode) + size;
|
|
}
|
|
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
|
return p;
|
|
}
|
|
|
|
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
|
SVBufPoolNode *pNode;
|
|
void *p = NULL;
|
|
ASSERT(pPool != NULL);
|
|
|
|
if (pPool->lock) taosThreadSpinLock(pPool->lock);
|
|
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
|
|
// allocate from the anchor node
|
|
p = pPool->ptr;
|
|
pPool->ptr = pPool->ptr + size;
|
|
pPool->size += size;
|
|
} else {
|
|
// allocate a new node
|
|
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
|
if (pNode == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
|
return NULL;
|
|
}
|
|
|
|
p = pNode->data;
|
|
pNode->size = size;
|
|
pNode->prev = pPool->pTail;
|
|
pNode->pnext = &pPool->pTail;
|
|
pPool->pTail->pnext = &pNode->prev;
|
|
pPool->pTail = pNode;
|
|
|
|
pPool->size = pPool->size + sizeof(*pNode) + size;
|
|
}
|
|
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
|
return p;
|
|
}
|
|
|
|
void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
|
|
// uint8_t *ptr = (uint8_t *)p;
|
|
// SVBufPoolNode *pNode;
|
|
|
|
// if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) {
|
|
// pNode = &((SVBufPoolNode *)p)[-1];
|
|
// *pNode->pnext = pNode->prev;
|
|
// pNode->prev->pnext = pNode->pnext;
|
|
|
|
// pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
|
|
// taosMemoryFree(pNode);
|
|
// }
|
|
}
|
|
|
|
void vnodeBufPoolRef(SVBufPool *pPool) {
|
|
int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
|
|
ASSERT(nRef > 0);
|
|
}
|
|
|
|
void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
|
if (pPool == NULL) {
|
|
return;
|
|
}
|
|
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
|
|
if (nRef == 0) {
|
|
SVnode *pVnode = pPool->pVnode;
|
|
|
|
vnodeBufPoolReset(pPool);
|
|
|
|
taosThreadMutexLock(&pVnode->mutex);
|
|
|
|
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
|
|
if (pPool->node.size != size) {
|
|
SVBufPool *pPoolT = NULL;
|
|
if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) {
|
|
vWarn("vgId:%d, try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode),
|
|
pPool->node.size, size, tstrerror(errno));
|
|
} else {
|
|
vnodeBufPoolDestroy(pPool);
|
|
pPool = pPoolT;
|
|
vDebug("vgId:%d, change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size);
|
|
}
|
|
}
|
|
|
|
pPool->freeNext = pVnode->freeList;
|
|
pVnode->freeList = pPool;
|
|
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
|
|
|
taosThreadMutexUnlock(&pVnode->mutex);
|
|
}
|
|
}
|