From 79341c73d2993c38199c708fc4e7fa0b29adecd4 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 20 Feb 2020 21:10:42 +0800 Subject: [PATCH] reorgnize mgmtDnode.c --- src/dnode/inc/dnodeModule.h | 28 ----- src/dnode/src/dnodeModule.c | 4 +- src/dnode/src/dnodeSystem.c | 1 + src/mnode/inc/mgmtDb.h | 33 +++--- src/mnode/inc/mgmtDnode.h | 16 +-- src/mnode/inc/mgmtVgroup.h | 11 +- src/mnode/src/mgmtAcct.c | 6 +- src/mnode/src/mgmtBalance.c | 33 ++++-- src/mnode/src/mgmtDb.c | 192 ++++++++++++++------------------ src/mnode/src/mgmtDnode.c | 169 +++++++++++++++------------- src/mnode/src/mgmtDnodeInt.c | 2 +- src/mnode/src/mgmtMnode.c | 10 +- src/mnode/src/mgmtNormalTable.c | 4 +- src/mnode/src/mgmtShell.c | 4 +- src/mnode/src/mgmtSuperTable.c | 6 +- src/mnode/src/mgmtVgroup.c | 141 +++++++++++------------ src/util/inc/tmodule.h | 70 +++++------- src/util/src/tmodule.c | 168 +--------------------------- 18 files changed, 344 insertions(+), 554 deletions(-) diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index 57cfe9621c..8f1a3cce78 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -24,34 +24,6 @@ extern "C" { #include #include -#define tsetModuleStatus(mod) \ - { tsModuleStatus |= (1 << mod); } -#define tclearModuleStatus(mod) \ - { tsModuleStatus &= ~(1 << mod); } - -enum _module { - TSDB_MOD_MGMT, - TSDB_MOD_HTTP, - TSDB_MOD_MONITOR, - TSDB_MOD_DCLUSTER, - TSDB_MOD_MSTORAGE, - TSDB_MOD_MAX -}; - -typedef struct { - char *name; - int (*initFp)(); - void (*cleanUpFp)(); - int (*startFp)(); - void (*stopFp)(); - int num; - int curNum; - int equalVnodeNum; -} SModule; - -extern uint32_t tsModuleStatus; -extern SModule tsModule[]; - void dnodeAllocModules(); int32_t dnodeInitModules(); void dnodeCleanUpModules(); diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 7166d8c8a2..70754cc6eb 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tlog.h" +#include "tmodule.h" #include "tglobalcfg.h" #include "mnode.h" #include "http.h" @@ -23,9 +24,6 @@ #include "dnodeModule.h" #include "dnodeSystem.h" -SModule tsModule[TSDB_MOD_MAX] = {0}; -uint32_t tsModuleStatus = 0; - void dnodeAllocModules() { tsModule[TSDB_MOD_MGMT].name = "mgmt"; tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem; diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 1937e7130f..24eeddb1d1 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -19,6 +19,7 @@ #include "taoserror.h" #include "tcrc32c.h" #include "tlog.h" +#include "tmodule.h" #include "tsched.h" #include "ttime.h" #include "ttimer.h" diff --git a/src/mnode/inc/mgmtDb.h b/src/mnode/inc/mgmtDb.h index e4843d01ef..820079961a 100644 --- a/src/mnode/inc/mgmtDb.h +++ b/src/mnode/inc/mgmtDb.h @@ -22,26 +22,25 @@ extern "C" { #include "mnode.h" -void mgmtMonitorDbDrop(void *unused, void *unusedt); -int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter); -int mgmtUseDb(SConnObj *pConn, char *name); -int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup); -int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup); -int mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup); -int mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup); -int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup); -int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int mgmtRetrieveDbs(SShowObj *pShow, char *data, int rows, SConnObj *pConn); -void mgmtCleanUpDbs(); - +void mgmtMonitorDbDrop(void *unused, void *unusedt); +int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter); +int32_t mgmtUseDb(SConnObj *pConn, char *name); +int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); +int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); +void mgmtCleanUpDbs(); int32_t mgmtInitDbs(); -int mgmtUpdateDb(SDbObj *pDb); +int32_t mgmtUpdateDb(SDbObj *pDb); SDbObj *mgmtGetDb(char *db); -SDbObj *mgmtGetDbByMeterId(char *db); -int mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate); -int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); -int mgmtDropDb(SDbObj *pDb); +SDbObj *mgmtGetDbByTableId(char *db); +int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate); +int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); +int32_t mgmtDropDb(SDbObj *pDb); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index a881601364..325ef5bb71 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -28,21 +28,21 @@ int32_t mgmtCreateDnode(uint32_t ip); int32_t mgmtDropDnode(SDnodeObj *pDnode); int32_t mgmtDropDnodeByIp(uint32_t ip); int32_t mgmtGetNextVnode(SVnodeGid *pVnodeGid); -void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes, int vgId); -void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes); +void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId); +void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes); int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); int32_t mgmtSendCfgDnodeMsg(char *cont); void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode); int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); extern int32_t (*mgmtInitDnodes)(); extern void (*mgmtCleanUpDnodes)(); @@ -52,10 +52,10 @@ extern void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode); extern int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode); extern void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode); extern int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg); -extern SDnodeObj dnodeObj; +extern SDnodeObj tsDnodeObj; #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 714080ae7c..845da4e159 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -24,16 +24,15 @@ extern "C" { #include #include "mnode.h" -int mgmtInitVgroups(); -SVgObj *mgmtGetVgroup(int vgId); +int32_t mgmtInitVgroups(); +SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtCreateVgroup(SDbObj *pDb); -int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); void mgmtSetVgroupIdPool(); -int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); +int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); void mgmtCleanUpVgroups(); - SVgObj *mgmtGetAvailVgroup(SDbObj *pDb); int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup); diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 37e6412b30..17d20b67cc 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -24,7 +24,7 @@ extern void *tsUserSdb; extern void *tsDbSdb; -SAcctObj acctObj; +static SAcctObj tsAcctObj; int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) { pthread_mutex_lock(&pAcct->mutex); @@ -103,7 +103,7 @@ int32_t mgmtInitAcctsImp() { int32_t (*mgmtInitAccts)() = mgmtInitAcctsImp; SAcctObj *mgmtGetAcctImp(char *acctName) { - return &acctObj; + return &tsAcctObj; } SAcctObj *(*mgmtGetAcct)(char *acctName) = mgmtGetAcctImp; @@ -137,7 +137,7 @@ int32_t mgmtCheckTableLimitImp(SAcctObj *pAcct, SCreateTableMsg *pCreate) { int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate) = mgmtCheckTableLimitImp; void mgmtCheckAcctImp() { - SAcctObj *pAcct = &acctObj; + SAcctObj *pAcct = &tsAcctObj; pAcct->acctId = 0; strcpy(pAcct->user, "root"); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index aa7448865f..cf1c51ad90 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE +#include "tglobalcfg.h" +#include "tmodule.h" +#include "tstatus.h" +#include "ttime.h" #include "mgmtBalance.h" #include "mgmtDnode.h" -#include "dnodeModule.h" -#include "tstatus.h" -#include "tglobalcfg.h" -#include "ttime.h" void mgmtStartBalanceTimerImp(int64_t mseconds) {} void (*mgmtStartBalanceTimer)(int64_t mseconds) = mgmtStartBalanceTimerImp; @@ -32,7 +32,7 @@ void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp; int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { int selectedVnode = -1; - SDnodeObj *pDnode = &dnodeObj; + SDnodeObj *pDnode = &tsDnodeObj; int lastAllocVode = pDnode->lastAllocVnode; for (int i = 0; i < pDnode->numOfVnodes; i++) { @@ -59,21 +59,34 @@ int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup) = mgmtAllocVnodesImp; bool mgmtCheckModuleInDnodeImp(SDnodeObj *pDnode, int moduleType) { return tsModule[moduleType].num != 0; } + bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType) = mgmtCheckModuleInDnodeImp; -char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; } +char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) { + return "master"; +} + char *(*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtGetVnodeStatusImp; -bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; } +bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { + return true; +} + bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtCheckVnodeReadyImp; +void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) { +} -void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) {} void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus) = mgmtUpdateDnodeStateImp; -void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) {} +void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) { +} + void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp) = mgmtUpdateVgroupStateImp; -bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { return false; } +bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { + return false; +} + bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) = mgmtAddVnodeImp; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index e2a1a2c4a2..0133cef9dd 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -15,35 +15,33 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "mgmtDnode.h" -#include "mgmtDb.h" -#include "mgmtAcct.h" -#include "mgmtGrant.h" -#include "mgmtBalance.h" -#include "mgmtDnodeInt.h" -#include "mgmtUtil.h" -#include "mgmtVgroup.h" -#include "mgmtTable.h" +#include "taoserror.h" #include "tschemautil.h" #include "tstatus.h" #include "mnode.h" -#include "taoserror.h" +#include "mgmtAcct.h" +#include "mgmtBalance.h" +#include "mgmtDb.h" +#include "mgmtDnode.h" +#include "mgmtDnodeInt.h" +#include "mgmtGrant.h" +#include "mgmtTable.h" +#include "mgmtUtil.h" +#include "mgmtVgroup.h" + +extern void *tsVgroupSdb; void *tsDbSdb = NULL; -extern void *vgSdb; -int tsDbUpdateSize; +int32_t tsDbUpdateSize; -void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); -void *mgmtDbActionInsert(void *row, char *str, int size, int *ssize); -void *mgmtDbActionDelete(void *row, char *str, int size, int *ssize); -void *mgmtDbActionUpdate(void *row, char *str, int size, int *ssize); -void *mgmtDbActionEncode(void *row, char *str, int size, int *ssize); -void *mgmtDbActionDecode(void *row, char *str, int size, int *ssize); -void *mgmtDbActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtDbActionBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtDbActionAfterBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtDbActionReset(void *row, char *str, int size, int *ssize); -void *mgmtDbActionDestroy(void *row, char *str, int size, int *ssize); +void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); void mgmtDbActionInit() { mgmtDbActionFp[SDB_TYPE_INSERT] = mgmtDbActionInsert; @@ -51,31 +49,17 @@ void mgmtDbActionInit() { mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate; mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode; mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode; - mgmtDbActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtDbActionBeforeBatchUpdate; - mgmtDbActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtDbActionBatchUpdate; - mgmtDbActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtDbActionAfterBatchUpdate; mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy; } -void *mgmtDbAction(char action, void *row, char *str, int size, int *ssize) { +void *mgmtDbAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { if (mgmtDbActionFp[(uint8_t)action] != NULL) { return (*(mgmtDbActionFp[(uint8_t)action]))(row, str, size, ssize); } return NULL; } -void mgmtGetAcctStr(char *src, char *dest) { - char *pos = strstr(src, TS_PATH_DELIMITER); - while ((pos != NULL) && (*src != *pos)) { - *dest = *src; - src++; - dest++; - } - - *dest = 0; -} - int32_t mgmtInitDbs() { void * pNode = NULL; SDbObj * pDb = NULL; @@ -118,9 +102,11 @@ int32_t mgmtInitDbs() { return 0; } -SDbObj *mgmtGetDb(char *db) { return (SDbObj *)sdbGetRow(tsDbSdb, db); } +SDbObj *mgmtGetDb(char *db) { + return (SDbObj *)sdbGetRow(tsDbSdb, db); +} -SDbObj *mgmtGetDbByMeterId(char *meterId) { +SDbObj *mgmtGetDbByTableId(char *meterId) { char db[TSDB_TABLE_ID_LEN], *pos; pos = strstr(meterId, TS_PATH_DELIMITER); @@ -131,7 +117,7 @@ SDbObj *mgmtGetDbByMeterId(char *meterId) { return (SDbObj *)sdbGetRow(tsDbSdb, db); } -int mgmtCheckDbParams(SCreateDbMsg *pCreate) { +int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) { // assign default parameters if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; // if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; // @@ -176,16 +162,13 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) { return TSDB_CODE_SUCCESS; } -int mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) { - SDbObj *pDb; - int code; - - code = mgmtCheckDbLimit(pAcct); +int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) { + int32_t code = mgmtCheckDbLimit(pAcct); if (code != 0) { return code; } - pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db); + SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db); if (pDb != NULL) { return TSDB_CODE_DB_ALREADY_EXIST; } @@ -215,14 +198,16 @@ int mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) { return code; } -int mgmtUpdateDb(SDbObj *pDb) { return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1); } +int32_t mgmtUpdateDb(SDbObj *pDb) { + return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1); +} -int mgmtSetDbDropping(SDbObj *pDb) { +int32_t mgmtSetDbDropping(SDbObj *pDb) { if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0; SVgObj *pVgroup = pDb->pHead; while (pVgroup != NULL) { - for (int i = 0; i < pVgroup->numOfVnodes; i++) { + for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); if (pDnode == NULL) continue; @@ -257,7 +242,7 @@ int mgmtSetDbDropping(SDbObj *pDb) { bool mgmtCheckDropDbFinished(SDbObj *pDb) { SVgObj *pVgroup = pDb->pHead; while (pVgroup) { - for (int i = 0; i < pVgroup->numOfVnodes; i++) { + for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); @@ -292,7 +277,7 @@ void mgmtDropDbFromSdb(SDbObj *pDb) { mPrint("db:%s database drop finished", pDb->name); } -int mgmtDropDb(SDbObj *pDb) { +int32_t mgmtDropDb(SDbObj *pDb) { if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { bool finished = mgmtCheckDropDbFinished(pDb); if (!finished) { @@ -309,15 +294,14 @@ int mgmtDropDb(SDbObj *pDb) { mgmtDropDbFromSdb(pDb); return 0; } else { - int code = mgmtSetDbDropping(pDb); + int32_t code = mgmtSetDbDropping(pDb); if (code != 0) return code; return TSDB_CODE_ACTION_IN_PROGRESS; } } -int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { - SDbObj *pDb; - pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); +int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { + SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); if (pDb == NULL) { if (ignoreNotExists) return TSDB_CODE_SUCCESS; mWarn("db:%s is not there", name); @@ -344,17 +328,16 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) { } } -int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { - SDbObj *pDb; - int code = TSDB_CODE_SUCCESS; +int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { + int32_t code = TSDB_CODE_SUCCESS; - pDb = (SDbObj *)sdbGetRow(tsDbSdb, pAlter->db); + SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); if (pDb == NULL) { mTrace("db:%s is not exist", pAlter->db); return TSDB_CODE_INVALID_DB; } - int oldReplicaNum = pDb->cfg.replications; + int32_t oldReplicaNum = pDb->cfg.replications; if (pAlter->daysToKeep > 0) { mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); pDb->cfg.daysToKeep = pAlter->daysToKeep; @@ -400,7 +383,7 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { } if (pAlter->maxSessions > 0) { //rebuild meterList in mgmtVgroup.c - sdbUpdateRow(vgSdb, pVgroup, tsVgUpdateSize, 0); + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0); } mgmtSendVPeersMsg(pVgroup); pVgroup = pVgroup->next; @@ -410,9 +393,9 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { return code; } -int mgmtUseDb(SConnObj *pConn, char *name) { - SDbObj *pDb; - int code = TSDB_CODE_INVALID_DB; +int32_t mgmtUseDb(SConnObj *pConn, char *name) { + SDbObj *pDb; + int32_t code = TSDB_CODE_INVALID_DB; // here change the default db for connect. pDb = mgmtGetDb(name); @@ -424,12 +407,11 @@ int mgmtUseDb(SConnObj *pConn, char *name) { return code; } -int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { pVgroup->next = pDb->pHead; pVgroup->prev = NULL; if (pDb->pHead) pDb->pHead->prev = pVgroup; - if (pDb->pTail == NULL) pDb->pTail = pVgroup; pDb->pHead = pVgroup; @@ -438,12 +420,11 @@ int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { return 0; } -int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) { pVgroup->next = NULL; pVgroup->prev = pDb->pTail; if (pDb->pTail) pDb->pTail->next = pVgroup; - if (pDb->pHead == NULL) pDb->pHead = pVgroup; pDb->pTail = pVgroup; @@ -452,46 +433,36 @@ int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) { return 0; } -int mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup) { if (pVgroup->prev) pVgroup->prev->next = pVgroup->next; - if (pVgroup->next) pVgroup->next->prev = pVgroup->prev; - if (pVgroup->prev == NULL) pDb->pHead = pVgroup->next; - if (pVgroup->next == NULL) pDb->pTail = pVgroup->prev; - pDb->numOfVgroups--; return 0; } -int mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup) { mgmtRemoveVgroupFromDb(pDb, pVgroup); mgmtAddVgroupIntoDbTail(pDb, pVgroup); return 0; } -int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) { mgmtRemoveVgroupFromDb(pDb, pVgroup); mgmtAddVgroupIntoDb(pDb, pVgroup); return 0; } -int mgmtShowTables(SAcctObj *pAcct, char *db) { - int code; - - code = 0; - - return code; +void mgmtCleanUpDbs() { + sdbCloseTable(tsDbSdb); } -void mgmtCleanUpDbs() { sdbCloseTable(tsDbSdb); } - -int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; SSchema *pSchema = tsGetSchema(pMeta); @@ -619,11 +590,12 @@ int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - // pShow->numOfRows = sdbGetNumOfRows (tsDbSdb); pShow->numOfRows = pConn->pAcct->acctInfo.numOfDbs; pShow->pNode = pConn->pAcct->pHead; @@ -636,11 +608,11 @@ char *mgmtGetDbStr(char *src) { return ++pos; } -int mgmtRetrieveDbs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; SDbObj *pDb = NULL; char * pWrite; - int cols = 0; + int32_t cols = 0; while (numOfRows < rows) { pDb = (SDbObj *)pShow->pNode; @@ -753,8 +725,8 @@ int mgmtRetrieveDbs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return numOfRows; } -void *mgmtDbActionInsert(void *row, char *str, int size, int *ssize) { - SDbObj * pDb = (SDbObj *)row; +void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { + SDbObj *pDb = (SDbObj *) row; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); pDb->pHead = NULL; @@ -767,19 +739,22 @@ void *mgmtDbActionInsert(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtDbActionDelete(void *row, char *str, int size, int *ssize) { - SDbObj * pDb = (SDbObj *)row; + +void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { + SDbObj *pDb = (SDbObj *) row; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); mgmtRemoveDbFromAcct(pAcct, pDb); return NULL; } -void *mgmtDbActionUpdate(void *row, char *str, int size, int *ssize) { + +void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return mgmtDbActionReset(row, str, size, ssize); } -void *mgmtDbActionEncode(void *row, char *str, int size, int *ssize) { - SDbObj *pDb = (SDbObj *)row; - int tsize = pDb->updateEnd - (char *)pDb; + +void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { + SDbObj *pDb = (SDbObj *) row; + int32_t tsize = pDb->updateEnd - (char *) pDb; if (size < tsize) { *ssize = -1; } else { @@ -789,27 +764,26 @@ void *mgmtDbActionEncode(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtDbActionDecode(void *row, char *str, int size, int *ssize) { - SDbObj *pDb = (SDbObj *)malloc(sizeof(SDbObj)); +void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { + SDbObj *pDb = (SDbObj *) malloc(sizeof(SDbObj)); if (pDb == NULL) return NULL; memset(pDb, 0, sizeof(SDbObj)); - int tsize = pDb->updateEnd - (char *)pDb; + int32_t tsize = pDb->updateEnd - (char *)pDb; memcpy(pDb, str, tsize); return (void *)pDb; } -void *mgmtDbActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtDbActionBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtDbActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtDbActionReset(void *row, char *str, int size, int *ssize) { - SDbObj *pDb = (SDbObj *)row; - int tsize = pDb->updateEnd - (char *)pDb; + +void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize) { + SDbObj *pDb = (SDbObj *) row; + int32_t tsize = pDb->updateEnd - (char *) pDb; memcpy(pDb, str, tsize); return NULL; } -void *mgmtDbActionDestroy(void *row, char *str, int size, int *ssize) { + +void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { tfree(row); return NULL; } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index c3791ab8d1..ba5c294527 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -14,19 +14,18 @@ */ #define _DEFAULT_SOURCE - #include "os.h" - -#include "dnodeSystem.h" +#include "tmodule.h" +#include "tschemautil.h" +#include "tstatus.h" #include "mnode.h" #include "mgmtDnode.h" #include "mgmtBalance.h" -#include "tschemautil.h" -#include "tstatus.h" -#include "dnodeModule.h" + +SDnodeObj tsDnodeObj; void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { - int maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; + int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; maxVnodes = maxVnodes > TSDB_MAX_VNODES ? TSDB_MAX_VNODES : maxVnodes; maxVnodes = maxVnodes < TSDB_MIN_VNODES ? TSDB_MIN_VNODES : maxVnodes; if (pDnode->numOfTotalVnodes != 0) { @@ -39,19 +38,14 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { pDnode->numOfVnodes = maxVnodes; pDnode->numOfFreeVnodes = maxVnodes; pDnode->openVnodes = 0; - -#ifdef CLUSTER pDnode->status = TSDB_DN_STATUS_OFFLINE; -#else - pDnode->status = TSDB_DN_STATUS_READY; -#endif } void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { - int totalVnodes = 0; + int32_t totalVnodes = 0; mTrace("dnode:%s, begin calc free vnodes", taosIpStr(pDnode->privateIp)); - for (int i = 0; i < pDnode->numOfVnodes; ++i) { + for (int32_t i = 0; i < pDnode->numOfVnodes; ++i) { SVnodeLoad *pVload = pDnode->vload + i; if (pVload->vgId != 0) { mTrace("%d-dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s", @@ -68,10 +62,10 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { taosIpStr(pDnode->privateIp), pDnode->numOfVnodes, pDnode->numOfFreeVnodes, totalVnodes); } -void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes, int vgId) { +void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) { SDnodeObj *pDnode; - for (int i = 0; i < numOfVnodes; ++i) { + for (int32_t i = 0; i < numOfVnodes; ++i) { pDnode = mgmtGetDnode(vnodeGid[i].ip); if (pDnode) { SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode; @@ -86,10 +80,10 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes, int vgId) { } } -void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes) { +void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) { SDnodeObj *pDnode; - for (int i = 0; i < numOfVnodes; ++i) { + for (int32_t i = 0; i < numOfVnodes; ++i) { pDnode = mgmtGetDnode(vnodeGid[i].ip); if (pDnode) { SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode; @@ -102,8 +96,8 @@ void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int numOfVnodes) { } } -int mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; @@ -155,7 +149,7 @@ int mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->numOfRows = mgmtGetDnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; @@ -164,12 +158,12 @@ int mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; - SDnodeObj *pDnode = NULL; - char * pWrite; - int cols = 0; - char ipstr[20]; +int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; + SDnodeObj *pDnode = NULL; + char *pWrite; + int32_t cols = 0; + char ipstr[20]; while (numOfRows < rows) { pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode); @@ -214,8 +208,8 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return numOfRows; } -int mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; @@ -243,14 +237,16 @@ int mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } pShow->numOfRows = 0; SDnodeObj *pDnode = NULL; while (1) { pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; - for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { + for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { if (mgmtCheckModuleInDnode(pDnode, moduleType)) { pShow->numOfRows++; } @@ -263,18 +259,18 @@ int mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; SDnodeObj *pDnode = NULL; char * pWrite; - int cols = 0; + int32_t cols = 0; char ipstr[20]; while (numOfRows < rows) { pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; - for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { + for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { if (!mgmtCheckModuleInDnode(pDnode, moduleType)) { continue; } @@ -302,8 +298,8 @@ int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn) return numOfRows; } -int mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; @@ -325,10 +321,10 @@ int mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->numOfRows = 0; - for (int i = tsGlobalConfigNum - 1; i >= 0; --i) { + for (int32_t i = tsGlobalConfigNum - 1; i >= 0; --i) { SGlobalConfig *cfg = tsGlobalConfig + i; if (!mgmtCheckConfigShow(cfg)) continue; pShow->numOfRows++; @@ -340,15 +336,15 @@ int mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; - for (int i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) { + for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) { SGlobalConfig *cfg = tsGlobalConfig + i; if (!mgmtCheckConfigShow(cfg)) continue; char *pWrite; - int cols = 0; + int32_t cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; snprintf(pWrite, TSDB_CFG_OPTION_LEN, "%s", cfg->option); @@ -388,7 +384,7 @@ int mgmtRetrieveConfigs(SShowObj *pShow, char *data, int rows, SConnObj *pConn) } int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; + int32_t cols = 0; if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; @@ -422,7 +418,7 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; // TODO: if other thread drop dnode ???? SDnodeObj *pDnode = NULL; @@ -435,7 +431,7 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { SVnodeLoad* pVnode; pShow->numOfRows = 0; - for (int i = 0 ; i < TSDB_MAX_VNODES; i++) { + for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) { pVnode = &pDnode->vload[i]; if (0 != pVnode->vgId) { pShow->numOfRows++; @@ -460,11 +456,11 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; SDnodeObj *pDnode = NULL; char * pWrite; - int cols = 0; + int32_t cols = 0; if (0 == rows) return 0; @@ -473,7 +469,7 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pCon pDnode = (SDnodeObj *)(pShow->pNode); if (pDnode != NULL) { SVnodeLoad* pVnode; - for (int i = 0 ; i < TSDB_MAX_VNODES; i++) { + for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) { pVnode = &pDnode->vload[i]; if (0 == pVnode->vgId) { continue; @@ -509,56 +505,76 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pCon return numOfRows; } -SDnodeObj dnodeObj; -extern uint32_t tsRebootTime; +SDnodeObj *mgmtGetDnodeImp(uint32_t ip) { + return &tsDnodeObj; +} -SDnodeObj* mgmtGetDnodeImp(uint32_t ip) { return &dnodeObj; } -SDnodeObj* (*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp; +SDnodeObj *(*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp; + +int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) { + return 0; +} -int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) { return 0; } int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode) = mgmtUpdateDnodeImp; -void mgmtCleanUpDnodesImp() {} +void mgmtCleanUpDnodesImp() { +} + void (*mgmtCleanUpDnodes)() = mgmtCleanUpDnodesImp; int32_t mgmtInitDnodesImp() { - dnodeObj.privateIp = inet_addr(tsPrivateIp);; - dnodeObj.createdTime = (int64_t)tsRebootTime * 1000; - dnodeObj.lastReboot = tsRebootTime; - dnodeObj.numOfCores = (uint16_t)tsNumOfCores; - dnodeObj.status = TSDB_DN_STATUS_READY; - dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; - dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; - dnodeObj.thandle = (void*)(1); //hack way - if (dnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) { - mgmtSetDnodeMaxVnodes(&dnodeObj); - mPrint("dnode first access, set total vnodes:%d", dnodeObj.numOfVnodes); + tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; + tsDnodeObj.createdTime = taosGetTimestampMs(); + tsDnodeObj.lastReboot = taosGetTimestampSec(); + tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores; + tsDnodeObj.status = TSDB_DN_STATUS_READY; + tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; + tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; + tsDnodeObj.thandle = (void *) (1); //hack way + if (tsDnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) { + mgmtSetDnodeMaxVnodes(&tsDnodeObj); + mPrint("dnode first access, set total vnodes:%d", tsDnodeObj.numOfVnodes); } - return 0; + + tsDnodeObj.status = TSDB_DN_STATUS_READY; + return 0; } + int32_t (*mgmtInitDnodes)() = mgmtInitDnodesImp; -int32_t mgmtGetDnodesNumImp() { return 1; } +int32_t mgmtGetDnodesNumImp() { + return 1; +} + int32_t (*mgmtGetDnodesNum)() = mgmtGetDnodesNumImp; -void* mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) { +void *mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) { if (*pDnode == NULL) { - *pDnode = &dnodeObj; + *pDnode = &tsDnodeObj; } else { *pDnode = NULL; } return *pDnode; } -void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp; -int32_t mgmtGetScoresMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; } +void *(*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp; + +int32_t mgmtGetScoresMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} + int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetScoresMetaImp; -int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return 0; } -int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = mgmtRetrieveScoresImp; +int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + return 0; +} + +int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveScoresImp; + +void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) { +} -void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) {} void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode) = mgmtSetDnodeUnRemoveImp; bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) { @@ -568,4 +584,5 @@ bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) { return false; return true; } + bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp; \ No newline at end of file diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 04545fd41f..0b8cbdbba1 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -538,7 +538,7 @@ void (*mgmtCleanUpDnodeInt)() = mgmtCleanUpDnodeIntImp; void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { /* - SDnodeObj *pObj = &dnodeObj; + SDnodeObj *pObj = &tsDnodeObj; pObj->openVnodes = tsOpenVnodes; pObj->status = TSDB_DN_STATUS_READY; diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 83171fc149..eda22b09eb 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -16,8 +16,14 @@ #define _DEFAULT_SOURCE #include "mgmtMnode.h" -int32_t mgmtGetMnodeMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; } +int32_t mgmtGetMnodeMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + return TSDB_CODE_OPS_NOT_SUPPORT; +} + int32_t (*mgmtGetMnodeMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetMnodeMetaImp; -int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { return 0; } +int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + return 0; +} + int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveMnodesImp; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 5e37c380cd..b0a12a9b53 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -352,7 +352,7 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int } } - SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("table: %s not belongs to any database", pTable->tableId); return TSDB_CODE_APP_ERROR; @@ -388,7 +388,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) return TSDB_CODE_APP_ERROR; } - SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("table: %s not belongs to any database", pTable->tableId); return TSDB_CODE_APP_ERROR; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index d2cbc67335..940737dd3f 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -229,7 +229,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); // strcpy(pCreateMsg->meterId, pInfo->meterId); // -// SDbObj* pMeterDb = mgmtGetDbByMeterId(pCreateMsg->meterId); +// SDbObj* pMeterDb = mgmtGetDbByTableId(pCreateMsg->meterId); // mTrace("meter:%s, pConnDb:%p, pConnDbName:%s, pMeterDb:%p, pMeterDbName:%s", // pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name); // assert(pDb == pMeterDb); @@ -411,7 +411,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { // // // get meter schema, and fill into resp payload // pMeterObj = mgmtGetTable(tblName); -// pDbObj = mgmtGetDbByMeterId(tblName); +// pDbObj = mgmtGetDbByTableId(tblName); // // if (pMeterObj == NULL || (pDbObj == NULL)) { // continue; diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 1f4af91c2f..49d04ad051 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -188,7 +188,7 @@ int32_t mgmtInitSuperTables() { break; } - SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { mError("super table:%s, failed to get db, discard it", pTable->tableId); sdbDeleteRow(tsSuperTableSdb, pTable); @@ -429,7 +429,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int n } } - SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId); + SDbObj *pDb = mgmtGetDbByTableId(pMetric->tableId); if (pDb == NULL) { mError("meter: %s not belongs to any database", pMetric->tableId); return TSDB_CODE_APP_ERROR; @@ -468,7 +468,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) { return TSDB_CODE_APP_ERROR; } - SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId); + SDbObj *pDb = mgmtGetDbByTableId(pMetric->tableId); if (pDb == NULL) { mError("table: %s not belongs to any database", pMetric->tableId); return TSDB_CODE_APP_ERROR; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index bc4f6f27b0..9b0b78319d 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -15,37 +15,34 @@ #define _DEFAULT_SOURCE #include "os.h" - +#include "taoserror.h" +#include "tlog.h" +#include "tschemautil.h" +#include "tstatus.h" #include "mnode.h" #include "mgmtBalance.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" -#include "mgmtVgroup.h" #include "mgmtTable.h" -#include "tschemautil.h" -#include "tlog.h" -#include "tstatus.h" -#include "taoserror.h" +#include "mgmtVgroup.h" -void * vgSdb = NULL; -int tsVgUpdateSize; +void * tsVgroupSdb = NULL; +int32_t tsVgUpdateSize; extern void *tsDbSdb; -extern void *acctSdb; extern void *tsUserSdb; -extern void *dnodeSdb; -void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize); -void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize); +void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); void mgmtVgroupActionInit() { mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert; @@ -53,21 +50,18 @@ void mgmtVgroupActionInit() { mgmtVgroupActionFp[SDB_TYPE_UPDATE] = mgmtVgroupActionUpdate; mgmtVgroupActionFp[SDB_TYPE_ENCODE] = mgmtVgroupActionEncode; mgmtVgroupActionFp[SDB_TYPE_DECODE] = mgmtVgroupActionDecode; - mgmtVgroupActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtVgroupActionBeforeBatchUpdate; - mgmtVgroupActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtVgroupActionBatchUpdate; - mgmtVgroupActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtVgroupActionAfterBatchUpdate; mgmtVgroupActionFp[SDB_TYPE_RESET] = mgmtVgroupActionReset; mgmtVgroupActionFp[SDB_TYPE_DESTROY] = mgmtVgroupActionDestroy; } -void *mgmtVgroupAction(char action, void *row, char *str, int size, int *ssize) { +void *mgmtVgroupAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { if (mgmtVgroupActionFp[(uint8_t)action] != NULL) { return (*(mgmtVgroupActionFp[(uint8_t)action]))(row, str, size, ssize); } return NULL; } -int mgmtInitVgroups() { +int32_t mgmtInitVgroups() { void * pNode = NULL; SVgObj *pVgroup = NULL; @@ -76,14 +70,14 @@ int mgmtInitVgroups() { SVgObj tObj; tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); - if (vgSdb == NULL) { + tsVgroupSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); + if (tsVgroupSdb == NULL) { mError("failed to init vgroup data"); return -1; } while (1) { - pNode = sdbFetchRow(vgSdb, pNode, (void **)&pVgroup); + pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); if (pVgroup == NULL) break; SDbObj *pDb = mgmtGetDb(pVgroup->dbName); @@ -91,7 +85,7 @@ int mgmtInitVgroups() { pVgroup->prev = NULL; pVgroup->next = NULL; - int size = sizeof(STabObj *) * pDb->cfg.maxSessions; + int32_t size = sizeof(STabObj *) * pDb->cfg.maxSessions; pVgroup->meterList = (STabObj **)malloc(size); if (pVgroup->meterList == NULL) { mError("failed to malloc(size:%d) for the meterList of vgroups", size); @@ -109,15 +103,10 @@ int mgmtInitVgroups() { taosIdPoolReinit(pVgroup->idPool); - if (tsIsCluster) { - /* - * Upgrade from open source version to cluster version for the first time - */ - if (pVgroup->vnodeGid[0].publicIp == 0) { - pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp); - pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp); - sdbUpdateRow(vgSdb, pVgroup, tsVgUpdateSize, 1); - } + if (pVgroup->vnodeGid[0].publicIp == 0) { + pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp); + pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp); + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); } mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); @@ -127,7 +116,9 @@ int mgmtInitVgroups() { return 0; } -SVgObj *mgmtGetVgroup(int vgId) { return (SVgObj *)sdbGetRow(vgSdb, &vgId); } +SVgObj *mgmtGetVgroup(int32_t vgId) { + return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); +} SVgObj *mgmtGetAvailVgroup(SDbObj *pDb) { SVgObj *pVgroup = pDb->pHead; @@ -191,7 +182,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) { SVgObj *mgmtCreateVgroup(SDbObj *pDb) { SVgObj *pVgroup; - int size; + int32_t size; size = sizeof(SVgObj); pVgroup = (SVgObj *)malloc(size); @@ -209,10 +200,10 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { return NULL; } - sdbInsertRow(vgSdb, pVgroup, 0); + sdbInsertRow(tsVgroupSdb, pVgroup, 0); mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode); mgmtSendVPeersMsg(pVgroup); @@ -220,11 +211,11 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { return pVgroup; } -int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { STabObj *pTable; if (pVgroup->numOfMeters > 0) { - for (int i = 0; i < pDb->cfg.maxSessions; ++i) { + for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { if (pVgroup->meterList != NULL) { pTable = pVgroup->meterList[i]; if (pTable) mgmtDropTable(pDb, pTable->meterId, 0); @@ -234,7 +225,7 @@ int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); mgmtSendFreeVnodeMsg(pVgroup); - sdbDeleteRow(vgSdb, pVgroup); + sdbDeleteRow(tsVgroupSdb, pVgroup); return 0; } @@ -245,7 +236,7 @@ void mgmtSetVgroupIdPool() { SDbObj *pDb; while (1) { - pNode = sdbFetchRow(vgSdb, pNode, (void **)&pVgroup); + pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); if (pVgroup == NULL || pVgroup->idPool == 0) break; taosIdPoolSetFreeList(pVgroup->idPool); @@ -260,10 +251,10 @@ void mgmtSetVgroupIdPool() { } } -void mgmtCleanUpVgroups() { sdbCloseTable(vgSdb); } +void mgmtCleanUpVgroups() { sdbCloseTable(tsVgroupSdb); } -int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; SDbObj *pDb = NULL; if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); @@ -290,7 +281,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - int maxReplica = 0; + int32_t maxReplica = 0; SVgObj *pVgroup = NULL; STabObj *pTable = NULL; if (pShow->payloadLen > 0 ) { @@ -311,7 +302,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { } } - for (int i = 0; i < maxReplica; ++i) { + for (int32_t i = 0; i < maxReplica; ++i) { pShow->bytes[cols] = 16; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip"); @@ -341,7 +332,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; @@ -356,14 +347,14 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return 0; } -int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; +int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { + int32_t numOfRows = 0; SVgObj *pVgroup = NULL; char * pWrite; - int cols = 0; + int32_t cols = 0; char ipstr[20]; - int maxReplica = 0; + int32_t maxReplica = 0; SDbObj *pDb = NULL; if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); @@ -376,7 +367,7 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) } while (numOfRows < rows) { - // pShow->pNode = sdbFetchRow(vgSdb, pShow->pNode, (void **)&pVgroup); + // pShow->pNode = sdbFetchRow(tsVgroupSdb, pShow->pNode, (void **)&pVgroup); pVgroup = (SVgObj *)pShow->pNode; if (pVgroup == NULL) break; pShow->pNode = (void *)pVgroup->next; @@ -395,7 +386,7 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus)); cols++; - for (int i = 0; i < maxReplica; ++i) { + for (int32_t i = 0; i < maxReplica; ++i) { tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, ipstr); @@ -427,13 +418,13 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) return numOfRows; } -void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { SVgObj *pVgroup = (SVgObj *)row; SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) return NULL; - int tsize = sizeof(STabObj *) * pDb->cfg.maxSessions; + int32_t tsize = sizeof(STabObj *) * pDb->cfg.maxSessions; pVgroup->meterList = (STabObj **)malloc(tsize); memset(pVgroup->meterList, 0, tsize); pVgroup->numOfMeters = 0; @@ -444,7 +435,7 @@ void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SVgObj *pVgroup = (SVgObj *)row; SDbObj *pDb = mgmtGetDb(pVgroup->dbName); @@ -455,17 +446,17 @@ void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { mgmtVgroupActionReset(row, str, size, ssize); SVgObj *pVgroup = (SVgObj *)row; - int oldTables = taosIdPoolMaxSize(pVgroup->idPool); + int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb != NULL) { if (pDb->cfg.maxSessions != oldTables) { mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions); taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions); - int size = sizeof(STabObj *) * pDb->cfg.maxSessions; + int32_t size = sizeof(STabObj *) * pDb->cfg.maxSessions; pVgroup->meterList = (STabObj **)realloc(pVgroup->meterList, size); } } @@ -474,9 +465,9 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { SVgObj *pVgroup = (SVgObj *)row; - int tsize = pVgroup->updateEnd - (int8_t *)pVgroup; + int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; if (size < tsize) { *ssize = -1; } else { @@ -486,28 +477,28 @@ void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { SVgObj *pVgroup = (SVgObj *)malloc(sizeof(SVgObj)); if (pVgroup == NULL) return NULL; memset(pVgroup, 0, sizeof(SVgObj)); - int tsize = pVgroup->updateEnd - (int8_t *)pVgroup; + int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; memcpy(pVgroup, str, tsize); return (void *)pVgroup; } -void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } -void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; } +void *mgmtVgroupActionBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; } +void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; } +void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) { SVgObj *pVgroup = (SVgObj *)row; - int tsize = pVgroup->updateEnd - (int8_t *)pVgroup; + int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; memcpy(pVgroup, str, tsize); return NULL; } -void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize) { +void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { SVgObj *pVgroup = (SVgObj *)row; if (pVgroup->idPool) { taosIdPoolCleanUp(pVgroup->idPool); diff --git a/src/util/inc/tmodule.h b/src/util/inc/tmodule.h index cfce5b38ef..9f8ef147f8 100644 --- a/src/util/inc/tmodule.h +++ b/src/util/inc/tmodule.h @@ -20,55 +20,37 @@ extern "C" { #endif +#include +#include #include -#include -#include "os.h" -typedef struct _msg_header { - int mid; /* message ID */ - int cid; /* call ID */ - int tid; /* transaction ID */ - // int len; /* length of msg */ - char *msg; /* content holder */ -} msg_header_t, msg_t; +enum _module { + TSDB_MOD_MGMT, + TSDB_MOD_HTTP, + TSDB_MOD_MONITOR, + TSDB_MOD_DCLUSTER, + TSDB_MOD_MSTORAGE, + TSDB_MOD_MAX +}; + +#define tsetModuleStatus(mod) \ + { tsModuleStatus |= (1 << mod); } +#define tclearModuleStatus(mod) \ + { tsModuleStatus &= ~(1 << mod); } typedef struct { - char * name; /* module name */ - pthread_t thread; /* thread ID */ - tsem_t emptySem; - tsem_t fullSem; - int fullSlot; - int emptySlot; - int debugFlag; - int queueSize; - int msgSize; - pthread_mutex_t queueMutex; - pthread_mutex_t stmMutex; - msg_t * queue; + char *name; + int (*initFp)(); + void (*cleanUpFp)(); + int (*startFp)(); + void (*stopFp)(); + int num; + int curNum; + int equalVnodeNum; +} SModule; - int (*processMsg)(msg_t *); - - int (*init)(); - - void (*cleanUp)(); -} module_t; - -typedef struct { - short len; - unsigned char data[0]; -} sim_data_t; - -extern int maxCid; -extern module_t moduleObj[]; -extern char * msgName[]; - -extern int taosSendMsgToModule(module_t *mod_p, int cid, int mid, int tid, char *msg); - -extern char *taosDisplayModuleStatus(int moduleNum); - -extern int taosInitModule(module_t *); - -extern void taosCleanUpModule(module_t *); +extern uint32_t tsModuleStatus; +extern SModule tsModule[]; #ifdef __cplusplus } diff --git a/src/util/src/tmodule.c b/src/util/src/tmodule.c index 54669a20be..8faff15cfa 100644 --- a/src/util/src/tmodule.c +++ b/src/util/src/tmodule.c @@ -13,170 +13,8 @@ * along with this program. If not, see . */ -#include "os.h" +#define _DEFAULT_SOURCE #include "tmodule.h" -#include "tutil.h" -void *taosProcessQueue(void *param); - -char *taosDisplayModuleStatus(int moduleNum) { - static char status[256]; - int i; - - status[0] = 0; - - for (i = 1; i < moduleNum; ++i) - if (taosCheckPthreadValid(moduleObj[i].thread)) sprintf(status + strlen(status), "%s ", moduleObj[i].name); - - if (status[0] == 0) - sprintf(status, "all module is down"); - else - sprintf(status, " is(are) up"); - - return status; -} - -int taosInitModule(module_t *pMod) { - pthread_attr_t attr; - - if (pthread_mutex_init(&pMod->queueMutex, NULL) < 0) { - printf("ERROR: init %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno)); - taosCleanUpModule(pMod); - return -1; - } - - if (pthread_mutex_init(&pMod->stmMutex, NULL) < 0) { - printf("ERROR: init %s stmMutex failed, reason:%s\n", pMod->name, strerror(errno)); - taosCleanUpModule(pMod); - return -1; - } - - if (tsem_init(&pMod->emptySem, 0, (unsigned int)pMod->queueSize) != 0) { - printf("ERROR: init %s empty semaphore failed, reason:%s\n", pMod->name, strerror(errno)); - taosCleanUpModule(pMod); - return -1; - } - - if (tsem_init(&pMod->fullSem, 0, 0) != 0) { - printf("ERROR: init %s full semaphore failed, reason:%s\n", pMod->name, strerror(errno)); - taosCleanUpModule(pMod); - return -1; - } - - if ((pMod->queue = (msg_t *)malloc((size_t)pMod->queueSize * sizeof(msg_t))) == NULL) { - printf("ERROR: %s no enough memory, reason:%s\n", pMod->name, strerror(errno)); - taosCleanUpModule(pMod); - return -1; - } - - memset(pMod->queue, 0, (size_t)pMod->queueSize * sizeof(msg_t)); - pMod->fullSlot = 0; - pMod->emptySlot = 0; - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - - if (pthread_create(&pMod->thread, &attr, taosProcessQueue, (void *)pMod) != 0) { - printf("ERROR: %s failed to create thread, reason:%s\n", pMod->name, strerror(errno)); - taosCleanUpModule(pMod); - return -1; - } - - if (pMod->init) return (*(pMod->init))(); - - return 0; -} - -void *taosProcessQueue(void *param) { - msg_t msg; - module_t *pMod = (module_t *)param; - int oldType; - - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldType); - - signal(SIGINT, SIG_IGN); - - while (1) { - if (tsem_wait(&pMod->fullSem) != 0) - printf("ERROR: wait %s fullSem failed, reason:%s\n", pMod->name, strerror(errno)); - - if (pthread_mutex_lock(&pMod->queueMutex) != 0) - printf("ERROR: lock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno)); - - msg = pMod->queue[pMod->fullSlot]; - memset(&(pMod->queue[pMod->fullSlot]), 0, sizeof(msg_t)); - pMod->fullSlot = (pMod->fullSlot + 1) % pMod->queueSize; - - if (pthread_mutex_unlock(&pMod->queueMutex) != 0) - printf("ERROR: unlock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno)); - - if (tsem_post(&pMod->emptySem) != 0) - printf("ERROR: post %s emptySem failed, reason:%s\n", pMod->name, strerror(errno)); - - /* process the message */ - if (msg.cid < 0 || msg.cid >= maxCid) { - /*printf("ERROR: cid:%d is out of range, msg is discarded\n", msg.cid);*/ - continue; - } - - /* - if ( pthread_mutex_lock ( &(pMod->stmMutex)) != 0 ) - printf("ERROR: lock %s stmMutex failed, reason:%s\n", pMod->name, - strerror(errno)); - */ - (*(pMod->processMsg))(&msg); - - tfree(msg.msg); - /* - if ( pthread_mutex_unlock ( &(pMod->stmMutex)) != 0 ) - printf("ERROR: unlock %s stmMutex failed, reason:%s\n", pMod->name, - strerror(errno)); - */ - } -} - -int taosSendMsgToModule(module_t *pMod, int cid, int mid, int tid, char *msg) { - if (tsem_wait(&pMod->emptySem) != 0) - printf("ERROR: wait %s emptySem failed, reason:%s\n", pMod->name, strerror(errno)); - - if (pthread_mutex_lock(&pMod->queueMutex) != 0) - printf("ERROR: lock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno)); - - pMod->queue[pMod->emptySlot].cid = cid; - pMod->queue[pMod->emptySlot].mid = mid; - pMod->queue[pMod->emptySlot].tid = tid; - pMod->queue[pMod->emptySlot].msg = msg; - pMod->emptySlot = (pMod->emptySlot + 1) % pMod->queueSize; - - if (pthread_mutex_unlock(&pMod->queueMutex) != 0) - printf("ERROR: unlock %s queueMutex failed, reason:%s\n", pMod->name, strerror(errno)); - - if (tsem_post(&pMod->fullSem) != 0) printf("ERROR: post %s fullSem failed, reason:%s\n", pMod->name, strerror(errno)); - - return 0; -} - -void taosCleanUpModule(module_t *pMod) { - int i; - - if (pMod->cleanUp) pMod->cleanUp(); - - if (taosCheckPthreadValid(pMod->thread)) { - pthread_cancel(pMod->thread); - pthread_join(pMod->thread, NULL); - } - - taosResetPthread(&pMod->thread); - tsem_destroy(&pMod->emptySem); - tsem_destroy(&pMod->fullSem); - pthread_mutex_destroy(&pMod->queueMutex); - pthread_mutex_destroy(&pMod->stmMutex); - - for (i = 0; i < pMod->queueSize; ++i) { - tfree(pMod->queue[i].msg); - } - - tfree(pMod->queue); - - memset(pMod, 0, sizeof(module_t)); -} +SModule tsModule[TSDB_MOD_MAX] = {0}; +uint32_t tsModuleStatus = 0; \ No newline at end of file