diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 09dd4a3f2d..20de27e59d 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -64,8 +64,7 @@ void mnodeStop(); int32_t mnodeGetLoad(SMnodeLoad *pLoad); int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); -SMnodeMsg *mnodeInitMsg(int32_t msgNum); -int32_t mnodeAppendMsg(SMnodeMsg *pMsg, SRpcMsg *pRpcMsg); +SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg); void mnodeCleanupMsg(SMnodeMsg *pMsg); void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType); diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 344dc6922d..eef79fbc09 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -77,18 +77,17 @@ extern "C" { typedef enum { SDB_START = 0, SDB_TRANS = 1, - SDB_VERSION = 2, - SDB_CLUSTER = 3, - SDB_DNODE = 4, - SDB_MNODE = 5, - SDB_ACCT = 6, - SDB_AUTH = 7, - SDB_USER = 8, - SDB_DB = 9, - SDB_VGROUP = 10, - SDB_STABLE = 11, - SDB_FUNC = 12, - SDB_MAX = 13 + SDB_CLUSTER = 2, + SDB_DNODE = 3, + SDB_MNODE = 4, + SDB_ACCT = 5, + SDB_AUTH = 6, + SDB_USER = 7, + SDB_DB = 8, + SDB_VGROUP = 9, + SDB_STABLE = 10, + SDB_FUNC = 11, + SDB_MAX = 12 } ESdbType; typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8c4a726bf6..9669ccb0bf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -60,17 +60,19 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //"Invalid app version") //common & util -#define TSDB_CODE_COM_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //"Operation not supported") -#define TSDB_CODE_COM_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101) //"Memory corrupted") -#define TSDB_CODE_COM_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0102) //"Out of memory") -#define TSDB_CODE_COM_INVALID_CFG_MSG TAOS_DEF_ERROR_CODE(0, 0x0103) //"Invalid config message") -#define TSDB_CODE_COM_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) //"Data file corrupted") -#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105) //"Ref out of memory") -#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106) //"too many Ref Objs") -#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0107) //"Ref ID is removed") -#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0108) //"Invalid Ref ID") -#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0109) //"Ref is already there") -#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A) //"Ref is not there") +#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) +#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0101) +#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0102) +#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0103) +#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) +#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106) +#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0107) +#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0108) +#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0109) +#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010A) +#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010B) +#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010C) +#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation") @@ -121,7 +123,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307) //"Invalid message length") #define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308) //"Invalid message type") #define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309) //"Too many connections") -#define TSDB_CODE_MND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x030A) //"Out of memory in mnode") #define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B) //"Data expired") #define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) //"Invalid query id") #define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) //"Invalid stream id") @@ -132,6 +133,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir") #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components") + #define TSDB_CODE_SDB_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) #define TSDB_CODE_SDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0321) #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0322) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4f30327c06..a19f6ec04b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -40,7 +40,7 @@ uint16_t tsArbitratorPort = 6042; int32_t tsStatusInterval = 1; // second int32_t tsNumOfMnodes = 1; int8_t tsEnableVnodeBak = 1; -int8_t tsEnableTelemetryReporting = 1; +int8_t tsEnableTelemetryReporting = 0; int8_t tsArbOnline = 0; int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME; char tsEmail[TSDB_FQDN_LEN] = {0}; diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 63de2b940d..7843075b30 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -302,6 +302,13 @@ PRASE_DNODE_OVER: return -1; } + if (tsDnode.dnodeEps == NULL) { + tsDnode.dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); + tsDnode.dnodeEps->dnodeNum = 1; + tsDnode.dnodeEps->dnodeEps[0].dnodePort = tsServerPort; + tstrncpy(tsDnode.dnodeEps->dnodeEps[0].dnodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN); + } + dnodeResetDnodes(tsDnode.dnodeEps); terrno = 0; diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c index e7018f4265..2674e107fd 100644 --- a/source/dnode/mgmt/src/dnodeInt.c +++ b/source/dnode/mgmt/src/dnodeInt.c @@ -135,7 +135,7 @@ static int32_t dnodeInitMain() { dnodeInitDir(); - return -1; + return 0; } static void dnodeCleanupMain() { @@ -145,14 +145,14 @@ static void dnodeCleanupMain() { } int32_t dnodeInit() { - SSteps *steps = taosStepInit(24, dnodeReportStartup); + SSteps *steps = taosStepInit(10, dnodeReportStartup); if (steps == NULL) return -1; taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); taosStepAdd(steps, "dnode-tfs", NULL, NULL); taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); - taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); + //taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode); taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes); taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode); diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index e5a758899e..f6726bf981 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -335,12 +335,9 @@ static int32_t dnodeWriteMnodeMsgToQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { if (pQueue == NULL) { code = TSDB_CODE_DND_MSG_NOT_PROCESSED; } else { - SMnodeMsg *pMsg = mnodeInitMsg(1); + SMnodeMsg *pMsg = mnodeInitMsg(pRpcMsg); if (pMsg == NULL) { - code = TSDB_CODE_DND_OUT_OF_MEMORY; - } else { - mnodeAppendMsg(pMsg, pRpcMsg); - code = taosWriteQitem(pQueue, pMsg); + code = terrno; } } diff --git a/source/dnode/mnode/impl/inc/mnodeDef.h b/source/dnode/mnode/impl/inc/mnodeDef.h index 49c74283ae..b6449ecfe7 100644 --- a/source/dnode/mnode/impl/inc/mnodeDef.h +++ b/source/dnode/mnode/impl/inc/mnodeDef.h @@ -57,7 +57,6 @@ typedef struct SVgObj SVgObj; typedef struct SSTableObj SSTableObj; typedef struct SFuncObj SFuncObj; typedef struct SOperObj SOperObj; -typedef struct SMnMsg SMnMsg; typedef enum { MN_AUTH_ACCT_START = 0, @@ -265,9 +264,9 @@ typedef struct { void *rsp; } SMnRsp; -typedef struct SMnMsg { - void (*fp)(SMnMsg *pMsg, int32_t code); - char user[TSDB_USER_LEN]; +typedef struct SMnodeMsg { + void (*fp)(SMnodeMsg *pMsg, int32_t code); + SRpcConnInfo conn; SUserObj *pUser; int16_t received; int16_t successed; @@ -278,7 +277,7 @@ typedef struct SMnMsg { SMnRsp rpcRsp; SRpcMsg rpcMsg; char pCont[]; -} SMnReq; +} SMnodeMsg; #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mnodeInt.h b/source/dnode/mnode/impl/inc/mnodeInt.h index 11f8f461ba..95f0acf52f 100644 --- a/source/dnode/mnode/impl/inc/mnodeInt.h +++ b/source/dnode/mnode/impl/inc/mnodeInt.h @@ -24,17 +24,18 @@ extern "C" { #endif -typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus; +typedef void (*MnodeRpcFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *pMsg); -tmr_h mnodeGetTimer(); -int32_t mnodeGetDnodeId(); -int64_t mnodeGetClusterId(); -EMnStatus mnodeGetStatus(); +tmr_h mnodeGetTimer(); +int32_t mnodeGetDnodeId(); +int64_t mnodeGetClusterId(); void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); +void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mnodeWorker.h b/source/dnode/mnode/impl/inc/mnodeWorker.h deleted file mode 100644 index 8477af6b72..0000000000 --- a/source/dnode/mnode/impl/inc/mnodeWorker.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_MNODE_WORKER_H_ -#define _TD_MNODE_WORKER_H_ - -#include "mnodeInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t mnodeInitWorker(); -void mnodeCleanupWorker(); -void mnodeSendRsp(SMnMsg *pMsg, int32_t code); -void mnodeReDispatchToWriteQueue(SMnMsg *pMsg); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_MNODE_WORKER_H_*/ diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c new file mode 100644 index 0000000000..93cd85a888 --- /dev/null +++ b/source/dnode/mnode/impl/src/mnode.c @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tglobal.h" +#include "tstep.h" +#include "tqueue.h" +#include "mnodeAcct.h" +#include "mnodeAuth.h" +#include "mnodeBalance.h" +#include "mnodeCluster.h" +#include "mnodeDb.h" +#include "mnodeDnode.h" +#include "mnodeFunc.h" +#include "mnodeMnode.h" +#include "mnodeOper.h" +#include "mnodeProfile.h" +#include "mnodeShow.h" +#include "mnodeStable.h" +#include "mnodeSync.h" +#include "mnodeTelem.h" +#include "mnodeUser.h" +#include "mnodeVgroup.h" + +static struct { + int32_t dnodeId; + int64_t clusterId; + tmr_h timer; + SSteps *pInitSteps; + SSteps *pStartSteps; + SMnodePara para; + MnodeRpcFp msgFp; +} tsMint; + +int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; } + +int64_t mnodeGetClusterId() { return tsMint.para.clusterId; } + +void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { (*tsMint.para.SendMsgToDnode)(epSet, rpcMsg); } + +void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.para.SendMsgToMnode)(rpcMsg); } + +void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.para.SendRedirectMsg)(rpcMsg, forShell); } + +static int32_t mnodeInitTimer() { + if (tsMint.timer == NULL) { + tsMint.timer = taosTmrInit(tsMaxShellConns, 200, 3600000, "MND"); + } + + if (tsMint.timer == NULL) { + return -1; + } + + return 0; +} + +static void mnodeCleanupTimer() { + if (tsMint.timer != NULL) { + taosTmrCleanUp(tsMint.timer); + tsMint.timer = NULL; + } +} + +tmr_h mnodeGetTimer() { return tsMint.timer; } + +static int32_t mnodeSetPara(SMnodePara para) { + tsMint.para = para; + + if (tsMint.para.SendMsgToDnode == NULL) { + terrno = TSDB_CODE_MND_APP_ERROR; + return -1; + } + + if (tsMint.para.SendMsgToMnode == NULL) { + terrno = TSDB_CODE_MND_APP_ERROR; + return -1; + } + + if (tsMint.para.SendRedirectMsg == NULL) { + terrno = TSDB_CODE_MND_APP_ERROR; + return -1; + } + + if (tsMint.para.PutMsgIntoApplyQueue == NULL) { + terrno = TSDB_CODE_MND_APP_ERROR; + return -1; + } + + if (tsMint.para.dnodeId < 0) { + terrno = TSDB_CODE_MND_APP_ERROR; + return -1; + } + + if (tsMint.para.clusterId < 0) { + terrno = TSDB_CODE_MND_APP_ERROR; + return -1; + } + + return 0; +} + +static int32_t mnodeAllocInitSteps() { + struct SSteps *steps = taosStepInit(16, NULL); + if (steps == NULL) return -1; + + if (taosStepAdd(steps, "mnode-trans", trnInit, trnCleanup) != 0) return -1; + if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1; + if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1; + if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1; + if (taosStepAdd(steps, "mnode-acct", mnodeInitAcct, mnodeCleanupAcct) != 0) return -1; + if (taosStepAdd(steps, "mnode-auth", mnodeInitAuth, mnodeCleanupAuth) != 0) return -1; + if (taosStepAdd(steps, "mnode-user", mnodeInitUser, mnodeCleanupUser) != 0) return -1; + if (taosStepAdd(steps, "mnode-db", mnodeInitDb, mnodeCleanupDb) != 0) return -1; + if (taosStepAdd(steps, "mnode-vgroup", mnodeInitVgroup, mnodeCleanupVgroup) != 0) return -1; + if (taosStepAdd(steps, "mnode-stable", mnodeInitStable, mnodeCleanupStable) != 0) return -1; + if (taosStepAdd(steps, "mnode-func", mnodeInitFunc, mnodeCleanupFunc) != 0) return -1; + if (taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup) != 0) return -1; + + tsMint.pInitSteps = steps; + return 0; +} + +static int32_t mnodeAllocStartSteps() { + struct SSteps *steps = taosStepInit(7, NULL); + if (steps == NULL) return -1; + + taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL); + taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance); + taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile); + taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow); + taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync); + taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem); + taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer); + + tsMint.pStartSteps = steps; + return 0; +} + +int32_t mnodeInit(SMnodePara para) { + if (mnodeSetPara(para) != 0) { + mError("failed to init mnode para since %s", terrstr()); + return -1; + } + + if (mnodeAllocInitSteps() != 0) { + mError("failed to alloc init steps since %s", terrstr()); + return -1; + } + + if (mnodeAllocStartSteps() != 0) { + mError("failed to alloc start steps since %s", terrstr()); + return -1; + } + + return taosStepExec(tsMint.pInitSteps); +} + +void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); } + +int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { + if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) { + if (sdbDeploy() != 0) { + mError("failed to deploy sdb since %s", terrstr()); + return -1; + } + } + + mDebug("mnode is deployed"); + return 0; +} + +void mnodeUnDeploy(char *path) { sdbUnDeploy(); } + +int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } + +int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; } + +void mnodeStop() { taosStepCleanup(tsMint.pStartSteps); } + +int32_t mnodeGetLoad(SMnodeLoad *pLoad) { return 0; } + +SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { + SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) { + mnodeCleanupMsg(pMsg); + mError("can not get user from conn:%p", pMsg->rpcMsg.handle); + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + return NULL; + } + + pMsg->rpcMsg = *pRpcMsg; + pMsg->createdTime = taosGetTimestampSec(); + + return pMsg; +} + +void mnodeCleanupMsg(SMnodeMsg *pMsg) { + if (pMsg->pUser != NULL) { + sdbRelease(pMsg->pUser); + } + + taosFreeQitem(pMsg); +} + +static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) { + int32_t msgType = pMsg->rpcMsg.msgType; + + if (tsMint.msgFp[msgType] == NULL) { + } + + (*tsMint.msgFp[msgType])(pMsg); +} + +void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { + if (msgType > 0 || msgType < TSDB_MSG_TYPE_MAX) { + tsMint.msgFp[msgType] = fp; + } +} + +void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) { + if (!mnodeIsMaster()) { + mnodeSendRedirectMsg(&pMsg->rpcMsg, true); + mnodeCleanupMsg(pMsg); + return; + } + + switch (msgType) { + case MN_MSG_TYPE_READ: + case MN_MSG_TYPE_WRITE: + case MN_MSG_TYPE_SYNC: + mnodeProcessRpcMsg(pMsg); + break; + case MN_MSG_TYPE_APPLY: + break; + default: + break; + } +} + +#if 0 + +static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) { + int32_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + int32_t code = 0; + + if (pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_WRITE_REQ_END; + } + + if (!mnodeIsMaster()) { + SMnRsp *rpcRsp = &pMsg->rpcRsp; + SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); + mnodeGetMnodeEpSetForShell(epSet, true); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SEpSet); + + mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, + taosMsg[msgType], epSet->numOfEps, epSet->inUse); + + code = TSDB_CODE_RPC_REDIRECT; + goto PROCESS_WRITE_REQ_END; + } + + if (tsMworker.writeMsgFp[msgType] == NULL) { + mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_MSG_NOT_PROCESSED; + goto PROCESS_WRITE_REQ_END; + } + + code = (*tsMworker.writeMsgFp[msgType])(pMsg); + +PROCESS_WRITE_REQ_END: + mnodeSendRsp(pMsg, code); +} + +static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) { + int32_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + int32_t code = 0; + + if (pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_READ_REQ_END; + } + + if (!mnodeIsMaster()) { + SMnRsp *rpcRsp = &pMsg->rpcRsp; + SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); + if (!epSet) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto PROCESS_READ_REQ_END; + } + mnodeGetMnodeEpSetForShell(epSet, true); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SEpSet); + + mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], + epSet->numOfEps, epSet->inUse); + code = TSDB_CODE_RPC_REDIRECT; + goto PROCESS_READ_REQ_END; + } + + if (tsMworker.readMsgFp[msgType] == NULL) { + mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_MSG_NOT_PROCESSED; + goto PROCESS_READ_REQ_END; + } + + mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]); + code = (*tsMworker.readMsgFp[msgType])(pMsg); + +PROCESS_READ_REQ_END: + mnodeSendRsp(pMsg, code); +} + +static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) { + int32_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + int32_t code = 0; + + if (pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_PEER_REQ_END; + } + + if (!mnodeIsMaster()) { + SMnRsp *rpcRsp = &pMsg->rpcRsp; + SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); + mnodeGetMnodeEpSetForPeer(epSet, true); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SEpSet); + + mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, + taosMsg[msgType], epSet->numOfEps, epSet->inUse); + + code = TSDB_CODE_RPC_REDIRECT; + goto PROCESS_PEER_REQ_END; + } + + if (tsMworker.peerReqFp[msgType] == NULL) { + mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_MSG_NOT_PROCESSED; + goto PROCESS_PEER_REQ_END; + } + + code = (*tsMworker.peerReqFp[msgType])(pMsg); + +PROCESS_PEER_REQ_END: + mnodeSendRsp(pMsg, code); +} + +static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) { + int32_t msgType = pMsg->rpcMsg.msgType; + SRpcMsg *pRpcMsg = &pMsg->rpcMsg; + + if (!mnodeIsMaster()) { + mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); + mnodeCleanupMsg2(pMsg); + } + + if (tsMworker.peerRspFp[msgType]) { + (*tsMworker.peerRspFp[msgType])(pRpcMsg); + } else { + mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); + } + + mnodeCleanupMsg2(pMsg); +} +#endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnodeAcct.c b/source/dnode/mnode/impl/src/mnodeAcct.c index e4538fa391..251fcf3edc 100644 --- a/source/dnode/mnode/impl/src/mnodeAcct.c +++ b/source/dnode/mnode/impl/src/mnodeAcct.c @@ -21,7 +21,7 @@ static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) { SSdbRaw *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRaw)); if (pRaw == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -53,7 +53,7 @@ static SAcctObj *mnodeAcctActionDecode(SSdbRaw *pRaw) { SAcctObj *pAcct = calloc(1, sizeof(SAcctObj)); if (pAcct == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/dnode/mnode/impl/src/mnodeTelem.c b/source/dnode/mnode/impl/src/mnodeTelem.c index a3977f5b17..ef1ac10eb6 100644 --- a/source/dnode/mnode/impl/src/mnodeTelem.c +++ b/source/dnode/mnode/impl/src/mnodeTelem.c @@ -17,6 +17,7 @@ #include "mnodeTelem.h" #include "tbuffer.h" #include "tglobal.h" +#include "mnodeSync.h" #define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_PORT 80 @@ -255,7 +256,7 @@ static void* mnodeTelemThreadFp(void* param) { if (r == 0) break; if (r != ETIMEDOUT) continue; - if (mnodeGetStatus() == MN_STATUS_READY) { + if (mnodeIsMaster()) { mnodeSendTelemetryReport(); } end.tv_sec += REPORT_INTERVAL; diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c index 3b5d40d807..e7b03b8372 100644 --- a/source/dnode/mnode/impl/src/mnodeUser.c +++ b/source/dnode/mnode/impl/src/mnodeUser.c @@ -24,7 +24,7 @@ static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { SSdbRaw *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRaw)); if (pRaw == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -51,7 +51,7 @@ static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) { SUserObj *pUser = calloc(1, sizeof(SUserObj)); if (pUser == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -77,7 +77,7 @@ static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) { static int32_t mnodeUserActionInsert(SUserObj *pUser) { pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pUser->prohibitDbHash == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -145,7 +145,7 @@ static int32_t mnodeCreateDefaultUsers() { return 0; } -static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg) { +static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) { SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -192,7 +192,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg) return 0; } -static int32_t mnodeProcessCreateUserMsg(SMnMsg *pMsg) { +static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) { SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; if (pCreate->user[0] == 0) { @@ -215,7 +215,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnMsg *pMsg) { return -1; } - SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->user); + SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->conn.user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; mError("user:%s, failed to create since %s", pCreate->user, terrstr()); diff --git a/source/dnode/mnode/impl/src/mnodeWorker.c b/source/dnode/mnode/impl/src/mnodeWorker.c deleted file mode 100644 index cf2f415b4d..0000000000 --- a/source/dnode/mnode/impl/src/mnodeWorker.c +++ /dev/null @@ -1,494 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "tworker.h" -#include "tglobal.h" -#include "mnodeMnode.h" -#include "mnodeInt.h" -#include "mnodeShow.h" -#include "mnodeSync.h" -#include "mnodeWorker.h" - -static struct { - SWorkerPool read; - SWorkerPool write; - SWorkerPool peerReq; - SWorkerPool peerRsp; - taos_queue readQ; - taos_queue writeQ; - taos_queue peerReqQ; - taos_queue peerRspQ; - int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); - int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); - int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); - void (*peerRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); - void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); -} tsMworker = {0}; - -static SMnMsg *mnodeInitMsg2(SRpcMsg *pRpcMsg) { - int32_t size = sizeof(SMnMsg) + pRpcMsg->contLen; - SMnMsg *pMsg = taosAllocateQitem(size); - - pMsg->rpcMsg = *pRpcMsg; - pMsg->rpcMsg.pCont = pMsg->pCont; - pMsg->createdTime = taosGetTimestampSec(); - memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); - - SRpcConnInfo connInfo = {0}; - if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) == 0) { - pMsg->pUser = sdbAcquire(SDB_USER, connInfo.user); - } - - if (pMsg->pUser == NULL) { - mError("can not get user from conn:%p", pMsg->rpcMsg.handle); - taosFreeQitem(pMsg); - return NULL; - } - - return pMsg; -} - -static void mnodeCleanupMsg2(SMnMsg *pMsg) { - if (pMsg == NULL) return; - if (pMsg->rpcMsg.pCont != pMsg->pCont) { - tfree(pMsg->rpcMsg.pCont); - } - - taosFreeQitem(pMsg); -} - -static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) { - if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) { - mnodeSendRedirectMsg(pRpcMsg, true); - } else { - SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); - if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; - rpcSendResponse(&rpcRsp); - } else { - mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.writeQ, pMsg); - } - } - - rpcFreeCont(pRpcMsg->pCont); -} - -void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) { - if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) { - mnodeSendRedirectMsg(&pMsg->rpcMsg, true); - mnodeCleanupMsg2(pMsg); - } else { - taosWriteQitem(tsMworker.writeQ, pMsg); - } -} - -static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) { - if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.readQ == NULL) { - mnodeSendRedirectMsg(pRpcMsg, true); - } else { - SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); - if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; - rpcSendResponse(&rpcRsp); - } else { - mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.readQ, pMsg); - } - } - - rpcFreeCont(pRpcMsg->pCont); -} - -static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) { - if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.peerReqQ == NULL) { - mnodeSendRedirectMsg(pRpcMsg, false); - } else { - SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); - if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; - rpcSendResponse(&rpcRsp); - } else { - mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle, - taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.peerReqQ, pMsg); - } - } - - rpcFreeCont(pRpcMsg->pCont); -} - -void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) { - SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); - if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; - rpcSendResponse(&rpcRsp); - } else { - mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle, - taosMsg[pMsg->rpcMsg.msgType]); - taosWriteQitem(tsMworker.peerRspQ, pMsg); - } - - // rpcFreeCont(pRpcMsg->pCont); -} - -void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { - if (pMsg == NULL) return; - if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; - if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { - mnodeReDispatchToWriteQueue(pMsg); - return; - } - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pMsg->rpcRsp.rsp, - .contLen = pMsg->rpcRsp.len, - .code = code, - }; - - rpcSendResponse(&rpcRsp); - mnodeCleanupMsg2(pMsg); -} - -static void mnodeInitMsgFp() { -// // peer req -// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue; -// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue; -// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_AUTH] = mnodeDispatchToPeerQueue; -// tsMworker.peerReqFp[TSDB_MSG_TYPE_AUTH] = mnodeProcessAuthMsg; -// // tsMworker.msgFp[TSDB_MSG_TYPE_GRANT] = mnodeDispatchToPeerQueue; -// // tsMworker.peerReqFp[TSDB_MSG_TYPE_GRANT] = grantProcessMsgInMgmt; -// tsMworker.msgFp[TSDB_MSG_TYPE_STATUS] = mnodeDispatchToPeerQueue; -// tsMworker.peerReqFp[TSDB_MSG_TYPE_STATUS] = mnodeProcessDnodeStatusMsg; - -// // peer rsp -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessCfgDnodeMsgRsp; - -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessDropSuperTableRsp; -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessCreateChildTableRsp; -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessDropChildTableRsp; -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessAlterTableRsp; - -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessCreateVnodeRsp; -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessAlterVnodeRsp; -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessCompactVnodeRsp; -// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeDispatchToPeerRspQueue; -// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessDropVnodeRsp; - -// // read msg -// tsMworker.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessHeartBeatMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessConnectMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessUseMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_TABLE_META] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_TABLE_META] = mnodeProcessTableMetaMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_TABLES_META] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_TABLES_META] = mnodeProcessMultiTableMetaMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessSuperTableVgroupMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessShowMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessRetrieveMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_RETRIEVE_FUNC] = mnodeDispatchToReadQueue; -// tsMworker.readMsgFp[TSDB_MSG_TYPE_RETRIEVE_FUNC] = mnodeProcessRetrieveFuncReq; - -// // tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeDispatchToWriteQueue; -// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CREATE_ACCT] = acctProcessCreateAcctMsg; -// // tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeDispatchToWriteQueue; -// // tsMworker.readMsgFp[TSDB_MSG_TYPE_ALTER_ACCT] = acctProcessDropAcctMsg; -// // tsMworker.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeDispatchToWriteQueue; -// // tsMworker.readMsgFp[TSDB_MSG_TYPE_DROP_ACCT] = acctProcessAlterAcctMsg; - -// // write msg -// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessCreateUserMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessAlterUserMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessDropUserMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessCreateDnodeMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessDropDnodeMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessCfgDnodeMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessCreateDbMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessAlterDbMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessDropDbMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessSyncDbMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessCompactMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessCreateFuncMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessDropFuncMsg; - -// // tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_TP] = mnodeDispatchToWriteQueue; -// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CREATE_TP] = tpProcessCreateTpMsg; -// // tsMworker.msgFp[TSDB_MSG_TYPE_DROP_TP] = mnodeDispatchToWriteQueue; -// // tsMworker.readMsgFp[TSDB_MSG_TYPE_DROP_TP] = tpProcessAlterTpMsg; -// // tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_TP] = mnodeDispatchToWriteQueue; -// // tsMworker.readMsgFp[TSDB_MSG_TYPE_ALTER_TP] = tpProcessDropTpMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_TABLE] = mnodeProcessCreateTableMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_TABLE] = mnodeProcessDropTableMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_TABLE] = mnodeProcessAlterTableMsg; - -// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_STREAM] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = NULL; -// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessKillQueryMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_STREAM] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_STREAM] = mnodeProcessKillStreamMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeDispatchToWriteQueue; -// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessKillConnectionMsg; -} - -static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - int32_t code = 0; - - if (pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_INVALID_MSG_LEN; - goto PROCESS_WRITE_REQ_END; - } - - if (!mnodeIsMaster()) { - SMnRsp *rpcRsp = &pMsg->rpcRsp; - SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); - mnodeGetMnodeEpSetForShell(epSet, true); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SEpSet); - - mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, - taosMsg[msgType], epSet->numOfEps, epSet->inUse); - - code = TSDB_CODE_RPC_REDIRECT; - goto PROCESS_WRITE_REQ_END; - } - - if (tsMworker.writeMsgFp[msgType] == NULL) { - mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_MSG_NOT_PROCESSED; - goto PROCESS_WRITE_REQ_END; - } - - code = (*tsMworker.writeMsgFp[msgType])(pMsg); - -PROCESS_WRITE_REQ_END: - mnodeSendRsp(pMsg, code); -} - -static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - int32_t code = 0; - - if (pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_INVALID_MSG_LEN; - goto PROCESS_READ_REQ_END; - } - - if (!mnodeIsMaster()) { - SMnRsp *rpcRsp = &pMsg->rpcRsp; - SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); - if (!epSet) { - code = TSDB_CODE_MND_OUT_OF_MEMORY; - goto PROCESS_READ_REQ_END; - } - mnodeGetMnodeEpSetForShell(epSet, true); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SEpSet); - - mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], - epSet->numOfEps, epSet->inUse); - code = TSDB_CODE_RPC_REDIRECT; - goto PROCESS_READ_REQ_END; - } - - if (tsMworker.readMsgFp[msgType] == NULL) { - mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_MSG_NOT_PROCESSED; - goto PROCESS_READ_REQ_END; - } - - mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]); - code = (*tsMworker.readMsgFp[msgType])(pMsg); - -PROCESS_READ_REQ_END: - mnodeSendRsp(pMsg, code); -} - -static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - int32_t code = 0; - - if (pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_INVALID_MSG_LEN; - goto PROCESS_PEER_REQ_END; - } - - if (!mnodeIsMaster()) { - SMnRsp *rpcRsp = &pMsg->rpcRsp; - SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); - mnodeGetMnodeEpSetForPeer(epSet, true); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SEpSet); - - mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, - taosMsg[msgType], epSet->numOfEps, epSet->inUse); - - code = TSDB_CODE_RPC_REDIRECT; - goto PROCESS_PEER_REQ_END; - } - - if (tsMworker.peerReqFp[msgType] == NULL) { - mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_MSG_NOT_PROCESSED; - goto PROCESS_PEER_REQ_END; - } - - code = (*tsMworker.peerReqFp[msgType])(pMsg); - -PROCESS_PEER_REQ_END: - mnodeSendRsp(pMsg, code); -} - -static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - SRpcMsg *pRpcMsg = &pMsg->rpcMsg; - - if (!mnodeIsMaster()) { - mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); - mnodeCleanupMsg2(pMsg); - } - - if (tsMworker.peerRspFp[msgType]) { - (*tsMworker.peerRspFp[msgType])(pRpcMsg); - } else { - mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); - } - - mnodeCleanupMsg2(pMsg); -} - -int32_t mnodeInitWorker() { - mnodeInitMsgFp(); - - SWorkerPool *pPool = &tsMworker.write; - pPool->name = "mnode-write"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } else { - tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessWriteReq); - } - - pPool = &tsMworker.read; - pPool->name = "mnode-read"; - pPool->min = 2; - pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2); - pPool->max = MAX(2, pPool->max); - pPool->max = MIN(4, pPool->max); - if (tWorkerInit(pPool) != 0) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } else { - tsMworker.readQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessReadReq); - } - - pPool = &tsMworker.peerReq; - pPool->name = "mnode-peer-req"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } else { - tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessPeerReq); - } - - pPool = &tsMworker.peerRsp; - pPool->name = "mnode-peer-rsp"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } else { - tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessPeerRsp); - } - - mInfo("mnode worker is initialized"); - return 0; -} - -void mnodeCleanupWorker() { - tWorkerFreeQueue(&tsMworker.write, tsMworker.writeQ); - tWorkerCleanup(&tsMworker.write); - tsMworker.writeQ = NULL; - - tWorkerFreeQueue(&tsMworker.read, tsMworker.readQ); - tWorkerCleanup(&tsMworker.read); - tsMworker.readQ = NULL; - - tWorkerFreeQueue(&tsMworker.peerReq, tsMworker.peerReqQ); - tWorkerCleanup(&tsMworker.peerReq); - tsMworker.peerReqQ = NULL; - - tWorkerFreeQueue(&tsMworker.peerRsp, tsMworker.peerRspQ); - tWorkerCleanup(&tsMworker.peerRsp); - tsMworker.peerRspQ = NULL; - - mInfo("mnode worker is closed"); -} - -SMnodeMsg *mnodeInitMsg(int32_t msgNum) { return NULL; } - -int32_t mnodeAppendMsg(SMnodeMsg *pMsg, SRpcMsg *pRpcMsg) { return 0; } - -void mnodeCleanupMsg(SMnodeMsg *pMsg) {} -void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) {} diff --git a/source/dnode/mnode/impl/src/mondeInt.c b/source/dnode/mnode/impl/src/mondeInt.c deleted file mode 100644 index a7c76360e2..0000000000 --- a/source/dnode/mnode/impl/src/mondeInt.c +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "tglobal.h" -#include "tstep.h" -#include "mnodeAcct.h" -#include "mnodeAuth.h" -#include "mnodeBalance.h" -#include "mnodeCluster.h" -#include "mnodeDb.h" -#include "mnodeDnode.h" -#include "mnodeFunc.h" -#include "mnodeMnode.h" -#include "mnodeOper.h" -#include "mnodeProfile.h" -#include "mnodeInt.h" -#include "mnodeShow.h" -#include "mnodeStable.h" -#include "mnodeSync.h" -#include "mnodeUser.h" -#include "mnodeVgroup.h" -#include "mnodeWorker.h" -#include "mnodeTelem.h" - -static struct { - int32_t state; - int32_t dnodeId; - int64_t clusterId; - tmr_h timer; - SSteps *steps1; - SSteps *steps2; - SMnodePara para; -} tsMint; - -tmr_h mnodeGetTimer() { return tsMint.timer; } - -int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; } - -int64_t mnodeGetClusterId() { return tsMint.para.clusterId; } - -EMnStatus mnodeGetStatus() { return tsMint.state; } - -void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { - (*tsMint.para.SendMsgToDnode)(epSet, rpcMsg); -} - -void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.para.SendMsgToMnode)(rpcMsg); } - -void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.para.SendRedirectMsg)(rpcMsg, forShell); } - -int32_t mnodeGetLoad(SMnodeLoad *pLoad) { return 0; } - -static int32_t mnodeSetPara(SMnodePara para) { - tsMint.para = para; - - if (tsMint.para.SendMsgToDnode == NULL) return -1; - if (tsMint.para.SendMsgToMnode == NULL) return -1; - if (tsMint.para.SendRedirectMsg == NULL) return -1; - if (tsMint.para.PutMsgIntoApplyQueue == NULL) return -1; - if (tsMint.para.dnodeId < 0) return -1; - if (tsMint.para.clusterId < 0) return -1; - - return 0; -} - -static int32_t mnodeInitTimer() { - if (tsMint.timer == NULL) { - tsMint.timer = taosTmrInit(tsMaxShellConns, 200, 3600000, "MND"); - } - - return 0; -} - -static void mnodeCleanupTimer() { - if (tsMint.timer != NULL) { - taosTmrCleanUp(tsMint.timer); - tsMint.timer = NULL; - } -} - -static int32_t mnodeInitStep1() { - struct SSteps *steps = taosStepInit(16, NULL); - if (steps == NULL) return -1; - - taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup); - taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster); - taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode); - taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode); - taosStepAdd(steps, "mnode-acct", mnodeInitAcct, mnodeCleanupAcct); - taosStepAdd(steps, "mnode-auth", mnodeInitAuth, mnodeCleanupAuth); - taosStepAdd(steps, "mnode-user", mnodeInitUser, mnodeCleanupUser); - taosStepAdd(steps, "mnode-db", mnodeInitDb, mnodeCleanupDb); - taosStepAdd(steps, "mnode-vgroup", mnodeInitVgroup, mnodeCleanupVgroup); - taosStepAdd(steps, "mnode-stable", mnodeInitStable, mnodeCleanupStable); - taosStepAdd(steps, "mnode-func", mnodeInitFunc, mnodeCleanupFunc); - taosStepAdd(steps, "mnode-oper", mnodeInitOper, mnodeCleanupOper); - - tsMint.steps1 = steps; - return taosStepExec(tsMint.steps1); -} - -static int32_t mnodeInitStep2() { - struct SSteps *steps = taosStepInit(12, NULL); - if (steps == NULL) return -1; - - taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL); - taosStepAdd(steps, "mnode-worker", mnodeInitWorker, NULL); - taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance); - taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile); - taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow); - taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync); - taosStepAdd(steps, "mnode-worker", NULL, mnodeCleanupWorker); - taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem); - taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer); - - tsMint.steps2 = steps; - return taosStepExec(tsMint.steps2); -} - -static void mnodeCleanupStep1() { taosStepCleanup(tsMint.steps1); } - -static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); } - -static bool mnodeNeedDeploy() { - if (tsMint.para.dnodeId > 0) return false; - if (tsMint.para.clusterId > 0) return false; - if (strcmp(tsFirst, tsLocalEp) != 0) return false; - return true; -} - -int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { - if (tsMint.state != MN_STATUS_UNINIT) { - mError("failed to deploy mnode since its deployed"); - return 0; - } else { - tsMint.state = MN_STATUS_INIT; - } - - if (tsMint.para.dnodeId <= 0 || tsMint.para.clusterId <= 0) { - mError("failed to deploy mnode since cluster not ready"); - return TSDB_CODE_MND_NOT_READY; - } - - mInfo("starting to deploy mnode"); - - int32_t code = mnodeInitStep1(); - if (code != 0) { - mError("failed to deploy mnode since init step1 error"); - tsMint.state = MN_STATUS_UNINIT; - return TSDB_CODE_MND_APP_ERROR; - } - - code = mnodeInitStep2(); - if (code != 0) { - mnodeCleanupStep1(); - mError("failed to deploy mnode since init step2 error"); - tsMint.state = MN_STATUS_UNINIT; - return TSDB_CODE_MND_APP_ERROR; - } - - mDebug("mnode is deployed and waiting for raft to confirm"); - tsMint.state = MN_STATUS_READY; - return 0; -} - -void mnodeUnDeploy(char *path) { - sdbUnDeploy(); - mnodeCleanup(); -} - -int32_t mnodeInit(SMnodePara para) { - mDebugFlag = 207; - if (tsMint.state != MN_STATUS_UNINIT) { - return 0; - } else { - tsMint.state = MN_STATUS_INIT; - } - - mInfo("starting to initialize mnode ..."); - - int32_t code = mnodeSetPara(para); - if (code != 0) { - tsMint.state = MN_STATUS_UNINIT; - return code; - } - - code = mnodeInitStep1(); - if (code != 0) { - tsMint.state = MN_STATUS_UNINIT; - return -1; - } - - code = sdbRead(); - if (code != 0) { - if (mnodeNeedDeploy()) { - code = sdbDeploy(); - if (code != 0) { - mnodeCleanupStep1(); - tsMint.state = MN_STATUS_UNINIT; - return -1; - } - } else { - mnodeCleanupStep1(); - tsMint.state = MN_STATUS_UNINIT; - return -1; - } - } - - code = mnodeInitStep2(); - if (code != 0) { - mnodeCleanupStep1(); - tsMint.state = MN_STATUS_UNINIT; - return -1; - } - - tsMint.state = MN_STATUS_READY; - mInfo("mnode is initialized successfully"); - return 0; -} - -void mnodeCleanup() { - if (tsMint.state != MN_STATUS_UNINIT && tsMint.state != MN_STATUS_CLOSING) { - mInfo("starting to clean up mnode"); - tsMint.state = MN_STATUS_CLOSING; - - mnodeCleanupStep2(); - mnodeCleanupStep1(); - - tsMint.state = MN_STATUS_UNINIT; - mInfo("mnode is cleaned up"); - } -} - -int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return 0; } -int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; } -void mnodeStop() {} \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index d3b825e190..119f614db5 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -87,7 +87,7 @@ static int32_t sdbReadDataFile() { SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); if (pRaw == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } char file[PATH_MAX] = {0}; @@ -241,7 +241,7 @@ int32_t sdbInit() { tsSdb.tmpDir = strdup(path); if (tsSdb.currDir == NULL || tsSdb.currDir == NULL || tsSdb.currDir == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < SDB_MAX; ++i) { @@ -256,7 +256,7 @@ int32_t sdbInit() { SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); if (hash == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } tsSdb.hashObjs[i] = hash; diff --git a/source/dnode/mnode/transaction/src/trnInt.c b/source/dnode/mnode/transaction/src/trnInt.c index f7463ec369..48f6175c62 100644 --- a/source/dnode/mnode/transaction/src/trnInt.c +++ b/source/dnode/mnode/transaction/src/trnInt.c @@ -41,7 +41,7 @@ SSdbRaw *trnActionEncode(STrans *pTrans) { SSdbRaw *pRaw = calloc(1, rawDataLen + sizeof(SSdbRaw)); if (pRaw == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -67,7 +67,7 @@ STrans *trnActionDecode(SSdbRaw *pRaw) { STrans *pTrans = NULL; if (pTrans == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -92,14 +92,14 @@ STrans *trnActionDecode(SSdbRaw *pRaw) { if (code == 0 && pTmp->dataLen > 0) { SSdbRaw *pRead = malloc(sizeof(SSdbRaw) + pTmp->dataLen); if (pRead == NULL) { - code = TSDB_CODE_MND_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } memcpy(pRead, pTmp, sizeof(SSdbRaw)); SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); void *ret = taosArrayPush(pTrans->redoLogs, &pRead); if (ret == NULL) { - code = TSDB_CODE_MND_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } } @@ -151,7 +151,7 @@ int32_t trnGenerateTransId() { return 1; } STrans *trnCreate(ETrnPolicy policy) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -166,7 +166,7 @@ STrans *trnCreate(ETrnPolicy policy) { if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -195,13 +195,13 @@ void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcH static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } void *ptr = taosArrayPush(pArray, &pRaw); if (ptr == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4151178e2e..7f396f4a53 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -70,11 +70,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQD TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") //common & util -TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, "Operation not supported") -TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, "Memory corrupted") -TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, "Out of memory") -TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, "Invalid config message") -TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, "Data file corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") +TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory") +TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PTR, "Invalid pointer") +TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, "Memory corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed") @@ -82,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") + //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_QHANDLE, "Invalid qhandle") @@ -131,7 +134,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_TYPE, "Invalid message type") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_SHELL_CONNS, "Too many connections") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_OUT_OF_MEMORY, "Out of memory in mnode") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, "Data expired") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, "Invalid query id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, "Invalid stream id") diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index fcd4f2b155..3373d09876 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -12,7 +12,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa SDiskbasedResultBuf* pResBuf = *pResultBuf; if (pResBuf == NULL) { - return TSDB_CODE_COM_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } pResBuf->pageSize = pagesize; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 5d6a507172..93008f7114 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -55,7 +55,7 @@ typedef struct STaosQall { taos_queue taosOpenQueue() { STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); if (queue == NULL) { - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -244,7 +244,7 @@ void taosResetQitems(taos_qall param) { taos_qset taosOpenQset() { STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); if (qset == NULL) { - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/util/src/tstep.c b/source/util/src/tstep.c index 8162fb76ed..d840b119fb 100644 --- a/source/util/src/tstep.c +++ b/source/util/src/tstep.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "ulog.h" +#include "taoserror.h" #include "tstep.h" typedef struct { @@ -33,7 +34,10 @@ typedef struct SSteps { SSteps *taosStepInit(int32_t maxsize, ReportFp fp) { SSteps *steps = calloc(1, sizeof(SSteps)); - if (steps == NULL) return NULL; + if (steps == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } steps->maxsize = maxsize; steps->cursize = 0; @@ -44,9 +48,14 @@ SSteps *taosStepInit(int32_t maxsize, ReportFp fp) { } int32_t taosStepAdd(struct SSteps *steps, char *name, InitFp initFp, CleanupFp cleanupFp) { - if (steps == NULL) return -1; + if (steps == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + if (steps->cursize >= steps->maxsize) { uError("failed to add step since up to the maxsize"); + terrno = TSDB_CODE_OUT_OF_RANGE; return -1; } @@ -66,7 +75,10 @@ static void taosStepCleanupImp(SSteps *steps, int32_t pos) { } int32_t taosStepExec(SSteps *steps) { - if (steps == NULL) return -1; + if (steps == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } for (int32_t s = 0; s < steps->cursize; s++) { SStep *step = steps->steps + s; diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 4aac7b7abf..56186d9b24 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -18,6 +18,7 @@ #include "tsched.h" #include "ttimer.h" #include "tutil.h" +#include "taoserror.h" extern int8_t tscEmbedded; @@ -547,6 +548,7 @@ void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* lab if (ctrl == NULL) { tmrError("%s too many timer controllers, failed to create timer controller.", label); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; }