From 9dbb925ad78232caff63caac43250976b7a26014 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Oct 2021 11:53:25 +0800 Subject: [PATCH 1/6] add serialization and deserialization for tq --- include/server/vnode/tq/tq.h | 24 +++--- source/server/vnode/tq/src/tq.c | 141 ++++++++++++++++++++------------ 2 files changed, 100 insertions(+), 65 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index c90991ae10..1e219e0983 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -135,7 +135,7 @@ typedef struct tqBufferHandle { } tqBufferHandle; typedef struct tqListHandle { - tqBufferHandle* bufHandle; + tqBufferHandle bufHandle; struct tqListHandle* next; } tqListHandle; @@ -176,22 +176,20 @@ int tqMoveOffsetToNext(tqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); int tqRegisterContext(tqGroupHandle*, void*); int tqLaunchQuery(tqGroupHandle*); -int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSendLaunchQuery(tqGroupHandle*); -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset); -int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset); -int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset); -int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset); +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes); +void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr); +void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr); +void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr); -int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle); -int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle); -int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle); -int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem); +const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle); +const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle); +const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem); int tqGetGHandleSSize(const tqGroupHandle *gHandle); -int tqListHandleSSize(const tqListHandle *listHandle); -int tqBufHandleSSize(const tqBufferHandle *bufHandle); -int tqBufItemSSize(const tqBufferItem *bufItem); +int tqBufHandleSSize(); +int tqBufItemSSize(); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 52702057d6..fc15a676d8 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -58,10 +58,10 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { int ackCnt = 0; tqQueryMsg *pQuery = NULL; while(i < ackNum && node->next) { - if(acks[i].topicId == node->next->bufHandle->topicId) { + if(acks[i].topicId == node->next->bufHandle.topicId) { ackCnt++; - tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery); - } else if(acks[i].topicId < node->next->bufHandle->topicId) { + tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery); + } else if(acks[i].topicId < node->next->bufHandle.topicId) { i++; } else { node = node->next; @@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { //until all topic iterated or msgs over sizeLimit while(node->next) { node = node->next; - tqBufferHandle* bufHandle = node->bufHandle; + tqBufferHandle* bufHandle = &node->bufHandle; int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; if(bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset @@ -140,11 +140,6 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { } } } - if(totSize == 0) { - //no msg - return -1; - } - return totSize; } @@ -157,7 +152,7 @@ int tqLaunchQuery(tqGroupHandle* ghandle) { return 0; } -int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) { +int tqSendLaunchQuery(tqGroupHandle* gHandle) { return 0; } @@ -196,7 +191,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; - if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) { + if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) { //fetch error return -1; } @@ -209,14 +204,9 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { return 0; } - -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) { +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) { //calculate size int sz = tqGetGHandleSSize(gHandle); - if(sz <= 0) { - //TODO: err - return -1; - } void* ptr = realloc(*ppBytes, sz); if(ptr == NULL) { free(ppBytes); @@ -224,29 +214,30 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offse return -1; } *ppBytes = ptr; - //do serialize + //do serialization *(int64_t*)ptr = gHandle->cId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = gHandle->cgId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int32_t*)ptr = gHandle->topicNum; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); if(gHandle->topicNum > 0) { - tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes); + tqSerializeListHandle(gHandle->head, ptr); } return 0; } -int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) { - void* ptr = POINTER_SHIFT(*ppBytes, offset); +void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) { tqListHandle *node = listHandle; - while(node->next) { + ASSERT(node != NULL); + while(node) { + ptr = tqSerializeBufHandle(&node->bufHandle, ptr); node = node->next; - offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset); } - return offset; + return ptr; } -int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) { - void *ptr = POINTER_SHIFT(*ppBytes, offset); + +void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) { *(int64_t*)ptr = bufHandle->nextConsumeOffset; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = bufHandle->topicId; @@ -256,41 +247,87 @@ int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offs *(int32_t*)ptr = bufHandle->tail; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); for(int i = 0; i < TQ_BUFFER_SIZE; i++) { - int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes); - ptr = POINTER_SHIFT(ptr, sz); + ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr); } - return ptr - *ppBytes; + return ptr; } -int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) { - void *ptr = POINTER_SHIFT(*ppBytes, offset); +void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr) { //TODO: do we need serialize this? - return 0; + //mainly for executor + return ptr; } -int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) { - return 0; -} -int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) { - return 0; -} -int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) { - return 0; -} -int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { - return 0; +const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) { + const void* ptr = pBytes; + gHandle->cId = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + gHandle->cgId = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + gHandle->ahandle = NULL; + gHandle->topicNum = *(int32_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + gHandle->head = NULL; + tqListHandle *node = gHandle->head; + for(int i = 0; i < gHandle->topicNum; i++) { + if(gHandle->head == NULL) { + if((node = malloc(sizeof(tqListHandle))) == NULL) { + //TODO: error + return NULL; + } + node->next= NULL; + ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); + gHandle->head = node; + } else { + node->next = malloc(sizeof(tqListHandle)); + if(node->next == NULL) { + //TODO: error + return NULL; + } + node->next->next = NULL; + ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle); + node = node->next; + } + } + return ptr; } +const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle) { + const void* ptr = pBytes; + bufHandle->nextConsumeOffset = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + bufHandle->topicId = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + bufHandle->head = *(int32_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + bufHandle->tail = *(int32_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + for(int i = 0; i < TQ_BUFFER_SIZE; i++) { + ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]); + } + return ptr; +} +const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem) { + return pBytes; +} + +//TODO: make this a macro int tqGetGHandleSSize(const tqGroupHandle *gHandle) { - return 0; -} -int tqListHandleSSize(const tqListHandle *listHandle) { - return 0; -} -int tqBufHandleSSize(const tqBufferHandle *bufHandle) { - return 0; -} -int tqBufItemSSize(const tqBufferItem *bufItem) { + return sizeof(int64_t) * 2 + + sizeof(int32_t) + + gHandle->topicNum * tqBufHandleSSize(); +} + +//TODO: make this a macro +int tqBufHandleSSize() { + return sizeof(int64_t) * 2 + + sizeof(int32_t) * 2 + + TQ_BUFFER_SIZE * tqBufItemSSize(); +} + +int tqBufItemSSize() { + //TODO: do this need serialization? + //mainly for executor return 0; } From 825239081f113fc5ad4bf55e88a43c8fe668bcd9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Oct 2021 14:48:12 +0800 Subject: [PATCH 2/6] refact: change struct name --- include/server/vnode/tq/tq.h | 150 ++++++++++++------------- source/server/vnode/src/vnodeReadMsg.c | 4 +- source/server/vnode/tq/src/tq.c | 68 +++++------ 3 files changed, 111 insertions(+), 111 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 1e219e0983..495383684a 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,89 +22,89 @@ extern "C" { #endif -typedef struct tmqMsgHead { +typedef struct TmqMsgHead { int32_t protoVer; int32_t msgType; int64_t cgId; int64_t clientId; -} tmqMsgHead; +} TmqMsgHead; -typedef struct tmqOneAck { +typedef struct TmqOneAck { int64_t topicId; int64_t consumeOffset; -} tmqOneAck; +} TmqOneAck; -typedef struct tmqAcks { +typedef struct TmqAcks { int32_t ackNum; //should be sorted - tmqOneAck acks[]; -} tmqAcks; + TmqOneAck acks[]; +} TmqAcks; //TODO: put msgs into common -typedef struct tmqConnectReq { - tmqMsgHead head; - tmqAcks acks; -} tmqConnectReq; +typedef struct TmqConnectReq { + TmqMsgHead head; + TmqAcks acks; +} TmqConnectReq; -typedef struct tmqConnectRsp { - tmqMsgHead head; +typedef struct TmqConnectRsp { + TmqMsgHead head; int8_t status; -} tmqConnectRsp; +} TmqConnectRsp; -typedef struct tmqDisconnectReq { - tmqMsgHead head; -} tmqDisconnectReq; +typedef struct TmqDisconnectReq { + TmqMsgHead head; +} TmqDiscconectReq; -typedef struct tmqDisconnectRsp { - tmqMsgHead head; +typedef struct TmqDisconnectRsp { + TmqMsgHead head; int8_t status; -} tmqDiconnectRsp; +} TmqDisconnectRsp; -typedef struct tmqConsumeReq { - tmqMsgHead head; - tmqAcks acks; -} tmqConsumeReq; +typedef struct TmqConsumeReq { + TmqMsgHead head; + TmqAcks acks; +} TmqConsumeReq; -typedef struct tmqMsgContent { +typedef struct TmqMsgContent { int64_t topicId; int64_t msgLen; char msg[]; -} tmqMsgContent; +} TmqMsgContent; -typedef struct tmqConsumeRsp { - tmqMsgHead head; +typedef struct TmqConsumeRsp { + TmqMsgHead head; int64_t bodySize; - tmqMsgContent msgs[]; -} tmqConsumeRsp; + TmqMsgContent msgs[]; +} TmqConsumeRsp; -typedef struct tmqMnodeSubscribeReq { - tmqMsgHead head; +typedef struct TmqSubscribeReq { + TmqMsgHead head; int64_t topicLen; char topic[]; -} tmqSubscribeReq; +} TmqSubscribeReq; -typedef struct tmqMnodeSubscribeRsp { - tmqMsgHead head; +typedef struct tmqSubscribeRsp { + TmqMsgHead head; int64_t vgId; char ep[]; //TSDB_EP_LEN -} tmqSubscribeRsp; +} TmqSubscribeRsp; -typedef struct tmqHeartbeatReq { +typedef struct TmqHeartbeatReq { -} tmqHeartbeatReq; +} TmqHeartbeatReq; -typedef struct tmqHeartbeatRsp { +typedef struct TmqHeartbeatRsp { -} tmqHeartbeatRsp; +} TmqHeartbeatRsp; -typedef struct tqTopicVhandle { +typedef struct TqTopicVhandle { //name // //executor for filter // //callback for mnode // -} tqTopicVhandle; +} TqTopicVhandle; typedef struct STQ { //the collection of group handle @@ -114,16 +114,16 @@ typedef struct STQ { #define TQ_BUFFER_SIZE 8 //TODO: define a serializer and deserializer -typedef struct tqBufferItem { +typedef struct TqBufferItem { int64_t offset; //executors are identical but not concurrent //so it must be a copy in each item void* executor; int64_t size; void* content; -} tqBufferItem; +} TqBufferItem; -typedef struct tqBufferHandle { +typedef struct TqBufferHandle { //char* topic; //c style, end with '\0' //int64_t cgId; //void* ahandle; @@ -131,32 +131,32 @@ typedef struct tqBufferHandle { int64_t topicId; int32_t head; int32_t tail; - tqBufferItem buffer[TQ_BUFFER_SIZE]; -} tqBufferHandle; + TqBufferItem buffer[TQ_BUFFER_SIZE]; +} TqBufferHandle; -typedef struct tqListHandle { - tqBufferHandle bufHandle; - struct tqListHandle* next; -} tqListHandle; +typedef struct TqListHandle { + TqBufferHandle bufHandle; + struct TqListHandle* next; +} TqListHandle; -typedef struct tqGroupHandle { +typedef struct TqGroupHandle { int64_t cId; int64_t cgId; void* ahandle; int32_t topicNum; - tqListHandle *head; -} tqGroupHandle; + TqListHandle *head; +} TqGroupHandle; -typedef struct tqQueryExec { +typedef struct TqQueryExec { void* src; - tqBufferItem* dest; + TqBufferItem* dest; void* executor; -} tqQueryExec; +} TqQueryExec; -typedef struct tqQueryMsg { - tqQueryExec *exec; - struct tqQueryMsg *next; -} tqQueryMsg; +typedef struct TqQueryMsg { + TqQueryExec *exec; + struct TqQueryMsg *next; +} TqQueryMsg; //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); @@ -166,28 +166,28 @@ void tqCleanUp(STQ*); int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqConsume(STQ*, tmqConsumeReq*); +int tqConsume(STQ*, TmqConsumeReq*); -tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); +TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqMoveOffsetToNext(tqGroupHandle*); +int tqMoveOffsetToNext(TqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqRegisterContext(tqGroupHandle*, void*); -int tqLaunchQuery(tqGroupHandle*); -int tqSendLaunchQuery(tqGroupHandle*); +int tqRegisterContext(TqGroupHandle*, void* ahandle); +int tqLaunchQuery(TqGroupHandle*); +int tqSendLaunchQuery(TqGroupHandle*); -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes); -void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr); -void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr); -void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr); +int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes); +void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr); +void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr); +void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr); -const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle); -const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle); -const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem); +const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle); +const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle); +const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem); -int tqGetGHandleSSize(const tqGroupHandle *gHandle); +int tqGetGHandleSSize(const TqGroupHandle *gHandle); int tqBufHandleSSize(); int tqBufItemSSize(); diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 1835b4f558..3626626791 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -221,8 +221,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { //parse message and optionally move offset void* pMsg = pRead->pCont; - tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; - tmqMsgHead msgHead = pConsumeMsg->head; + TmqConsumeReq *pConsumeMsg = (TmqConsumeReq*) pMsg; + TmqMsgHead msgHead = pConsumeMsg->head; //extract head STQ *pTq = pVnode->pTQ; /*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/ diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index fc15a676d8..7ecdfe7f19 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,18 +22,18 @@ // //handle management message // -static int tqProtoCheck(tmqMsgHead *pMsg) { +static int tqProtoCheck(TmqMsgHead *pMsg) { return pMsg->protoVer == 0; } -static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) { +static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) { //clean old item and move forward int32_t consumeOffset = pAck->consumeOffset; int idx = consumeOffset % TQ_BUFFER_SIZE; ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor); tfree(bhandle->buffer[idx].content); if( 1 /* TODO: need to launch new query */) { - tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg)); + TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg)); if(pNewQuery == NULL) { //TODO: memory insufficient return -1; @@ -49,14 +49,14 @@ static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** return 0; } -static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { +static int tqAck(TqGroupHandle* ghandle, TmqAcks* pAcks) { int32_t ackNum = pAcks->ackNum; - tmqOneAck *acks = pAcks->acks; + TmqOneAck *acks = pAcks->acks; //double ptr for acks and list int i = 0; - tqListHandle* node = ghandle->head; + TqListHandle* node = ghandle->head; int ackCnt = 0; - tqQueryMsg *pQuery = NULL; + TqQueryMsg *pQuery = NULL; while(i < ackNum && node->next) { if(acks[i].topicId == node->next->bufHandle.topicId) { ackCnt++; @@ -73,12 +73,12 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { return ackCnt; } -static int tqCommitTCGroup(tqGroupHandle* handle) { +static int tqCommitTCGroup(TqGroupHandle* handle) { //persist modification into disk return 0; } -int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) { +int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) { //create in disk return 0; } @@ -99,13 +99,13 @@ int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; } -static int tqFetch(tqGroupHandle* ghandle, void** msg) { - tqListHandle* head = ghandle->head; - tqListHandle* node = head; +static int tqFetch(TqGroupHandle* ghandle, void** msg) { + TqListHandle* head = ghandle->head; + TqListHandle* node = head; int totSize = 0; //TODO: make it a macro int sizeLimit = 4 * 1024; - tmqMsgContent* buffer = malloc(sizeLimit); + TmqMsgContent* buffer = malloc(sizeLimit); if(buffer == NULL) { //TODO:memory insufficient return -1; @@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { //until all topic iterated or msgs over sizeLimit while(node->next) { node = node->next; - tqBufferHandle* bufHandle = &node->bufHandle; + TqBufferHandle* bufHandle = &node->bufHandle; int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; if(bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset @@ -144,19 +144,19 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { } -tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { +TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; } -int tqLaunchQuery(tqGroupHandle* ghandle) { +int tqLaunchQuery(TqGroupHandle* ghandle) { return 0; } -int tqSendLaunchQuery(tqGroupHandle* gHandle) { +int tqSendLaunchQuery(TqGroupHandle* gHandle) { return 0; } -/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ +/*int tqMoveOffsetToNext(TqGroupHandle* ghandle) {*/ /*return 0;*/ /*}*/ @@ -171,13 +171,13 @@ int tqCommit(STQ* pTq) { return 0; } -int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { - if(!tqProtoCheck((tmqMsgHead *)pMsg)) { +int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { + if(!tqProtoCheck((TmqMsgHead *)pMsg)) { //proto version invalid return -1; } int64_t clientId = pMsg->head.clientId; - tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); + TqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); if(ghandle == NULL) { //client not connect return -1; @@ -189,7 +189,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { } } - tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; + TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg; if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) { //fetch error @@ -204,7 +204,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { return 0; } -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) { +int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) { //calculate size int sz = tqGetGHandleSSize(gHandle); void* ptr = realloc(*ppBytes, sz); @@ -227,8 +227,8 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) { return 0; } -void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) { - tqListHandle *node = listHandle; +void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) { + TqListHandle *node = listHandle; ASSERT(node != NULL); while(node) { ptr = tqSerializeBufHandle(&node->bufHandle, ptr); @@ -237,7 +237,7 @@ void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) { return ptr; } -void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) { +void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) { *(int64_t*)ptr = bufHandle->nextConsumeOffset; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = bufHandle->topicId; @@ -252,13 +252,13 @@ void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) { return ptr; } -void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr) { +void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) { //TODO: do we need serialize this? //mainly for executor return ptr; } -const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) { +const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) { const void* ptr = pBytes; gHandle->cId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -268,10 +268,10 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) gHandle->topicNum = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); gHandle->head = NULL; - tqListHandle *node = gHandle->head; + TqListHandle *node = gHandle->head; for(int i = 0; i < gHandle->topicNum; i++) { if(gHandle->head == NULL) { - if((node = malloc(sizeof(tqListHandle))) == NULL) { + if((node = malloc(sizeof(TqListHandle))) == NULL) { //TODO: error return NULL; } @@ -279,7 +279,7 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); gHandle->head = node; } else { - node->next = malloc(sizeof(tqListHandle)); + node->next = malloc(sizeof(TqListHandle)); if(node->next == NULL) { //TODO: error return NULL; @@ -292,7 +292,7 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) return ptr; } -const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle) { +const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) { const void* ptr = pBytes; bufHandle->nextConsumeOffset = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -308,12 +308,12 @@ const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle return ptr; } -const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem) { +const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) { return pBytes; } //TODO: make this a macro -int tqGetGHandleSSize(const tqGroupHandle *gHandle) { +int tqGetGHandleSSize(const TqGroupHandle *gHandle) { return sizeof(int64_t) * 2 + sizeof(int32_t) + gHandle->topicNum * tqBufHandleSSize(); From 8ad7c2fd26ee7502d81381b9209f14ea29c1cd96 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Oct 2021 15:50:53 +0800 Subject: [PATCH 3/6] add sync interface --- include/common/taosmsg.h | 5 +- include/libs/sync/sync.h | 59 +++--- source/libs/sync/src/sync.c | 11 +- source/server/mnode/inc/mnodeInt.h | 2 +- source/server/mnode/src/mnodeTelem.c | 7 +- source/server/mnode/src/mondeInt.c | 12 +- source/server/vnode/inc/vnodeInt.h | 21 +- source/server/vnode/src/vnodeFile.c | 278 ++++++++++++++------------- source/server/vnode/src/vnodeMain.c | 40 +++- source/server/vnode/src/vnodeMgmt.c | 9 +- 10 files changed, 249 insertions(+), 195 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 50594fac00..d571153c1a 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -721,6 +721,8 @@ typedef struct { int32_t daysToKeep2; int32_t minRowsPerFileBlock; int32_t maxRowsPerFileBlock; + int32_t fsyncPeriod; + int8_t reserved[16]; int8_t precision; int8_t compression; int8_t cacheLastRow; @@ -728,8 +730,7 @@ typedef struct { int8_t walLevel; int8_t replica; int8_t quorum; - int8_t reserved[9]; - int32_t fsyncPeriod; + int8_t selfIndex; SVnodeDesc nodes[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 30583686c5..e8a8dee866 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -23,9 +23,9 @@ extern "C" { #include #include "taosdef.h" -typedef int64_t SyncNodeId; -typedef int32_t SyncGroupId; -typedef int64_t SyncIndex; +typedef int32_t SyncNodeId; +typedef int32_t SyncGroupId; +typedef int64_t SyncIndex; typedef uint64_t SSyncTerm; typedef enum { @@ -41,21 +41,21 @@ typedef struct { typedef struct { SyncNodeId nodeId; - uint16_t nodePort; // node sync Port - char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; typedef struct { - int selfIndex; - int nNode; - SNodeInfo* nodeInfo; + int selfIndex; + int replica; + SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCluster; typedef struct { - int32_t selfIndex; - int nNode; - SNodeInfo* node; - ESyncRole* role; + int32_t selfIndex; + int replica; + SNodeInfo node[TSDB_MAX_REPLICA]; + ESyncRole role[TSDB_MAX_REPLICA]; } SNodesRole; typedef struct SSyncFSM { @@ -101,13 +101,13 @@ typedef struct SSyncLogStore { typedef struct SSyncServerState { SyncNodeId voteFor; - SSyncTerm term; + SSyncTerm term; } SSyncServerState; typedef struct SSyncClusterConfig { // Log index number of current cluster config. SyncIndex index; - + // Log index number of previous cluster config. SyncIndex prevIndex; @@ -122,21 +122,17 @@ typedef struct SStateManager { const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); - void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); + // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); - const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); + // const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); } SStateManager; typedef struct { - SyncGroupId vgId; - - SyncIndex snapshotIndex; - SSyncCluster syncCfg; - - SSyncFSM fsm; - + SyncGroupId vgId; + SyncIndex snapshotIndex; + SSyncCluster syncCfg; + SSyncFSM fsm; SSyncLogStore logStore; - SStateManager stateManager; } SSyncInfo; @@ -146,19 +142,20 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -SSyncNode syncStart(const SSyncInfo*); -void syncStop(SyncNodeId); +SSyncNode* syncStart(const SSyncInfo*); +void syncReconfig(const SSyncNode*, const SSyncCluster*); +void syncStop(const SSyncNode*); -int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak); +int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak); -int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); +//int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); -int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); +//int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); -extern int32_t syncDebugFlag; +extern int32_t syncDebugFlag; #ifdef __cplusplus } #endif -#endif /*_TD_LIBS_SYNC_H*/ +#endif /*_TD_LIBS_SYNC_H*/ diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 4b3ca11e4b..879f2d4f6d 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -15,5 +15,12 @@ #include "sync.h" -int32_t syncInit() {return 0;} -void syncCleanUp() {} \ No newline at end of file +int32_t syncInit() { return 0; } + +void syncCleanUp() {} + +SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; } + +void syncStop(const SSyncNode* pNode) {} + +void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} \ No newline at end of file diff --git a/source/server/mnode/inc/mnodeInt.h b/source/server/mnode/inc/mnodeInt.h index 42d3c53fa2..0ce47cbe36 100644 --- a/source/server/mnode/inc/mnodeInt.h +++ b/source/server/mnode/inc/mnodeInt.h @@ -24,7 +24,7 @@ extern "C" { tmr_h mnodeGetTimer(); int32_t mnodeGetDnodeId(); -char *mnodeGetClusterId(); +int64_t mnodeGetClusterId(); EMnStatus mnodeGetStatus(); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); diff --git a/source/server/mnode/src/mnodeTelem.c b/source/server/mnode/src/mnodeTelem.c index cb292342c7..8b8e4f9ce0 100644 --- a/source/server/mnode/src/mnodeTelem.c +++ b/source/server/mnode/src/mnodeTelem.c @@ -202,12 +202,13 @@ static void mnodeSendTelemetryReport() { return; } - char clusterId[TSDB_CLUSTER_ID_LEN] = {0}; - mnodeGetClusterId(clusterId); + int64_t clusterId = mnodeGetClusterId(); + char clusterIdStr[20] = {0}; + snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId); SBufferWriter bw = tbufInitWriter(NULL, false); mnodeBeginObject(&bw); - mnodeAddStringField(&bw, "instanceId", clusterId); + mnodeAddStringField(&bw, "instanceId", clusterIdStr); mnodeAddIntField(&bw, "reportVersion", 1); mnodeAddOsInfo(&bw); mnodeAddCpuInfo(&bw); diff --git a/source/server/mnode/src/mondeInt.c b/source/server/mnode/src/mondeInt.c index 37af26f604..343384ba67 100644 --- a/source/server/mnode/src/mondeInt.c +++ b/source/server/mnode/src/mondeInt.c @@ -39,7 +39,7 @@ static struct { int32_t state; int32_t dnodeId; - char clusterId[TSDB_CLUSTER_ID_LEN]; + int64_t clusterId; tmr_h timer; SMnodeFp fp; SSteps * steps1; @@ -50,7 +50,7 @@ tmr_h mnodeGetTimer() { return tsMint.timer; } int32_t mnodeGetDnodeId() { return tsMint.dnodeId; } -char *mnodeGetClusterId() { return tsMint.clusterId; } +int64_t mnodeGetClusterId() { return tsMint.clusterId; } EMnStatus mnodeGetStatus() { return tsMint.state; } @@ -71,12 +71,14 @@ int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; } static int32_t mnodeSetPara(SMnodePara para) { tsMint.fp = para.fp; tsMint.dnodeId = para.dnodeId; - strncpy(tsMint.clusterId, para.clusterId, TSDB_CLUSTER_ID_LEN); + tsMint.clusterId = para.clusterId; if (tsMint.fp.SendMsgToDnode == NULL) return -1; if (tsMint.fp.SendMsgToMnode == NULL) return -1; if (tsMint.fp.SendRedirectMsg == NULL) return -1; + if (tsMint.fp.GetDnodeEp == NULL) return -1; if (tsMint.dnodeId < 0) return -1; + if (tsMint.clusterId < 0) return -1; return 0; } @@ -141,7 +143,7 @@ static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); } static bool mnodeNeedDeploy() { if (tsMint.dnodeId > 0) return false; - if (tsMint.clusterId[0] != 0) return false; + if (tsMint.clusterId > 0) return false; if (strcmp(tsFirst, tsLocalEp) != 0) return false; return true; } @@ -154,7 +156,7 @@ int32_t mnodeDeploy() { tsMint.state = MN_STATUS_INIT; } - if (tsMint.dnodeId <= 0 || tsMint.clusterId[0] == 0) { + if (tsMint.dnodeId <= 0 || tsMint.clusterId <= 0) { mError("failed to deploy mnode since cluster not ready"); return TSDB_CODE_MND_NOT_READY; } diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index 3c7487f681..ac6c77041f 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -62,19 +62,14 @@ typedef struct STsdbCfg { typedef struct SMetaCfg { } SMetaCfg; -typedef struct SSyncCluster { - int8_t replica; - int8_t quorum; - SNodeInfo nodes[TSDB_MAX_REPLICA]; -} SSyncCfg; - typedef struct SVnodeCfg { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - int8_t dropped; - SWalCfg wal; - STsdbCfg tsdb; - SMetaCfg meta; - SSyncCfg sync; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int8_t dropped; + int8_t quorum; + SWalCfg wal; + STsdbCfg tsdb; + SMetaCfg meta; + SSyncCluster sync; } SVnodeCfg; typedef struct { @@ -86,7 +81,7 @@ typedef struct { STQ *pTQ; twalh pWal; void *pQuery; - SyncNodeId syncNode; + SSyncNode *pSync; taos_queue pWriteQ; // write queue taos_queue pQueryQ; // read query queue taos_queue pFetchQ; // read fetch/cancel queue diff --git a/source/server/vnode/src/vnodeFile.c b/source/server/vnode/src/vnodeFile.c index 9835e3e0fb..a77c99ec34 100644 --- a/source/server/vnode/src/vnodeFile.c +++ b/source/server/vnode/src/vnodeFile.c @@ -30,149 +30,156 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) { fp = fopen(file, "r"); if (!fp) { - vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", vgId, file, strerror(errno)); + vError("vgId:%d, failed to open vnode cfg file:%s to read since %s", vgId, file, strerror(errno)); ret = TAOS_SYSTEM_ERROR(errno); goto PARSE_VCFG_ERROR; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - vError("vgId:%d, failed to read %s, content is null", vgId, file); + vError("vgId:%d, failed to read %s since content is null", vgId, file); goto PARSE_VCFG_ERROR; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - vError("vgId:%d, failed to read %s, invalid json format", vgId, file); + vError("vgId:%d, failed to read %s since invalid json format", vgId, file); goto PARSE_VCFG_ERROR; } cJSON *db = cJSON_GetObjectItem(root, "db"); if (!db || db->type != cJSON_String || db->valuestring == NULL) { - vError("vgId:%d, failed to read %s, db not found", vgId, file); + vError("vgId:%d, failed to read %s since db not found", vgId, file); goto PARSE_VCFG_ERROR; } tstrncpy(pCfg->db, db->valuestring, sizeof(pCfg->db)); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, dropped not found", vgId, file); + vError("vgId:%d, failed to read %s since dropped not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->dropped = (int32_t)dropped->valueint; - cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); - if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, cacheBlockSize not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.cacheBlockSize = (int32_t)cacheBlockSize->valueint; - - cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); - if (!totalBlocks || totalBlocks->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, totalBlocks not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.totalBlocks = (int32_t)totalBlocks->valueint; - - cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); - if (!daysPerFile || daysPerFile->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysPerFile not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.daysPerFile = (int32_t)daysPerFile->valueint; - - cJSON *daysToKeep0 = cJSON_GetObjectItem(root, "daysToKeep0"); - if (!daysToKeep0 || daysToKeep0->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysToKeep0 not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.daysToKeep0 = (int32_t)daysToKeep0->valueint; - - cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); - if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysToKeep1 not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.daysToKeep1 = (int32_t)daysToKeep1->valueint; - - cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); - if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysToKeep2 not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.daysToKeep2 = (int32_t)daysToKeep2->valueint; - - cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); - if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint; - - cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); - if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint; - - cJSON *precision = cJSON_GetObjectItem(root, "precision"); - if (!precision || precision->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, precision not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.precision = (int8_t)precision->valueint; - - cJSON *compression = cJSON_GetObjectItem(root, "compression"); - if (!compression || compression->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, compression not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.compression = (int8_t)compression->valueint; - - cJSON *update = cJSON_GetObjectItem(root, "update"); - if (!update || update->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, update not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.update = (int8_t)update->valueint; - - cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); - if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, cacheLastRow not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->tsdb.cacheLastRow = (int8_t)cacheLastRow->valueint; - - cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel"); - if (!walLevel || walLevel->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, walLevel not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->wal.walLevel = (int8_t)walLevel->valueint; - - cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod"); - if (!walLevel || walLevel->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, fsyncPeriod not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->wal.fsyncPeriod = (int32_t)fsyncPeriod->valueint; - - cJSON *replica = cJSON_GetObjectItem(root, "replica"); - if (!replica || replica->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, replica not found", vgId, file); - goto PARSE_VCFG_ERROR; - } - pCfg->sync.replica = (int8_t)replica->valueint; - cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); if (!quorum || quorum->type != cJSON_Number) { vError("vgId: %d, failed to read %s, quorum not found", vgId, file); goto PARSE_VCFG_ERROR; } - pCfg->sync.quorum = (int8_t)quorum->valueint; + pCfg->quorum = (int8_t)quorum->valueint; + + cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); + if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since cacheBlockSize not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.cacheBlockSize = (int32_t)cacheBlockSize->valueint; + + cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); + if (!totalBlocks || totalBlocks->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since totalBlocks not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.totalBlocks = (int32_t)totalBlocks->valueint; + + cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); + if (!daysPerFile || daysPerFile->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since daysPerFile not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.daysPerFile = (int32_t)daysPerFile->valueint; + + cJSON *daysToKeep0 = cJSON_GetObjectItem(root, "daysToKeep0"); + if (!daysToKeep0 || daysToKeep0->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since daysToKeep0 not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.daysToKeep0 = (int32_t)daysToKeep0->valueint; + + cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); + if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since daysToKeep1 not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.daysToKeep1 = (int32_t)daysToKeep1->valueint; + + cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); + if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since daysToKeep2 not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.daysToKeep2 = (int32_t)daysToKeep2->valueint; + + cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); + if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since minRowsPerFileBlock not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint; + + cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); + if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since maxRowsPerFileBlock not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint; + + cJSON *precision = cJSON_GetObjectItem(root, "precision"); + if (!precision || precision->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since precision not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.precision = (int8_t)precision->valueint; + + cJSON *compression = cJSON_GetObjectItem(root, "compression"); + if (!compression || compression->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since compression not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.compression = (int8_t)compression->valueint; + + cJSON *update = cJSON_GetObjectItem(root, "update"); + if (!update || update->type != cJSON_Number) { + vError("vgId: %d, failed to read %s since update not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.update = (int8_t)update->valueint; + + cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); + if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { + vError("vgId: %d, failed to read %s since cacheLastRow not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->tsdb.cacheLastRow = (int8_t)cacheLastRow->valueint; + + cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel"); + if (!walLevel || walLevel->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since walLevel not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->wal.walLevel = (int8_t)walLevel->valueint; + + cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod"); + if (!walLevel || walLevel->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since fsyncPeriod not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->wal.fsyncPeriod = (int32_t)fsyncPeriod->valueint; + + cJSON *selfIndex = cJSON_GetObjectItem(root, "selfIndex"); + if (!selfIndex || selfIndex->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since selfIndex not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->sync.selfIndex = selfIndex->valueint; + + cJSON *replica = cJSON_GetObjectItem(root, "replica"); + if (!replica || replica->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since replica not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->sync.replica = replica->valueint; cJSON *nodes = cJSON_GetObjectItem(root, "nodes"); if (!nodes || nodes->type != cJSON_Array) { @@ -182,28 +189,35 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) { int size = cJSON_GetArraySize(nodes); if (size != pCfg->sync.replica) { - vError("vgId:%d, failed to read %s, nodes size not matched", vgId, file); + vError("vgId:%d, failed to read %s since nodes size not matched", vgId, file); goto PARSE_VCFG_ERROR; } for (int i = 0; i < size; ++i) { cJSON *nodeInfo = cJSON_GetArrayItem(nodes, i); if (nodeInfo == NULL) continue; - SNodeInfo *node = &pCfg->sync.nodes[i]; + SNodeInfo *node = &pCfg->sync.nodeInfo[i]; - cJSON *port = cJSON_GetObjectItem(nodeInfo, "port"); - if (!port || port->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, port not found", vgId, file); + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "id"); + if (!nodeId || nodeId->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since nodeId not found", vgId, file); goto PARSE_VCFG_ERROR; } - node->nodePort = (uint16_t)port->valueint; + node->nodeId = nodeId->valueint; - cJSON *fqdn = cJSON_GetObjectItem(nodeInfo, "fqdn"); - if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - vError("vgId:%d, failed to read %s, fqdn not found", vgId, file); + cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "port"); + if (!nodePort || nodePort->type != cJSON_Number) { + vError("vgId:%d, failed to read %s sincenodePort not found", vgId, file); goto PARSE_VCFG_ERROR; } - tstrncpy(node->nodeFqdn, fqdn->valuestring, TSDB_FQDN_LEN); + node->nodePort = (uint16_t)nodePort->valueint; + + cJSON *nodeFqdn = cJSON_GetObjectItem(nodeInfo, "fqdn"); + if (!nodeFqdn || nodeFqdn->type != cJSON_String || nodeFqdn->valuestring == NULL) { + vError("vgId:%d, failed to read %s since nodeFqdn not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + tstrncpy(node->nodeFqdn, nodeFqdn->valuestring, TSDB_FQDN_LEN); } ret = TSDB_CODE_SUCCESS; @@ -238,6 +252,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", vgId); len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pCfg->db); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pCfg->dropped); + len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->quorum); // tsdb len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pCfg->tsdb.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pCfg->tsdb.totalBlocks); @@ -255,11 +270,12 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pCfg->wal.walLevel); len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pCfg->wal.fsyncPeriod); // sync - len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->sync.quorum); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pCfg->sync.replica); + len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pCfg->sync.selfIndex); len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n"); for (int32_t i = 0; i < pCfg->sync.replica; i++) { - SNodeInfo *node = &pCfg->sync.nodes[i]; + SNodeInfo *node = &pCfg->sync.nodeInfo[i]; + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", node->nodeId); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", node->nodePort); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\"\n", node->nodeFqdn); if (i < pCfg->sync.replica - 1) { @@ -304,20 +320,20 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { } cJSON *term = cJSON_GetObjectItem(root, "term"); - if (!term || term->type != cJSON_Number) { + if (!term || term->type != cJSON_String) { vError("vgId:%d, failed to read %s since term not found", vgId, file); goto PARSE_TERM_ERROR; } - pState->term = (uint64_t)term->valueint; + pState->term = atoll(term->valuestring); cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor"); - if (!voteFor || voteFor->type != cJSON_Number) { + if (!voteFor || voteFor->type != cJSON_String) { vError("vgId:%d, failed to read %s since voteFor not found", vgId, file); goto PARSE_TERM_ERROR; } - pState->voteFor = (int64_t)voteFor->valueint; + pState->voteFor = atoi(voteFor->valuestring); - vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); + vInfo("vgId:%d, read %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term); PARSE_TERM_ERROR: if (content != NULL) free(content); @@ -342,8 +358,8 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term); - len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor); + len += snprintf(content + len, maxLen - len, " \"term\": \"%" PRIu64 "\",\n", pState->term); + len += snprintf(content + len, maxLen - len, " \"voteFor\": \"%d\"\n", pState->voteFor); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -351,6 +367,6 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { fclose(fp); free(content); - vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); + vInfo("vgId:%d, write %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index 5143f04c5b..c08ae7708a 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -108,6 +108,11 @@ static void vnodeDestroyVnode(SVnode *pVnode) { int32_t code = 0; int32_t vgId = pVnode->vgId; + if (pVnode->pSync != NULL) { + syncStop(pVnode->pSync); + pVnode->pSync = NULL; + } + if (pVnode->pQuery) { // todo } @@ -177,6 +182,9 @@ static int32_t vnodeOpenVnode(int32_t vgId) { pVnode->role = TAOS_SYNC_ROLE_CANDIDATE; pthread_mutex_init(&pVnode->statusMutex, NULL); + vDebug("vgId:%d, vnode is opened", pVnode->vgId); + taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *)); + code = vnodeReadCfg(vgId, &pVnode->cfg); if (code != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId); @@ -209,8 +217,34 @@ static int32_t vnodeOpenVnode(int32_t vgId) { return terrno; } - vDebug("vgId:%d, vnode is opened", pVnode->vgId); - taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *)); + // create sync node + SSyncInfo syncInfo = {0}; + syncInfo.vgId = vgId; + syncInfo.snapshotIndex = 0; // todo, from tsdb + memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster)); + syncInfo.fsm.pData = pVnode; + syncInfo.fsm.applyLog = NULL; + syncInfo.fsm.onClusterChanged = NULL; + syncInfo.fsm.getSnapshot = NULL; + syncInfo.fsm.applySnapshot = NULL; + syncInfo.fsm.onRestoreDone = NULL; + syncInfo.fsm.onRollback = NULL; + syncInfo.logStore.pData = pVnode; + syncInfo.logStore.logWrite = NULL; + syncInfo.logStore.logCommit = NULL; + syncInfo.logStore.logPrune = NULL; + syncInfo.logStore.logRollback = NULL; + syncInfo.stateManager.pData = pVnode; + syncInfo.stateManager.saveServerState = NULL; + syncInfo.stateManager.readServerState = NULL; + // syncInfo.stateManager.saveCluster = NULL; + // syncInfo.stateManager.readCluster = NULL; + + pVnode->pSync = syncStart(&syncInfo); + if (pVnode->pSync == NULL) { + vnodeCleanupVnode(pVnode); + return terrno; + } vnodeSetReadyStatus(pVnode); return TSDB_CODE_SUCCESS; @@ -313,7 +347,7 @@ int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) { } if (syncChanged) { - // todo + syncReconfig(pVnode->pSync, &pVnode->cfg.sync); } vnodeRelease(pVnode); diff --git a/source/server/vnode/src/vnodeMgmt.c b/source/server/vnode/src/vnodeMgmt.c index d20e36641e..e0e76d5b56 100644 --- a/source/server/vnode/src/vnodeMgmt.c +++ b/source/server/vnode/src/vnodeMgmt.c @@ -31,6 +31,7 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf *vgId = htonl(pCreate->vgId); pCfg->dropped = 0; + pCfg->quorum = pCreate->quorum; tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db)); pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize); @@ -50,11 +51,11 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf pCfg->wal.walLevel = pCreate->walLevel; pCfg->sync.replica = pCreate->replica; - pCfg->sync.quorum = pCreate->quorum; - + pCfg->sync.selfIndex = pCreate->selfIndex; + for (int32_t j = 0; j < pCreate->replica; ++j) { - pCfg->sync.nodes[j].nodePort = htons(pCreate->nodes[j].port); - tstrncpy(pCfg->sync.nodes[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN); + pCfg->sync.nodeInfo[j].nodePort = htons(pCreate->nodes[j].port); + tstrncpy(pCfg->sync.nodeInfo[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN); } return 0; From 775da462d47c024c0618938a47114741ed90ce03 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Oct 2021 15:55:52 +0800 Subject: [PATCH 4/6] add tq files, fix cmake file --- include/server/vnode/tq/tq.h | 17 ++++++----- source/libs/wal/CMakeLists.txt | 4 +-- source/server/vnode/meta/src/metaMain.c | 1 + source/server/vnode/tq/CMakeLists.txt | 7 +++-- source/server/vnode/tq/inc/tqCommit.h | 14 +++++++++ source/server/vnode/tq/inc/tqInt.h | 1 - source/server/vnode/tq/inc/tqMetaStore.h | 38 ++++++++++++++++++++++++ source/server/vnode/tq/src/tqCommit.c | 14 +++++++++ source/server/vnode/tq/src/tqMetaStore.c | 14 +++++++++ 9 files changed, 96 insertions(+), 14 deletions(-) create mode 100644 source/server/vnode/tq/inc/tqCommit.h create mode 100644 source/server/vnode/tq/inc/tqMetaStore.h create mode 100644 source/server/vnode/tq/src/tqCommit.c create mode 100644 source/server/vnode/tq/src/tqMetaStore.c diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 495383684a..3aeaf9acb6 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -17,6 +17,7 @@ #define _TD_TQ_H_ #include "os.h" +#include "tutil.h" #ifdef __cplusplus extern "C" { @@ -79,14 +80,14 @@ typedef struct TmqConsumeRsp { typedef struct TmqSubscribeReq { TmqMsgHead head; - int64_t topicLen; - char topic[]; + int32_t topicNum; + int64_t topic[]; } TmqSubscribeReq; typedef struct tmqSubscribeRsp { TmqMsgHead head; int64_t vgId; - char ep[]; //TSDB_EP_LEN + char ep[TSDB_EP_LEN]; //TSDB_EP_LEN } TmqSubscribeRsp; typedef struct TmqHeartbeatReq { @@ -98,17 +99,17 @@ typedef struct TmqHeartbeatRsp { } TmqHeartbeatRsp; typedef struct TqTopicVhandle { - //name - // + int64_t topicId; //executor for filter - // + void* filterExec; //callback for mnode - // + //trigger when vnode list associated topic change + void* (*mCallback)(void*, void*); } TqTopicVhandle; typedef struct STQ { //the collection of group handle - + //the handle of kvstore } STQ; #define TQ_BUFFER_SIZE 8 diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index fbcdff59ee..4af8bac7f9 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -4,9 +4,9 @@ target_include_directories( wal PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" - PRIVATE "${CMAKE_SOURCE_DIR}/include/os" ) target_link_libraries( - os + wal + PUBLIC os ) diff --git a/source/server/vnode/meta/src/metaMain.c b/source/server/vnode/meta/src/metaMain.c index 612ef49a04..4efcd67908 100644 --- a/source/server/vnode/meta/src/metaMain.c +++ b/source/server/vnode/meta/src/metaMain.c @@ -64,6 +64,7 @@ SMeta *metaOpen(SMetaOpts *pMetaOpts) { // TODO: need to figure out how to persist the START UID tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID); + return pMeta; } void metaClose(SMeta *pMeta) { diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index 0c15e23d33..441fe46244 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,11 +3,12 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" - PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" - PRIVATE "${CMAKE_SOURCE_DIR}/include/os" ) target_link_libraries( - wal + tq + PUBLIC wal + PUBLIC os + PUBLIC util ) diff --git a/source/server/vnode/tq/inc/tqCommit.h b/source/server/vnode/tq/inc/tqCommit.h new file mode 100644 index 0000000000..f2f48bbc8a --- /dev/null +++ b/source/server/vnode/tq/inc/tqCommit.h @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index d19e9ec81e..100149c0ea 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,7 +18,6 @@ #include "tq.h" - #ifdef __cplusplus extern "C" { #endif diff --git a/source/server/vnode/tq/inc/tqMetaStore.h b/source/server/vnode/tq/inc/tqMetaStore.h new file mode 100644 index 0000000000..2c752b8f8f --- /dev/null +++ b/source/server/vnode/tq/inc/tqMetaStore.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TQ_META_STORE_H_ +#define _TQ_META_STORE_H_ + +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct TqKvHandle { + int64_t key; + int64_t offset; + void *valueInUse; + void *valueInTxn; + //serializer +} TqKvHandle; + + +#ifdef __cplusplus +} +#endif + +#endif /* ifndef _TQ_META_STORE_H_ */ diff --git a/source/server/vnode/tq/src/tqCommit.c b/source/server/vnode/tq/src/tqCommit.c new file mode 100644 index 0000000000..f2f48bbc8a --- /dev/null +++ b/source/server/vnode/tq/src/tqCommit.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ diff --git a/source/server/vnode/tq/src/tqMetaStore.c b/source/server/vnode/tq/src/tqMetaStore.c new file mode 100644 index 0000000000..f2f48bbc8a --- /dev/null +++ b/source/server/vnode/tq/src/tqMetaStore.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ From d95e8e1f6ceb5c972073ec74e3dc0469b3977662 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Oct 2021 16:57:02 +0800 Subject: [PATCH 5/6] Call wal and sync code in vnode --- include/libs/sync/sync.h | 24 ++++----- include/libs/wal/wal.h | 58 ++++++++++---------- source/libs/wal/src/wal.c | 16 +++++- source/server/vnode/inc/vnodeFile.h | 4 +- source/server/vnode/inc/vnodeInt.h | 2 +- source/server/vnode/src/vnodeFile.c | 8 +-- source/server/vnode/src/vnodeMain.c | 84 +++++++++++++++++++++++------ 7 files changed, 129 insertions(+), 67 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e8a8dee866..9ffd74c229 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -46,14 +46,14 @@ typedef struct { } SNodeInfo; typedef struct { - int selfIndex; - int replica; + int32_t selfIndex; + int32_t replica; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCluster; typedef struct { int32_t selfIndex; - int replica; + int32_t replica; SNodeInfo node[TSDB_MAX_REPLICA]; ESyncRole role[TSDB_MAX_REPLICA]; } SNodesRole; @@ -62,20 +62,20 @@ typedef struct SSyncFSM { void* pData; // apply committed log, bufs will be free by raft module - int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); + int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); // cluster commit callback - int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); + int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); // fsm return snapshot in ppBuf, bufs will be free by raft module // TODO: getSnapshot SHOULD be async? - int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast); + int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast); // fsm apply snapshot with pBuf data - int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast); + int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast); // call when restore snapshot and log done - int (*onRestoreDone)(struct SSyncFSM* fsm); + int32_t (*onRestoreDone)(struct SSyncFSM* fsm); void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf); @@ -118,9 +118,9 @@ typedef struct SSyncClusterConfig { typedef struct SStateManager { void* pData; - void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state); + int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state); - const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); + int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state); // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); @@ -148,9 +148,9 @@ void syncStop(const SSyncNode*); int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak); -//int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); +// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); -//int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); +// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); extern int32_t syncDebugFlag; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 9a3310922d..143bdf0710 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -44,41 +44,41 @@ typedef struct { EWalType walLevel; // wal level } SWalCfg; -typedef void * twalh; // WAL HANDLE -typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +struct SWal; +typedef struct SWal SWal; // WAL HANDLE +typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); -//module initialization -int32_t walInit(); -void walCleanUp(); +// module initialization +int32_t walInit(); +void walCleanUp(); -//handle open and ctl -twalh walOpen(char *path, SWalCfg *pCfg); -int32_t walAlter(twalh, SWalCfg *pCfg); -void walStop(twalh); -void walClose(twalh); +// handle open and ctl +SWal *walOpen(char *path, SWalCfg *pCfg); +int32_t walAlter(SWal *, SWalCfg *pCfg); +void walClose(SWal *); -//write -//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); -int64_t walWrite(twalh, void* body, int32_t bodyLen); -int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize); +// write +// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); +int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen); +int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); -//apis for lifecycle management -void walFsync(twalh, bool force); -int32_t walCommit(twalh, int64_t ver); -//truncate after -int32_t walRollback(twalh, int64_t ver); -//notify that previous log can be pruned safely -int32_t walPrune(twalh, int64_t ver); +// apis for lifecycle management +void walFsync(SWal *, bool force); +int32_t walCommit(SWal *, int64_t ver); +// truncate after +int32_t walRollback(SWal *, int64_t ver); +// notify that previous log can be pruned safely +int32_t walPrune(SWal *, int64_t ver); -//read -int32_t walRead(twalh, SWalHead **, int64_t ver); -int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); +// read +int32_t walRead(SWal *, SWalHead **, int64_t ver); +int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); -//lifecycle check -int32_t walFirstVer(twalh); -int32_t walPersistedVer(twalh); -int32_t walLastVer(twalh); -//int32_t walDataCorrupted(twalh); +// lifecycle check +int32_t walFirstVer(SWal *); +int32_t walPersistedVer(SWal *); +int32_t walLastVer(SWal *); +// int32_t walDataCorrupted(SWal*); #ifdef __cplusplus } diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index 8c0fc2b775..9331cce20b 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -19,6 +19,18 @@ int32_t walInit() { return 0; } void walCleanUp() {} -twalh walOpen(char *path, SWalCfg *pCfg) { return NULL; } +SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; } -int32_t walAlter(twalh pWal, SWalCfg *pCfg) { return 0; } \ No newline at end of file +int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } + +void walClose(SWal *pWal) {} + +void walFsync(SWal *pWal, bool force) {} + +int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {} + +int32_t walCommit(SWal *pWal, int64_t ver) { return 0; } + +int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } + +int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } \ No newline at end of file diff --git a/source/server/vnode/inc/vnodeFile.h b/source/server/vnode/inc/vnodeFile.h index 31364d8c03..bea28324ee 100644 --- a/source/server/vnode/inc/vnodeFile.h +++ b/source/server/vnode/inc/vnodeFile.h @@ -23,8 +23,8 @@ extern "C" { int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg); int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg); -int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState); -int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState); +int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState); +int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState); #ifdef __cplusplus } diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index ac6c77041f..90d9e7105e 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -79,7 +79,7 @@ typedef struct { SMeta *pMeta; STsdb *pTsdb; STQ *pTQ; - twalh pWal; + SWal *pWal; void *pQuery; SSyncNode *pSync; taos_queue pWriteQ; // write queue diff --git a/source/server/vnode/src/vnodeFile.c b/source/server/vnode/src/vnodeFile.c index a77c99ec34..ddcbd2689d 100644 --- a/source/server/vnode/src/vnodeFile.c +++ b/source/server/vnode/src/vnodeFile.c @@ -296,7 +296,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { return TSDB_CODE_SUCCESS; } -int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { +int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState) { int32_t ret = TSDB_CODE_VND_APP_ERROR; int32_t len = 0; int32_t maxLen = 100; @@ -305,7 +305,7 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { FILE *fp = NULL; char file[PATH_MAX + 30] = {0}; - sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); + sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId); len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { @@ -343,9 +343,9 @@ PARSE_TERM_ERROR: return ret; } -int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { +int32_t vnodeSaveState(int32_t vgId, SSyncServerState *pState) { char file[PATH_MAX + 30] = {0}; - sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); + sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId); FILE *fp = fopen(file, "w"); if (!fp) { diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index c08ae7708a..ced93ea6a7 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -130,7 +130,8 @@ static void vnodeDestroyVnode(SVnode *pVnode) { } if (pVnode->pWal) { - // todo + walClose(pVnode->pWal); + pVnode->pWal = NULL; } if (pVnode->allocator) { @@ -166,6 +167,56 @@ static void vnodeCleanupVnode(SVnode *pVnode) { vnodeRelease(pVnode); } +static inline int32_t vnodeLogWrite(struct SSyncLogStore *logStore, SyncIndex index, SSyncBuffer *pBuf) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walWrite(pVnode->pWal, index, pBuf->data, (int32_t)pBuf->len); +} + +static inline int32_t vnodeLogCommit(struct SSyncLogStore *logStore, SyncIndex index) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walCommit(pVnode->pWal, index); +} + +static inline int32_t vnodeLogPrune(struct SSyncLogStore *logStore, SyncIndex index) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walPrune(pVnode->pWal, index); +} + +static inline int32_t vnodeLogRollback(struct SSyncLogStore *logStore, SyncIndex index) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walRollback(pVnode->pWal, index); +} + +static inline int32_t vnodeSaveServerState(struct SStateManager *stateMng, SSyncServerState *pState) { + SVnode *pVnode = stateMng->pData; + return vnodeSaveState(pVnode->vgId, pState); +} + +static inline int32_t vnodeReadServerState(struct SStateManager *stateMng, SSyncServerState *pState) { + SVnode *pVnode = stateMng->pData; + return vnodeSaveState(pVnode->vgId, pState); +} + +static inline int32_t vnodeApplyLog(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { + return 0; +} + +static inline int32_t vnodeOnClusterChanged(struct SSyncFSM *fsm, const SSyncCluster *cluster, void *pData) { return 0; } + +static inline int32_t vnodeGetSnapshot(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int32_t *objId, bool *isLast) { + return 0; +} + +static inline int32_t vnodeApplySnapshot(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int32_t objId, bool isLast) { + return 0; +} + +static inline int32_t vnodeOnRestoreDone(struct SSyncFSM *fsm) { return 0; } + +static inline void vnodeOnRollback(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf) {} + +static inline void vnodeOnRoleChanged(struct SSyncFSM *fsm, const SNodesRole *pRole) {} + static int32_t vnodeOpenVnode(int32_t vgId) { int32_t code = 0; @@ -193,7 +244,7 @@ static int32_t vnodeOpenVnode(int32_t vgId) { return 0; } - code = vnodeReadTerm(vgId, &pVnode->term); + code = vnodeSaveState(vgId, &pVnode->term); if (code != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code)); pVnode->cfg.dropped = 1; @@ -220,25 +271,24 @@ static int32_t vnodeOpenVnode(int32_t vgId) { // create sync node SSyncInfo syncInfo = {0}; syncInfo.vgId = vgId; - syncInfo.snapshotIndex = 0; // todo, from tsdb + syncInfo.snapshotIndex = 0; // todo, from tsdb memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster)); syncInfo.fsm.pData = pVnode; - syncInfo.fsm.applyLog = NULL; - syncInfo.fsm.onClusterChanged = NULL; - syncInfo.fsm.getSnapshot = NULL; - syncInfo.fsm.applySnapshot = NULL; - syncInfo.fsm.onRestoreDone = NULL; - syncInfo.fsm.onRollback = NULL; + syncInfo.fsm.applyLog = vnodeApplyLog; + syncInfo.fsm.onClusterChanged = vnodeOnClusterChanged; + syncInfo.fsm.getSnapshot = vnodeGetSnapshot; + syncInfo.fsm.applySnapshot = vnodeApplySnapshot; + syncInfo.fsm.onRestoreDone = vnodeOnRestoreDone; + syncInfo.fsm.onRollback = vnodeOnRollback; + syncInfo.fsm.onRoleChanged = vnodeOnRoleChanged; syncInfo.logStore.pData = pVnode; - syncInfo.logStore.logWrite = NULL; - syncInfo.logStore.logCommit = NULL; - syncInfo.logStore.logPrune = NULL; - syncInfo.logStore.logRollback = NULL; + syncInfo.logStore.logWrite = vnodeLogWrite; + syncInfo.logStore.logCommit = vnodeLogCommit; + syncInfo.logStore.logPrune = vnodeLogPrune; + syncInfo.logStore.logRollback = vnodeLogRollback; syncInfo.stateManager.pData = pVnode; - syncInfo.stateManager.saveServerState = NULL; - syncInfo.stateManager.readServerState = NULL; - // syncInfo.stateManager.saveCluster = NULL; - // syncInfo.stateManager.readCluster = NULL; + syncInfo.stateManager.saveServerState = vnodeSaveServerState; + syncInfo.stateManager.readServerState = vnodeReadServerState; pVnode->pSync = syncStart(&syncInfo); if (pVnode->pSync == NULL) { From f32eadf4ef72d386aff0fc026f7297e13b25ced2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Oct 2021 17:32:44 +0800 Subject: [PATCH 6/6] add header for tqMetaStore --- source/server/vnode/tq/inc/tqMetaStore.h | 48 ++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/source/server/vnode/tq/inc/tqMetaStore.h b/source/server/vnode/tq/inc/tqMetaStore.h index 2c752b8f8f..6319b32a10 100644 --- a/source/server/vnode/tq/inc/tqMetaStore.h +++ b/source/server/vnode/tq/inc/tqMetaStore.h @@ -18,18 +18,60 @@ #include "os.h" +#define TQ_INUSE_SIZE 0xFF +#define TQ_PAGE_SIZE 4096 + #ifdef __cplusplus extern "C" { #endif -typedef struct TqKvHandle { +typedef struct TqMetaHandle { int64_t key; int64_t offset; void *valueInUse; void *valueInTxn; - //serializer -} TqKvHandle; +} TqMetaHandle; +typedef struct TqMetaList { + TqMetaHandle handle; + struct TqMetaList* next; + struct TqMetaList* inTxnPrev; + struct TqMetaList* inTxnNext; + struct TqMetaList* unpersistPrev; + struct TqMetaList* unpersistNext; +} TqMetaList; + +typedef struct TqMetaStore { + TqMetaList* inUse[TQ_INUSE_SIZE]; + TqMetaList* unpersistHead; + //deserializer + //serializer + //deleter +} TqMetaStore; + +typedef struct TqMetaPageBuf { + int16_t offset; + char buffer[TQ_PAGE_SIZE]; +} TqMetaPageBuf; + +TqMetaStore* TqStoreOpen(const char* path, void* serializer(void* ), void* deserializer(void*)); +int32_t TqStoreClose(TqMetaStore*); +int32_t TqStoreDelete(TqMetaStore*); +int32_t TqStoreCommitAll(TqMetaStore*); +int32_t TqStorePersist(TqMetaStore*); + +TqMetaHandle* TqHandleGetInUse(TqMetaStore*, int64_t key); +int32_t TqHandlePutInUse(TqMetaStore*, TqMetaHandle* handle); +TqMetaHandle* TqHandleGetInTxn(TqMetaStore*, int64_t key); +int32_t TqHandlePutInTxn(TqMetaStore*, TqMetaHandle* handle); +//delete in-use-handle, make in-txn-handle in use +int32_t TqHandleCommit(TqMetaStore*, int64_t key); +//delete in-txn-handle +int32_t TqHandleAbort(TqMetaStore*, int64_t key); +//delete in-use-handle +int32_t TqHandleDel(TqMetaStore*, int64_t key); +//delete in-use-handle and in-txn-handle +int32_t TqHandleClear(TqMetaStore*, int64_t key); #ifdef __cplusplus }