This commit is contained in:
Hongze Cheng 2021-11-12 10:53:52 +08:00
parent 7277784852
commit 15d9a46600
12 changed files with 274 additions and 89 deletions

View File

@ -28,24 +28,24 @@ typedef struct SMetaOptions SMetaOptions;
typedef struct STbOptions STbOptions; typedef struct STbOptions STbOptions;
// SMeta operations // SMeta operations
SMeta *metaOpen(const char *path, const SMetaOptions *); SMeta *metaOpen(const char *path, const SMetaOptions *pOptions);
void metaClose(SMeta *); void metaClose(SMeta *pMeta);
void metaRemove(const char *path); void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, const STbOptions *); int metaCreateTable(SMeta *pMeta, const STbOptions *pTbOptions);
int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *); int metaCommit(SMeta *pMeta);
// Options // Options
void metaOptionsInit(SMetaOptions *); void metaOptionsInit(SMetaOptions *pOptions);
void metaOptionsClear(SMetaOptions *); void metaOptionsClear(SMetaOptions *pOptions);
// STableOpts // STableOpts
#define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0} #define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0}
void metaNormalTableOptsInit(STbOptions *, const char *name, const STSchema *pSchema); void metaNormalTableOptsInit(STbOptions *pTbOptions, const char *name, const STSchema *pSchema);
void metaSuperTableOptsInit(STbOptions *, const char *name, tb_uid_t uid, const STSchema *pSchema, void metaSuperTableOptsInit(STbOptions *pTbOptions, const char *name, tb_uid_t uid, const STSchema *pSchema,
const STSchema *pTagSchema); const STSchema *pTagSchema);
void metaChildTableOptsInit(STbOptions *, const char *name, tb_uid_t suid, const SKVRow tags); void metaChildTableOptsInit(STbOptions *pTbOptions, const char *name, tb_uid_t suid, const SKVRow tags);
void metaTableOptsClear(STbOptions *); void metaTableOptsClear(STbOptions *pTbOptions);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -70,6 +70,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
* *
* @param pVnode The vnode object. * @param pVnode The vnode object.
* @param pMsg The request message * @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure * @return int 0 for success, -1 for failure
*/ */
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);

View File

@ -25,20 +25,14 @@ extern "C" {
typedef struct SMemAllocator SMemAllocator; typedef struct SMemAllocator SMemAllocator;
#define MALLOCATOR_APIS \ #define MALLOCATOR_APIS \
void *impl; \
void *(*malloc)(SMemAllocator *, size_t size); \ void *(*malloc)(SMemAllocator *, size_t size); \
void *(*calloc)(SMemAllocator *, size_t nmemb, size_t size); \ void *(*calloc)(SMemAllocator *, size_t nmemb, size_t size); \
void *(*realloc)(SMemAllocator *, void *ptr, size_t size); \ void *(*realloc)(SMemAllocator *, void *ptr, size_t size); \
void (*free)(SMemAllocator *, void *ptr); \ void (*free)(SMemAllocator *, void *ptr); \
size_t (*usage)(SMemAllocator *); size_t (*usage)(SMemAllocator *);
// Interfaces to implement
typedef struct {
MALLOCATOR_APIS
} SMemAllocatorIf;
struct SMemAllocator { struct SMemAllocator {
void * impl;
size_t usize;
MALLOCATOR_APIS MALLOCATOR_APIS
}; };

View File

@ -46,9 +46,10 @@ typedef struct {
#define isListEmpty(l) ((l)->numOfEles == 0) #define isListEmpty(l) ((l)->numOfEles == 0)
#define listNodeFree(n) free(n) #define listNodeFree(n) free(n)
void tdListInit(SList *list, int eleSize);
void tdListEmpty(SList *list);
SList * tdListNew(int eleSize); SList * tdListNew(int eleSize);
void * tdListFree(SList *list); void * tdListFree(SList *list);
void tdListEmpty(SList *list);
void tdListPrependNode(SList *list, SListNode *node); void tdListPrependNode(SList *list, SListNode *node);
void tdListAppendNode(SList *list, SListNode *node); void tdListAppendNode(SList *list, SListNode *node);
int tdListPrepend(SList *list, void *data); int tdListPrepend(SList *list, void *data);

View File

@ -13,27 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_VNODE_ALLOCATOR_POOL_H_ #ifndef _TD_VNODE_BUFFER_POOL_H_
#define _TD_VNODE_ALLOCATOR_POOL_H_ #define _TD_VNODE_BUFFER_POOL_H_
#include "tlist.h"
#include "vnode.h" #include "vnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct { typedef struct SVBufPool SVBufPool;
int nexta;
int enda;
SMemAllocator *free[3];
SMemAllocator *used[3];
} SVAllocatorPool;
int vnodeOpenAllocatorPool(SVnode *pVnode); int vnodeOpenBufPool(SVnode *pVnode);
void vnodeCloseAllocatorPool(SVnode *pVnode); void vnodeCloseBufPool(SVnode *pVnode);
SMemAllocator *vnodeCreateMemAllocator(SVnode *pVnode);
void vnodeDestroyMemAllocator(SMemAllocator *pma);
void vnodeRefMemAllocator(SMemAllocator *pma);
void vnodeUnrefMemAllocator(SMemAllocator *pma);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_VNODE_ALLOCATOR_POOL_H_*/ #endif /*_TD_VNODE_BUFFER_POOL_H_*/

View File

@ -22,7 +22,7 @@
#include "wal.h" #include "wal.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeAllocatorPool.h" #include "vnodeBufferPool.h"
#include "vnodeCommit.h" #include "vnodeCommit.h"
#include "vnodeFileSystem.h" #include "vnodeFileSystem.h"
#include "vnodeOptions.h" #include "vnodeOptions.h"
@ -34,17 +34,16 @@ extern "C" {
#endif #endif
struct SVnode { struct SVnode {
char* path; char* path;
SVnodeOptions options; SVnodeOptions options;
SVState state; SVState state;
SVAllocatorPool* pool; SVBufPool* pBufPool;
SMemAllocator* inuse; SMeta* pMeta;
SMeta* pMeta; STsdb* pTsdb;
STsdb* pTsdb; STQ* pTq;
STQ* pTq; SWal* pWal;
SWal* pWal; SVnodeSync* pSync;
SVnodeSync* pSync; SVnodeFS* pFs;
SVnodeFS* pFs;
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -20,6 +20,20 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SVnodeReq SVnodeReq;
typedef struct SVnodeRsp SVnodeRsp;
typedef enum {
} EVReqT;
struct SVnodeReq {
/* TODO */
};
struct SVnodeRsp {
/* TODO */
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1,37 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
int vnodeOpenAllocatorPool(SVnode *pVnode) {
// TODO
return 0;
}
void vnodeCloseAllocatorPool(SVnode *pVnode) {
if (pVnode->pool) {
}
}
/* ------------------------ STATIC METHODS ------------------------ */
static SVAllocatorPool *vapCreate() {
SVAllocatorPool *pPool = NULL;
/* TODO */
return pPool;
}
static void vapDestroy() {
// TODO
}

View File

@ -0,0 +1,211 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
/* ------------------------ STRUCTURES ------------------------ */
struct SVBufPool {
SList free;
SList incycle;
SListNode *inuse;
};
typedef enum { E_V_HEAP_ALLOCATOR = 0, E_V_ARENA_ALLOCATOR } EVMemAllocatorT;
typedef struct {
} SVHeapAllocator;
typedef struct SVArenaNode {
struct SVArenaNode *prev;
uint64_t size;
void * ptr;
char data[];
} SVArenaNode;
typedef struct {
SVArenaNode *inuse;
SVArenaNode node;
} SVArenaAllocator;
typedef struct {
uint64_t capacity;
EVMemAllocatorT type;
T_REF_DECLARE()
union {
SVHeapAllocator vha;
SVArenaAllocator vaa;
};
} SVMemAllocator;
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode);
static int vArenaAllocatorInit(SVArenaAllocator *pvaa);
static void vArenaAllocatorClear(SVArenaAllocator *pvaa);
static int vHeapAllocatorInit(SVHeapAllocator *pvha);
static void vHeapAllocatorClear(SVHeapAllocator *pvha);
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
EVMemAllocatorT type = E_V_ARENA_ALLOCATOR;
if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
/* TODO */
return -1;
}
tdListInit(&(pVnode->pBufPool->free), 0);
tdListInit(&(pVnode->pBufPool->incycle), 0);
capacity = pVnode->options.wsize / 3;
if (pVnode->options.isHeapAllocator) {
type = E_V_HEAP_ALLOCATOR;
}
for (int i = 0; i < 3; i++) {
SListNode *pNode = vBufPoolNewNode(capacity, type);
if (pNode == NULL) {
vnodeCloseBufPool(pVnode);
return -1;
}
tdListAppendNode(&(pVnode->pBufPool->free), pNode);
}
pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free));
return 0;
}
void vnodeCloseBufPool(SVnode *pVnode) {
SListNode *pNode;
if (pVnode->pBufPool) {
// Clear free list
while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) != NULL) {
vBufPoolFreeNode(pNode);
}
// Clear incycle list
while ((pNode = tdListPopHead(&(pVnode->pBufPool->incycle))) != NULL) {
vBufPoolFreeNode(pNode);
}
// Free inuse node
vBufPoolFreeNode(pVnode->pBufPool->inuse);
free(pVnode->pBufPool);
pVnode->pBufPool = NULL;
}
}
SMemAllocator *vnodeCreateMemAllocator(SVnode *pVnode) {
SMemAllocator *pma;
pma = (SMemAllocator *)calloc(1, sizeof(*pma));
if (pma == NULL) {
/* TODO */
return NULL;
}
pma->impl = pVnode;
if (pVnode->options.isHeapAllocator) {
/* TODO */
pma->malloc = NULL;
pma->calloc = NULL;
pma->realloc = NULL;
pma->free = NULL;
pma->usage = NULL;
} else {
/* TODO */
pma->malloc = NULL;
pma->calloc = NULL;
pma->realloc = NULL;
pma->free = NULL;
pma->usage = NULL;
}
return pma;
}
void vnodeDestroyMemAllocator(SMemAllocator *pma) { tfree(pma); }
void vnodeRefMemAllocator(SMemAllocator *pma) {
SVnode * pVnode = (SVnode *)pma->impl;
SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
T_REF_INC(pvma);
}
void vnodeUnrefMemAllocator(SMemAllocator *pma) {
SVnode * pVnode = (SVnode *)pma->impl;
SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
if (T_REF_DEC(pvma) == 0) {
/* TODO */
}
}
/* ------------------------ STATIC METHODS ------------------------ */
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) {
SListNode * pNode;
SVMemAllocator *pvma;
pNode = (SListNode *)calloc(1, sizeof(*pNode) + sizeof(SVMemAllocator));
if (pNode == NULL) {
return NULL;
}
pvma = (SVMemAllocator *)(pNode->data);
pvma->capacity = capacity;
pvma->type = type;
switch (type) {
case E_V_HEAP_ALLOCATOR:
vHeapAllocatorInit(&(pvma->vha));
break;
case E_V_ARENA_ALLOCATOR:
vArenaAllocatorInit(&(pvma->vaa));
break;
default:
ASSERT(0);
}
return pNode;
}
static void vBufPoolFreeNode(SListNode *pNode) {
if (pNode) {
free(pNode);
}
}
// --------------- For arena allocator
static int vArenaAllocatorInit(SVArenaAllocator *pvaa) {
// TODO
return 0;
}
static void vArenaAllocatorClear(SVArenaAllocator *pvaa) {
// TODO
}
// --------------- For heap allocator
static int vHeapAllocatorInit(SVHeapAllocator *pvha) {
// TODO
return 0;
}
static void vHeapAllocatorClear(SVHeapAllocator *pvha) {
// TODO
}

View File

@ -87,12 +87,6 @@ static void vnodeFree(SVnode *pVnode) {
static int vnodeOpenImpl(SVnode *pVnode) { static int vnodeOpenImpl(SVnode *pVnode) {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
// Open allocator pool
if (vnodeOpenAllocatorPool(pVnode) < 0) {
// TODO: handle error
return -1;
}
// Open meta // Open meta
sprintf(dir, "%s/meta", pVnode->path); sprintf(dir, "%s/meta", pVnode->path);
pVnode->pMeta = metaOpen(dir, &(pVnode->options.metaOptions)); pVnode->pMeta = metaOpen(dir, &(pVnode->options.metaOptions));
@ -117,7 +111,6 @@ static int vnodeOpenImpl(SVnode *pVnode) {
static void vnodeCloseImpl(SVnode *pVnode) { static void vnodeCloseImpl(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeCloseAllocatorPool(pVnode);
// TODO: Close TQ // TODO: Close TQ
tsdbClose(pVnode->pTsdb); tsdbClose(pVnode->pTsdb);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);

View File

@ -21,6 +21,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
#if 0
int reqType; /* TODO */ int reqType; /* TODO */
size_t reqSize; /* TODO */ size_t reqSize; /* TODO */
uint64_t reqVersion = 0; /* TODO */ uint64_t reqVersion = 0; /* TODO */
@ -45,7 +46,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TSDB_MSG_TYPE_DROP_TABLE: case TSDB_MSG_TYPE_DROP_TABLE:
code = metaDropTable(pVnode->pMeta, 0 /* TODO */); code = metaDropTable(pVnode->pMeta, 0 /* TODO */);
break; break;
case TSDB_MSG_TYPE_SUBMIT:
/* TODO */ /* TODO */
break;
default: default:
break; break;
} }
@ -57,6 +60,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
return code; return code;
#endif
return 0;
} }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */

View File

@ -13,16 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "tlist.h" #include "tlist.h"
#include "os.h"
void tdListInit(SList *list, int eleSize) {
list->eleSize = eleSize;
list->numOfEles = 0;
list->head = list->tail = NULL;
}
SList *tdListNew(int eleSize) { SList *tdListNew(int eleSize) {
SList *list = (SList *)malloc(sizeof(SList)); SList *list = (SList *)malloc(sizeof(SList));
if (list == NULL) return NULL; if (list == NULL) return NULL;
list->eleSize = eleSize; tdListInit(list, eleSize);
list->numOfEles = 0;
list->head = list->tail = NULL;
return list; return list;
} }