From 5e14c85609bb947434dafa4c4e5cd4bc59dd9f93 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 27 Oct 2021 17:49:40 +0800 Subject: [PATCH 1/6] add tqconsume and tqserialize --- include/server/vnode/tq/tq.h | 122 ++++++++++---- source/server/vnode/tq/inc/tqInt.h | 4 +- source/server/vnode/tq/src/tq.c | 252 ++++++++++++++++++++++++----- 3 files changed, 300 insertions(+), 78 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index f30ba75c42..c90991ae10 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -23,64 +23,79 @@ extern "C" { #endif typedef struct tmqMsgHead { - int32_t headLen; int32_t protoVer; - int64_t cgId; - int64_t topicId; - int64_t clientId; - int32_t checksum; int32_t msgType; + int64_t cgId; + int64_t clientId; } tmqMsgHead; +typedef struct tmqOneAck { + int64_t topicId; + int64_t consumeOffset; +} tmqOneAck; + +typedef struct tmqAcks { + int32_t ackNum; + //should be sorted + tmqOneAck acks[]; +} tmqAcks; + //TODO: put msgs into common typedef struct tmqConnectReq { tmqMsgHead head; + tmqAcks acks; } tmqConnectReq; -typedef struct tmqConnectResp { +typedef struct tmqConnectRsp { tmqMsgHead head; int8_t status; -} tmqConnectResp; +} tmqConnectRsp; typedef struct tmqDisconnectReq { tmqMsgHead head; } tmqDisconnectReq; -typedef struct tmqDisconnectResp { +typedef struct tmqDisconnectRsp { tmqMsgHead head; int8_t status; -} tmqDiconnectResp; +} tmqDiconnectRsp; typedef struct tmqConsumeReq { tmqMsgHead head; - int64_t commitOffset; + tmqAcks acks; } tmqConsumeReq; -typedef struct tmqConsumeResp { - tmqMsgHead head; - char content[]; -} tmqConsumeResp; +typedef struct tmqMsgContent { + int64_t topicId; + int64_t msgLen; + char msg[]; +} tmqMsgContent; + +typedef struct tmqConsumeRsp { + tmqMsgHead head; + int64_t bodySize; + tmqMsgContent msgs[]; +} tmqConsumeRsp; -// typedef struct tmqMnodeSubscribeReq { tmqMsgHead head; int64_t topicLen; char topic[]; } tmqSubscribeReq; -typedef struct tmqMnodeSubscribeResp { +typedef struct tmqMnodeSubscribeRsp { tmqMsgHead head; int64_t vgId; char ep[]; //TSDB_EP_LEN -} tmqSubscribeResp; +} tmqSubscribeRsp; typedef struct tmqHeartbeatReq { } tmqHeartbeatReq; -typedef struct tmqHeartbeatResp { +typedef struct tmqHeartbeatRsp { -} tmqHeartbeatResp; +} tmqHeartbeatRsp; typedef struct tqTopicVhandle { //name @@ -92,33 +107,57 @@ typedef struct tqTopicVhandle { } tqTopicVhandle; typedef struct STQ { - //the set for topics - //key=topicName: str - //value=tqTopicVhandle + //the collection of group handle - //a map - //key= - //value=consumeOffset: int64_t } STQ; #define TQ_BUFFER_SIZE 8 +//TODO: define a serializer and deserializer typedef struct tqBufferItem { int64_t offset; + //executors are identical but not concurrent + //so it must be a copy in each item void* executor; + int64_t size; void* content; } tqBufferItem; -typedef struct tqGroupHandle { - char* topic; //c style, end with '\0' - int64_t cgId; - void* ahandle; - int64_t consumeOffset; +typedef struct tqBufferHandle { + //char* topic; //c style, end with '\0' + //int64_t cgId; + //void* ahandle; + int64_t nextConsumeOffset; + int64_t topicId; int32_t head; int32_t tail; tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqBufferHandle; + +typedef struct tqListHandle { + tqBufferHandle* bufHandle; + struct tqListHandle* next; +} tqListHandle; + +typedef struct tqGroupHandle { + int64_t cId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + tqListHandle *head; } tqGroupHandle; +typedef struct tqQueryExec { + void* src; + tqBufferItem* dest; + void* executor; +} tqQueryExec; + +typedef struct tqQueryMsg { + tqQueryExec *exec; + struct tqQueryMsg *next; +} tqQueryMsg; + //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); @@ -127,20 +166,33 @@ void tqCleanUp(STQ*); int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -//void* will be replace by a msg type -int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); +int tqConsume(STQ*, tmqConsumeReq*); -tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId); +tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqMoveOffsetToNext(tqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqFetchMsg(tqGroupHandle*, void*); int tqRegisterContext(tqGroupHandle*, void*); -int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqLaunchQuery(tqGroupHandle*); int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset); +int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset); +int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset); +int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset); + +int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle); +int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle); +int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle); +int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem); + +int tqGetGHandleSSize(const tqGroupHandle *gHandle); +int tqListHandleSSize(const tqListHandle *listHandle); +int tqBufHandleSSize(const tqBufferHandle *bufHandle); +int tqBufItemSSize(const tqBufferItem *bufItem); + #ifdef __cplusplus } #endif diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 0896e7afab..d19e9ec81e 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -26,13 +26,13 @@ extern "C" { //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code -int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); +//int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle); //create ring buffer in memory and load consuming offset //int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset //int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info -int tqDropTCGroup(STQ*, const char* topic, int cgId); +//int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index e0f2fc545e..b0a1c08673 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -21,64 +21,136 @@ //send to fetch queue // //handle management message +// +static int tqProtoCheck(tmqMsgHead *pMsg) { + return pMsg->protoVer == 0; +} -tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { - //look in memory - // - //not found, try to restore from disk - // - //still not found - return NULL; +static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) { + //clean old item and move forward + int32_t consumeOffset = pAck->consumeOffset; + int idx = consumeOffset % TQ_BUFFER_SIZE; + ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor); + tfree(bhandle->buffer[idx].content); + if( 1 /* TODO: need to launch new query */) { + tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg)); + if(pNewQuery == NULL) { + //TODO: memory insufficient + return -1; + } + //TODO: lock executor + pNewQuery->exec->executor = bhandle->buffer[idx].executor; + //TODO: read from wal and assign to src + pNewQuery->exec->src = 0; + pNewQuery->exec->dest = &bhandle->buffer[idx]; + pNewQuery->next = *ppQuery; + *ppQuery = pNewQuery; + } + return 0; +} + +static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { + int32_t ackNum = pAcks->ackNum; + tmqOneAck *acks = pAcks->acks; + //double ptr for acks and list + int i = 0; + tqListHandle* node = ghandle->head; + int ackCnt = 0; + tqQueryMsg *pQuery = NULL; + while(i < ackNum && node->next) { + if(acks[i].topicId == node->next->bufHandle->topicId) { + ackCnt++; + tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery); + } else if(acks[i].topicId < node->next->bufHandle->topicId) { + i++; + } else { + node = node->next; + } + } + if(pQuery) { + //post message + } + return ackCnt; } static int tqCommitTCGroup(tqGroupHandle* handle) { - //persist into disk + //persist modification into disk return 0; } -int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) { +int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) { + //create in disk return 0; } -int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { - int code; - tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); - if(handle == NULL) { - code = tqCreateTCGroup(pTq, topic, cgId, &handle); - if(code != 0) { - return code; - } - } - ASSERT(handle != NULL); - - //put into STQ - +int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + //look up in disk + //create + //open return 0; } -/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/ - /*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/ - /*return tqCommitTCGroup(handle);*/ -/*}*/ +int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + return 0; +} -int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { +int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { //delete from disk return 0; } +static int tqFetch(tqGroupHandle* ghandle, void** msg) { + tqListHandle* head = ghandle->head; + tqListHandle* node = head; + int totSize = 0; + //TODO: make it a macro + int sizeLimit = 4 * 1024; + tmqMsgContent* buffer = malloc(sizeLimit); + if(buffer == NULL) { + //TODO:memory insufficient + return -1; + } + //iterate the list to get msgs of all topics + //until all topic iterated or msgs over sizeLimit + while(node->next) { + node = node->next; + tqBufferHandle* bufHandle = node->bufHandle; + int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; + if(bufHandle->buffer[idx].content != NULL && + bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset + ) { + totSize += bufHandle->buffer[idx].size; + if(totSize > sizeLimit) { + void *ptr = realloc(buffer, totSize); + if(ptr == NULL) { + totSize -= bufHandle->buffer[idx].size; + //TODO:memory insufficient + //return msgs already copied + break; + } + } + *((int64_t*)buffer) = bufHandle->topicId; + buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); + *((int64_t*)buffer) = bufHandle->buffer[idx].size; + buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); + memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size); + if(totSize > sizeLimit) { + break; + } + } + } + if(totSize == 0) { + //no msg + return -1; + } -int tqFetchMsg(tqGroupHandle* handle, void* msg) { - return 0; + return totSize; } -int tqMoveOffsetToNext(tqGroupHandle* handle) { - return 0; -} - - -tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) { - return NULL; -} +/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ + /*return 0;*/ +/*}*/ int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference @@ -91,10 +163,108 @@ int tqCommit(STQ* pTq) { return 0; } -int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) { - //parse msg and extract topic and cgId - //lookup handle - //confirm message and send to consumer +int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { + if(!tqProtoCheck((tmqMsgHead *)pMsg)) { + //proto version invalid + return -1; + } + int64_t clientId = pMsg->head.clientId; + tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); + if(ghandle == NULL) { + //client not connect + return -1; + } + if(pMsg->acks.ackNum != 0) { + if(tqAck(ghandle, &pMsg->acks) != 0) { + //ack not success + return -1; + } + } + + tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; + + if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) { + //fetch error + return -1; + } + //judge and launch new query + if(tqLaunchQuery(ghandle)) { + //launch query error + return -1; + } return 0; } + + +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) { + //calculate size + int sz = tqGetGHandleSSize(gHandle); + if(sz <= 0) { + //TODO: err + return -1; + } + void* ptr = realloc(*ppBytes, sz); + if(ptr == NULL) { + free(ppBytes); + //TODO: memory err + return -1; + } + *ppBytes = ptr; + //do serialize + *(int64_t*)ptr = gHandle->cId; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int64_t*)ptr = gHandle->cgId; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int32_t*)ptr = gHandle->topicNum; + if(gHandle->topicNum > 0) { + tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes); + } + return 0; +} + +int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) { + void* ptr = POINTER_SHIFT(*ppBytes, offset); + tqListHandle *node = listHandle; + while(node->next) { + node = node->next; + offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset); + } + return offset; +} +int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) { + void *ptr = POINTER_SHIFT(*ppBytes, offset); + *(int64_t*)ptr = bufHandle->nextConsumeOffset; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int64_t*)ptr = bufHandle->topicId; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int32_t*)ptr = bufHandle->head; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + *(int32_t*)ptr = bufHandle->tail; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + for(int i = 0; i < TQ_BUFFER_SIZE; i++) { + int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes); + ptr = POINTER_SHIFT(ptr, sz); + } + return ptr - *ppBytes; +} + +int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) { + void *ptr = POINTER_SHIFT(*ppBytes, offset); + //TODO: do we need serialize this? + return 0; +} + +int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) { + return 0; +} +int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) { + return 0; +} +int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) { + return 0; +} +int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { + return 0; +} + From 35869f2ed3070a70895ce87c17e8a628d27c495c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 27 Oct 2021 17:53:47 +0800 Subject: [PATCH 2/6] fix compile --- source/server/vnode/src/vnodeReadMsg.c | 12 ++++++------ source/server/vnode/tq/src/tq.c | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 21ecde3326..1835b4f558 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -225,16 +225,16 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { tmqMsgHead msgHead = pConsumeMsg->head; //extract head STQ *pTq = pVnode->pTQ; - tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId); + /*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/ //return msg if offset not moved - if(pConsumeMsg->commitOffset == pHandle->consumeOffset) { + /*if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {*/ //return msg - return 0; - } + /*return 0;*/ + /*}*/ //or move offset - tqMoveOffsetToNext(pHandle); + /*tqMoveOffsetToNext(pHandle);*/ //fetch or register context - tqFetchMsg(pHandle, pRead); + /*tqFetchMsg(pHandle, pRead);*/ //judge mode, tail read or catch up read /*int64_t lastVer = walLastVer(pVnode->wal);*/ //launch new query diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index b0a1c08673..52702057d6 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -148,6 +148,19 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { return totSize; } + +tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { + return NULL; +} + +int tqLaunchQuery(tqGroupHandle* ghandle) { + return 0; +} + +int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) { + return 0; +} + /*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ /*return 0;*/ /*}*/ @@ -268,3 +281,16 @@ int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { return 0; } + +int tqGetGHandleSSize(const tqGroupHandle *gHandle) { + return 0; +} +int tqListHandleSSize(const tqListHandle *listHandle) { + return 0; +} +int tqBufHandleSSize(const tqBufferHandle *bufHandle) { + return 0; +} +int tqBufItemSSize(const tqBufferItem *bufItem) { + return 0; +} From 4845ca7f9921e2a2638b4eba639132d7a6fe06fd Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 28 Oct 2021 09:56:20 +0800 Subject: [PATCH 3/6] [raft]refactor raft interface,add log store methods --- include/libs/sync/sync.h | 50 ++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a0602ec1b0..30583686c5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -22,7 +22,6 @@ extern "C" { #include #include "taosdef.h" -#include "wal.h" typedef int64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -41,6 +40,7 @@ typedef struct { } SSyncBuffer; typedef struct { + SyncNodeId nodeId; uint16_t nodePort; // node sync Port char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; @@ -83,11 +83,38 @@ typedef struct SSyncFSM { } SSyncFSM; +typedef struct SSyncLogStore { + void* pData; + + // write log with given index + int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); + + // mark log with given index has been commtted + int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); + + // prune log before given index + int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index); + + // rollback log after given index + int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index); +} SSyncLogStore; + typedef struct SSyncServerState { - SNodeInfo voteFor; + SyncNodeId voteFor; SSyncTerm term; } SSyncServerState; +typedef struct SSyncClusterConfig { + // Log index number of current cluster config. + SyncIndex index; + + // Log index number of previous cluster config. + SyncIndex prevIndex; + + // current cluster + const SSyncCluster* cluster; +} SSyncClusterConfig; + typedef struct SStateManager { void* pData; @@ -95,35 +122,38 @@ typedef struct SStateManager { const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); - void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster); + void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); - const SSyncCluster* (*readCluster)(struct SStateManager* stateMng); + const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); } SStateManager; typedef struct { SyncGroupId vgId; - twalh walHandle; - SyncIndex snapshotIndex; SSyncCluster syncCfg; SSyncFSM fsm; + SSyncLogStore logStore; + SStateManager stateManager; } SSyncInfo; +struct SSyncNode; +typedef struct SSyncNode SSyncNode; + int32_t syncInit(); void syncCleanUp(); -SyncNodeId syncStart(const SSyncInfo*); +SSyncNode syncStart(const SSyncInfo*); void syncStop(SyncNodeId); -int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak); +int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak); -int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode); +int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); -int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode); +int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); extern int32_t syncDebugFlag; From c50a21cb79f9642bf1a694359798c164dcf991f4 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 28 Oct 2021 10:03:52 +0800 Subject: [PATCH 4/6] merge from 3.0 From d84f44c1499642c0bc975fbd50d40cd8cb01aaec Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Oct 2021 11:36:51 +0800 Subject: [PATCH 5/6] minor changes --- include/util/taoserror.h | 8 +-- source/server/vnode/inc/vnodeInt.h | 2 - source/server/vnode/src/vnodeFile.c | 83 ++++++++++++---------------- source/server/vnode/src/vnodeInt.c | 35 ++++++++++++ source/server/vnode/src/vnodeMain.c | 36 ------------ source/server/vnode/src/vnodeWrite.c | 2 - source/util/src/terror.c | 8 +-- 7 files changed, 78 insertions(+), 96 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index cf8cd510c5..76c5f575a5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -233,11 +233,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file") #define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory") #define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode") -#define TSDB_CODE_VND_INVALID_VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid version file") -#define TSDB_CODE_VND_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) //"Database memory is full for commit failed") -#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full for waiting commit") +#define TSDB_CODE_VND_INVALID_CFG_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid config file) +#define TSDB_CODE_VND_INVALID_TERM_FILE TAOS_DEF_ERROR_CODE(0, 0x050B) //"Invalid term file") +#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full") #define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping") -#define TSDB_CODE_VND_IS_BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is balancing") +#define TSDB_CODE_VND_IS_UPDATING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is updating") #define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing") #define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended") #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied") diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index 245455ef18..d94c5ba2b5 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -77,8 +77,6 @@ typedef struct SVnodeCfg { SSyncCfg sync; } SVnodeCfg; - - typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count diff --git a/source/server/vnode/src/vnodeFile.c b/source/server/vnode/src/vnodeFile.c index 8453c985c3..9835e3e0fb 100644 --- a/source/server/vnode/src/vnodeFile.c +++ b/source/server/vnode/src/vnodeFile.c @@ -280,90 +280,77 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { return TSDB_CODE_SUCCESS; } -int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState){ -#if 0 +int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { + int32_t ret = TSDB_CODE_VND_APP_ERROR; int32_t len = 0; int32_t maxLen = 100; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; - terrno = TSDB_CODE_VND_INVALID_VRESION_FILE; - char file[TSDB_FILENAME_LEN + 30] = {0}; - sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); - - fp = fopen(file, "r"); - if (!fp) { - if (errno != ENOENT) { - vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } else { - terrno = TSDB_CODE_SUCCESS; - } - goto PARSE_VER_ERROR; - } + char file[PATH_MAX + 30] = {0}; + sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); - goto PARSE_VER_ERROR; + vError("vgId:%d, failed to read %s since content is null", vgId, file); + goto PARSE_TERM_ERROR; } root = cJSON_Parse(content); if (root == NULL) { - vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file); - goto PARSE_VER_ERROR; + vError("vgId:%d, failed to read %s since invalid json format", vgId, file); + goto PARSE_TERM_ERROR; } - cJSON *ver = cJSON_GetObjectItem(root, "version"); - if (!ver || ver->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file); - goto PARSE_VER_ERROR; + cJSON *term = cJSON_GetObjectItem(root, "term"); + if (!term || term->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since term not found", vgId, file); + goto PARSE_TERM_ERROR; } -#if 0 - pVnode->version = (uint64_t)ver->valueint; + pState->term = (uint64_t)term->valueint; - terrno = TSDB_CODE_SUCCESS; - vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version); -#endif + cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor"); + if (!voteFor || voteFor->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since voteFor not found", vgId, file); + goto PARSE_TERM_ERROR; + } + pState->voteFor = (int64_t)voteFor->valueint; -PARSE_VER_ERROR: + vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); + +PARSE_TERM_ERROR: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - return terrno; -#endif - return 0; + return ret; } -int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState) { -#if 0 - char file[TSDB_FILENAME_LEN + 30] = {0}; - sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); +int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { + char file[PATH_MAX + 30] = {0}; + sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); FILE *fp = fopen(file, "w"); if (!fp) { - vError("vgId:%d, failed to write %s, reason:%s", pVnode->vgId, file, strerror(errno)); + vError("vgId:%d, failed to write %s since %s", vgId, file, strerror(errno)); return -1; } int32_t len = 0; int32_t maxLen = 100; - char * content = calloc(1, maxLen + 1); + char *content = calloc(1, maxLen + 1); -#if 0 len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion); + len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term); + len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor); len += snprintf(content + len, maxLen - len, "}\n"); -#endif + fwrite(content, 1, len, fp); taosFsyncFile(fileno(fp)); fclose(fp); free(content); - terrno = 0; - // vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion); -#endif + vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/server/vnode/src/vnodeInt.c b/source/server/vnode/src/vnodeInt.c index 5a5ba4df01..ed295160ba 100644 --- a/source/server/vnode/src/vnodeInt.c +++ b/source/server/vnode/src/vnodeInt.c @@ -24,6 +24,7 @@ static struct { SSteps *steps; SVnodeFp fp; + void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); } tsVint; void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { @@ -36,7 +37,41 @@ void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsVint.fp.SendMsgToMnode)(rpcMsg); } +void vnodeProcessMsg(SRpcMsg *pMsg) { + if (tsVint.msgFp[pMsg->msgType]) { + (*tsVint.msgFp[pMsg->msgType])(pMsg); + } else { + assert(0); + } +} + +static void vnodeInitMsgFp() { + tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + // mq related + tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; + // mq related end + tsVint.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVint.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; +} + int32_t vnodeInit(SVnodePara para) { + vnodeInitMsgFp(); tsVint.fp = para.fp; struct SSteps *steps = taosStepInit(8, NULL); diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index bfe2df9e43..5143f04c5b 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -44,7 +44,6 @@ static struct { SHashObj *hash; int32_t openVnodes; int32_t totalVnodes; - void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); } tsVnode; static bool vnodeSetInitStatus(SVnode *pVnode) { @@ -566,34 +565,7 @@ void vnodeRelease(SVnode *pVnode) { } } -static void vnodeInitMsgFp() { - tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; - // mq related - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; - // mq related end - tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; -} - int32_t vnodeInitMain() { - vnodeInitMsgFp(); - tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVnode.hash == NULL) { vError("failed to init vnode mgmt"); @@ -654,11 +626,3 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { } } } - -void vnodeProcessMsg(SRpcMsg *pMsg) { - if (tsVnode.msgFp[pMsg->msgType]) { - (*tsVnode.msgFp[pMsg->msgType])(pMsg); - } else { - assert(0); - } -} diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index 70fa9bff80..f3258af0bf 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -15,8 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "tqueue.h" -#include "tworker.h" #include "vnodeMain.h" #include "vnodeWrite.h" #include "vnodeWriteMsg.h" diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 68cd067fb9..8e5d7a47fd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -245,11 +245,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, "No write permission f TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, "Missing data file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, "Unexpected generic error in vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, "Invalid version file") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, "Database memory is full for commit failed") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full for waiting commit") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, "Database is balancing") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") From 18697d841b9898d7e1be64f85de031637a748e3a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Oct 2021 11:46:27 +0800 Subject: [PATCH 6/6] add progress step --- include/server/dnode/dnode.h | 5 +++++ include/server/vnode/vnode.h | 5 +++++ source/server/dnode/src/dnodeInt.c | 3 ++- source/server/vnode/inc/vnodeInt.h | 1 + source/server/vnode/src/vnodeInt.c | 2 ++ 5 files changed, 15 insertions(+), 1 deletion(-) diff --git a/include/server/dnode/dnode.h b/include/server/dnode/dnode.h index 3499913afa..bc0d1e89b0 100644 --- a/include/server/dnode/dnode.h +++ b/include/server/dnode/dnode.h @@ -67,6 +67,11 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); */ void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); +/** + * Report the startup progress. + */ +void dnodeReportStartup(char *name, char *desc); + #ifdef __cplusplus } #endif diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index 00decfe338..ecb1412b06 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -46,6 +46,11 @@ typedef struct { */ void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); + /** + * Report the startup progress. + */ + void (*ReportStartup)(char *name, char *desc); + } SVnodeFp; typedef struct { diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c index d294143e57..7b0b87368e 100644 --- a/source/server/dnode/src/dnodeInt.c +++ b/source/server/dnode/src/dnodeInt.c @@ -37,7 +37,7 @@ EDnStat dnodeGetRunStat() { return tsDnode.runStatus; } void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; } -static void dnodeReportStartup(char *name, char *desc) { +void dnodeReportStartup(char *name, char *desc) { SStartupStep *startup = &tsDnode.startup; tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->desc, desc, strlen(startup->desc)); @@ -58,6 +58,7 @@ static int32_t dnodeInitVnode() { para.fp.GetDnodeEp = dnodeGetEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; + para.fp.ReportStartup = dnodeReportStartup; return vnodeInit(para); } diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index d94c5ba2b5..3c7487f681 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -112,6 +112,7 @@ typedef struct { void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); +void vnodeReportStartup(char *name, char *desc); #ifdef __cplusplus } diff --git a/source/server/vnode/src/vnodeInt.c b/source/server/vnode/src/vnodeInt.c index ed295160ba..9e1739a68e 100644 --- a/source/server/vnode/src/vnodeInt.c +++ b/source/server/vnode/src/vnodeInt.c @@ -37,6 +37,8 @@ void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsVint.fp.SendMsgToMnode)(rpcMsg); } +void vnodeReportStartup(char *name, char *desc) { (*tsVint.fp.ReportStartup)(name, desc); } + void vnodeProcessMsg(SRpcMsg *pMsg) { if (tsVint.msgFp[pMsg->msgType]) { (*tsVint.msgFp[pMsg->msgType])(pMsg);