Merge pull request #9195 from taosdata/feature/vnode

more
This commit is contained in:
Hongze Cheng 2021-12-19 11:26:54 +08:00 committed by GitHub
commit 67ed87e2fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 11 deletions

View File

@ -24,26 +24,26 @@ extern "C" {
// TYPES EXPOSED // TYPES EXPOSED
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STsdbCfg STsdbCfg;
typedef struct STsdbCfg {
uint64_t lruCacheSize;
uint32_t keep0;
uint32_t keep1;
uint32_t keep2;
} STsdbCfg;
// STsdb // STsdb
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
void tsdbRemove(const char *path); void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
// STsdbCfg // STsdbCfg
int tsdbOptionsInit(STsdbCfg *); int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *);
/* ------------------------ STRUCT DEFINITIONS ------------------------ */
struct STsdbCfg {
uint64_t lruCacheSize;
uint32_t keep0;
uint32_t keep1;
uint32_t keep2;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -39,6 +39,9 @@ typedef struct SMemAllocator {
TD_MEM_ALCT(SMemAllocator); TD_MEM_ALCT(SMemAllocator);
} SMemAllocator; } SMemAllocator;
#define tMalloc(pMA, SIZE) TD_MA_MALLOC(PMA, SIZE)
#define tFree(pMA, PTR) TD_MA_FREE(PMA, PTR)
typedef struct SMemAllocatorFactory { typedef struct SMemAllocatorFactory {
void *impl; void *impl;
SMemAllocator *(*create)(struct SMemAllocatorFactory *); SMemAllocator *(*create)(struct SMemAllocatorFactory *);

View File

@ -25,6 +25,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
pTask->execute = vnodeCommit; // TODO pTask->execute = vnodeCommit; // TODO
pTask->arg = pVnode; // TODO pTask->arg = pVnode; // TODO
tsdbPrepareCommit(pVnode->pTsdb);
// metaPrepareCommit(pVnode->pMeta);
// walPreapareCommit(pVnode->pWal);
vnodeScheduleTask(pTask); vnodeScheduleTask(pTask);
return 0; return 0;
} }
@ -32,6 +36,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int vnodeCommit(void *arg) { int vnodeCommit(void *arg) {
SVnode *pVnode = (SVnode *)arg; SVnode *pVnode = (SVnode *)arg;
metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTq);
vnodeBufPoolRecycle(pVnode); vnodeBufPoolRecycle(pVnode);
// TODO // TODO
return 0; return 0;

View File

@ -50,7 +50,7 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader; pTq->tqLogReader = tqLogReader;
pTq->tqMemRef.pAlloctorFactory = allocFac; pTq->tqMemRef.pAlloctorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac); // pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) { if (pTq->tqMemRef.pAllocator == NULL) {
// TODO // TODO
} }

View File

@ -15,7 +15,19 @@
#include "tsdbDef.h" #include "tsdbDef.h"
int tsdbPrepareCommit(STsdb *pTsdb) {
if (pTsdb->mem == NULL) return 0;
// tsem_wait(&(pTsdb->canCommit));
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
}
int tsdbCommit(STsdb *pTsdb) { int tsdbCommit(STsdb *pTsdb) {
// TODO // TODO
pTsdb->imem = NULL;
// tsem_post(&(pTsdb->canCommit));
return 0; return 0;
} }