TD-3130 for topic module
This commit is contained in:
parent
c690ba1282
commit
32510fa452
|
@ -16,6 +16,7 @@ SET(TD_GRANT FALSE)
|
|||
SET(TD_MQTT FALSE)
|
||||
SET(TD_TSDB_PLUGINS FALSE)
|
||||
SET(TD_STORAGE FALSE)
|
||||
SET(TD_TOPIC FALSE)
|
||||
|
||||
SET(TD_COVER FALSE)
|
||||
SET(TD_MEM_CHECK FALSE)
|
||||
|
|
|
@ -25,6 +25,10 @@ IF (TD_STORAGE)
|
|||
ADD_DEFINITIONS(-D_STORAGE)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_TOPIC)
|
||||
ADD_DEFINITIONS(-D_TOPIC)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_GODLL)
|
||||
ADD_DEFINITIONS(-D_TD_GO_DLL_)
|
||||
ENDIF ()
|
||||
|
|
|
@ -9,6 +9,14 @@ ELSEIF (${ACCOUNT} MATCHES "false")
|
|||
MESSAGE(STATUS "Build without account plugins")
|
||||
ENDIF ()
|
||||
|
||||
IF (${TOPIC} MATCHES "true")
|
||||
SET(TD_TOPIC TRUE)
|
||||
MESSAGE(STATUS "Build with topic plugins")
|
||||
ELSEIF (${TOPIC} MATCHES "false")
|
||||
SET(TD_TOPIC FALSE)
|
||||
MESSAGE(STATUS "Build without topic plugins")
|
||||
ENDIF ()
|
||||
|
||||
IF (${COVER} MATCHES "true")
|
||||
SET(TD_COVER TRUE)
|
||||
MESSAGE(STATUS "Build with test coverage")
|
||||
|
|
|
@ -95,6 +95,7 @@ extern int8_t tsCompression;
|
|||
extern int8_t tsWAL;
|
||||
extern int32_t tsFsyncPeriod;
|
||||
extern int32_t tsReplications;
|
||||
extern int16_t tsPartitons;
|
||||
extern int32_t tsQuorum;
|
||||
extern int8_t tsUpdate;
|
||||
extern int8_t tsCacheLastRow;
|
||||
|
|
|
@ -125,6 +125,7 @@ int8_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
|
|||
int8_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
|
||||
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
|
||||
int16_t tsPartitons = TSDB_DEFAULT_DB_PARTITON_OPTION;
|
||||
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
|
||||
int8_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
||||
int8_t tsCacheLastRow = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
|
||||
|
@ -853,6 +854,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "partitions";
|
||||
cfg.ptr = &tsPartitons;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT16;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = TSDB_MIN_DB_PARTITON_OPTION;
|
||||
cfg.maxValue = TSDB_MAX_DB_PARTITON_OPTION;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "quorum";
|
||||
cfg.ptr = &tsQuorum;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
|
|
|
@ -31,6 +31,10 @@ IF (TD_MQTT)
|
|||
TARGET_LINK_LIBRARIES(taosd mqtt)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_TOPIC)
|
||||
TARGET_LINK_LIBRARIES(taosd topic)
|
||||
ENDIF ()
|
||||
|
||||
SET(PREPARE_ENV_CMD "prepare_env_cmd")
|
||||
SET(PREPARE_ENV_TARGET "prepare_env_target")
|
||||
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
|
||||
|
|
|
@ -222,6 +222,9 @@ do { \
|
|||
#define TSDB_MQTT_TOPIC_LEN 64
|
||||
#define TSDB_MQTT_CLIENT_ID_LEN 32
|
||||
|
||||
#define TSDB_DB_TYPE_DEFAULT 0
|
||||
#define TSDB_DB_TYPE_TOPIC 1
|
||||
|
||||
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
|
||||
|
||||
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
|
||||
|
@ -306,6 +309,10 @@ do { \
|
|||
#define TSDB_MAX_DB_REPLICA_OPTION 3
|
||||
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
|
||||
|
||||
#define TSDB_MIN_DB_PARTITON_OPTION 1
|
||||
#define TSDB_MAX_DB_PARTITON_OPTION 50000
|
||||
#define TSDB_DEFAULT_DB_PARTITON_OPTION 4
|
||||
|
||||
#define TSDB_MIN_DB_QUORUM_OPTION 1
|
||||
#define TSDB_MAX_DB_QUORUM_OPTION 2
|
||||
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1
|
||||
|
|
|
@ -107,6 +107,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
|
|||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
|
||||
|
||||
// message for topic
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
|
||||
|
||||
#ifndef TAOS_MESSAGE_C
|
||||
TSDB_MSG_TYPE_MAX // 105
|
||||
#endif
|
||||
|
@ -141,6 +147,7 @@ enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_VNODES,
|
||||
TSDB_MGMT_TABLE_STREAMTABLES,
|
||||
TSDB_MGMT_TABLE_CLUSTER,
|
||||
TSDB_MGMT_TABLE_TP,
|
||||
TSDB_MGMT_TABLE_MAX,
|
||||
};
|
||||
|
||||
|
@ -555,7 +562,9 @@ typedef struct {
|
|||
int8_t ignoreExist;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t reserve[8];
|
||||
int8_t dbType;
|
||||
int16_t partitions;
|
||||
int8_t reserve[5];
|
||||
} SCreateDbMsg, SAlterDbMsg;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_TP
|
||||
#define TDENGINE_TP
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t tpInit();
|
||||
void tpCleanUp();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -175,7 +175,9 @@ typedef struct {
|
|||
int8_t quorum;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t reserved[10];
|
||||
int8_t dbType;
|
||||
int16_t partitions;
|
||||
int8_t reserved[7];
|
||||
} SDbCfg;
|
||||
|
||||
typedef struct SDbObj {
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "tname.h"
|
||||
#include "tbn.h"
|
||||
#include "tdataformat.h"
|
||||
#include "tp.h"
|
||||
#include "mnode.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeInt.h"
|
||||
|
@ -38,8 +39,8 @@
|
|||
#include "mnodeVgroup.h"
|
||||
|
||||
#define VG_LIST_SIZE 8
|
||||
int64_t tsDbRid = -1;
|
||||
static void * tsDbSdb = NULL;
|
||||
int64_t tsDbRid = -1;
|
||||
void * tsDbSdb = NULL;
|
||||
static int32_t tsDbUpdateSize;
|
||||
|
||||
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg);
|
||||
|
@ -51,6 +52,11 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
|
|||
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
|
||||
|
||||
#ifndef _TOPIC
|
||||
int32_t tpInit() {}
|
||||
void tpCleanUp() {}
|
||||
#endif
|
||||
|
||||
static void mnodeDestroyDb(SDbObj *pDb) {
|
||||
pthread_mutex_destroy(&pDb->mutex);
|
||||
tfree(pDb->vgList);
|
||||
|
@ -174,7 +180,14 @@ int32_t mnodeInitDbs() {
|
|||
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
|
||||
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
|
||||
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
|
||||
|
||||
|
||||
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_TP, mnodeProcessCreateDbMsg);
|
||||
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TP, mnodeProcessAlterDbMsg);
|
||||
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_TP, mnodeProcessDropDbMsg);
|
||||
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_TP, mnodeGetDbMeta);
|
||||
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TP, mnodeRetrieveDbs);
|
||||
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_TP, mnodeCancelGetNextDb);
|
||||
|
||||
mDebug("table:dbs table is created");
|
||||
return 0;
|
||||
}
|
||||
|
@ -354,6 +367,8 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
|
||||
if (pCfg->update < 0) pCfg->update = tsUpdate;
|
||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
|
||||
if (pCfg->dbType < 0) pCfg->dbType = 0;
|
||||
if (pCfg->partitions < 0) pCfg->partitions = tsPartitons;
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
|
@ -408,7 +423,9 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
|
|||
.replications = pCreate->replications,
|
||||
.quorum = pCreate->quorum,
|
||||
.update = pCreate->update,
|
||||
.cacheLastRow = pCreate->cacheLastRow
|
||||
.cacheLastRow = pCreate->cacheLastRow,
|
||||
.dbType = pCreate->dbType,
|
||||
.partitions = pCreate->partitions
|
||||
};
|
||||
|
||||
mnodeSetDefaultDbCfg(&pDb->cfg);
|
||||
|
@ -501,6 +518,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) {
|
|||
}
|
||||
|
||||
void mnodeCleanupDbs() {
|
||||
tpCleanUp();
|
||||
sdbCloseTable(tsDbRid);
|
||||
tsDbSdb = NULL;
|
||||
}
|
||||
|
@ -660,7 +678,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
|
|||
return 0;
|
||||
}
|
||||
|
||||
static char *mnodeGetDbStr(char *src) {
|
||||
char *mnodeGetDbStr(char *src) {
|
||||
char *pos = strstr(src, TS_PATH_DELIMITER);
|
||||
if (pos != NULL) ++pos;
|
||||
|
||||
|
@ -679,7 +697,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
|
|||
pShow->pIter = mnodeGetNextDb(pShow->pIter, &pDb);
|
||||
|
||||
if (pDb == NULL) break;
|
||||
if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY) {
|
||||
if (pDb->pAcct != pUser->pAcct || pDb->status != TSDB_DB_STATUS_READY /*|| pDb->cfg.dbType != TSDB_DB_TYPE_DEFAULT*/) {
|
||||
mnodeDecDbRef(pDb);
|
||||
continue;
|
||||
}
|
||||
|
@ -852,6 +870,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
|
|||
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
||||
pCreate->commitTime = htonl(pCreate->commitTime);
|
||||
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
||||
pCreate->partitions = htons(pCreate->partitions);
|
||||
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
|
||||
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
|
||||
|
||||
|
@ -887,6 +906,8 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
|
|||
int8_t precision = pAlter->precision;
|
||||
int8_t update = pAlter->update;
|
||||
int8_t cacheLastRow = pAlter->cacheLastRow;
|
||||
int8_t dbType = pAlter->dbType;
|
||||
int16_t partitions = pAlter->partitions;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -1004,6 +1025,16 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
|
|||
newCfg.cacheLastRow = cacheLastRow;
|
||||
}
|
||||
|
||||
if (dbType >= 0 && dbType != pDb->cfg.dbType) {
|
||||
mError("db:%s, can't alter dbType option", pDb->name);
|
||||
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (partitions >= 0 && partitions != pDb->cfg.partitions) {
|
||||
mDebug("db:%s, partitions:%d change to %d", pDb->name, pDb->cfg.partitions, partitions);
|
||||
newCfg.partitions = partitions;
|
||||
}
|
||||
|
||||
return newCfg;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue