try integrating allocator

This commit is contained in:
Liu Jicong 2022-01-20 15:18:33 +08:00
parent fa73f1d497
commit 3203e8fe42
3 changed files with 8 additions and 25 deletions

View File

@ -198,17 +198,6 @@ typedef struct STqQueryMsg {
struct STqQueryMsg* next;
} STqQueryMsg;
typedef struct STqLogHandle {
void* logHandle;
void* (*openLogReader)(void* logHandle);
void (*closeLogReader)(void* logReader);
int32_t (*logRead)(void* logReader, void** data, int64_t ver);
int64_t (*logGetFirstVer)(void* logHandle);
int64_t (*logGetSnapshotVer)(void* logHandle);
int64_t (*logGetLastVer)(void* logHandle);
} STqLogHandle;
typedef struct STqCfg {
// TODO
} STqCfg;
@ -306,7 +295,6 @@ typedef struct STQ {
// the handle of meta kvstore
char* path;
STqCfg* tqConfig;
STqLogHandle* tqLogHandle;
STqMemRef tqMemRef;
STqMetaStore* tqMeta;
} STQ;
@ -323,7 +311,7 @@ int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac);
STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*);
// void* will be replace by a msg type

View File

@ -50,7 +50,7 @@ void tqCleanUp() {
taosTmrCleanUp(tqMgmt.timer);
}
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) {
STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
@ -58,20 +58,15 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemA
}
pTq->path = strdup(path);
pTq->tqConfig = tqConfig;
pTq->tqLogHandle = tqLogHandle;
#if 0
pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) {
// TODO: error code of buffer pool
}
#endif
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
if (pTq->tqMeta == NULL) {
free(pTq);
#if 0
allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
free(pTq);
return NULL;
}
@ -421,10 +416,10 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
// read from wal
void* raw = NULL;
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
if (code < 0) {
/*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
/*if (code < 0) {*/
// TODO: error
}
/*}*/
// get msgType
// if submitblk
pItem->executor->assign(pItem->executor->runtimeEnv, raw);

View File

@ -119,7 +119,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// TODO: Open TQ
sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode));
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) {
// TODO: handle error
return -1;