Merge branch '3.0' into 3.0_refact

This commit is contained in:
Hongze Cheng 2021-10-14 16:05:37 +08:00
commit 35f0be5a63
8 changed files with 115 additions and 18 deletions

2
.gitignore vendored
View File

@ -1,4 +1,6 @@
build/ build/
compile_commands.json
.cache
.ycm_extra_conf.py .ycm_extra_conf.py
.vscode/ .vscode/
.idea/ .idea/

View File

@ -58,18 +58,25 @@ void walStop(twalh);
void walClose(twalh); void walClose(twalh);
//write //write
int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
void walFsync(twalh, bool forceHint); int64_t walWrite(twalh, void* body, int32_t bodyLen);
//int32_t walCommit(twalh, int64_t ver); int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize);
//int32_t walRollback(twalh, int64_t ver);
//apis for lifecycle management
void walFsync(twalh, bool force);
int32_t walCommit(twalh, int64_t ver);
//truncate after
int32_t walRollback(twalh, int64_t ver);
//notify that previous log can be pruned safely
int32_t walPrune(twalh, int64_t ver);
//read //read
int32_t walRead(twalh, SWalHead **, int64_t ver); int32_t walRead(twalh, SWalHead **, int64_t ver);
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
//life cycle //lifecycle check
int32_t walDataPersisted(twalh, int64_t ver);
int32_t walFirstVer(twalh); int32_t walFirstVer(twalh);
int32_t walPersistedVer(twalh);
int32_t walLastVer(twalh); int32_t walLastVer(twalh);
//int32_t walDataCorrupted(twalh); //int32_t walDataCorrupted(twalh);

View File

@ -22,8 +22,26 @@
extern "C" { extern "C" {
#endif #endif
typedef struct STQ STQ; typedef struct tqTopicVhandle {
//name
//
//executor for filter
//
//callback for mnode
//
} tqTopic;
typedef struct STQ {
//the set for topics
//key=topicName: str
//value=tqTopicVhandle
//a map
//key=<topic: str, cgId: int64_t>
//value=consumeOffset: int64_t
} STQ;
//init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
void tqCleanUp(STQ*); void tqCleanUp(STQ*);

View File

@ -16,10 +16,11 @@
#ifndef _TD_VNODE_MAIN_H_ #ifndef _TD_VNODE_MAIN_H_
#define _TD_VNODE_MAIN_H_ #define _TD_VNODE_MAIN_H_
#include "vnodeInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "vnodeInt.h"
int32_t vnodeInitMain(); int32_t vnodeInitMain();
void vnodeCleanupMain(); void vnodeCleanupMain();

View File

@ -3,10 +3,11 @@ add_library(tq ${TQ_SRC})
target_include_directories( target_include_directories(
tq tq
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_SOURCE_DIR}/include/os"
) )
target_link_libraries( target_link_libraries(
os wal
) )

View File

@ -18,23 +18,38 @@
#include "tq.h" #include "tq.h"
#define TQ_BUFFER_SIZE 8
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
//implement the array index typedef struct tqBufferItem {
//implement the ring buffer int64_t offset;
void *content;
} tqBufferItem;
typedef struct tqGroupHandle {
char* topic;
void* ahandle;
int64_t cgId;
int64_t consumeOffset;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqGroupHandle;
//create persistent storage for meta info such as consuming offset //create persistent storage for meta info such as consuming offset
//return value > 0: cgId //return value > 0: cgId
//return value <= 0: error code //return value <= 0: error code
int tqCreateGroup(STQ*); int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
//create ring buffer in memory and load consuming offset //create ring buffer in memory and load consuming offset
int tqOpenGroup(STQ*, int cgId); int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//destroy ring buffer and persist consuming offset //destroy ring buffer and persist consuming offset
int tqCloseGroup(STQ*, int cgId); int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//delete persistent storage for meta info //delete persistent storage for meta info
int tqDropGroup(STQ*, int cgId); int tqDropTCGroup(STQ*, const char* topic, int cgId);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,12 +22,65 @@
// //
//handle management message //handle management message
static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
//look in memory
//
//not found, try to restore from disk
//
//still not found
return NULL;
}
static int tqCommitTCGroup(tqGroupHandle* handle) {
//persist into disk
return 0;
}
int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) {
return 0;
}
int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
int code;
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
if(handle == NULL) {
code = tqCreateTCGroup(pTq, topic, cgId, &handle);
if(code != 0) {
return code;
}
}
ASSERT(handle != NULL);
//put into STQ
return 0;
}
int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
return tqCommitTCGroup(handle);
}
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
//delete from disk
return 0;
}
int tqPushMsg(STQ* pTq , void* p, int64_t version) { int tqPushMsg(STQ* pTq , void* p, int64_t version) {
//add reference //add reference
// //judge and launch new query
return 0; return 0;
} }
int tqCommit(STQ* pTq) { int tqCommit(STQ* pTq) {
//do nothing
return 0;
}
int tqHandleMsg(STQ* pTq, void*msg) {
//parse msg and extract topic and cgId
//lookup handle
//confirm message and send to consumer
//judge and launch new query
return 0; return 0;
} }