Merge branch '3.0' of github.com:taosdata/tdengine into 3.0

This commit is contained in:
Haojun Liao 2021-10-29 11:26:04 +08:00
commit a0d8b7df5f
24 changed files with 686 additions and 411 deletions

View File

@ -721,6 +721,8 @@ typedef struct {
int32_t daysToKeep2; int32_t daysToKeep2;
int32_t minRowsPerFileBlock; int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock; int32_t maxRowsPerFileBlock;
int32_t fsyncPeriod;
int8_t reserved[16];
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t cacheLastRow; int8_t cacheLastRow;
@ -728,8 +730,7 @@ typedef struct {
int8_t walLevel; int8_t walLevel;
int8_t replica; int8_t replica;
int8_t quorum; int8_t quorum;
int8_t reserved[9]; int8_t selfIndex;
int32_t fsyncPeriod;
SVnodeDesc nodes[TSDB_MAX_REPLICA]; SVnodeDesc nodes[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg; } SCreateVnodeMsg, SAlterVnodeMsg;

View File

@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
typedef int64_t SyncNodeId; typedef int32_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm; typedef uint64_t SSyncTerm;
@ -46,36 +46,36 @@ typedef struct {
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct {
int selfIndex; int32_t selfIndex;
int nNode; int32_t replica;
SNodeInfo* nodeInfo; SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCluster; } SSyncCluster;
typedef struct { typedef struct {
int32_t selfIndex; int32_t selfIndex;
int nNode; int32_t replica;
SNodeInfo* node; SNodeInfo node[TSDB_MAX_REPLICA];
ESyncRole* role; ESyncRole role[TSDB_MAX_REPLICA];
} SNodesRole; } SNodesRole;
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* pData; void* pData;
// apply committed log, bufs will be free by raft module // apply committed log, bufs will be free by raft module
int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
// cluster commit callback // cluster commit callback
int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
// fsm return snapshot in ppBuf, bufs will be free by raft module // fsm return snapshot in ppBuf, bufs will be free by raft module
// TODO: getSnapshot SHOULD be async? // TODO: getSnapshot SHOULD be async?
int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast); int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast);
// fsm apply snapshot with pBuf data // fsm apply snapshot with pBuf data
int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast); int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast);
// call when restore snapshot and log done // call when restore snapshot and log done
int (*onRestoreDone)(struct SSyncFSM* fsm); int32_t (*onRestoreDone)(struct SSyncFSM* fsm);
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf); void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
@ -118,25 +118,21 @@ typedef struct SSyncClusterConfig {
typedef struct SStateManager { typedef struct SStateManager {
void* pData; void* pData;
void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state); int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state);
const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); // const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
} SStateManager; } SStateManager;
typedef struct { typedef struct {
SyncGroupId vgId; SyncGroupId vgId;
SyncIndex snapshotIndex; SyncIndex snapshotIndex;
SSyncCluster syncCfg; SSyncCluster syncCfg;
SSyncFSM fsm; SSyncFSM fsm;
SSyncLogStore logStore; SSyncLogStore logStore;
SStateManager stateManager; SStateManager stateManager;
} SSyncInfo; } SSyncInfo;
@ -146,14 +142,15 @@ typedef struct SSyncNode SSyncNode;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
SSyncNode syncStart(const SSyncInfo*); SSyncNode* syncStart(const SSyncInfo*);
void syncStop(SyncNodeId); void syncReconfig(const SSyncNode*, const SSyncCluster*);
void syncStop(const SSyncNode*);
int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak); int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); // int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); // int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t syncDebugFlag; extern int32_t syncDebugFlag;

View File

@ -44,41 +44,41 @@ typedef struct {
EWalType walLevel; // wal level EWalType walLevel; // wal level
} SWalCfg; } SWalCfg;
typedef void * twalh; // WAL HANDLE struct SWal;
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef struct SWal SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
//module initialization // module initialization
int32_t walInit(); int32_t walInit();
void walCleanUp(); void walCleanUp();
//handle open and ctl // handle open and ctl
twalh walOpen(char *path, SWalCfg *pCfg); SWal *walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg);
void walStop(twalh); void walClose(SWal *);
void walClose(twalh);
//write // write
//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); // int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(twalh, void* body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize); int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
//apis for lifecycle management // apis for lifecycle management
void walFsync(twalh, bool force); void walFsync(SWal *, bool force);
int32_t walCommit(twalh, int64_t ver); int32_t walCommit(SWal *, int64_t ver);
//truncate after // truncate after
int32_t walRollback(twalh, int64_t ver); int32_t walRollback(SWal *, int64_t ver);
//notify that previous log can be pruned safely // notify that previous log can be pruned safely
int32_t walPrune(twalh, int64_t ver); int32_t walPrune(SWal *, int64_t ver);
//read // read
int32_t walRead(twalh, SWalHead **, int64_t ver); int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
//lifecycle check // lifecycle check
int32_t walFirstVer(twalh); int32_t walFirstVer(SWal *);
int32_t walPersistedVer(twalh); int32_t walPersistedVer(SWal *);
int32_t walLastVer(twalh); int32_t walLastVer(SWal *);
//int32_t walDataCorrupted(twalh); // int32_t walDataCorrupted(SWal*);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -17,113 +17,114 @@
#define _TD_TQ_H_ #define _TD_TQ_H_
#include "os.h" #include "os.h"
#include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct tmqMsgHead { typedef struct TmqMsgHead {
int32_t protoVer; int32_t protoVer;
int32_t msgType; int32_t msgType;
int64_t cgId; int64_t cgId;
int64_t clientId; int64_t clientId;
} tmqMsgHead; } TmqMsgHead;
typedef struct tmqOneAck { typedef struct TmqOneAck {
int64_t topicId; int64_t topicId;
int64_t consumeOffset; int64_t consumeOffset;
} tmqOneAck; } TmqOneAck;
typedef struct tmqAcks { typedef struct TmqAcks {
int32_t ackNum; int32_t ackNum;
//should be sorted //should be sorted
tmqOneAck acks[]; TmqOneAck acks[];
} tmqAcks; } TmqAcks;
//TODO: put msgs into common //TODO: put msgs into common
typedef struct tmqConnectReq { typedef struct TmqConnectReq {
tmqMsgHead head; TmqMsgHead head;
tmqAcks acks; TmqAcks acks;
} tmqConnectReq; } TmqConnectReq;
typedef struct tmqConnectRsp { typedef struct TmqConnectRsp {
tmqMsgHead head; TmqMsgHead head;
int8_t status; int8_t status;
} tmqConnectRsp; } TmqConnectRsp;
typedef struct tmqDisconnectReq { typedef struct TmqDisconnectReq {
tmqMsgHead head; TmqMsgHead head;
} tmqDisconnectReq; } TmqDiscconectReq;
typedef struct tmqDisconnectRsp { typedef struct TmqDisconnectRsp {
tmqMsgHead head; TmqMsgHead head;
int8_t status; int8_t status;
} tmqDiconnectRsp; } TmqDisconnectRsp;
typedef struct tmqConsumeReq { typedef struct TmqConsumeReq {
tmqMsgHead head; TmqMsgHead head;
tmqAcks acks; TmqAcks acks;
} tmqConsumeReq; } TmqConsumeReq;
typedef struct tmqMsgContent { typedef struct TmqMsgContent {
int64_t topicId; int64_t topicId;
int64_t msgLen; int64_t msgLen;
char msg[]; char msg[];
} tmqMsgContent; } TmqMsgContent;
typedef struct tmqConsumeRsp { typedef struct TmqConsumeRsp {
tmqMsgHead head; TmqMsgHead head;
int64_t bodySize; int64_t bodySize;
tmqMsgContent msgs[]; TmqMsgContent msgs[];
} tmqConsumeRsp; } TmqConsumeRsp;
typedef struct tmqMnodeSubscribeReq { typedef struct TmqSubscribeReq {
tmqMsgHead head; TmqMsgHead head;
int64_t topicLen; int32_t topicNum;
char topic[]; int64_t topic[];
} tmqSubscribeReq; } TmqSubscribeReq;
typedef struct tmqMnodeSubscribeRsp { typedef struct tmqSubscribeRsp {
tmqMsgHead head; TmqMsgHead head;
int64_t vgId; int64_t vgId;
char ep[]; //TSDB_EP_LEN char ep[TSDB_EP_LEN]; //TSDB_EP_LEN
} tmqSubscribeRsp; } TmqSubscribeRsp;
typedef struct tmqHeartbeatReq { typedef struct TmqHeartbeatReq {
} tmqHeartbeatReq; } TmqHeartbeatReq;
typedef struct tmqHeartbeatRsp { typedef struct TmqHeartbeatRsp {
} tmqHeartbeatRsp; } TmqHeartbeatRsp;
typedef struct tqTopicVhandle { typedef struct TqTopicVhandle {
//name int64_t topicId;
//
//executor for filter //executor for filter
// void* filterExec;
//callback for mnode //callback for mnode
// //trigger when vnode list associated topic change
} tqTopicVhandle; void* (*mCallback)(void*, void*);
} TqTopicVhandle;
typedef struct STQ { typedef struct STQ {
//the collection of group handle //the collection of group handle
//the handle of kvstore
} STQ; } STQ;
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
//TODO: define a serializer and deserializer //TODO: define a serializer and deserializer
typedef struct tqBufferItem { typedef struct TqBufferItem {
int64_t offset; int64_t offset;
//executors are identical but not concurrent //executors are identical but not concurrent
//so it must be a copy in each item //so it must be a copy in each item
void* executor; void* executor;
int64_t size; int64_t size;
void* content; void* content;
} tqBufferItem; } TqBufferItem;
typedef struct tqBufferHandle { typedef struct TqBufferHandle {
//char* topic; //c style, end with '\0' //char* topic; //c style, end with '\0'
//int64_t cgId; //int64_t cgId;
//void* ahandle; //void* ahandle;
@ -131,32 +132,32 @@ typedef struct tqBufferHandle {
int64_t topicId; int64_t topicId;
int32_t head; int32_t head;
int32_t tail; int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE]; TqBufferItem buffer[TQ_BUFFER_SIZE];
} tqBufferHandle; } TqBufferHandle;
typedef struct tqListHandle { typedef struct TqListHandle {
tqBufferHandle* bufHandle; TqBufferHandle bufHandle;
struct tqListHandle* next; struct TqListHandle* next;
} tqListHandle; } TqListHandle;
typedef struct tqGroupHandle { typedef struct TqGroupHandle {
int64_t cId; int64_t cId;
int64_t cgId; int64_t cgId;
void* ahandle; void* ahandle;
int32_t topicNum; int32_t topicNum;
tqListHandle *head; TqListHandle *head;
} tqGroupHandle; } TqGroupHandle;
typedef struct tqQueryExec { typedef struct TqQueryExec {
void* src; void* src;
tqBufferItem* dest; TqBufferItem* dest;
void* executor; void* executor;
} tqQueryExec; } TqQueryExec;
typedef struct tqQueryMsg { typedef struct TqQueryMsg {
tqQueryExec *exec; TqQueryExec *exec;
struct tqQueryMsg *next; struct TqQueryMsg *next;
} tqQueryMsg; } TqQueryMsg;
//init in each vnode //init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
@ -166,32 +167,30 @@ void tqCleanUp(STQ*);
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int tqConsume(STQ*, tmqConsumeReq*); int tqConsume(STQ*, TmqConsumeReq*);
tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*); int tqMoveOffsetToNext(TqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqRegisterContext(tqGroupHandle*, void*); int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(tqGroupHandle*); int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); int tqSendLaunchQuery(TqGroupHandle*);
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset); int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes);
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset); void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr);
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset); void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr);
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset); void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr);
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle); const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle);
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle); const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle);
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle); const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem);
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem);
int tqGetGHandleSSize(const tqGroupHandle *gHandle); int tqGetGHandleSSize(const TqGroupHandle *gHandle);
int tqListHandleSSize(const tqListHandle *listHandle); int tqBufHandleSSize();
int tqBufHandleSSize(const tqBufferHandle *bufHandle); int tqBufItemSSize();
int tqBufItemSSize(const tqBufferItem *bufItem);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -15,5 +15,12 @@
#include "sync.h" #include "sync.h"
int32_t syncInit() {return 0;} int32_t syncInit() { return 0; }
void syncCleanUp() {} void syncCleanUp() {}
SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; }
void syncStop(const SSyncNode* pNode) {}
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}

View File

@ -4,9 +4,9 @@ target_include_directories(
wal wal
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PRIVATE "${CMAKE_SOURCE_DIR}/include/os"
) )
target_link_libraries( target_link_libraries(
os wal
PUBLIC os
) )

View File

@ -19,6 +19,18 @@ int32_t walInit() { return 0; }
void walCleanUp() {} void walCleanUp() {}
twalh walOpen(char *path, SWalCfg *pCfg) { return NULL; } SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; }
int32_t walAlter(twalh pWal, SWalCfg *pCfg) { return 0; } int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
void walClose(SWal *pWal) {}
void walFsync(SWal *pWal, bool force) {}
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {}
int32_t walCommit(SWal *pWal, int64_t ver) { return 0; }
int32_t walRollback(SWal *pWal, int64_t ver) { return 0; }
int32_t walPrune(SWal *pWal, int64_t ver) { return 0; }

View File

@ -24,7 +24,7 @@ extern "C" {
tmr_h mnodeGetTimer(); tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId(); int32_t mnodeGetDnodeId();
char *mnodeGetClusterId(); int64_t mnodeGetClusterId();
EMnStatus mnodeGetStatus(); EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);

View File

@ -202,12 +202,13 @@ static void mnodeSendTelemetryReport() {
return; return;
} }
char clusterId[TSDB_CLUSTER_ID_LEN] = {0}; int64_t clusterId = mnodeGetClusterId();
mnodeGetClusterId(clusterId); char clusterIdStr[20] = {0};
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
mnodeBeginObject(&bw); mnodeBeginObject(&bw);
mnodeAddStringField(&bw, "instanceId", clusterId); mnodeAddStringField(&bw, "instanceId", clusterIdStr);
mnodeAddIntField(&bw, "reportVersion", 1); mnodeAddIntField(&bw, "reportVersion", 1);
mnodeAddOsInfo(&bw); mnodeAddOsInfo(&bw);
mnodeAddCpuInfo(&bw); mnodeAddCpuInfo(&bw);

View File

@ -39,7 +39,7 @@
static struct { static struct {
int32_t state; int32_t state;
int32_t dnodeId; int32_t dnodeId;
char clusterId[TSDB_CLUSTER_ID_LEN]; int64_t clusterId;
tmr_h timer; tmr_h timer;
SMnodeFp fp; SMnodeFp fp;
SSteps * steps1; SSteps * steps1;
@ -50,7 +50,7 @@ tmr_h mnodeGetTimer() { return tsMint.timer; }
int32_t mnodeGetDnodeId() { return tsMint.dnodeId; } int32_t mnodeGetDnodeId() { return tsMint.dnodeId; }
char *mnodeGetClusterId() { return tsMint.clusterId; } int64_t mnodeGetClusterId() { return tsMint.clusterId; }
EMnStatus mnodeGetStatus() { return tsMint.state; } EMnStatus mnodeGetStatus() { return tsMint.state; }
@ -71,12 +71,14 @@ int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; }
static int32_t mnodeSetPara(SMnodePara para) { static int32_t mnodeSetPara(SMnodePara para) {
tsMint.fp = para.fp; tsMint.fp = para.fp;
tsMint.dnodeId = para.dnodeId; tsMint.dnodeId = para.dnodeId;
strncpy(tsMint.clusterId, para.clusterId, TSDB_CLUSTER_ID_LEN); tsMint.clusterId = para.clusterId;
if (tsMint.fp.SendMsgToDnode == NULL) return -1; if (tsMint.fp.SendMsgToDnode == NULL) return -1;
if (tsMint.fp.SendMsgToMnode == NULL) return -1; if (tsMint.fp.SendMsgToMnode == NULL) return -1;
if (tsMint.fp.SendRedirectMsg == NULL) return -1; if (tsMint.fp.SendRedirectMsg == NULL) return -1;
if (tsMint.fp.GetDnodeEp == NULL) return -1;
if (tsMint.dnodeId < 0) return -1; if (tsMint.dnodeId < 0) return -1;
if (tsMint.clusterId < 0) return -1;
return 0; return 0;
} }
@ -141,7 +143,7 @@ static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); }
static bool mnodeNeedDeploy() { static bool mnodeNeedDeploy() {
if (tsMint.dnodeId > 0) return false; if (tsMint.dnodeId > 0) return false;
if (tsMint.clusterId[0] != 0) return false; if (tsMint.clusterId > 0) return false;
if (strcmp(tsFirst, tsLocalEp) != 0) return false; if (strcmp(tsFirst, tsLocalEp) != 0) return false;
return true; return true;
} }
@ -154,7 +156,7 @@ int32_t mnodeDeploy() {
tsMint.state = MN_STATUS_INIT; tsMint.state = MN_STATUS_INIT;
} }
if (tsMint.dnodeId <= 0 || tsMint.clusterId[0] == 0) { if (tsMint.dnodeId <= 0 || tsMint.clusterId <= 0) {
mError("failed to deploy mnode since cluster not ready"); mError("failed to deploy mnode since cluster not ready");
return TSDB_CODE_MND_NOT_READY; return TSDB_CODE_MND_NOT_READY;
} }

View File

@ -23,8 +23,8 @@ extern "C" {
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg); int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg); int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState); int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState);
int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState); int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -62,19 +62,14 @@ typedef struct STsdbCfg {
typedef struct SMetaCfg { typedef struct SMetaCfg {
} SMetaCfg; } SMetaCfg;
typedef struct SSyncCluster {
int8_t replica;
int8_t quorum;
SNodeInfo nodes[TSDB_MAX_REPLICA];
} SSyncCfg;
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped; int8_t dropped;
int8_t quorum;
SWalCfg wal; SWalCfg wal;
STsdbCfg tsdb; STsdbCfg tsdb;
SMetaCfg meta; SMetaCfg meta;
SSyncCfg sync; SSyncCluster sync;
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
@ -84,9 +79,9 @@ typedef struct {
SMeta *pMeta; SMeta *pMeta;
STsdb *pTsdb; STsdb *pTsdb;
STQ *pTQ; STQ *pTQ;
twalh pWal; SWal *pWal;
void *pQuery; void *pQuery;
SyncNodeId syncNode; SSyncNode *pSync;
taos_queue pWriteQ; // write queue taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue taos_queue pFetchQ; // read fetch/cancel queue

View File

@ -64,6 +64,7 @@ SMeta *metaOpen(SMetaOpts *pMetaOpts) {
// TODO: need to figure out how to persist the START UID // TODO: need to figure out how to persist the START UID
tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID); tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID);
return pMeta;
} }
void metaClose(SMeta *pMeta) { void metaClose(SMeta *pMeta) {

View File

@ -30,149 +30,156 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) {
fp = fopen(file, "r"); fp = fopen(file, "r");
if (!fp) { if (!fp) {
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", vgId, file, strerror(errno)); vError("vgId:%d, failed to open vnode cfg file:%s to read since %s", vgId, file, strerror(errno));
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
len = (int32_t)fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", vgId, file); vError("vgId:%d, failed to read %s since content is null", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
vError("vgId:%d, failed to read %s, invalid json format", vgId, file); vError("vgId:%d, failed to read %s since invalid json format", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
cJSON *db = cJSON_GetObjectItem(root, "db"); cJSON *db = cJSON_GetObjectItem(root, "db");
if (!db || db->type != cJSON_String || db->valuestring == NULL) { if (!db || db->type != cJSON_String || db->valuestring == NULL) {
vError("vgId:%d, failed to read %s, db not found", vgId, file); vError("vgId:%d, failed to read %s since db not found", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
tstrncpy(pCfg->db, db->valuestring, sizeof(pCfg->db)); tstrncpy(pCfg->db, db->valuestring, sizeof(pCfg->db));
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) { if (!dropped || dropped->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, dropped not found", vgId, file); vError("vgId:%d, failed to read %s since dropped not found", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
pCfg->dropped = (int32_t)dropped->valueint; pCfg->dropped = (int32_t)dropped->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, cacheBlockSize not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.cacheBlockSize = (int32_t)cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, totalBlocks not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.totalBlocks = (int32_t)totalBlocks->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysPerFile not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysPerFile = (int32_t)daysPerFile->valueint;
cJSON *daysToKeep0 = cJSON_GetObjectItem(root, "daysToKeep0");
if (!daysToKeep0 || daysToKeep0->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep0 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep0 = (int32_t)daysToKeep0->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep1 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep1 = (int32_t)daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep2 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep2 = (int32_t)daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, precision not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, compression not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.compression = (int8_t)compression->valueint;
cJSON *update = cJSON_GetObjectItem(root, "update");
if (!update || update->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, update not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.update = (int8_t)update->valueint;
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.cacheLastRow = (int8_t)cacheLastRow->valueint;
cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, walLevel not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->wal.walLevel = (int8_t)walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, fsyncPeriod not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->wal.fsyncPeriod = (int32_t)fsyncPeriod->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, replica not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->sync.replica = (int8_t)replica->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) { if (!quorum || quorum->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, quorum not found", vgId, file); vError("vgId: %d, failed to read %s, quorum not found", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
pCfg->sync.quorum = (int8_t)quorum->valueint; pCfg->quorum = (int8_t)quorum->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since cacheBlockSize not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.cacheBlockSize = (int32_t)cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since totalBlocks not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.totalBlocks = (int32_t)totalBlocks->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since daysPerFile not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysPerFile = (int32_t)daysPerFile->valueint;
cJSON *daysToKeep0 = cJSON_GetObjectItem(root, "daysToKeep0");
if (!daysToKeep0 || daysToKeep0->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since daysToKeep0 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep0 = (int32_t)daysToKeep0->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since daysToKeep1 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep1 = (int32_t)daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since daysToKeep2 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep2 = (int32_t)daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since minRowsPerFileBlock not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since maxRowsPerFileBlock not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since precision not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since compression not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.compression = (int8_t)compression->valueint;
cJSON *update = cJSON_GetObjectItem(root, "update");
if (!update || update->type != cJSON_Number) {
vError("vgId: %d, failed to read %s since update not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.update = (int8_t)update->valueint;
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s since cacheLastRow not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.cacheLastRow = (int8_t)cacheLastRow->valueint;
cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since walLevel not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->wal.walLevel = (int8_t)walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since fsyncPeriod not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->wal.fsyncPeriod = (int32_t)fsyncPeriod->valueint;
cJSON *selfIndex = cJSON_GetObjectItem(root, "selfIndex");
if (!selfIndex || selfIndex->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since selfIndex not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->sync.selfIndex = selfIndex->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since replica not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->sync.replica = replica->valueint;
cJSON *nodes = cJSON_GetObjectItem(root, "nodes"); cJSON *nodes = cJSON_GetObjectItem(root, "nodes");
if (!nodes || nodes->type != cJSON_Array) { if (!nodes || nodes->type != cJSON_Array) {
@ -182,28 +189,35 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) {
int size = cJSON_GetArraySize(nodes); int size = cJSON_GetArraySize(nodes);
if (size != pCfg->sync.replica) { if (size != pCfg->sync.replica) {
vError("vgId:%d, failed to read %s, nodes size not matched", vgId, file); vError("vgId:%d, failed to read %s since nodes size not matched", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodes, i); cJSON *nodeInfo = cJSON_GetArrayItem(nodes, i);
if (nodeInfo == NULL) continue; if (nodeInfo == NULL) continue;
SNodeInfo *node = &pCfg->sync.nodes[i]; SNodeInfo *node = &pCfg->sync.nodeInfo[i];
cJSON *port = cJSON_GetObjectItem(nodeInfo, "port"); cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "id");
if (!port || port->type != cJSON_Number) { if (!nodeId || nodeId->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, port not found", vgId, file); vError("vgId:%d, failed to read %s since nodeId not found", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
node->nodePort = (uint16_t)port->valueint; node->nodeId = nodeId->valueint;
cJSON *fqdn = cJSON_GetObjectItem(nodeInfo, "fqdn"); cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "port");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { if (!nodePort || nodePort->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, fqdn not found", vgId, file); vError("vgId:%d, failed to read %s sincenodePort not found", vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
tstrncpy(node->nodeFqdn, fqdn->valuestring, TSDB_FQDN_LEN); node->nodePort = (uint16_t)nodePort->valueint;
cJSON *nodeFqdn = cJSON_GetObjectItem(nodeInfo, "fqdn");
if (!nodeFqdn || nodeFqdn->type != cJSON_String || nodeFqdn->valuestring == NULL) {
vError("vgId:%d, failed to read %s since nodeFqdn not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
tstrncpy(node->nodeFqdn, nodeFqdn->valuestring, TSDB_FQDN_LEN);
} }
ret = TSDB_CODE_SUCCESS; ret = TSDB_CODE_SUCCESS;
@ -238,6 +252,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", vgId); len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", vgId);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pCfg->db); len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pCfg->db);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pCfg->dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pCfg->dropped);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->quorum);
// tsdb // tsdb
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pCfg->tsdb.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pCfg->tsdb.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pCfg->tsdb.totalBlocks); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pCfg->tsdb.totalBlocks);
@ -255,11 +270,12 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pCfg->wal.walLevel); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pCfg->wal.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pCfg->wal.fsyncPeriod); len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pCfg->wal.fsyncPeriod);
// sync // sync
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->sync.quorum);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pCfg->sync.replica); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pCfg->sync.replica);
len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pCfg->sync.selfIndex);
len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n");
for (int32_t i = 0; i < pCfg->sync.replica; i++) { for (int32_t i = 0; i < pCfg->sync.replica; i++) {
SNodeInfo *node = &pCfg->sync.nodes[i]; SNodeInfo *node = &pCfg->sync.nodeInfo[i];
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", node->nodeId);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", node->nodePort); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", node->nodePort);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\"\n", node->nodeFqdn); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\"\n", node->nodeFqdn);
if (i < pCfg->sync.replica - 1) { if (i < pCfg->sync.replica - 1) {
@ -280,7 +296,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState) {
int32_t ret = TSDB_CODE_VND_APP_ERROR; int32_t ret = TSDB_CODE_VND_APP_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 100; int32_t maxLen = 100;
@ -289,7 +305,7 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
FILE *fp = NULL; FILE *fp = NULL;
char file[PATH_MAX + 30] = {0}; char file[PATH_MAX + 30] = {0};
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId);
len = (int32_t)fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
@ -304,20 +320,20 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
} }
cJSON *term = cJSON_GetObjectItem(root, "term"); cJSON *term = cJSON_GetObjectItem(root, "term");
if (!term || term->type != cJSON_Number) { if (!term || term->type != cJSON_String) {
vError("vgId:%d, failed to read %s since term not found", vgId, file); vError("vgId:%d, failed to read %s since term not found", vgId, file);
goto PARSE_TERM_ERROR; goto PARSE_TERM_ERROR;
} }
pState->term = (uint64_t)term->valueint; pState->term = atoll(term->valuestring);
cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor"); cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor");
if (!voteFor || voteFor->type != cJSON_Number) { if (!voteFor || voteFor->type != cJSON_String) {
vError("vgId:%d, failed to read %s since voteFor not found", vgId, file); vError("vgId:%d, failed to read %s since voteFor not found", vgId, file);
goto PARSE_TERM_ERROR; goto PARSE_TERM_ERROR;
} }
pState->voteFor = (int64_t)voteFor->valueint; pState->voteFor = atoi(voteFor->valuestring);
vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); vInfo("vgId:%d, read %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
PARSE_TERM_ERROR: PARSE_TERM_ERROR:
if (content != NULL) free(content); if (content != NULL) free(content);
@ -327,9 +343,9 @@ PARSE_TERM_ERROR:
return ret; return ret;
} }
int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { int32_t vnodeSaveState(int32_t vgId, SSyncServerState *pState) {
char file[PATH_MAX + 30] = {0}; char file[PATH_MAX + 30] = {0};
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId);
FILE *fp = fopen(file, "w"); FILE *fp = fopen(file, "w");
if (!fp) { if (!fp) {
@ -342,8 +358,8 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term); len += snprintf(content + len, maxLen - len, " \"term\": \"%" PRIu64 "\",\n", pState->term);
len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor); len += snprintf(content + len, maxLen - len, " \"voteFor\": \"%d\"\n", pState->voteFor);
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp); fwrite(content, 1, len, fp);
@ -351,6 +367,6 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
fclose(fp); fclose(fp);
free(content); free(content);
vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); vInfo("vgId:%d, write %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -108,6 +108,11 @@ static void vnodeDestroyVnode(SVnode *pVnode) {
int32_t code = 0; int32_t code = 0;
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
if (pVnode->pSync != NULL) {
syncStop(pVnode->pSync);
pVnode->pSync = NULL;
}
if (pVnode->pQuery) { if (pVnode->pQuery) {
// todo // todo
} }
@ -125,7 +130,8 @@ static void vnodeDestroyVnode(SVnode *pVnode) {
} }
if (pVnode->pWal) { if (pVnode->pWal) {
// todo walClose(pVnode->pWal);
pVnode->pWal = NULL;
} }
if (pVnode->allocator) { if (pVnode->allocator) {
@ -161,6 +167,56 @@ static void vnodeCleanupVnode(SVnode *pVnode) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static inline int32_t vnodeLogWrite(struct SSyncLogStore *logStore, SyncIndex index, SSyncBuffer *pBuf) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walWrite(pVnode->pWal, index, pBuf->data, (int32_t)pBuf->len);
}
static inline int32_t vnodeLogCommit(struct SSyncLogStore *logStore, SyncIndex index) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walCommit(pVnode->pWal, index);
}
static inline int32_t vnodeLogPrune(struct SSyncLogStore *logStore, SyncIndex index) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walPrune(pVnode->pWal, index);
}
static inline int32_t vnodeLogRollback(struct SSyncLogStore *logStore, SyncIndex index) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walRollback(pVnode->pWal, index);
}
static inline int32_t vnodeSaveServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
SVnode *pVnode = stateMng->pData;
return vnodeSaveState(pVnode->vgId, pState);
}
static inline int32_t vnodeReadServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
SVnode *pVnode = stateMng->pData;
return vnodeSaveState(pVnode->vgId, pState);
}
static inline int32_t vnodeApplyLog(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
return 0;
}
static inline int32_t vnodeOnClusterChanged(struct SSyncFSM *fsm, const SSyncCluster *cluster, void *pData) { return 0; }
static inline int32_t vnodeGetSnapshot(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int32_t *objId, bool *isLast) {
return 0;
}
static inline int32_t vnodeApplySnapshot(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int32_t objId, bool isLast) {
return 0;
}
static inline int32_t vnodeOnRestoreDone(struct SSyncFSM *fsm) { return 0; }
static inline void vnodeOnRollback(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf) {}
static inline void vnodeOnRoleChanged(struct SSyncFSM *fsm, const SNodesRole *pRole) {}
static int32_t vnodeOpenVnode(int32_t vgId) { static int32_t vnodeOpenVnode(int32_t vgId) {
int32_t code = 0; int32_t code = 0;
@ -177,6 +233,9 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
pVnode->role = TAOS_SYNC_ROLE_CANDIDATE; pVnode->role = TAOS_SYNC_ROLE_CANDIDATE;
pthread_mutex_init(&pVnode->statusMutex, NULL); pthread_mutex_init(&pVnode->statusMutex, NULL);
vDebug("vgId:%d, vnode is opened", pVnode->vgId);
taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));
code = vnodeReadCfg(vgId, &pVnode->cfg); code = vnodeReadCfg(vgId, &pVnode->cfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId); vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId);
@ -185,7 +244,7 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
return 0; return 0;
} }
code = vnodeReadTerm(vgId, &pVnode->term); code = vnodeSaveState(vgId, &pVnode->term);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code)); vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
pVnode->cfg.dropped = 1; pVnode->cfg.dropped = 1;
@ -209,8 +268,33 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
return terrno; return terrno;
} }
vDebug("vgId:%d, vnode is opened", pVnode->vgId); // create sync node
taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *)); SSyncInfo syncInfo = {0};
syncInfo.vgId = vgId;
syncInfo.snapshotIndex = 0; // todo, from tsdb
memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster));
syncInfo.fsm.pData = pVnode;
syncInfo.fsm.applyLog = vnodeApplyLog;
syncInfo.fsm.onClusterChanged = vnodeOnClusterChanged;
syncInfo.fsm.getSnapshot = vnodeGetSnapshot;
syncInfo.fsm.applySnapshot = vnodeApplySnapshot;
syncInfo.fsm.onRestoreDone = vnodeOnRestoreDone;
syncInfo.fsm.onRollback = vnodeOnRollback;
syncInfo.fsm.onRoleChanged = vnodeOnRoleChanged;
syncInfo.logStore.pData = pVnode;
syncInfo.logStore.logWrite = vnodeLogWrite;
syncInfo.logStore.logCommit = vnodeLogCommit;
syncInfo.logStore.logPrune = vnodeLogPrune;
syncInfo.logStore.logRollback = vnodeLogRollback;
syncInfo.stateManager.pData = pVnode;
syncInfo.stateManager.saveServerState = vnodeSaveServerState;
syncInfo.stateManager.readServerState = vnodeReadServerState;
pVnode->pSync = syncStart(&syncInfo);
if (pVnode->pSync == NULL) {
vnodeCleanupVnode(pVnode);
return terrno;
}
vnodeSetReadyStatus(pVnode); vnodeSetReadyStatus(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -313,7 +397,7 @@ int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) {
} }
if (syncChanged) { if (syncChanged) {
// todo syncReconfig(pVnode->pSync, &pVnode->cfg.sync);
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);

View File

@ -31,6 +31,7 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf
*vgId = htonl(pCreate->vgId); *vgId = htonl(pCreate->vgId);
pCfg->dropped = 0; pCfg->dropped = 0;
pCfg->quorum = pCreate->quorum;
tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db)); tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db));
pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize); pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize);
@ -50,11 +51,11 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf
pCfg->wal.walLevel = pCreate->walLevel; pCfg->wal.walLevel = pCreate->walLevel;
pCfg->sync.replica = pCreate->replica; pCfg->sync.replica = pCreate->replica;
pCfg->sync.quorum = pCreate->quorum; pCfg->sync.selfIndex = pCreate->selfIndex;
for (int32_t j = 0; j < pCreate->replica; ++j) { for (int32_t j = 0; j < pCreate->replica; ++j) {
pCfg->sync.nodes[j].nodePort = htons(pCreate->nodes[j].port); pCfg->sync.nodeInfo[j].nodePort = htons(pCreate->nodes[j].port);
tstrncpy(pCfg->sync.nodes[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN); tstrncpy(pCfg->sync.nodeInfo[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN);
} }
return 0; return 0;

View File

@ -217,8 +217,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
//parse message and optionally move offset //parse message and optionally move offset
void* pMsg = pRead->pCont; void* pMsg = pRead->pCont;
tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; TmqConsumeReq *pConsumeMsg = (TmqConsumeReq*) pMsg;
tmqMsgHead msgHead = pConsumeMsg->head; TmqMsgHead msgHead = pConsumeMsg->head;
//extract head //extract head
STQ *pTq = pVnode->pTQ; STQ *pTq = pVnode->pTQ;
/*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/ /*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/

View File

@ -3,11 +3,12 @@ add_library(tq ${TQ_SRC})
target_include_directories( target_include_directories(
tq tq
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PRIVATE "${CMAKE_SOURCE_DIR}/include/os"
) )
target_link_libraries( target_link_libraries(
wal tq
PUBLIC wal
PUBLIC os
PUBLIC util
) )

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -18,7 +18,6 @@
#include "tq.h" #include "tq.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TQ_META_STORE_H_
#define _TQ_META_STORE_H_
#include "os.h"
#define TQ_INUSE_SIZE 0xFF
#define TQ_PAGE_SIZE 4096
#ifdef __cplusplus
extern "C" {
#endif
typedef struct TqMetaHandle {
int64_t key;
int64_t offset;
void *valueInUse;
void *valueInTxn;
} TqMetaHandle;
typedef struct TqMetaList {
TqMetaHandle handle;
struct TqMetaList* next;
struct TqMetaList* inTxnPrev;
struct TqMetaList* inTxnNext;
struct TqMetaList* unpersistPrev;
struct TqMetaList* unpersistNext;
} TqMetaList;
typedef struct TqMetaStore {
TqMetaList* inUse[TQ_INUSE_SIZE];
TqMetaList* unpersistHead;
//deserializer
//serializer
//deleter
} TqMetaStore;
typedef struct TqMetaPageBuf {
int16_t offset;
char buffer[TQ_PAGE_SIZE];
} TqMetaPageBuf;
TqMetaStore* TqStoreOpen(const char* path, void* serializer(void* ), void* deserializer(void*));
int32_t TqStoreClose(TqMetaStore*);
int32_t TqStoreDelete(TqMetaStore*);
int32_t TqStoreCommitAll(TqMetaStore*);
int32_t TqStorePersist(TqMetaStore*);
TqMetaHandle* TqHandleGetInUse(TqMetaStore*, int64_t key);
int32_t TqHandlePutInUse(TqMetaStore*, TqMetaHandle* handle);
TqMetaHandle* TqHandleGetInTxn(TqMetaStore*, int64_t key);
int32_t TqHandlePutInTxn(TqMetaStore*, TqMetaHandle* handle);
//delete in-use-handle, make in-txn-handle in use
int32_t TqHandleCommit(TqMetaStore*, int64_t key);
//delete in-txn-handle
int32_t TqHandleAbort(TqMetaStore*, int64_t key);
//delete in-use-handle
int32_t TqHandleDel(TqMetaStore*, int64_t key);
//delete in-use-handle and in-txn-handle
int32_t TqHandleClear(TqMetaStore*, int64_t key);
#ifdef __cplusplus
}
#endif
#endif /* ifndef _TQ_META_STORE_H_ */

View File

@ -22,18 +22,18 @@
// //
//handle management message //handle management message
// //
static int tqProtoCheck(tmqMsgHead *pMsg) { static int tqProtoCheck(TmqMsgHead *pMsg) {
return pMsg->protoVer == 0; return pMsg->protoVer == 0;
} }
static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) { static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) {
//clean old item and move forward //clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset; int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE; int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor); ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor);
tfree(bhandle->buffer[idx].content); tfree(bhandle->buffer[idx].content);
if( 1 /* TODO: need to launch new query */) { if( 1 /* TODO: need to launch new query */) {
tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg)); TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg));
if(pNewQuery == NULL) { if(pNewQuery == NULL) {
//TODO: memory insufficient //TODO: memory insufficient
return -1; return -1;
@ -49,19 +49,19 @@ static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg**
return 0; return 0;
} }
static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { static int tqAck(TqGroupHandle* ghandle, TmqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum; int32_t ackNum = pAcks->ackNum;
tmqOneAck *acks = pAcks->acks; TmqOneAck *acks = pAcks->acks;
//double ptr for acks and list //double ptr for acks and list
int i = 0; int i = 0;
tqListHandle* node = ghandle->head; TqListHandle* node = ghandle->head;
int ackCnt = 0; int ackCnt = 0;
tqQueryMsg *pQuery = NULL; TqQueryMsg *pQuery = NULL;
while(i < ackNum && node->next) { while(i < ackNum && node->next) {
if(acks[i].topicId == node->next->bufHandle->topicId) { if(acks[i].topicId == node->next->bufHandle.topicId) {
ackCnt++; ackCnt++;
tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery); tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
} else if(acks[i].topicId < node->next->bufHandle->topicId) { } else if(acks[i].topicId < node->next->bufHandle.topicId) {
i++; i++;
} else { } else {
node = node->next; node = node->next;
@ -73,12 +73,12 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) {
return ackCnt; return ackCnt;
} }
static int tqCommitTCGroup(tqGroupHandle* handle) { static int tqCommitTCGroup(TqGroupHandle* handle) {
//persist modification into disk //persist modification into disk
return 0; return 0;
} }
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) { int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
//create in disk //create in disk
return 0; return 0;
} }
@ -99,13 +99,13 @@ int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return 0; return 0;
} }
static int tqFetch(tqGroupHandle* ghandle, void** msg) { static int tqFetch(TqGroupHandle* ghandle, void** msg) {
tqListHandle* head = ghandle->head; TqListHandle* head = ghandle->head;
tqListHandle* node = head; TqListHandle* node = head;
int totSize = 0; int totSize = 0;
//TODO: make it a macro //TODO: make it a macro
int sizeLimit = 4 * 1024; int sizeLimit = 4 * 1024;
tmqMsgContent* buffer = malloc(sizeLimit); TmqMsgContent* buffer = malloc(sizeLimit);
if(buffer == NULL) { if(buffer == NULL) {
//TODO:memory insufficient //TODO:memory insufficient
return -1; return -1;
@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) {
//until all topic iterated or msgs over sizeLimit //until all topic iterated or msgs over sizeLimit
while(node->next) { while(node->next) {
node = node->next; node = node->next;
tqBufferHandle* bufHandle = node->bufHandle; TqBufferHandle* bufHandle = &node->bufHandle;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if(bufHandle->buffer[idx].content != NULL && if(bufHandle->buffer[idx].content != NULL &&
bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
@ -140,28 +140,23 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) {
} }
} }
} }
if(totSize == 0) {
//no msg
return -1;
}
return totSize; return totSize;
} }
tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
return NULL; return NULL;
} }
int tqLaunchQuery(tqGroupHandle* ghandle) { int tqLaunchQuery(TqGroupHandle* ghandle) {
return 0; return 0;
} }
int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) { int tqSendLaunchQuery(TqGroupHandle* gHandle) {
return 0; return 0;
} }
/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ /*int tqMoveOffsetToNext(TqGroupHandle* ghandle) {*/
/*return 0;*/ /*return 0;*/
/*}*/ /*}*/
@ -176,13 +171,13 @@ int tqCommit(STQ* pTq) {
return 0; return 0;
} }
int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
if(!tqProtoCheck((tmqMsgHead *)pMsg)) { if(!tqProtoCheck((TmqMsgHead *)pMsg)) {
//proto version invalid //proto version invalid
return -1; return -1;
} }
int64_t clientId = pMsg->head.clientId; int64_t clientId = pMsg->head.clientId;
tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); TqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId);
if(ghandle == NULL) { if(ghandle == NULL) {
//client not connect //client not connect
return -1; return -1;
@ -194,9 +189,9 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
} }
} }
tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg;
if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) { if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) {
//fetch error //fetch error
return -1; return -1;
} }
@ -209,14 +204,9 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
return 0; return 0;
} }
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) {
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) {
//calculate size //calculate size
int sz = tqGetGHandleSSize(gHandle); int sz = tqGetGHandleSSize(gHandle);
if(sz <= 0) {
//TODO: err
return -1;
}
void* ptr = realloc(*ppBytes, sz); void* ptr = realloc(*ppBytes, sz);
if(ptr == NULL) { if(ptr == NULL) {
free(ppBytes); free(ppBytes);
@ -224,29 +214,30 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offse
return -1; return -1;
} }
*ppBytes = ptr; *ppBytes = ptr;
//do serialize //do serialization
*(int64_t*)ptr = gHandle->cId; *(int64_t*)ptr = gHandle->cId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = gHandle->cgId; *(int64_t*)ptr = gHandle->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = gHandle->topicNum; *(int32_t*)ptr = gHandle->topicNum;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
if(gHandle->topicNum > 0) { if(gHandle->topicNum > 0) {
tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes); tqSerializeListHandle(gHandle->head, ptr);
} }
return 0; return 0;
} }
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) { void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
void* ptr = POINTER_SHIFT(*ppBytes, offset); TqListHandle *node = listHandle;
tqListHandle *node = listHandle; ASSERT(node != NULL);
while(node->next) { while(node) {
ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
node = node->next; node = node->next;
offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset);
} }
return offset; return ptr;
} }
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset); void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
*(int64_t*)ptr = bufHandle->nextConsumeOffset; *(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId; *(int64_t*)ptr = bufHandle->topicId;
@ -256,41 +247,87 @@ int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offs
*(int32_t*)ptr = bufHandle->tail; *(int32_t*)ptr = bufHandle->tail;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for(int i = 0; i < TQ_BUFFER_SIZE; i++) { for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes); ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
ptr = POINTER_SHIFT(ptr, sz);
} }
return ptr - *ppBytes; return ptr;
} }
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) { void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
//TODO: do we need serialize this? //TODO: do we need serialize this?
return 0; //mainly for executor
return ptr;
} }
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) { const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) {
return 0; const void* ptr = pBytes;
} gHandle->cId = *(int64_t*)ptr;
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) { ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
return 0; gHandle->cgId = *(int64_t*)ptr;
} ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) { gHandle->ahandle = NULL;
return 0; gHandle->topicNum = *(int32_t*)ptr;
} ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { gHandle->head = NULL;
return 0; TqListHandle *node = gHandle->head;
for(int i = 0; i < gHandle->topicNum; i++) {
if(gHandle->head == NULL) {
if((node = malloc(sizeof(TqListHandle))) == NULL) {
//TODO: error
return NULL;
}
node->next= NULL;
ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
gHandle->head = node;
} else {
node->next = malloc(sizeof(TqListHandle));
if(node->next == NULL) {
//TODO: error
return NULL;
}
node->next->next = NULL;
ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle);
node = node->next;
}
}
return ptr;
} }
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) {
const void* ptr = pBytes;
bufHandle->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
bufHandle->topicId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
bufHandle->head = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
bufHandle->tail = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
}
return ptr;
}
int tqGetGHandleSSize(const tqGroupHandle *gHandle) { const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
return 0; return pBytes;
} }
int tqListHandleSSize(const tqListHandle *listHandle) {
return 0; //TODO: make this a macro
} int tqGetGHandleSSize(const TqGroupHandle *gHandle) {
int tqBufHandleSSize(const tqBufferHandle *bufHandle) { return sizeof(int64_t) * 2
return 0; + sizeof(int32_t)
} + gHandle->topicNum * tqBufHandleSSize();
int tqBufItemSSize(const tqBufferItem *bufItem) { }
//TODO: make this a macro
int tqBufHandleSSize() {
return sizeof(int64_t) * 2
+ sizeof(int32_t) * 2
+ TQ_BUFFER_SIZE * tqBufItemSSize();
}
int tqBufItemSSize() {
//TODO: do this need serialization?
//mainly for executor
return 0; return 0;
} }

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/