Merge remote-tracking branch 'origin/3.0' into feature/qnode
This commit is contained in:
commit
6adae940f4
|
@ -193,6 +193,9 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
|
||||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
|
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
|
||||||
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
|
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
|
||||||
|
|
||||||
|
|
||||||
|
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1076,6 +1076,27 @@ typedef struct STaskDropRsp {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
} STaskDropRsp;
|
} STaskDropRsp;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t igExists;
|
||||||
|
char* name;
|
||||||
|
char* phyPlan;
|
||||||
|
} SCMCreateTopicReq;
|
||||||
|
|
||||||
|
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
|
||||||
|
int tlen = 0;
|
||||||
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
||||||
|
tlen += taosEncodeString(buf, pReq->phyPlan);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) {
|
||||||
|
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->name));
|
||||||
|
buf = taosDecodeString(buf, &(pReq->phyPlan));
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
|
|
|
@ -49,10 +49,11 @@ typedef struct {
|
||||||
/**
|
/**
|
||||||
* @brief Start one Bnode in Dnode.
|
* @brief Start one Bnode in Dnode.
|
||||||
*
|
*
|
||||||
|
* @param path Path of the bnode.
|
||||||
* @param pOption Option of the bnode.
|
* @param pOption Option of the bnode.
|
||||||
* @return SBnode* The bnode object.
|
* @return SBnode* The bnode object.
|
||||||
*/
|
*/
|
||||||
SBnode *bndOpen(const SBnodeOpt *pOption);
|
SBnode *bndOpen(const char *path, const SBnodeOpt *pOption);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Stop Bnode in Dnode.
|
* @brief Stop Bnode in Dnode.
|
||||||
|
@ -79,6 +80,13 @@ int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad);
|
||||||
*/
|
*/
|
||||||
int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs);
|
int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Drop a bnode.
|
||||||
|
*
|
||||||
|
* @param path Path of the bnode.
|
||||||
|
*/
|
||||||
|
void bndDestroy(const char *path);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -147,28 +147,12 @@ void mndCleanupMsg(SMnodeMsg *pMsg);
|
||||||
void mndSendRsp(SMnodeMsg *pMsg, int32_t code);
|
void mndSendRsp(SMnodeMsg *pMsg, int32_t code);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Process the read request.
|
* @brief Process the read, write, sync request.
|
||||||
*
|
*
|
||||||
* @param pMsg The request msg.
|
* @param pMsg The request msg.
|
||||||
* @return int32_t 0 for success, -1 for failure.
|
* @return int32_t 0 for success, -1 for failure.
|
||||||
*/
|
*/
|
||||||
void mndProcessReadMsg(SMnodeMsg *pMsg);
|
void mndProcessMsg(SMnodeMsg *pMsg);
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Process the write request.
|
|
||||||
*
|
|
||||||
* @param pMsg The request msg.
|
|
||||||
* @return int32_t 0 for success, -1 for failure.
|
|
||||||
*/
|
|
||||||
void mndProcessWriteMsg(SMnodeMsg *pMsg);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Process the sync request.
|
|
||||||
*
|
|
||||||
* @param pMsg The request msg.
|
|
||||||
* @return int32_t 0 for success, -1 for failure.
|
|
||||||
*/
|
|
||||||
void mndProcessSyncMsg(SMnodeMsg *pMsg);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,10 +49,11 @@ typedef struct {
|
||||||
/**
|
/**
|
||||||
* @brief Start one Snode in Dnode.
|
* @brief Start one Snode in Dnode.
|
||||||
*
|
*
|
||||||
|
* @param path Path of the snode.
|
||||||
* @param pOption Option of the snode.
|
* @param pOption Option of the snode.
|
||||||
* @return SSnode* The snode object.
|
* @return SSnode* The snode object.
|
||||||
*/
|
*/
|
||||||
SSnode *sndOpen(const SSnodeOpt *pOption);
|
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Stop Snode in Dnode.
|
* @brief Stop Snode in Dnode.
|
||||||
|
@ -78,7 +79,14 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
|
||||||
* @param pRsp The response message
|
* @param pRsp The response message
|
||||||
* @return int32_t 0 for success, -1 for failure
|
* @return int32_t 0 for success, -1 for failure
|
||||||
*/
|
*/
|
||||||
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Drop a snode.
|
||||||
|
*
|
||||||
|
* @param path Path of the snode.
|
||||||
|
*/
|
||||||
|
void sndDestroy(const char *path);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -251,6 +251,8 @@ typedef struct STqMetaStore {
|
||||||
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||||
// a table head
|
// a table head
|
||||||
STqMetaList* unpersistHead;
|
STqMetaList* unpersistHead;
|
||||||
|
// topics that are not connectted
|
||||||
|
STqMetaList* unconnectTopic;
|
||||||
|
|
||||||
// TODO:temporaral use, to be replaced by unified tfile
|
// TODO:temporaral use, to be replaced by unified tfile
|
||||||
int fileFd;
|
int fileFd;
|
||||||
|
|
|
@ -43,7 +43,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
|
||||||
|
|
||||||
bool qIsDdlQuery(const SQueryNode* pQuery);
|
bool qIsDdlQuery(const SQueryNode* pQuery);
|
||||||
|
|
||||||
void qDestoryQuery(SQueryNode* pQuery);
|
void qDestroyQuery(SQueryNode* pQuery);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
||||||
|
|
|
@ -230,6 +230,57 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
||||||
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
|
||||||
|
STscObj* pTscObj = (STscObj*)taos;
|
||||||
|
SRequestObj* pRequest = NULL;
|
||||||
|
SQueryNode* pQuery = NULL;
|
||||||
|
SQueryDag* pDag = NULL;
|
||||||
|
char *dagStr = NULL;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||||
|
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
||||||
|
//TODO: check sql valid
|
||||||
|
|
||||||
|
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
|
||||||
|
|
||||||
|
dagStr = qDagToString(pDag);
|
||||||
|
if(dagStr == NULL) {
|
||||||
|
//TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
SCMCreateTopicReq req = {
|
||||||
|
.name = (char*)name,
|
||||||
|
.igExists = 0,
|
||||||
|
.phyPlan = dagStr,
|
||||||
|
};
|
||||||
|
|
||||||
|
void* buf = NULL;
|
||||||
|
int tlen = tSerializeSCMCreateTopicReq(&buf, &req);
|
||||||
|
|
||||||
|
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||||
|
|
||||||
|
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||||
|
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||||
|
|
||||||
|
int64_t transporterId = 0;
|
||||||
|
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
|
||||||
|
|
||||||
|
tsem_wait(&pRequest->body.rspSem);
|
||||||
|
|
||||||
|
destroySendMsgInfo(body);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
qDestroyQueryDag(pDag);
|
||||||
|
destroySendMsgInfo(body);
|
||||||
|
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
pRequest->code = terrno;
|
||||||
|
}
|
||||||
|
return pRequest;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
STscObj *pTscObj = (STscObj *)taos;
|
STscObj *pTscObj = (STscObj *)taos;
|
||||||
if (sqlLen > (size_t) tsMaxSQLStringLen) {
|
if (sqlLen > (size_t) tsMaxSQLStringLen) {
|
||||||
|
@ -257,7 +308,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
qDestoryQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
qDestroyQueryDag(pDag);
|
qDestroyQueryDag(pDag);
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
|
|
|
@ -399,6 +399,41 @@ TEST(testCase, drop_stable_Test) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(testCase, create_topic_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "use abc1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in create stable, reason:%s\n", taos_errstr(pRes));
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
|
ASSERT_TRUE(pFields == NULL);
|
||||||
|
|
||||||
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
ASSERT_EQ(numOfFields, 0);
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
char* sql = "select * from st1";
|
||||||
|
tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
//TEST(testCase, show_table_Test) {
|
//TEST(testCase, show_table_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
// assert(pConn != NULL);
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "bndInt.h"
|
#include "bndInt.h"
|
||||||
|
|
||||||
SBnode *bndOpen(const SBnodeOpt *pOption) {
|
SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) {
|
||||||
SBnode *pBnode = calloc(1, sizeof(SBnode));
|
SBnode *pBnode = calloc(1, sizeof(SBnode));
|
||||||
return pBnode;
|
return pBnode;
|
||||||
}
|
}
|
||||||
|
@ -25,3 +25,5 @@ void bndClose(SBnode *pBnode) { free(pBnode); }
|
||||||
int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad) { return 0; }
|
int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad) { return 0; }
|
||||||
|
|
||||||
int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs) { return 0; }
|
int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs) { return 0; }
|
||||||
|
|
||||||
|
void bndDestroy(const char *path) {}
|
||||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
int32_t dndInitBnode(SDnode *pDnode);
|
int32_t dndInitBnode(SDnode *pDnode);
|
||||||
void dndCleanupBnode(SDnode *pDnode);
|
void dndCleanupBnode(SDnode *pDnode);
|
||||||
|
|
||||||
ioid dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
||||||
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
||||||
|
|
||||||
|
|
|
@ -54,18 +54,21 @@ extern int32_t dDebugFlag;
|
||||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
|
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
|
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
|
||||||
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType;
|
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
|
||||||
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
|
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
EDndWorkerType type;
|
EWorkerType type;
|
||||||
const char *name;
|
const char *name;
|
||||||
int32_t minNum;
|
int32_t minNum;
|
||||||
int32_t maxNum;
|
int32_t maxNum;
|
||||||
FProcessItem fp;
|
void *queueFp;
|
||||||
SDnode *pDnode;
|
SDnode *pDnode;
|
||||||
taos_queue queue;
|
taos_queue queue;
|
||||||
SWorkerPool pool;
|
union {
|
||||||
|
SWorkerPool pool;
|
||||||
|
SMWorkerPool mpool;
|
||||||
|
};
|
||||||
} SDnodeWorker;
|
} SDnodeWorker;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -94,21 +97,17 @@ typedef struct {
|
||||||
} SDnodeMgmt;
|
} SDnodeMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int8_t deployed;
|
int8_t deployed;
|
||||||
int8_t dropped;
|
int8_t dropped;
|
||||||
int8_t replica;
|
SMnode *pMnode;
|
||||||
int8_t selfIndex;
|
SRWLatch latch;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SDnodeWorker readWorker;
|
||||||
char *file;
|
SDnodeWorker writeWorker;
|
||||||
SMnode *pMnode;
|
SDnodeWorker syncWorker;
|
||||||
SRWLatch latch;
|
int8_t replica;
|
||||||
taos_queue pReadQ;
|
int8_t selfIndex;
|
||||||
taos_queue pWriteQ;
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
taos_queue pSyncQ;
|
|
||||||
SWorkerPool readPool;
|
|
||||||
SWorkerPool writePool;
|
|
||||||
SWorkerPool syncPool;
|
|
||||||
} SMnodeMgmt;
|
} SMnodeMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -122,25 +121,21 @@ typedef struct {
|
||||||
} SQnodeMgmt;
|
} SQnodeMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int8_t deployed;
|
int8_t deployed;
|
||||||
int8_t dropped;
|
int8_t dropped;
|
||||||
char *file;
|
SSnode *pSnode;
|
||||||
SSnode *pSnode;
|
SRWLatch latch;
|
||||||
SRWLatch latch;
|
SDnodeWorker writeWorker;
|
||||||
taos_queue pWriteQ;
|
|
||||||
SWorkerPool writePool;
|
|
||||||
} SSnodeMgmt;
|
} SSnodeMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int8_t deployed;
|
int8_t deployed;
|
||||||
int8_t dropped;
|
int8_t dropped;
|
||||||
char *file;
|
|
||||||
SBnode *pBnode;
|
SBnode *pBnode;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
taos_queue pWriteQ;
|
SDnodeWorker writeWorker;
|
||||||
SMWorkerPool writePool;
|
|
||||||
} SBnodeMgmt;
|
} SBnodeMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -21,8 +21,8 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
#include "dndInt.h"
|
#include "dndInt.h"
|
||||||
|
|
||||||
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum,
|
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
|
||||||
int32_t maxNum, FProcessItem fp);
|
int32_t maxNum, void *queueFp);
|
||||||
void dndCleanupWorker(SDnodeWorker *pWorker);
|
void dndCleanupWorker(SDnodeWorker *pWorker);
|
||||||
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
|
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,379 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http:www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "dndBnode.h"
|
||||||
|
#include "dndDnode.h"
|
||||||
|
#include "dndTransport.h"
|
||||||
|
#include "dndWorker.h"
|
||||||
|
|
||||||
|
static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs);
|
||||||
|
|
||||||
|
static SBnode *dndAcquireBnode(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
SBnode *pBnode = NULL;
|
||||||
|
int32_t refCount = 0;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
if (pMgmt->deployed && !pMgmt->dropped) {
|
||||||
|
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
||||||
|
pBnode = pMgmt->pBnode;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (pBnode != NULL) {
|
||||||
|
dTrace("acquire bnode, refCount:%d", refCount);
|
||||||
|
}
|
||||||
|
return pBnode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
int32_t refCount = 0;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
if (pBnode != NULL) {
|
||||||
|
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (pBnode != NULL) {
|
||||||
|
dTrace("release bnode, refCount:%d", refCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndReadBnodeFile(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t maxLen = 4096;
|
||||||
|
char *content = calloc(1, maxLen + 1);
|
||||||
|
cJSON *root = NULL;
|
||||||
|
|
||||||
|
char file[PATH_MAX + 20];
|
||||||
|
snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
FILE *fp = fopen(file, "r");
|
||||||
|
if (fp == NULL) {
|
||||||
|
dDebug("file %s not exist", file);
|
||||||
|
code = 0;
|
||||||
|
goto PRASE_BNODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
len = (int32_t)fread(content, 1, maxLen, fp);
|
||||||
|
if (len <= 0) {
|
||||||
|
dError("failed to read %s since content is null", file);
|
||||||
|
goto PRASE_BNODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
content[len] = 0;
|
||||||
|
root = cJSON_Parse(content);
|
||||||
|
if (root == NULL) {
|
||||||
|
dError("failed to read %s since invalid json format", file);
|
||||||
|
goto PRASE_BNODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
||||||
|
if (!deployed || deployed->type != cJSON_Number) {
|
||||||
|
dError("failed to read %s since deployed not found", file);
|
||||||
|
goto PRASE_BNODE_OVER;
|
||||||
|
}
|
||||||
|
pMgmt->deployed = deployed->valueint;
|
||||||
|
|
||||||
|
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||||
|
if (!dropped || dropped->type != cJSON_Number) {
|
||||||
|
dError("failed to read %s since dropped not found", file);
|
||||||
|
goto PRASE_BNODE_OVER;
|
||||||
|
}
|
||||||
|
pMgmt->dropped = dropped->valueint;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
||||||
|
|
||||||
|
PRASE_BNODE_OVER:
|
||||||
|
if (content != NULL) free(content);
|
||||||
|
if (root != NULL) cJSON_Delete(root);
|
||||||
|
if (fp != NULL) fclose(fp);
|
||||||
|
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndWriteBnodeFile(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
|
||||||
|
char file[PATH_MAX + 20];
|
||||||
|
snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
FILE *fp = fopen(file, "w");
|
||||||
|
if (fp == NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
|
||||||
|
dError("failed to write %s since %s", file, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t maxLen = 4096;
|
||||||
|
char *content = calloc(1, maxLen + 1);
|
||||||
|
|
||||||
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
|
||||||
|
len += snprintf(content + len, maxLen - len, "}\n");
|
||||||
|
|
||||||
|
fwrite(content, 1, len, fp);
|
||||||
|
taosFsyncFile(fileno(fp));
|
||||||
|
fclose(fp);
|
||||||
|
free(content);
|
||||||
|
|
||||||
|
char realfile[PATH_MAX + 20];
|
||||||
|
snprintf(realfile, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
if (taosRenameFile(file, realfile) != 0) {
|
||||||
|
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
|
||||||
|
dError("failed to rename %s since %s", file, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndStartBnodeWorker(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) {
|
||||||
|
dError("failed to start bnode write worker since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndStopBnodeWorker(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
|
||||||
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
while (pMgmt->refCount > 1) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
dndCleanupWorker(&pMgmt->writeWorker);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
|
||||||
|
pOption->pDnode = pDnode;
|
||||||
|
pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
|
||||||
|
pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
|
||||||
|
pOption->sendRedirectMsgFp = dndSendRedirectMsg;
|
||||||
|
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||||
|
pOption->clusterId = dndGetClusterId(pDnode);
|
||||||
|
pOption->cfg.sver = pDnode->opt.sver;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndOpenBnode(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
SBnodeOpt option = {0};
|
||||||
|
dndBuildBnodeOption(pDnode, &option);
|
||||||
|
|
||||||
|
SBnode *pBnode = bndOpen(pDnode->dir.bnode, &option);
|
||||||
|
if (pBnode == NULL) {
|
||||||
|
dError("failed to open bnode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndStartBnodeWorker(pDnode) != 0) {
|
||||||
|
dError("failed to start bnode worker since %s", terrstr());
|
||||||
|
bndClose(pBnode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMgmt->deployed = 1;
|
||||||
|
if (dndWriteBnodeFile(pDnode) != 0) {
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
dError("failed to write bnode file since %s", terrstr());
|
||||||
|
dndStopBnodeWorker(pDnode);
|
||||||
|
bndClose(pBnode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->pBnode = pBnode;
|
||||||
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
dInfo("bnode open successfully");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndDropBnode(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
|
||||||
|
SBnode *pBnode = dndAcquireBnode(pDnode);
|
||||||
|
if (pBnode == NULL) {
|
||||||
|
dError("failed to drop bnode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->dropped = 1;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (dndWriteBnodeFile(pDnode) != 0) {
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->dropped = 0;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
dndReleaseBnode(pDnode, pBnode);
|
||||||
|
dError("failed to drop bnode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dndReleaseBnode(pDnode, pBnode);
|
||||||
|
dndStopBnodeWorker(pDnode);
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
dndWriteBnodeFile(pDnode);
|
||||||
|
bndClose(pBnode);
|
||||||
|
pMgmt->pBnode = NULL;
|
||||||
|
bndDestroy(pDnode->dir.bnode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
SCreateBnodeInMsg *pMsg = pRpcMsg->pCont;
|
||||||
|
pMsg->dnodeId = htonl(pMsg->dnodeId);
|
||||||
|
|
||||||
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
|
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return dndOpenBnode(pDnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
SDropBnodeInMsg *pMsg = pRpcMsg->pCont;
|
||||||
|
pMsg->dnodeId = htonl(pMsg->dnodeId);
|
||||||
|
|
||||||
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
|
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return dndDropBnode(pDnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
|
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||||
|
rpcSendResponse(&rpcRsp);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) {
|
||||||
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
|
SRpcMsg *pMsg = NULL;
|
||||||
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
dndSendBnodeErrorRsp(pMsg, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) {
|
||||||
|
SBnode *pBnode = dndAcquireBnode(pDnode);
|
||||||
|
if (pBnode == NULL) {
|
||||||
|
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||||
|
if (pArray == NULL) {
|
||||||
|
dndReleaseBnode(pDnode, pBnode);
|
||||||
|
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
|
SRpcMsg *pMsg = NULL;
|
||||||
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
void *ptr = taosArrayPush(pArray, &pMsg);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
dndSendBnodeErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bndProcessWMsgs(pBnode, pArray);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
dndReleaseBnode(pDnode, pBnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
|
SBnode *pBnode = dndAcquireBnode(pDnode);
|
||||||
|
if (pBnode != NULL) {
|
||||||
|
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
|
||||||
|
}
|
||||||
|
dndReleaseBnode(pDnode, pBnode);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
if (pMsg->msgType & 1u) {
|
||||||
|
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
dndWriteBnodeMsgToWorker(pDnode, &pDnode->bmgmt.writeWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndInitBnode(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
taosInitRWLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (dndReadBnodeFile(pDnode) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMgmt->dropped) {
|
||||||
|
dInfo("bnode has been deployed and needs to be deleted");
|
||||||
|
bndDestroy(pDnode->dir.bnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pMgmt->deployed) return 0;
|
||||||
|
|
||||||
|
return dndOpenBnode(pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndCleanupBnode(SDnode *pDnode) {
|
||||||
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
|
if (pMgmt->pBnode) {
|
||||||
|
dndStopBnodeWorker(pDnode);
|
||||||
|
bndClose(pMgmt->pBnode);
|
||||||
|
pMgmt->pBnode = NULL;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,42 +17,9 @@
|
||||||
#include "dndMnode.h"
|
#include "dndMnode.h"
|
||||||
#include "dndDnode.h"
|
#include "dndDnode.h"
|
||||||
#include "dndTransport.h"
|
#include "dndTransport.h"
|
||||||
|
#include "dndWorker.h"
|
||||||
|
|
||||||
static int32_t dndInitMnodeReadWorker(SDnode *pDnode);
|
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
|
||||||
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode);
|
|
||||||
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupMnodeReadWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupMnodeWriteWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupMnodeSyncWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode);
|
|
||||||
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
|
|
||||||
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
|
|
||||||
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
|
|
||||||
static void dndFreeMnodeReadQueue(SDnode *pDnode);
|
|
||||||
static void dndFreeMnodeWriteQueue(SDnode *pDnode);
|
|
||||||
static void dndFreeMnodeSyncQueue(SDnode *pDnode);
|
|
||||||
static void dndFreeMnodeMgmtQueue(SDnode *pDnode);
|
|
||||||
|
|
||||||
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
|
|
||||||
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
|
|
||||||
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
|
|
||||||
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
|
|
||||||
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
|
||||||
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
|
||||||
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
|
||||||
|
|
||||||
static int32_t dndStartMnodeWorker(SDnode *pDnode);
|
|
||||||
static void dndStopMnodeWorker(SDnode *pDnode);
|
|
||||||
|
|
||||||
static SMnode *dndAcquireMnode(SDnode *pDnode);
|
|
||||||
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode);
|
|
||||||
|
|
||||||
static int32_t dndReadMnodeFile(SDnode *pDnode);
|
|
||||||
static int32_t dndWriteMnodeFile(SDnode *pDnode);
|
|
||||||
|
|
||||||
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
|
|
||||||
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
|
|
||||||
static int32_t dndDropMnode(SDnode *pDnode);
|
|
||||||
|
|
||||||
static SMnode *dndAcquireMnode(SDnode *pDnode) {
|
static SMnode *dndAcquireMnode(SDnode *pDnode) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
|
@ -97,49 +64,52 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
||||||
char *content = calloc(1, maxLen + 1);
|
char *content = calloc(1, maxLen + 1);
|
||||||
cJSON *root = NULL;
|
cJSON *root = NULL;
|
||||||
|
|
||||||
FILE *fp = fopen(pMgmt->file, "r");
|
char file[PATH_MAX + 20];
|
||||||
|
snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
FILE *fp = fopen(file, "r");
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
dDebug("file %s not exist", pMgmt->file);
|
dDebug("file %s not exist", file);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = (int32_t)fread(content, 1, maxLen, fp);
|
len = (int32_t)fread(content, 1, maxLen, fp);
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
dError("failed to read %s since content is null", pMgmt->file);
|
dError("failed to read %s since content is null", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
content[len] = 0;
|
content[len] = 0;
|
||||||
root = cJSON_Parse(content);
|
root = cJSON_Parse(content);
|
||||||
if (root == NULL) {
|
if (root == NULL) {
|
||||||
dError("failed to read %s since invalid json format", pMgmt->file);
|
dError("failed to read %s since invalid json format", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
||||||
if (!deployed || deployed->type != cJSON_Number) {
|
if (!deployed || deployed->type != cJSON_Number) {
|
||||||
dError("failed to read %s since deployed not found", pMgmt->file);
|
dError("failed to read %s since deployed not found", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->deployed = deployed->valueint;
|
pMgmt->deployed = deployed->valueint;
|
||||||
|
|
||||||
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||||
if (!dropped || dropped->type != cJSON_Number) {
|
if (!dropped || dropped->type != cJSON_Number) {
|
||||||
dError("failed to read %s since dropped not found", pMgmt->file);
|
dError("failed to read %s since dropped not found", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pMgmt->dropped = dropped->valueint;
|
pMgmt->dropped = dropped->valueint;
|
||||||
|
|
||||||
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
|
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
|
||||||
if (!mnodes || mnodes->type != cJSON_Array) {
|
if (!mnodes || mnodes->type != cJSON_Array) {
|
||||||
dError("failed to read %s since nodes not found", pMgmt->file);
|
dError("failed to read %s since nodes not found", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMgmt->replica = cJSON_GetArraySize(mnodes);
|
pMgmt->replica = cJSON_GetArraySize(mnodes);
|
||||||
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
|
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
|
||||||
dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica);
|
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,28 +121,28 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
||||||
|
|
||||||
cJSON *id = cJSON_GetObjectItem(node, "id");
|
cJSON *id = cJSON_GetObjectItem(node, "id");
|
||||||
if (!id || id->type != cJSON_Number) {
|
if (!id || id->type != cJSON_Number) {
|
||||||
dError("failed to read %s since id not found", pMgmt->file);
|
dError("failed to read %s since id not found", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pReplica->id = id->valueint;
|
pReplica->id = id->valueint;
|
||||||
|
|
||||||
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
|
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
|
||||||
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
|
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
|
||||||
dError("failed to read %s since fqdn not found", pMgmt->file);
|
dError("failed to read %s since fqdn not found", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
|
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
cJSON *port = cJSON_GetObjectItem(node, "port");
|
cJSON *port = cJSON_GetObjectItem(node, "port");
|
||||||
if (!port || port->type != cJSON_Number) {
|
if (!port || port->type != cJSON_Number) {
|
||||||
dError("failed to read %s since port not found", pMgmt->file);
|
dError("failed to read %s since port not found", file);
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
pReplica->port = port->valueint;
|
pReplica->port = port->valueint;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
|
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
||||||
|
|
||||||
PRASE_MNODE_OVER:
|
PRASE_MNODE_OVER:
|
||||||
if (content != NULL) free(content);
|
if (content != NULL) free(content);
|
||||||
|
@ -186,8 +156,8 @@ PRASE_MNODE_OVER:
|
||||||
static int32_t dndWriteMnodeFile(SDnode *pDnode) {
|
static int32_t dndWriteMnodeFile(SDnode *pDnode) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
|
|
||||||
char file[PATH_MAX + 20] = {0};
|
char file[PATH_MAX + 20];
|
||||||
snprintf(file, sizeof(file), "%s.bak", pMgmt->file);
|
snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode);
|
||||||
|
|
||||||
FILE *fp = fopen(file, "w");
|
FILE *fp = fopen(file, "w");
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
|
@ -223,47 +193,36 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
free(content);
|
free(content);
|
||||||
|
|
||||||
if (taosRenameFile(file, pMgmt->file) != 0) {
|
char realfile[PATH_MAX + 20];
|
||||||
|
snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
if (taosRenameFile(file, realfile) != 0) {
|
||||||
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
|
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
|
||||||
dError("failed to rename %s since %s", pMgmt->file, terrstr());
|
dError("failed to rename %s since %s", file, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
|
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndStartMnodeWorker(SDnode *pDnode) {
|
static int32_t dndStartMnodeWorker(SDnode *pDnode) {
|
||||||
if (dndInitMnodeReadWorker(pDnode) != 0) {
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
|
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) {
|
||||||
dError("failed to start mnode read worker since %s", terrstr());
|
dError("failed to start mnode read worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndInitMnodeWriteWorker(pDnode) != 0) {
|
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) {
|
||||||
dError("failed to start mnode write worker since %s", terrstr());
|
dError("failed to start mnode write worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndInitMnodeSyncWorker(pDnode) != 0) {
|
if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) {
|
||||||
dError("failed to start mnode sync worker since %s", terrstr());
|
dError("failed to start mnode sync worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndAllocMnodeReadQueue(pDnode) != 0) {
|
|
||||||
dError("failed to alloc mnode read queue since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndAllocMnodeWriteQueue(pDnode) != 0) {
|
|
||||||
dError("failed to alloc mnode write queue since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndAllocMnodeSyncQueue(pDnode) != 0) {
|
|
||||||
dError("failed to alloc mnode sync queue since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,18 +233,13 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
|
||||||
pMgmt->deployed = 0;
|
pMgmt->deployed = 0;
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
while (pMgmt->refCount > 1) taosMsleep(10);
|
while (pMgmt->refCount > 1) {
|
||||||
while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10);
|
taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10);
|
}
|
||||||
while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10);
|
|
||||||
|
|
||||||
dndCleanupMnodeReadWorker(pDnode);
|
dndCleanupWorker(&pMgmt->readWorker);
|
||||||
dndCleanupMnodeWriteWorker(pDnode);
|
dndCleanupWorker(&pMgmt->writeWorker);
|
||||||
dndCleanupMnodeSyncWorker(pDnode);
|
dndCleanupWorker(&pMgmt->syncWorker);
|
||||||
|
|
||||||
dndFreeMnodeReadQueue(pDnode);
|
|
||||||
dndFreeMnodeWriteQueue(pDnode);
|
|
||||||
dndFreeMnodeSyncQueue(pDnode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool dndNeedDeployMnode(SDnode *pDnode) {
|
static bool dndNeedDeployMnode(SDnode *pDnode) {
|
||||||
|
@ -383,28 +337,21 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||||
dError("failed to open mnode since %s", terrstr());
|
dError("failed to open mnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pMgmt->deployed = 1;
|
|
||||||
|
|
||||||
int32_t code = dndWriteMnodeFile(pDnode);
|
if (dndStartMnodeWorker(pDnode) != 0) {
|
||||||
if (code != 0) {
|
dError("failed to start mnode worker since %s", terrstr());
|
||||||
dError("failed to write mnode file since %s", terrstr());
|
|
||||||
code = terrno;
|
|
||||||
pMgmt->deployed = 0;
|
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
mndDestroy(pDnode->dir.mnode);
|
mndDestroy(pDnode->dir.mnode);
|
||||||
terrno = code;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = dndStartMnodeWorker(pDnode);
|
pMgmt->deployed = 1;
|
||||||
if (code != 0) {
|
if (dndWriteMnodeFile(pDnode) != 0) {
|
||||||
dError("failed to start mnode worker since %s", terrstr());
|
dError("failed to write mnode file since %s", terrstr());
|
||||||
code = terrno;
|
|
||||||
pMgmt->deployed = 0;
|
pMgmt->deployed = 0;
|
||||||
dndStopMnodeWorker(pDnode);
|
dndStopMnodeWorker(pDnode);
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
mndDestroy(pDnode->dir.mnode);
|
mndDestroy(pDnode->dir.mnode);
|
||||||
terrno = code;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,6 +408,7 @@ static int32_t dndDropMnode(SDnode *pDnode) {
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
dndReleaseMnode(pDnode, pMnode);
|
||||||
dndStopMnodeWorker(pDnode);
|
dndStopMnodeWorker(pDnode);
|
||||||
|
pMgmt->deployed = 0;
|
||||||
dndWriteMnodeFile(pDnode);
|
dndWriteMnodeFile(pDnode);
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
pMgmt->pMnode = NULL;
|
pMgmt->pMnode = NULL;
|
||||||
|
@ -528,13 +476,12 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
|
||||||
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
|
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
SMnode *pMnode = dndAcquireMnode(pDnode);
|
||||||
if (pMnode != NULL) {
|
if (pMnode != NULL) {
|
||||||
mndProcessReadMsg(pMsg);
|
mndProcessMsg(pMsg);
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
dndReleaseMnode(pDnode, pMnode);
|
||||||
} else {
|
} else {
|
||||||
mndSendRsp(pMsg, terrno);
|
mndSendRsp(pMsg, terrno);
|
||||||
|
@ -543,208 +490,43 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
|
||||||
mndCleanupMsg(pMsg);
|
mndCleanupMsg(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
|
static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
SMnode *pMnode = dndAcquireMnode(pDnode);
|
||||||
if (pMnode != NULL) {
|
if (pMnode != NULL) {
|
||||||
mndProcessWriteMsg(pMsg);
|
SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
if (pMsg == NULL) {
|
||||||
} else {
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mndSendRsp(pMsg, terrno);
|
} else {
|
||||||
|
code = dndWriteMsgToWorker(pWorker, pMsg, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
mndCleanupMsg(pMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
dndReleaseMnode(pDnode, pMnode);
|
||||||
|
|
||||||
mndCleanupMsg(pMsg);
|
if (code != 0) {
|
||||||
}
|
if (pRpcMsg->msgType & 1u) {
|
||||||
|
SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code};
|
||||||
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
|
rpcSendResponse(&rsp);
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
}
|
||||||
|
rpcFreeCont(pRpcMsg->pCont);
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
|
||||||
if (pMnode != NULL) {
|
|
||||||
mndProcessSyncMsg(pMsg);
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
|
||||||
} else {
|
|
||||||
mndSendRsp(pMsg, terrno);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mndCleanupMsg(pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
|
||||||
SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
|
|
||||||
if (pMsg == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosWriteQitem(pQueue, pMsg) != 0) {
|
|
||||||
mndCleanupMsg(pMsg);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
|
||||||
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
|
|
||||||
if (pMsg->msgType & 1u) {
|
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
|
||||||
rpcSendResponse(&rsp);
|
|
||||||
}
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
|
||||||
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
|
|
||||||
if (pMsg->msgType & 1u) {
|
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
|
||||||
rpcSendResponse(&rsp);
|
|
||||||
}
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
|
||||||
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
|
|
||||||
if (pMsg->msgType & 1u) {
|
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
|
||||||
rpcSendResponse(&rsp);
|
|
||||||
}
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue);
|
|
||||||
if (pMgmt->pReadQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeMnodeReadQueue(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ);
|
|
||||||
pMgmt->pReadQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
SWorkerPool *pPool = &pMgmt->readPool;
|
|
||||||
pPool->name = "mnode-read";
|
|
||||||
pPool->min = 0;
|
|
||||||
pPool->max = 1;
|
|
||||||
if (tWorkerInit(pPool) != 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("mnode read worker is initialized");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
tWorkerCleanup(&pMgmt->readPool);
|
|
||||||
dDebug("mnode read worker is closed");
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue);
|
|
||||||
if (pMgmt->pWriteQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ);
|
|
||||||
pMgmt->pWriteQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
SWorkerPool *pPool = &pMgmt->writePool;
|
|
||||||
pPool->name = "mnode-write";
|
|
||||||
pPool->min = 0;
|
|
||||||
pPool->max = 1;
|
|
||||||
if (tWorkerInit(pPool) != 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("mnode write worker is initialized");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
tWorkerCleanup(&pMgmt->writePool);
|
|
||||||
dDebug("mnode write worker is closed");
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue);
|
|
||||||
if (pMgmt->pSyncQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeMnodeSyncQueue(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ);
|
|
||||||
pMgmt->pSyncQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
SWorkerPool *pPool = &pMgmt->syncPool;
|
|
||||||
pPool->name = "mnode-sync";
|
|
||||||
pPool->min = 0;
|
|
||||||
pPool->max = 1;
|
|
||||||
if (tWorkerInit(pPool) != 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("mnode sync worker is initialized");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
tWorkerCleanup(&pMgmt->syncPool);
|
|
||||||
dDebug("mnode sync worker is closed");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndInitMnode(SDnode *pDnode) {
|
int32_t dndInitMnode(SDnode *pDnode) {
|
||||||
|
@ -752,14 +534,6 @@ int32_t dndInitMnode(SDnode *pDnode) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
taosInitRWLatch(&pMgmt->latch);
|
taosInitRWLatch(&pMgmt->latch);
|
||||||
|
|
||||||
char path[PATH_MAX];
|
|
||||||
snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
|
|
||||||
pMgmt->file = strdup(path);
|
|
||||||
if (pMgmt->file == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndReadMnodeFile(pDnode) != 0) {
|
if (dndReadMnodeFile(pDnode) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -790,13 +564,13 @@ int32_t dndInitMnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndCleanupMnode(SDnode *pDnode) {
|
void dndCleanupMnode(SDnode *pDnode) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
|
||||||
|
|
||||||
dInfo("dnode-mnode start to clean up");
|
dInfo("dnode-mnode start to clean up");
|
||||||
if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
tfree(pMgmt->file);
|
if (pMgmt->pMnode) {
|
||||||
mndClose(pMgmt->pMnode);
|
dndStopMnodeWorker(pDnode);
|
||||||
pMgmt->pMnode = NULL;
|
mndClose(pMgmt->pMnode);
|
||||||
|
pMgmt->pMnode = NULL;
|
||||||
|
}
|
||||||
dInfo("dnode-mnode is cleaned up");
|
dInfo("dnode-mnode is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -140,26 +140,27 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) {
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
free(content);
|
free(content);
|
||||||
|
|
||||||
if (taosRenameFile(file, file) != 0) {
|
char realfile[PATH_MAX + 20];
|
||||||
|
snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
if (taosRenameFile(file, realfile) != 0) {
|
||||||
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
|
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
|
||||||
dError("failed to rename %s since %s", file, terrstr());
|
dError("failed to rename %s since %s", file, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndStartQnodeWorker(SDnode *pDnode) {
|
static int32_t dndStartQnodeWorker(SDnode *pDnode) {
|
||||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||||
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1,
|
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) {
|
||||||
(FProcessItem)dndProcessQnodeQueue) != 0) {
|
|
||||||
dError("failed to start qnode query worker since %s", terrstr());
|
dError("failed to start qnode query worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1,
|
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) {
|
||||||
(FProcessItem)dndProcessQnodeQueue) != 0) {
|
|
||||||
dError("failed to start qnode fetch worker since %s", terrstr());
|
dError("failed to start qnode fetch worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -209,7 +210,9 @@ static int32_t dndOpenQnode(SDnode *pDnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMgmt->deployed = 1;
|
||||||
if (dndWriteQnodeFile(pDnode) != 0) {
|
if (dndWriteQnodeFile(pDnode) != 0) {
|
||||||
|
pMgmt->deployed = 0;
|
||||||
dError("failed to write qnode file since %s", terrstr());
|
dError("failed to write qnode file since %s", terrstr());
|
||||||
dndStopQnodeWorker(pDnode);
|
dndStopQnodeWorker(pDnode);
|
||||||
qndClose(pQnode);
|
qndClose(pQnode);
|
||||||
|
@ -218,7 +221,6 @@ static int32_t dndOpenQnode(SDnode *pDnode) {
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->latch);
|
taosWLockLatch(&pMgmt->latch);
|
||||||
pMgmt->pQnode = pQnode;
|
pMgmt->pQnode = pQnode;
|
||||||
pMgmt->deployed = 1;
|
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
dInfo("qnode open successfully");
|
dInfo("qnode open successfully");
|
||||||
|
@ -250,9 +252,10 @@ static int32_t dndDropQnode(SDnode *pDnode) {
|
||||||
|
|
||||||
dndReleaseQnode(pDnode, pQnode);
|
dndReleaseQnode(pDnode, pQnode);
|
||||||
dndStopQnodeWorker(pDnode);
|
dndStopQnodeWorker(pDnode);
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
dndWriteQnodeFile(pDnode);
|
||||||
qndClose(pQnode);
|
qndClose(pQnode);
|
||||||
pMgmt->pQnode = NULL;
|
pMgmt->pQnode = NULL;
|
||||||
// qndDestroy(pDnode->dir.qnode);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,354 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http:www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "dndSnode.h"
|
||||||
|
#include "dndDnode.h"
|
||||||
|
#include "dndTransport.h"
|
||||||
|
#include "dndWorker.h"
|
||||||
|
|
||||||
|
static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
static SSnode *dndAcquireSnode(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
SSnode *pSnode = NULL;
|
||||||
|
int32_t refCount = 0;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
if (pMgmt->deployed && !pMgmt->dropped) {
|
||||||
|
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
||||||
|
pSnode = pMgmt->pSnode;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
dTrace("acquire snode, refCount:%d", refCount);
|
||||||
|
}
|
||||||
|
return pSnode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
int32_t refCount = 0;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
dTrace("release snode, refCount:%d", refCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndReadSnodeFile(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
int32_t code = TSDB_CODE_DND_SNODE_READ_FILE_ERROR;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t maxLen = 4096;
|
||||||
|
char *content = calloc(1, maxLen + 1);
|
||||||
|
cJSON *root = NULL;
|
||||||
|
|
||||||
|
char file[PATH_MAX + 20];
|
||||||
|
snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
FILE *fp = fopen(file, "r");
|
||||||
|
if (fp == NULL) {
|
||||||
|
dDebug("file %s not exist", file);
|
||||||
|
code = 0;
|
||||||
|
goto PRASE_SNODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
len = (int32_t)fread(content, 1, maxLen, fp);
|
||||||
|
if (len <= 0) {
|
||||||
|
dError("failed to read %s since content is null", file);
|
||||||
|
goto PRASE_SNODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
content[len] = 0;
|
||||||
|
root = cJSON_Parse(content);
|
||||||
|
if (root == NULL) {
|
||||||
|
dError("failed to read %s since invalid json format", file);
|
||||||
|
goto PRASE_SNODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
||||||
|
if (!deployed || deployed->type != cJSON_Number) {
|
||||||
|
dError("failed to read %s since deployed not found", file);
|
||||||
|
goto PRASE_SNODE_OVER;
|
||||||
|
}
|
||||||
|
pMgmt->deployed = deployed->valueint;
|
||||||
|
|
||||||
|
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||||
|
if (!dropped || dropped->type != cJSON_Number) {
|
||||||
|
dError("failed to read %s since dropped not found", file);
|
||||||
|
goto PRASE_SNODE_OVER;
|
||||||
|
}
|
||||||
|
pMgmt->dropped = dropped->valueint;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
||||||
|
|
||||||
|
PRASE_SNODE_OVER:
|
||||||
|
if (content != NULL) free(content);
|
||||||
|
if (root != NULL) cJSON_Delete(root);
|
||||||
|
if (fp != NULL) fclose(fp);
|
||||||
|
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndWriteSnodeFile(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
|
||||||
|
char file[PATH_MAX + 20];
|
||||||
|
snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
FILE *fp = fopen(file, "w");
|
||||||
|
if (fp == NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR;
|
||||||
|
dError("failed to write %s since %s", file, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t maxLen = 4096;
|
||||||
|
char *content = calloc(1, maxLen + 1);
|
||||||
|
|
||||||
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
|
||||||
|
len += snprintf(content + len, maxLen - len, "}\n");
|
||||||
|
|
||||||
|
fwrite(content, 1, len, fp);
|
||||||
|
taosFsyncFile(fileno(fp));
|
||||||
|
fclose(fp);
|
||||||
|
free(content);
|
||||||
|
|
||||||
|
char realfile[PATH_MAX + 20];
|
||||||
|
snprintf(realfile, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
|
||||||
|
|
||||||
|
if (taosRenameFile(file, realfile) != 0) {
|
||||||
|
terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR;
|
||||||
|
dError("failed to rename %s since %s", file, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndStartSnodeWorker(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) {
|
||||||
|
dError("failed to start snode write worker since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndStopSnodeWorker(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
|
||||||
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
while (pMgmt->refCount > 1) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
dndCleanupWorker(&pMgmt->writeWorker);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
|
||||||
|
pOption->pDnode = pDnode;
|
||||||
|
pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
|
||||||
|
pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
|
||||||
|
pOption->sendRedirectMsgFp = dndSendRedirectMsg;
|
||||||
|
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||||
|
pOption->clusterId = dndGetClusterId(pDnode);
|
||||||
|
pOption->cfg.sver = pDnode->opt.sver;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndOpenSnode(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
SSnodeOpt option = {0};
|
||||||
|
dndBuildSnodeOption(pDnode, &option);
|
||||||
|
|
||||||
|
SSnode *pSnode = sndOpen(pDnode->dir.snode, &option);
|
||||||
|
if (pSnode == NULL) {
|
||||||
|
dError("failed to open snode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndStartSnodeWorker(pDnode) != 0) {
|
||||||
|
dError("failed to start snode worker since %s", terrstr());
|
||||||
|
sndClose(pSnode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMgmt->deployed = 1;
|
||||||
|
if (dndWriteSnodeFile(pDnode) != 0) {
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
dError("failed to write snode file since %s", terrstr());
|
||||||
|
dndStopSnodeWorker(pDnode);
|
||||||
|
sndClose(pSnode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->pSnode = pSnode;
|
||||||
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
dInfo("snode open successfully");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndDropSnode(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
|
||||||
|
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||||
|
if (pSnode == NULL) {
|
||||||
|
dError("failed to drop snode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->dropped = 1;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (dndWriteSnodeFile(pDnode) != 0) {
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
pMgmt->dropped = 0;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
dndReleaseSnode(pDnode, pSnode);
|
||||||
|
dError("failed to drop snode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dndReleaseSnode(pDnode, pSnode);
|
||||||
|
dndStopSnodeWorker(pDnode);
|
||||||
|
pMgmt->deployed = 0;
|
||||||
|
dndWriteSnodeFile(pDnode);
|
||||||
|
sndClose(pSnode);
|
||||||
|
pMgmt->pSnode = NULL;
|
||||||
|
sndDestroy(pDnode->dir.snode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
SCreateSnodeInMsg *pMsg = pRpcMsg->pCont;
|
||||||
|
pMsg->dnodeId = htonl(pMsg->dnodeId);
|
||||||
|
|
||||||
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
|
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return dndOpenSnode(pDnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
SDropSnodeInMsg *pMsg = pRpcMsg->pCont;
|
||||||
|
pMsg->dnodeId = htonl(pMsg->dnodeId);
|
||||||
|
|
||||||
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
|
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return dndDropSnode(pDnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
SRpcMsg *pRsp = NULL;
|
||||||
|
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
|
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
code = sndProcessMsg(pSnode, pMsg, &pRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp != NULL) {
|
||||||
|
pRsp->ahandle = pMsg->ahandle;
|
||||||
|
rpcSendResponse(pRsp);
|
||||||
|
free(pRsp);
|
||||||
|
} else {
|
||||||
|
if (code != 0) code = terrno;
|
||||||
|
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||||
|
rpcSendResponse(&rpcRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
|
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
|
||||||
|
}
|
||||||
|
dndReleaseSnode(pDnode, pSnode);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
if (pMsg->msgType & 1u) {
|
||||||
|
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndInitSnode(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
taosInitRWLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
if (dndReadSnodeFile(pDnode) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMgmt->dropped) {
|
||||||
|
dInfo("snode has been deployed and needs to be deleted");
|
||||||
|
sndDestroy(pDnode->dir.snode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pMgmt->deployed) return 0;
|
||||||
|
|
||||||
|
return dndOpenSnode(pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndCleanupSnode(SDnode *pDnode) {
|
||||||
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
|
if (pMgmt->pSnode) {
|
||||||
|
dndStopSnodeWorker(pDnode);
|
||||||
|
sndClose(pMgmt->pSnode);
|
||||||
|
pMgmt->pSnode = NULL;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,9 +16,9 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dndWorker.h"
|
#include "dndWorker.h"
|
||||||
|
|
||||||
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum,
|
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
|
||||||
int32_t maxNum, FProcessItem fp) {
|
int32_t maxNum, void *queueFp) {
|
||||||
if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) {
|
if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -27,19 +27,32 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type
|
||||||
pWorker->name = name;
|
pWorker->name = name;
|
||||||
pWorker->minNum = minNum;
|
pWorker->minNum = minNum;
|
||||||
pWorker->maxNum = maxNum;
|
pWorker->maxNum = maxNum;
|
||||||
pWorker->fp = fp;
|
pWorker->queueFp = queueFp;
|
||||||
pWorker->pDnode = pDnode;
|
pWorker->pDnode = pDnode;
|
||||||
|
|
||||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||||
SWorkerPool *pPool = &pWorker->pool;
|
SWorkerPool *pPool = &pWorker->pool;
|
||||||
|
pPool->name = name;
|
||||||
pPool->min = minNum;
|
pPool->min = minNum;
|
||||||
pPool->max = maxNum;
|
pPool->max = maxNum;
|
||||||
if (tWorkerInit(pPool) != 0) {
|
if (tWorkerInit(pPool) != 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp);
|
||||||
pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp);
|
if (pWorker->queue == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else if (pWorker->type == DND_WORKER_MULTI) {
|
||||||
|
SMWorkerPool *pPool = &pWorker->mpool;
|
||||||
|
pPool->name = name;
|
||||||
|
pPool->max = maxNum;
|
||||||
|
if (tMWorkerInit(pPool) != 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp);
|
||||||
if (pWorker->queue == NULL) {
|
if (pWorker->queue == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -52,12 +65,17 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndCleanupWorker(SDnodeWorker *pWorker) {
|
void dndCleanupWorker(SDnodeWorker *pWorker) {
|
||||||
|
while (!taosQueueEmpty(pWorker->queue)) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||||
while (!taosQueueEmpty(pWorker->queue)) {
|
|
||||||
taosMsleep(10);
|
|
||||||
}
|
|
||||||
tWorkerCleanup(&pWorker->pool);
|
tWorkerCleanup(&pWorker->pool);
|
||||||
tWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
tWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
||||||
|
} else if (pWorker->type == DND_WORKER_MULTI) {
|
||||||
|
tMWorkerCleanup(&pWorker->mpool);
|
||||||
|
tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
|
||||||
|
} else {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,16 +85,23 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pMsg = taosAllocateQitem(contLen);
|
void *pMsg = NULL;
|
||||||
|
if (contLen != 0) {
|
||||||
|
pMsg = taosAllocateQitem(contLen);
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
memcpy(pMsg, pCont, contLen);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pMsg = pCont;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pMsg, pCont, contLen);
|
if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
if (taosWriteQitem(pWorker, pMsg) != 0) {
|
|
||||||
taosFreeItem(pMsg);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,7 +162,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
|
||||||
|
|
||||||
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
|
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
|
||||||
strcpy(pReq->fqdn, "localhost");
|
strcpy(pReq->fqdn, "localhost");
|
||||||
pReq->port = htonl(904);
|
pReq->port = htonl(9044);
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
|
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen);
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
|
|
@ -388,7 +388,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
|
||||||
dnodeObj.updateTime = dnodeObj.createdTime;
|
dnodeObj.updateTime = dnodeObj.createdTime;
|
||||||
dnodeObj.port = pCreate->port;
|
dnodeObj.port = pCreate->port;
|
||||||
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
|
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
|
||||||
snprintf(dnodeObj.ep, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
|
|
@ -390,7 +390,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
|
void mndProcessMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
||||||
|
@ -451,12 +451,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
|
|
||||||
|
|
||||||
void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
|
|
||||||
|
|
||||||
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
|
|
||||||
|
|
||||||
uint64_t mndGenerateUid(char *name, int32_t len) {
|
uint64_t mndGenerateUid(char *name, int32_t len) {
|
||||||
int64_t us = taosGetTimestampUs();
|
int64_t us = taosGetTimestampUs();
|
||||||
int32_t hashval = MurmurHash3_32(name, len);
|
int32_t hashval = MurmurHash3_32(name, len);
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "sndInt.h"
|
#include "sndInt.h"
|
||||||
|
|
||||||
SSnode *sndOpen(const SSnodeOpt *pOption) {
|
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
SSnode *pSnode = calloc(1, sizeof(SSnode));
|
SSnode *pSnode = calloc(1, sizeof(SSnode));
|
||||||
return pSnode;
|
return pSnode;
|
||||||
}
|
}
|
||||||
|
@ -28,3 +28,5 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
*pRsp = NULL;
|
*pRsp = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void sndDestroy(const char *path) {}
|
|
@ -428,6 +428,7 @@ SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf
|
||||||
char* end = NULL;
|
char* end = NULL;
|
||||||
SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg));
|
SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg));
|
||||||
pDrop->dnodeId = strtoll(pzName->z, &end, 10);
|
pDrop->dnodeId = strtoll(pzName->z, &end, 10);
|
||||||
|
pDrop->dnodeId = htonl(pDrop->dnodeId);
|
||||||
*len = sizeof(SDropDnodeMsg);
|
*len = sizeof(SDropDnodeMsg);
|
||||||
|
|
||||||
if (end - pzName->z != pzName->n) {
|
if (end - pzName->z != pzName->n) {
|
||||||
|
|
|
@ -229,6 +229,6 @@ void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) {
|
||||||
taosArrayDestroy(pMetaReq->pUdf);
|
taosArrayDestroy(pMetaReq->pUdf);
|
||||||
}
|
}
|
||||||
|
|
||||||
void qDestoryQuery(SQueryNode* pQuery) {
|
void qDestroyQuery(SQueryNode* pQuery) {
|
||||||
// todo
|
// todo
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,5 +94,16 @@ if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print =============== drop dnode
|
||||||
|
sql drop dnode 2;
|
||||||
|
sql show dnodes;
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
Loading…
Reference in New Issue