minor changes in mnode
This commit is contained in:
parent
419323274c
commit
c7f4f9d4e7
|
@ -64,8 +64,7 @@ void mnodeStop();
|
|||
int32_t mnodeGetLoad(SMnodeLoad *pLoad);
|
||||
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
|
||||
SMnodeMsg *mnodeInitMsg(int32_t msgNum);
|
||||
int32_t mnodeAppendMsg(SMnodeMsg *pMsg, SRpcMsg *pRpcMsg);
|
||||
SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg);
|
||||
void mnodeCleanupMsg(SMnodeMsg *pMsg);
|
||||
void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType);
|
||||
|
||||
|
|
|
@ -77,18 +77,17 @@ extern "C" {
|
|||
typedef enum {
|
||||
SDB_START = 0,
|
||||
SDB_TRANS = 1,
|
||||
SDB_VERSION = 2,
|
||||
SDB_CLUSTER = 3,
|
||||
SDB_DNODE = 4,
|
||||
SDB_MNODE = 5,
|
||||
SDB_ACCT = 6,
|
||||
SDB_AUTH = 7,
|
||||
SDB_USER = 8,
|
||||
SDB_DB = 9,
|
||||
SDB_VGROUP = 10,
|
||||
SDB_STABLE = 11,
|
||||
SDB_FUNC = 12,
|
||||
SDB_MAX = 13
|
||||
SDB_CLUSTER = 2,
|
||||
SDB_DNODE = 3,
|
||||
SDB_MNODE = 4,
|
||||
SDB_ACCT = 5,
|
||||
SDB_AUTH = 6,
|
||||
SDB_USER = 7,
|
||||
SDB_DB = 8,
|
||||
SDB_VGROUP = 9,
|
||||
SDB_STABLE = 10,
|
||||
SDB_FUNC = 11,
|
||||
SDB_MAX = 12
|
||||
} ESdbType;
|
||||
|
||||
typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction;
|
||||
|
|
|
@ -60,17 +60,19 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //"Invalid app version")
|
||||
|
||||
//common & util
|
||||
#define TSDB_CODE_COM_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //"Operation not supported")
|
||||
#define TSDB_CODE_COM_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101) //"Memory corrupted")
|
||||
#define TSDB_CODE_COM_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0102) //"Out of memory")
|
||||
#define TSDB_CODE_COM_INVALID_CFG_MSG TAOS_DEF_ERROR_CODE(0, 0x0103) //"Invalid config message")
|
||||
#define TSDB_CODE_COM_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) //"Data file corrupted")
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105) //"Ref out of memory")
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106) //"too many Ref Objs")
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0107) //"Ref ID is removed")
|
||||
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0108) //"Invalid Ref ID")
|
||||
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0109) //"Ref is already there")
|
||||
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A) //"Ref is not there")
|
||||
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100)
|
||||
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0101)
|
||||
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0102)
|
||||
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0103)
|
||||
#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104)
|
||||
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106)
|
||||
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0107)
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0108)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0109)
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010A)
|
||||
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010B)
|
||||
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010C)
|
||||
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D)
|
||||
|
||||
//client
|
||||
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
|
||||
|
@ -121,7 +123,6 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307) //"Invalid message length")
|
||||
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308) //"Invalid message type")
|
||||
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309) //"Too many connections")
|
||||
#define TSDB_CODE_MND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x030A) //"Out of memory in mnode")
|
||||
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B) //"Data expired")
|
||||
#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) //"Invalid query id")
|
||||
#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) //"Invalid stream id")
|
||||
|
@ -132,6 +133,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
|
||||
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components")
|
||||
|
||||
|
||||
#define TSDB_CODE_SDB_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
|
||||
#define TSDB_CODE_SDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0321)
|
||||
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0322)
|
||||
|
|
|
@ -40,7 +40,7 @@ uint16_t tsArbitratorPort = 6042;
|
|||
int32_t tsStatusInterval = 1; // second
|
||||
int32_t tsNumOfMnodes = 1;
|
||||
int8_t tsEnableVnodeBak = 1;
|
||||
int8_t tsEnableTelemetryReporting = 1;
|
||||
int8_t tsEnableTelemetryReporting = 0;
|
||||
int8_t tsArbOnline = 0;
|
||||
int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME;
|
||||
char tsEmail[TSDB_FQDN_LEN] = {0};
|
||||
|
|
|
@ -302,6 +302,13 @@ PRASE_DNODE_OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tsDnode.dnodeEps == NULL) {
|
||||
tsDnode.dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
|
||||
tsDnode.dnodeEps->dnodeNum = 1;
|
||||
tsDnode.dnodeEps->dnodeEps[0].dnodePort = tsServerPort;
|
||||
tstrncpy(tsDnode.dnodeEps->dnodeEps[0].dnodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
dnodeResetDnodes(tsDnode.dnodeEps);
|
||||
|
||||
terrno = 0;
|
||||
|
|
|
@ -135,7 +135,7 @@ static int32_t dnodeInitMain() {
|
|||
|
||||
dnodeInitDir();
|
||||
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dnodeCleanupMain() {
|
||||
|
@ -145,14 +145,14 @@ static void dnodeCleanupMain() {
|
|||
}
|
||||
|
||||
int32_t dnodeInit() {
|
||||
SSteps *steps = taosStepInit(24, dnodeReportStartup);
|
||||
SSteps *steps = taosStepInit(10, dnodeReportStartup);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
|
||||
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
|
||||
taosStepAdd(steps, "dnode-tfs", NULL, NULL);
|
||||
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
|
||||
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
|
||||
//taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
|
||||
taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode);
|
||||
taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes);
|
||||
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode);
|
||||
|
|
|
@ -335,12 +335,9 @@ static int32_t dnodeWriteMnodeMsgToQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
|||
if (pQueue == NULL) {
|
||||
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
|
||||
} else {
|
||||
SMnodeMsg *pMsg = mnodeInitMsg(1);
|
||||
SMnodeMsg *pMsg = mnodeInitMsg(pRpcMsg);
|
||||
if (pMsg == NULL) {
|
||||
code = TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
mnodeAppendMsg(pMsg, pRpcMsg);
|
||||
code = taosWriteQitem(pQueue, pMsg);
|
||||
code = terrno;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@ typedef struct SVgObj SVgObj;
|
|||
typedef struct SSTableObj SSTableObj;
|
||||
typedef struct SFuncObj SFuncObj;
|
||||
typedef struct SOperObj SOperObj;
|
||||
typedef struct SMnMsg SMnMsg;
|
||||
|
||||
typedef enum {
|
||||
MN_AUTH_ACCT_START = 0,
|
||||
|
@ -265,9 +264,9 @@ typedef struct {
|
|||
void *rsp;
|
||||
} SMnRsp;
|
||||
|
||||
typedef struct SMnMsg {
|
||||
void (*fp)(SMnMsg *pMsg, int32_t code);
|
||||
char user[TSDB_USER_LEN];
|
||||
typedef struct SMnodeMsg {
|
||||
void (*fp)(SMnodeMsg *pMsg, int32_t code);
|
||||
SRpcConnInfo conn;
|
||||
SUserObj *pUser;
|
||||
int16_t received;
|
||||
int16_t successed;
|
||||
|
@ -278,7 +277,7 @@ typedef struct SMnMsg {
|
|||
SMnRsp rpcRsp;
|
||||
SRpcMsg rpcMsg;
|
||||
char pCont[];
|
||||
} SMnReq;
|
||||
} SMnodeMsg;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -24,17 +24,18 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus;
|
||||
typedef void (*MnodeRpcFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *pMsg);
|
||||
|
||||
tmr_h mnodeGetTimer();
|
||||
int32_t mnodeGetDnodeId();
|
||||
int64_t mnodeGetClusterId();
|
||||
EMnStatus mnodeGetStatus();
|
||||
tmr_h mnodeGetTimer();
|
||||
int32_t mnodeGetDnodeId();
|
||||
int64_t mnodeGetClusterId();
|
||||
|
||||
void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
|
||||
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
|
||||
|
||||
void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_MNODE_WORKER_H_
|
||||
#define _TD_MNODE_WORKER_H_
|
||||
|
||||
#include "mnodeInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mnodeInitWorker();
|
||||
void mnodeCleanupWorker();
|
||||
void mnodeSendRsp(SMnMsg *pMsg, int32_t code);
|
||||
void mnodeReDispatchToWriteQueue(SMnMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_MNODE_WORKER_H_*/
|
|
@ -0,0 +1,392 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
#include "tstep.h"
|
||||
#include "tqueue.h"
|
||||
#include "mnodeAcct.h"
|
||||
#include "mnodeAuth.h"
|
||||
#include "mnodeBalance.h"
|
||||
#include "mnodeCluster.h"
|
||||
#include "mnodeDb.h"
|
||||
#include "mnodeDnode.h"
|
||||
#include "mnodeFunc.h"
|
||||
#include "mnodeMnode.h"
|
||||
#include "mnodeOper.h"
|
||||
#include "mnodeProfile.h"
|
||||
#include "mnodeShow.h"
|
||||
#include "mnodeStable.h"
|
||||
#include "mnodeSync.h"
|
||||
#include "mnodeTelem.h"
|
||||
#include "mnodeUser.h"
|
||||
#include "mnodeVgroup.h"
|
||||
|
||||
static struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
tmr_h timer;
|
||||
SSteps *pInitSteps;
|
||||
SSteps *pStartSteps;
|
||||
SMnodePara para;
|
||||
MnodeRpcFp msgFp;
|
||||
} tsMint;
|
||||
|
||||
int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; }
|
||||
|
||||
int64_t mnodeGetClusterId() { return tsMint.para.clusterId; }
|
||||
|
||||
void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { (*tsMint.para.SendMsgToDnode)(epSet, rpcMsg); }
|
||||
|
||||
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.para.SendMsgToMnode)(rpcMsg); }
|
||||
|
||||
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.para.SendRedirectMsg)(rpcMsg, forShell); }
|
||||
|
||||
static int32_t mnodeInitTimer() {
|
||||
if (tsMint.timer == NULL) {
|
||||
tsMint.timer = taosTmrInit(tsMaxShellConns, 200, 3600000, "MND");
|
||||
}
|
||||
|
||||
if (tsMint.timer == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mnodeCleanupTimer() {
|
||||
if (tsMint.timer != NULL) {
|
||||
taosTmrCleanUp(tsMint.timer);
|
||||
tsMint.timer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
tmr_h mnodeGetTimer() { return tsMint.timer; }
|
||||
|
||||
static int32_t mnodeSetPara(SMnodePara para) {
|
||||
tsMint.para = para;
|
||||
|
||||
if (tsMint.para.SendMsgToDnode == NULL) {
|
||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsMint.para.SendMsgToMnode == NULL) {
|
||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsMint.para.SendRedirectMsg == NULL) {
|
||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsMint.para.PutMsgIntoApplyQueue == NULL) {
|
||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsMint.para.dnodeId < 0) {
|
||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsMint.para.clusterId < 0) {
|
||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mnodeAllocInitSteps() {
|
||||
struct SSteps *steps = taosStepInit(16, NULL);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
if (taosStepAdd(steps, "mnode-trans", trnInit, trnCleanup) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-acct", mnodeInitAcct, mnodeCleanupAcct) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-auth", mnodeInitAuth, mnodeCleanupAuth) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-user", mnodeInitUser, mnodeCleanupUser) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-db", mnodeInitDb, mnodeCleanupDb) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-vgroup", mnodeInitVgroup, mnodeCleanupVgroup) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-stable", mnodeInitStable, mnodeCleanupStable) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-func", mnodeInitFunc, mnodeCleanupFunc) != 0) return -1;
|
||||
if (taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup) != 0) return -1;
|
||||
|
||||
tsMint.pInitSteps = steps;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mnodeAllocStartSteps() {
|
||||
struct SSteps *steps = taosStepInit(7, NULL);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL);
|
||||
taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance);
|
||||
taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile);
|
||||
taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow);
|
||||
taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync);
|
||||
taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem);
|
||||
taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer);
|
||||
|
||||
tsMint.pStartSteps = steps;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mnodeInit(SMnodePara para) {
|
||||
if (mnodeSetPara(para) != 0) {
|
||||
mError("failed to init mnode para since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mnodeAllocInitSteps() != 0) {
|
||||
mError("failed to alloc init steps since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mnodeAllocStartSteps() != 0) {
|
||||
mError("failed to alloc start steps since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return taosStepExec(tsMint.pInitSteps);
|
||||
}
|
||||
|
||||
void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); }
|
||||
|
||||
int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
|
||||
if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) {
|
||||
if (sdbDeploy() != 0) {
|
||||
mError("failed to deploy sdb since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("mnode is deployed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mnodeUnDeploy(char *path) { sdbUnDeploy(); }
|
||||
|
||||
int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); }
|
||||
|
||||
int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; }
|
||||
|
||||
void mnodeStop() { taosStepCleanup(tsMint.pStartSteps); }
|
||||
|
||||
int32_t mnodeGetLoad(SMnodeLoad *pLoad) { return 0; }
|
||||
|
||||
SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) {
|
||||
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
|
||||
mnodeCleanupMsg(pMsg);
|
||||
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pMsg->rpcMsg = *pRpcMsg;
|
||||
pMsg->createdTime = taosGetTimestampSec();
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void mnodeCleanupMsg(SMnodeMsg *pMsg) {
|
||||
if (pMsg->pUser != NULL) {
|
||||
sdbRelease(pMsg->pUser);
|
||||
}
|
||||
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
|
||||
if (tsMint.msgFp[msgType] == NULL) {
|
||||
}
|
||||
|
||||
(*tsMint.msgFp[msgType])(pMsg);
|
||||
}
|
||||
|
||||
void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) {
|
||||
if (msgType > 0 || msgType < TSDB_MSG_TYPE_MAX) {
|
||||
tsMint.msgFp[msgType] = fp;
|
||||
}
|
||||
}
|
||||
|
||||
void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) {
|
||||
if (!mnodeIsMaster()) {
|
||||
mnodeSendRedirectMsg(&pMsg->rpcMsg, true);
|
||||
mnodeCleanupMsg(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (msgType) {
|
||||
case MN_MSG_TYPE_READ:
|
||||
case MN_MSG_TYPE_WRITE:
|
||||
case MN_MSG_TYPE_SYNC:
|
||||
mnodeProcessRpcMsg(pMsg);
|
||||
break;
|
||||
case MN_MSG_TYPE_APPLY:
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
|
||||
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SEpSet);
|
||||
|
||||
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
|
||||
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
|
||||
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.writeMsgFp[msgType] == NULL) {
|
||||
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
code = (*tsMworker.writeMsgFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_WRITE_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
|
||||
if (!epSet) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SEpSet);
|
||||
|
||||
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
|
||||
epSet->numOfEps, epSet->inUse);
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.readMsgFp[msgType] == NULL) {
|
||||
mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = (*tsMworker.readMsgFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_READ_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
|
||||
mnodeGetMnodeEpSetForPeer(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SEpSet);
|
||||
|
||||
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
|
||||
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
|
||||
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.peerReqFp[msgType] == NULL) {
|
||||
mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
code = (*tsMworker.peerReqFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_PEER_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
SRpcMsg *pRpcMsg = &pMsg->rpcMsg;
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
|
||||
mnodeCleanupMsg2(pMsg);
|
||||
}
|
||||
|
||||
if (tsMworker.peerRspFp[msgType]) {
|
||||
(*tsMworker.peerRspFp[msgType])(pRpcMsg);
|
||||
} else {
|
||||
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
|
||||
}
|
||||
|
||||
mnodeCleanupMsg2(pMsg);
|
||||
}
|
||||
#endif
|
|
@ -21,7 +21,7 @@
|
|||
static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) {
|
||||
SSdbRaw *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRaw));
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ static SAcctObj *mnodeAcctActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
SAcctObj *pAcct = calloc(1, sizeof(SAcctObj));
|
||||
if (pAcct == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "mnodeTelem.h"
|
||||
#include "tbuffer.h"
|
||||
#include "tglobal.h"
|
||||
#include "mnodeSync.h"
|
||||
|
||||
#define TELEMETRY_SERVER "telemetry.taosdata.com"
|
||||
#define TELEMETRY_PORT 80
|
||||
|
@ -255,7 +256,7 @@ static void* mnodeTelemThreadFp(void* param) {
|
|||
if (r == 0) break;
|
||||
if (r != ETIMEDOUT) continue;
|
||||
|
||||
if (mnodeGetStatus() == MN_STATUS_READY) {
|
||||
if (mnodeIsMaster()) {
|
||||
mnodeSendTelemetryReport();
|
||||
}
|
||||
end.tv_sec += REPORT_INTERVAL;
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) {
|
||||
SSdbRaw *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRaw));
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -51,7 +51,7 @@ static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
SUserObj *pUser = calloc(1, sizeof(SUserObj));
|
||||
if (pUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) {
|
|||
static int32_t mnodeUserActionInsert(SUserObj *pUser) {
|
||||
pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (pUser->prohibitDbHash == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ static int32_t mnodeCreateDefaultUsers() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg) {
|
||||
static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) {
|
||||
SUserObj userObj = {0};
|
||||
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
||||
|
@ -192,7 +192,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateUserMsg(SMnMsg *pMsg) {
|
||||
static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
||||
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
|
||||
if (pCreate->user[0] == 0) {
|
||||
|
@ -215,7 +215,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->user);
|
||||
SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->conn.user);
|
||||
if (pOperUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||
|
|
|
@ -1,494 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tworker.h"
|
||||
#include "tglobal.h"
|
||||
#include "mnodeMnode.h"
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeShow.h"
|
||||
#include "mnodeSync.h"
|
||||
#include "mnodeWorker.h"
|
||||
|
||||
static struct {
|
||||
SWorkerPool read;
|
||||
SWorkerPool write;
|
||||
SWorkerPool peerReq;
|
||||
SWorkerPool peerRsp;
|
||||
taos_queue readQ;
|
||||
taos_queue writeQ;
|
||||
taos_queue peerReqQ;
|
||||
taos_queue peerRspQ;
|
||||
int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
|
||||
int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
|
||||
int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
|
||||
void (*peerRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
|
||||
void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
||||
} tsMworker = {0};
|
||||
|
||||
static SMnMsg *mnodeInitMsg2(SRpcMsg *pRpcMsg) {
|
||||
int32_t size = sizeof(SMnMsg) + pRpcMsg->contLen;
|
||||
SMnMsg *pMsg = taosAllocateQitem(size);
|
||||
|
||||
pMsg->rpcMsg = *pRpcMsg;
|
||||
pMsg->rpcMsg.pCont = pMsg->pCont;
|
||||
pMsg->createdTime = taosGetTimestampSec();
|
||||
memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
|
||||
SRpcConnInfo connInfo = {0};
|
||||
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) == 0) {
|
||||
pMsg->pUser = sdbAcquire(SDB_USER, connInfo.user);
|
||||
}
|
||||
|
||||
if (pMsg->pUser == NULL) {
|
||||
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
|
||||
taosFreeQitem(pMsg);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
static void mnodeCleanupMsg2(SMnMsg *pMsg) {
|
||||
if (pMsg == NULL) return;
|
||||
if (pMsg->rpcMsg.pCont != pMsg->pCont) {
|
||||
tfree(pMsg->rpcMsg.pCont);
|
||||
}
|
||||
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
|
||||
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) {
|
||||
mnodeSendRedirectMsg(pRpcMsg, true);
|
||||
} else {
|
||||
SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg);
|
||||
if (pMsg == NULL) {
|
||||
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.writeQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) {
|
||||
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) {
|
||||
mnodeSendRedirectMsg(&pMsg->rpcMsg, true);
|
||||
mnodeCleanupMsg2(pMsg);
|
||||
} else {
|
||||
taosWriteQitem(tsMworker.writeQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) {
|
||||
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.readQ == NULL) {
|
||||
mnodeSendRedirectMsg(pRpcMsg, true);
|
||||
} else {
|
||||
SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg);
|
||||
if (pMsg == NULL) {
|
||||
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.readQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
|
||||
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.peerReqQ == NULL) {
|
||||
mnodeSendRedirectMsg(pRpcMsg, false);
|
||||
} else {
|
||||
SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg);
|
||||
if (pMsg == NULL) {
|
||||
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle,
|
||||
taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.peerReqQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) {
|
||||
SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg);
|
||||
if (pMsg == NULL) {
|
||||
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle,
|
||||
taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.peerRspQ, pMsg);
|
||||
}
|
||||
|
||||
// rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
void mnodeSendRsp(SMnMsg *pMsg, int32_t code) {
|
||||
if (pMsg == NULL) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
|
||||
mnodeReDispatchToWriteQueue(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcMsg.handle,
|
||||
.pCont = pMsg->rpcRsp.rsp,
|
||||
.contLen = pMsg->rpcRsp.len,
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
mnodeCleanupMsg2(pMsg);
|
||||
}
|
||||
|
||||
static void mnodeInitMsgFp() {
|
||||
// // peer req
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue;
|
||||
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue;
|
||||
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_AUTH] = mnodeDispatchToPeerQueue;
|
||||
// tsMworker.peerReqFp[TSDB_MSG_TYPE_AUTH] = mnodeProcessAuthMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_GRANT] = mnodeDispatchToPeerQueue;
|
||||
// // tsMworker.peerReqFp[TSDB_MSG_TYPE_GRANT] = grantProcessMsgInMgmt;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_STATUS] = mnodeDispatchToPeerQueue;
|
||||
// tsMworker.peerReqFp[TSDB_MSG_TYPE_STATUS] = mnodeProcessDnodeStatusMsg;
|
||||
|
||||
// // peer rsp
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessCfgDnodeMsgRsp;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessDropSuperTableRsp;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessCreateChildTableRsp;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessDropChildTableRsp;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessAlterTableRsp;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessCreateVnodeRsp;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessAlterVnodeRsp;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessCompactVnodeRsp;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
|
||||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessDropVnodeRsp;
|
||||
|
||||
// // read msg
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessHeartBeatMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessConnectMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessUseMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_TABLE_META] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_TABLE_META] = mnodeProcessTableMetaMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_TABLES_META] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_TABLES_META] = mnodeProcessMultiTableMetaMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessSuperTableVgroupMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessShowMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessRetrieveMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_RETRIEVE_FUNC] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_RETRIEVE_FUNC] = mnodeProcessRetrieveFuncReq;
|
||||
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CREATE_ACCT] = acctProcessCreateAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_ALTER_ACCT] = acctProcessDropAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_DROP_ACCT] = acctProcessAlterAcctMsg;
|
||||
|
||||
// // write msg
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessCreateUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessAlterUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessDropUserMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessCreateDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessDropDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessCfgDnodeMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessCreateDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessAlterDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessDropDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessSyncDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessCompactMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessCreateFuncMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessDropFuncMsg;
|
||||
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CREATE_TP] = tpProcessCreateTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_DROP_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_DROP_TP] = tpProcessAlterTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_ALTER_TP] = tpProcessDropTpMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_TABLE] = mnodeProcessCreateTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_TABLE] = mnodeProcessDropTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_TABLE] = mnodeProcessAlterTableMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_STREAM] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = NULL;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessKillQueryMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_STREAM] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_STREAM] = mnodeProcessKillStreamMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessKillConnectionMsg;
|
||||
}
|
||||
|
||||
static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
|
||||
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SEpSet);
|
||||
|
||||
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
|
||||
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
|
||||
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.writeMsgFp[msgType] == NULL) {
|
||||
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
code = (*tsMworker.writeMsgFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_WRITE_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
|
||||
if (!epSet) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SEpSet);
|
||||
|
||||
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
|
||||
epSet->numOfEps, epSet->inUse);
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.readMsgFp[msgType] == NULL) {
|
||||
mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = (*tsMworker.readMsgFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_READ_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
|
||||
mnodeGetMnodeEpSetForPeer(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SEpSet);
|
||||
|
||||
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
|
||||
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
|
||||
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.peerReqFp[msgType] == NULL) {
|
||||
mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
code = (*tsMworker.peerReqFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_PEER_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
SRpcMsg *pRpcMsg = &pMsg->rpcMsg;
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
|
||||
mnodeCleanupMsg2(pMsg);
|
||||
}
|
||||
|
||||
if (tsMworker.peerRspFp[msgType]) {
|
||||
(*tsMworker.peerRspFp[msgType])(pRpcMsg);
|
||||
} else {
|
||||
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
|
||||
}
|
||||
|
||||
mnodeCleanupMsg2(pMsg);
|
||||
}
|
||||
|
||||
int32_t mnodeInitWorker() {
|
||||
mnodeInitMsgFp();
|
||||
|
||||
SWorkerPool *pPool = &tsMworker.write;
|
||||
pPool->name = "mnode-write";
|
||||
pPool->min = 1;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessWriteReq);
|
||||
}
|
||||
|
||||
pPool = &tsMworker.read;
|
||||
pPool->name = "mnode-read";
|
||||
pPool->min = 2;
|
||||
pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2);
|
||||
pPool->max = MAX(2, pPool->max);
|
||||
pPool->max = MIN(4, pPool->max);
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.readQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessReadReq);
|
||||
}
|
||||
|
||||
pPool = &tsMworker.peerReq;
|
||||
pPool->name = "mnode-peer-req";
|
||||
pPool->min = 1;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessPeerReq);
|
||||
}
|
||||
|
||||
pPool = &tsMworker.peerRsp;
|
||||
pPool->name = "mnode-peer-rsp";
|
||||
pPool->min = 1;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessPeerRsp);
|
||||
}
|
||||
|
||||
mInfo("mnode worker is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mnodeCleanupWorker() {
|
||||
tWorkerFreeQueue(&tsMworker.write, tsMworker.writeQ);
|
||||
tWorkerCleanup(&tsMworker.write);
|
||||
tsMworker.writeQ = NULL;
|
||||
|
||||
tWorkerFreeQueue(&tsMworker.read, tsMworker.readQ);
|
||||
tWorkerCleanup(&tsMworker.read);
|
||||
tsMworker.readQ = NULL;
|
||||
|
||||
tWorkerFreeQueue(&tsMworker.peerReq, tsMworker.peerReqQ);
|
||||
tWorkerCleanup(&tsMworker.peerReq);
|
||||
tsMworker.peerReqQ = NULL;
|
||||
|
||||
tWorkerFreeQueue(&tsMworker.peerRsp, tsMworker.peerRspQ);
|
||||
tWorkerCleanup(&tsMworker.peerRsp);
|
||||
tsMworker.peerRspQ = NULL;
|
||||
|
||||
mInfo("mnode worker is closed");
|
||||
}
|
||||
|
||||
SMnodeMsg *mnodeInitMsg(int32_t msgNum) { return NULL; }
|
||||
|
||||
int32_t mnodeAppendMsg(SMnodeMsg *pMsg, SRpcMsg *pRpcMsg) { return 0; }
|
||||
|
||||
void mnodeCleanupMsg(SMnodeMsg *pMsg) {}
|
||||
void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) {}
|
|
@ -1,250 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
#include "tstep.h"
|
||||
#include "mnodeAcct.h"
|
||||
#include "mnodeAuth.h"
|
||||
#include "mnodeBalance.h"
|
||||
#include "mnodeCluster.h"
|
||||
#include "mnodeDb.h"
|
||||
#include "mnodeDnode.h"
|
||||
#include "mnodeFunc.h"
|
||||
#include "mnodeMnode.h"
|
||||
#include "mnodeOper.h"
|
||||
#include "mnodeProfile.h"
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeShow.h"
|
||||
#include "mnodeStable.h"
|
||||
#include "mnodeSync.h"
|
||||
#include "mnodeUser.h"
|
||||
#include "mnodeVgroup.h"
|
||||
#include "mnodeWorker.h"
|
||||
#include "mnodeTelem.h"
|
||||
|
||||
static struct {
|
||||
int32_t state;
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
tmr_h timer;
|
||||
SSteps *steps1;
|
||||
SSteps *steps2;
|
||||
SMnodePara para;
|
||||
} tsMint;
|
||||
|
||||
tmr_h mnodeGetTimer() { return tsMint.timer; }
|
||||
|
||||
int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; }
|
||||
|
||||
int64_t mnodeGetClusterId() { return tsMint.para.clusterId; }
|
||||
|
||||
EMnStatus mnodeGetStatus() { return tsMint.state; }
|
||||
|
||||
void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) {
|
||||
(*tsMint.para.SendMsgToDnode)(epSet, rpcMsg);
|
||||
}
|
||||
|
||||
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.para.SendMsgToMnode)(rpcMsg); }
|
||||
|
||||
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.para.SendRedirectMsg)(rpcMsg, forShell); }
|
||||
|
||||
int32_t mnodeGetLoad(SMnodeLoad *pLoad) { return 0; }
|
||||
|
||||
static int32_t mnodeSetPara(SMnodePara para) {
|
||||
tsMint.para = para;
|
||||
|
||||
if (tsMint.para.SendMsgToDnode == NULL) return -1;
|
||||
if (tsMint.para.SendMsgToMnode == NULL) return -1;
|
||||
if (tsMint.para.SendRedirectMsg == NULL) return -1;
|
||||
if (tsMint.para.PutMsgIntoApplyQueue == NULL) return -1;
|
||||
if (tsMint.para.dnodeId < 0) return -1;
|
||||
if (tsMint.para.clusterId < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mnodeInitTimer() {
|
||||
if (tsMint.timer == NULL) {
|
||||
tsMint.timer = taosTmrInit(tsMaxShellConns, 200, 3600000, "MND");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mnodeCleanupTimer() {
|
||||
if (tsMint.timer != NULL) {
|
||||
taosTmrCleanUp(tsMint.timer);
|
||||
tsMint.timer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mnodeInitStep1() {
|
||||
struct SSteps *steps = taosStepInit(16, NULL);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup);
|
||||
taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster);
|
||||
taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode);
|
||||
taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode);
|
||||
taosStepAdd(steps, "mnode-acct", mnodeInitAcct, mnodeCleanupAcct);
|
||||
taosStepAdd(steps, "mnode-auth", mnodeInitAuth, mnodeCleanupAuth);
|
||||
taosStepAdd(steps, "mnode-user", mnodeInitUser, mnodeCleanupUser);
|
||||
taosStepAdd(steps, "mnode-db", mnodeInitDb, mnodeCleanupDb);
|
||||
taosStepAdd(steps, "mnode-vgroup", mnodeInitVgroup, mnodeCleanupVgroup);
|
||||
taosStepAdd(steps, "mnode-stable", mnodeInitStable, mnodeCleanupStable);
|
||||
taosStepAdd(steps, "mnode-func", mnodeInitFunc, mnodeCleanupFunc);
|
||||
taosStepAdd(steps, "mnode-oper", mnodeInitOper, mnodeCleanupOper);
|
||||
|
||||
tsMint.steps1 = steps;
|
||||
return taosStepExec(tsMint.steps1);
|
||||
}
|
||||
|
||||
static int32_t mnodeInitStep2() {
|
||||
struct SSteps *steps = taosStepInit(12, NULL);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL);
|
||||
taosStepAdd(steps, "mnode-worker", mnodeInitWorker, NULL);
|
||||
taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance);
|
||||
taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile);
|
||||
taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow);
|
||||
taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync);
|
||||
taosStepAdd(steps, "mnode-worker", NULL, mnodeCleanupWorker);
|
||||
taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem);
|
||||
taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer);
|
||||
|
||||
tsMint.steps2 = steps;
|
||||
return taosStepExec(tsMint.steps2);
|
||||
}
|
||||
|
||||
static void mnodeCleanupStep1() { taosStepCleanup(tsMint.steps1); }
|
||||
|
||||
static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); }
|
||||
|
||||
static bool mnodeNeedDeploy() {
|
||||
if (tsMint.para.dnodeId > 0) return false;
|
||||
if (tsMint.para.clusterId > 0) return false;
|
||||
if (strcmp(tsFirst, tsLocalEp) != 0) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
|
||||
if (tsMint.state != MN_STATUS_UNINIT) {
|
||||
mError("failed to deploy mnode since its deployed");
|
||||
return 0;
|
||||
} else {
|
||||
tsMint.state = MN_STATUS_INIT;
|
||||
}
|
||||
|
||||
if (tsMint.para.dnodeId <= 0 || tsMint.para.clusterId <= 0) {
|
||||
mError("failed to deploy mnode since cluster not ready");
|
||||
return TSDB_CODE_MND_NOT_READY;
|
||||
}
|
||||
|
||||
mInfo("starting to deploy mnode");
|
||||
|
||||
int32_t code = mnodeInitStep1();
|
||||
if (code != 0) {
|
||||
mError("failed to deploy mnode since init step1 error");
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
code = mnodeInitStep2();
|
||||
if (code != 0) {
|
||||
mnodeCleanupStep1();
|
||||
mError("failed to deploy mnode since init step2 error");
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
mDebug("mnode is deployed and waiting for raft to confirm");
|
||||
tsMint.state = MN_STATUS_READY;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mnodeUnDeploy(char *path) {
|
||||
sdbUnDeploy();
|
||||
mnodeCleanup();
|
||||
}
|
||||
|
||||
int32_t mnodeInit(SMnodePara para) {
|
||||
mDebugFlag = 207;
|
||||
if (tsMint.state != MN_STATUS_UNINIT) {
|
||||
return 0;
|
||||
} else {
|
||||
tsMint.state = MN_STATUS_INIT;
|
||||
}
|
||||
|
||||
mInfo("starting to initialize mnode ...");
|
||||
|
||||
int32_t code = mnodeSetPara(para);
|
||||
if (code != 0) {
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return code;
|
||||
}
|
||||
|
||||
code = mnodeInitStep1();
|
||||
if (code != 0) {
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
code = sdbRead();
|
||||
if (code != 0) {
|
||||
if (mnodeNeedDeploy()) {
|
||||
code = sdbDeploy();
|
||||
if (code != 0) {
|
||||
mnodeCleanupStep1();
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
mnodeCleanupStep1();
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
code = mnodeInitStep2();
|
||||
if (code != 0) {
|
||||
mnodeCleanupStep1();
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsMint.state = MN_STATUS_READY;
|
||||
mInfo("mnode is initialized successfully");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mnodeCleanup() {
|
||||
if (tsMint.state != MN_STATUS_UNINIT && tsMint.state != MN_STATUS_CLOSING) {
|
||||
mInfo("starting to clean up mnode");
|
||||
tsMint.state = MN_STATUS_CLOSING;
|
||||
|
||||
mnodeCleanupStep2();
|
||||
mnodeCleanupStep1();
|
||||
|
||||
tsMint.state = MN_STATUS_UNINIT;
|
||||
mInfo("mnode is cleaned up");
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return 0; }
|
||||
int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; }
|
||||
void mnodeStop() {}
|
|
@ -87,7 +87,7 @@ static int32_t sdbReadDataFile() {
|
|||
|
||||
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
|
||||
if (pRaw == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
char file[PATH_MAX] = {0};
|
||||
|
@ -241,7 +241,7 @@ int32_t sdbInit() {
|
|||
tsSdb.tmpDir = strdup(path);
|
||||
|
||||
if (tsSdb.currDir == NULL || tsSdb.currDir == NULL || tsSdb.currDir == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
||||
|
@ -256,7 +256,7 @@ int32_t sdbInit() {
|
|||
|
||||
SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
|
||||
if (hash == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
tsSdb.hashObjs[i] = hash;
|
||||
|
|
|
@ -41,7 +41,7 @@ SSdbRaw *trnActionEncode(STrans *pTrans) {
|
|||
|
||||
SSdbRaw *pRaw = calloc(1, rawDataLen + sizeof(SSdbRaw));
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -67,7 +67,7 @@ STrans *trnActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
STrans *pTrans = NULL;
|
||||
if (pTrans == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -92,14 +92,14 @@ STrans *trnActionDecode(SSdbRaw *pRaw) {
|
|||
if (code == 0 && pTmp->dataLen > 0) {
|
||||
SSdbRaw *pRead = malloc(sizeof(SSdbRaw) + pTmp->dataLen);
|
||||
if (pRead == NULL) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
memcpy(pRead, pTmp, sizeof(SSdbRaw));
|
||||
SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code);
|
||||
void *ret = taosArrayPush(pTrans->redoLogs, &pRead);
|
||||
if (ret == NULL) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ int32_t trnGenerateTransId() { return 1; }
|
|||
STrans *trnCreate(ETrnPolicy policy) {
|
||||
STrans *pTrans = calloc(1, sizeof(STrans));
|
||||
if (pTrans == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -166,7 +166,7 @@ STrans *trnCreate(ETrnPolicy policy) {
|
|||
|
||||
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
|
||||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -195,13 +195,13 @@ void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcH
|
|||
|
||||
static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
|
||||
if (pArray == NULL || pRaw == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *ptr = taosArrayPush(pArray, &pRaw);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -70,11 +70,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQD
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
|
||||
|
||||
//common & util
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, "Operation not supported")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, "Memory corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, "Out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, "Invalid config message")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, "Data file corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PTR, "Invalid pointer")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, "Memory corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
|
||||
|
@ -82,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
|
||||
|
||||
|
||||
//client
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_QHANDLE, "Invalid qhandle")
|
||||
|
@ -131,7 +134,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_TYPE, "Invalid message type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_SHELL_CONNS, "Too many connections")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OUT_OF_MEMORY, "Out of memory in mnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, "Data expired")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, "Invalid query id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, "Invalid stream id")
|
||||
|
|
|
@ -12,7 +12,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
|
|||
|
||||
SDiskbasedResultBuf* pResBuf = *pResultBuf;
|
||||
if (pResBuf == NULL) {
|
||||
return TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pResBuf->pageSize = pagesize;
|
||||
|
|
|
@ -55,7 +55,7 @@ typedef struct STaosQall {
|
|||
taos_queue taosOpenQueue() {
|
||||
STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1);
|
||||
if (queue == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -244,7 +244,7 @@ void taosResetQitems(taos_qall param) {
|
|||
taos_qset taosOpenQset() {
|
||||
STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1);
|
||||
if (qset == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "ulog.h"
|
||||
#include "taoserror.h"
|
||||
#include "tstep.h"
|
||||
|
||||
typedef struct {
|
||||
|
@ -33,7 +34,10 @@ typedef struct SSteps {
|
|||
|
||||
SSteps *taosStepInit(int32_t maxsize, ReportFp fp) {
|
||||
SSteps *steps = calloc(1, sizeof(SSteps));
|
||||
if (steps == NULL) return NULL;
|
||||
if (steps == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
steps->maxsize = maxsize;
|
||||
steps->cursize = 0;
|
||||
|
@ -44,9 +48,14 @@ SSteps *taosStepInit(int32_t maxsize, ReportFp fp) {
|
|||
}
|
||||
|
||||
int32_t taosStepAdd(struct SSteps *steps, char *name, InitFp initFp, CleanupFp cleanupFp) {
|
||||
if (steps == NULL) return -1;
|
||||
if (steps == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (steps->cursize >= steps->maxsize) {
|
||||
uError("failed to add step since up to the maxsize");
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -66,7 +75,10 @@ static void taosStepCleanupImp(SSteps *steps, int32_t pos) {
|
|||
}
|
||||
|
||||
int32_t taosStepExec(SSteps *steps) {
|
||||
if (steps == NULL) return -1;
|
||||
if (steps == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t s = 0; s < steps->cursize; s++) {
|
||||
SStep *step = steps->steps + s;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "tsched.h"
|
||||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
extern int8_t tscEmbedded;
|
||||
|
||||
|
@ -547,6 +548,7 @@ void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* lab
|
|||
|
||||
if (ctrl == NULL) {
|
||||
tmrError("%s too many timer controllers, failed to create timer controller.", label);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue