diff --git a/.gitignore b/.gitignore index 5141448ee0..0b98a1b161 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ build/ +compile_commands.json +.cache .ycm_extra_conf.py .vscode/ .idea/ @@ -96,4 +98,4 @@ tramp TAGS deps/* -!deps/CMakeLists.txt \ No newline at end of file +!deps/CMakeLists.txt diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 37cd263783..b6fd5a70d9 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -58,18 +58,25 @@ void walStop(twalh); void walClose(twalh); //write -int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); -void walFsync(twalh, bool forceHint); -//int32_t walCommit(twalh, int64_t ver); -//int32_t walRollback(twalh, int64_t ver); +int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); +int64_t walWrite(twalh, void* body, int32_t bodyLen); +int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize); + +//apis for lifecycle management +void walFsync(twalh, bool force); +int32_t walCommit(twalh, int64_t ver); +//truncate after +int32_t walRollback(twalh, int64_t ver); +//notify that previous log can be pruned safely +int32_t walPrune(twalh, int64_t ver); //read int32_t walRead(twalh, SWalHead **, int64_t ver); int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); -//life cycle -int32_t walDataPersisted(twalh, int64_t ver); +//lifecycle check int32_t walFirstVer(twalh); +int32_t walPersistedVer(twalh); int32_t walLastVer(twalh); //int32_t walDataCorrupted(twalh); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index dd355c8381..eb9c57c581 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,8 +22,26 @@ extern "C" { #endif -typedef struct STQ STQ; +typedef struct tqTopicVhandle { + //name + // + //executor for filter + // + //callback for mnode + // +} tqTopic; +typedef struct STQ { + //the set for topics + //key=topicName: str + //value=tqTopicVhandle + + //a map + //key= + //value=consumeOffset: int64_t +} STQ; + +//init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); diff --git a/source/server/vnode/inc/vnodeMain.h b/source/server/vnode/inc/vnodeMain.h index 093d07b013..0b41812215 100644 --- a/source/server/vnode/inc/vnodeMain.h +++ b/source/server/vnode/inc/vnodeMain.h @@ -16,10 +16,11 @@ #ifndef _TD_VNODE_MAIN_H_ #define _TD_VNODE_MAIN_H_ +#include "vnodeInt.h" + #ifdef __cplusplus extern "C" { #endif -#include "vnodeInt.h" int32_t vnodeInitMain(); void vnodeCleanupMain(); diff --git a/source/server/vnode/inc/vnodeWrite.h b/source/server/vnode/inc/vnodeWrite.h index 48acf750c1..0bb670de5b 100644 --- a/source/server/vnode/inc/vnodeWrite.h +++ b/source/server/vnode/inc/vnodeWrite.h @@ -37,4 +37,4 @@ void vnodeWaitWriteCompleted(SVnode *pVnode); } #endif -#endif /*_TD_VNODE_WRITE_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_WRITE_H_*/ diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index 9577007400..0c15e23d33 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,10 +3,11 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_SOURCE_DIR}/include/os" ) target_link_libraries( - os + wal ) diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index a51f0b03af..c42bcfef43 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,23 +18,38 @@ #include "tq.h" +#define TQ_BUFFER_SIZE 8 + #ifdef __cplusplus extern "C" { #endif -//implement the array index -//implement the ring buffer +typedef struct tqBufferItem { + int64_t offset; + void *content; +} tqBufferItem; + + +typedef struct tqGroupHandle { + char* topic; + void* ahandle; + int64_t cgId; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code -int tqCreateGroup(STQ*); +int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ*, int cgId); +int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ*, int cgId); +int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info -int tqDropGroup(STQ*, int cgId); +int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 3255f3fb3a..2ef2a4b6ea 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,12 +22,65 @@ // //handle management message +static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { + //look in memory + // + //not found, try to restore from disk + // + //still not found + return NULL; +} + +static int tqCommitTCGroup(tqGroupHandle* handle) { + //persist into disk + return 0; +} + +int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) { + return 0; +} + +int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { + int code; + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + if(handle == NULL) { + code = tqCreateTCGroup(pTq, topic, cgId, &handle); + if(code != 0) { + return code; + } + } + ASSERT(handle != NULL); + + //put into STQ + + return 0; +} + +int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + return tqCommitTCGroup(handle); +} + +int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { + //delete from disk + return 0; +} + int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference - // + //judge and launch new query return 0; } int tqCommit(STQ* pTq) { + //do nothing + return 0; +} + +int tqHandleMsg(STQ* pTq, void*msg) { + //parse msg and extract topic and cgId + //lookup handle + //confirm message and send to consumer + //judge and launch new query return 0; }