use safe sys func

This commit is contained in:
yihaoDeng 2024-12-05 16:35:45 +08:00
parent ffed7a3c67
commit c5c56606c4
22 changed files with 214 additions and 213 deletions

View File

@ -251,7 +251,7 @@ bool checkColumnEncode(char encode[TSDB_CL_COMPRESS_OPTION_LEN]) {
} }
bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OPTION_LEN]) { bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OPTION_LEN]) {
if (0 == strlen(encode)) { if (0 == strlen(encode)) {
strncpy(encode, getDefaultEncodeStr(type), TSDB_CL_COMPRESS_OPTION_LEN); tstrncpy(encode, getDefaultEncodeStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
return true; return true;
} }
return checkColumnEncode(encode) && validColEncode(type, columnEncodeVal(encode)); return checkColumnEncode(encode) && validColEncode(type, columnEncodeVal(encode));
@ -268,7 +268,7 @@ bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
} }
bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRESS_OPTION_LEN]) { bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
if (0 == strlen(compress)) { if (0 == strlen(compress)) {
strncpy(compress, getDefaultCompressStr(type), TSDB_CL_COMPRESS_OPTION_LEN); tstrncpy(compress, getDefaultCompressStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
return true; return true;
} }
@ -290,7 +290,7 @@ bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
} }
bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]) { bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
if (0 == strlen(level)) { if (0 == strlen(level)) {
strncpy(level, getDefaultLevelStr(type), TSDB_CL_COMPRESS_OPTION_LEN); tstrncpy(level, getDefaultLevelStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
return true; return true;
} }
return checkColumnLevel(level) && validColCompressLevel(type, columnLevelVal(level)); return checkColumnLevel(level) && validColCompressLevel(type, columnLevelVal(level));
@ -314,7 +314,7 @@ void setColLevel(uint32_t* compress, uint8_t level) {
int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check, int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressType, uint8_t level, bool check,
uint32_t* compress) { uint32_t* compress) {
if(compress == NULL) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (compress == NULL) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
if (check && !validColEncode(type, encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR; if (check && !validColEncode(type, encode)) return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
setColEncode(compress, encode); setColEncode(compress, encode);

View File

@ -22,7 +22,7 @@
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) { int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0; pEp->port = 0;
memset(pEp->fqdn, 0, TSDB_FQDN_LEN); memset(pEp->fqdn, 0, TSDB_FQDN_LEN);
strncpy(pEp->fqdn, ep, TSDB_FQDN_LEN - 1); tstrncpy(pEp->fqdn, ep, TSDB_FQDN_LEN);
char* temp = strchr(pEp->fqdn, ':'); char* temp = strchr(pEp->fqdn, ':');
if (temp) { if (temp) {

View File

@ -186,7 +186,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm; pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) { if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
#else #else
pCfg->tsdbCfg.encryptAlgorithm = 0; pCfg->tsdbCfg.encryptAlgorithm = 0;
@ -202,7 +202,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm; pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) { if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
#else #else
pCfg->walCfg.encryptAlgorithm = 0; pCfg->walCfg.encryptAlgorithm = 0;
@ -211,7 +211,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm; pCfg->tdbEncryptAlgorithm = pCreate->encryptAlgorithm;
if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) { if (pCfg->tdbEncryptAlgorithm == DND_CA_SM4) {
strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
#else #else
pCfg->tdbEncryptAlgorithm = 0; pCfg->tdbEncryptAlgorithm = 0;
@ -898,7 +898,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
size_t size = taosArrayGetSize(arbHbReq.hbMembers); size_t size = taosArrayGetSize(arbHbReq.hbMembers);
arbHbRsp.dnodeId = pMgmt->pData->dnodeId; arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
strncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE); tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember)); arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
if (arbHbRsp.hbMembers == NULL) { if (arbHbRsp.hbMembers == NULL) {
goto _OVER; goto _OVER;

View File

@ -179,7 +179,7 @@ int32_t dmInitVars(SDnode *pDnode) {
//code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey); //code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
code = 0; code = 0;
strncpy(tsEncryptKey, tsAuthCode, 16); tstrncpy(tsEncryptKey, tsAuthCode, 16);
if (code != 0) { if (code != 0) {
if(code == -1){ if(code == -1){
@ -220,62 +220,62 @@ int32_t dmInitVars(SDnode *pDnode) {
} }
extern SMonVloadInfo tsVinfo; extern SMonVloadInfo tsVinfo;
void dmClearVars(SDnode *pDnode) { void dmClearVars(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path); taosMemoryFreeClear(pWrapper->path);
(void)taosThreadRwlockDestroy(&pWrapper->lock); (void)taosThreadRwlockDestroy(&pWrapper->lock);
} }
if (pDnode->lockfile != NULL) { if (pDnode->lockfile != NULL) {
if (taosUnLockFile(pDnode->lockfile) != 0) { if (taosUnLockFile(pDnode->lockfile) != 0) {
dError("failed to unlock file"); dError("failed to unlock file");
} }
(void)taosCloseFile(&pDnode->lockfile); (void)taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL; pDnode->lockfile = NULL;
} }
SDnodeData *pData = &pDnode->data; SDnodeData *pData = &pDnode->data;
(void)taosThreadRwlockWrlock(&pData->lock); (void)taosThreadRwlockWrlock(&pData->lock);
if (pData->oldDnodeEps != NULL) { if (pData->oldDnodeEps != NULL) {
if (dmWriteEps(pData) == 0) { if (dmWriteEps(pData) == 0) {
dmRemoveDnodePairs(pData); dmRemoveDnodePairs(pData);
} }
taosArrayDestroy(pData->oldDnodeEps); taosArrayDestroy(pData->oldDnodeEps);
pData->oldDnodeEps = NULL; pData->oldDnodeEps = NULL;
} }
if (pData->dnodeEps != NULL) { if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps); taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL; pData->dnodeEps = NULL;
} }
if (pData->dnodeHash != NULL) { if (pData->dnodeHash != NULL) {
taosHashCleanup(pData->dnodeHash); taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL; pData->dnodeHash = NULL;
} }
(void)taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
(void)taosThreadRwlockDestroy(&pData->lock); (void)taosThreadRwlockDestroy(&pData->lock);
dDebug("begin to lock status info when thread exit"); dDebug("begin to lock status info when thread exit");
if (taosThreadMutexLock(&pData->statusInfolock) != 0) { if (taosThreadMutexLock(&pData->statusInfolock) != 0) {
dError("failed to lock status info lock"); dError("failed to lock status info lock");
return; return;
} }
if (tsVinfo.pVloads != NULL) { if (tsVinfo.pVloads != NULL) {
taosArrayDestroy(tsVinfo.pVloads); taosArrayDestroy(tsVinfo.pVloads);
tsVinfo.pVloads = NULL; tsVinfo.pVloads = NULL;
} }
if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) { if (taosThreadMutexUnlock(&pData->statusInfolock) != 0) {
dError("failed to unlock status info lock"); dError("failed to unlock status info lock");
return; return;
} }
if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) { if (taosThreadMutexDestroy(&pData->statusInfolock) != 0) {
dError("failed to destroy status info lock"); dError("failed to destroy status info lock");
} }
memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock)); memset(&pData->statusInfolock, 0, sizeof(pData->statusInfolock));
(void)taosThreadMutexDestroy(&pDnode->mutex); (void)taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
} }
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) { void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {

View File

@ -230,7 +230,7 @@ static int32_t dmWriteCheckCodeFile(char *file, char *realfile, char *key, bool
} }
SCryptOpts opts; SCryptOpts opts;
strncpy(opts.key, key, ENCRYPT_KEY_LEN); tstrncpy(opts.key, key, ENCRYPT_KEY_LEN);
opts.len = len; opts.len = len;
opts.source = DM_KEY_INDICATOR; opts.source = DM_KEY_INDICATOR;
opts.result = result; opts.result = result;
@ -349,7 +349,7 @@ static int32_t dmCompareEncryptKey(char *file, char *key, bool toLogFile) {
} }
SCryptOpts opts = {0}; SCryptOpts opts = {0};
strncpy(opts.key, key, ENCRYPT_KEY_LEN); tstrncpy(opts.key, key, ENCRYPT_KEY_LEN);
opts.len = len; opts.len = len;
opts.source = content; opts.source = content;
opts.result = result; opts.result = result;
@ -551,7 +551,7 @@ int32_t dmGetEncryptKey() {
goto _OVER; goto _OVER;
} }
strncpy(tsEncryptKey, encryptKey, ENCRYPT_KEY_LEN); tstrncpy(tsEncryptKey, encryptKey, ENCRYPT_KEY_LEN);
taosMemoryFreeClear(encryptKey); taosMemoryFreeClear(encryptKey);
tsEncryptionKeyChksum = taosCalcChecksum(0, tsEncryptKey, strlen(tsEncryptKey)); tsEncryptionKeyChksum = taosCalcChecksum(0, tsEncryptKey, strlen(tsEncryptKey));
tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED; tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;

View File

@ -1315,7 +1315,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
continue; continue;
} }
char dbNameInGroup[TSDB_DB_FNAME_LEN]; char dbNameInGroup[TSDB_DB_FNAME_LEN];
strncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN); tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
sdbRelease(pSdb, pVgObj); sdbRelease(pSdb, pVgObj);
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};

View File

@ -244,7 +244,7 @@ int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int
TAOS_RETURN(code); TAOS_RETURN(code);
} }
(void)strncpy(dbname, pCompact->dbname, len); tstrncpy(dbname, pCompact->dbname, len);
mndReleaseCompact(pMnode, pCompact); mndReleaseCompact(pMnode, pCompact);
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -321,7 +321,7 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock,
TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER); TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
(void)tNameGetDbName(&name, varDataVal(tmpBuf)); (void)tNameGetDbName(&name, varDataVal(tmpBuf));
} else { } else {
(void)strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN); tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
} }
varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf))); varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER); RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
@ -641,7 +641,7 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
char detail[1024] = {0}; char detail[1024] = {0};
int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse); TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
} }

View File

@ -1450,7 +1450,7 @@ static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
} }
size_t optLen = p - pMCfgReq->config; size_t optLen = p - pMCfgReq->config;
(void)strncpy(pDCfgReq->config, pMCfgReq->config, optLen); tstrncpy(pDCfgReq->config, pMCfgReq->config, optLen + 1);
pDCfgReq->config[optLen] = 0; pDCfgReq->config[optLen] = 0;
if (' ' == pMCfgReq->config[optLen]) { if (' ' == pMCfgReq->config[optLen]) {

View File

@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndAcct.h" #include "mndAcct.h"
#include "mndArbGroup.h"
#include "mndAnode.h" #include "mndAnode.h"
#include "mndArbGroup.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndCompact.h" #include "mndCompact.h"
#include "mndCompactDetail.h" #include "mndCompactDetail.h"
@ -530,7 +530,7 @@ static int32_t mndInitWal(SMnode *pMnode) {
code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
TAOS_RETURN(code); TAOS_RETURN(code);
} else { } else {
(void)strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
} }
#endif #endif

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndSma.h" #include "mndSma.h"
#include "functionMgt.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndIndex.h" #include "mndIndex.h"
@ -31,7 +32,6 @@
#include "mndVgroup.h" #include "mndVgroup.h"
#include "parser.h" #include "parser.h"
#include "tname.h" #include "tname.h"
#include "functionMgt.h"
#define TSDB_SMA_VER_NUMBER 1 #define TSDB_SMA_VER_NUMBER 1
#define TSDB_SMA_RESERVE_SIZE 64 #define TSDB_SMA_RESERVE_SIZE 64
@ -48,8 +48,8 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndDestroySmaObj(SSmaObj *pSmaObj); static void mndDestroySmaObj(SSmaObj *pSmaObj);
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq); static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq);
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq); static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq);
// sma and tag index comm func // sma and tag index comm func
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq); static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
@ -61,11 +61,11 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq); static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq);
typedef struct SCreateTSMACxt { typedef struct SCreateTSMACxt {
SMnode * pMnode; SMnode *pMnode;
const SRpcMsg *pRpcReq; const SRpcMsg *pRpcReq;
union { union {
const SMCreateSmaReq *pCreateSmaReq; const SMCreateSmaReq *pCreateSmaReq;
const SMDropSmaReq * pDropSmaReq; const SMDropSmaReq *pDropSmaReq;
}; };
SDbObj *pDb; SDbObj *pDb;
SStbObj *pSrcStb; SStbObj *pSrcStb;
@ -298,16 +298,13 @@ void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
} }
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
return mndAcquireDb(pMnode, db);
}
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) { static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t contLen = 0; int32_t contLen = 0;
SName name = {0}; SName name = {0};
int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return NULL; return NULL;
} }
@ -368,7 +365,7 @@ static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma,
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t contLen; int32_t contLen;
SName name = {0}; SName name = {0};
int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
terrno = code; terrno = code;
return NULL; return NULL;
@ -424,9 +421,9 @@ static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static int32_t mndSetCreateSmaUndoLogs(SMnode* pMnode, STrans* pTrans, SSmaObj* pSma) { static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
int32_t code = 0; int32_t code = 0;
SSdbRaw * pUndoRaw = mndSmaActionEncode(pSma); SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma);
if (!pUndoRaw) { if (!pUndoRaw) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
@ -670,7 +667,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
// check the maxDelay // check the maxDelay
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = -1; int64_t msInterval = -1;
int32_t code = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval); int32_t code =
convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code)); mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
return code; return code;
@ -792,7 +790,7 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
} }
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) { static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
SName n; SName n;
int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
@ -984,10 +982,10 @@ static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SD
} }
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
int32_t code = -1; int32_t code = -1;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
STrans *pTrans = NULL; STrans *pTrans = NULL;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
@ -1023,7 +1021,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
goto _OVER; goto _OVER;
} }
code = mndAcquireStream(pMnode, streamName, &pStream); code = mndAcquireStream(pMnode, streamName, &pStream);
if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) { if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1124,8 +1121,8 @@ _OVER:
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = 0; int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
@ -1232,7 +1229,7 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
FOREACH(node, pList) { FOREACH(node, pList) {
SFunctionNode *pFunc = (SFunctionNode *)node; SFunctionNode *pFunc = (SFunctionNode *)node;
extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", extOffset += tsnprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
(extOffset ? "," : ""), pFunc->functionName); (extOffset ? "," : ""), pFunc->functionName);
} }
*exist = true; *exist = true;
@ -1439,8 +1436,8 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SName smaName = {0}; SName smaName = {0};
SName stbName = {0}; SName stbName = {0};
char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -1448,7 +1445,7 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db)); STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
} }
SColumnInfoData* pColInfo = NULL; SColumnInfoData *pColInfo = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName)); STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
@ -1540,7 +1537,7 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
taosMemoryFree(p); taosMemoryFree(p);
} }
static void initSMAObj(SCreateTSMACxt* pCxt) { static void initSMAObj(SCreateTSMACxt *pCxt) {
memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN); memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
@ -1549,7 +1546,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) {
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
pCxt->pSma->dstTbUid = 0; // not used pCxt->pSma->dstTbUid = 0; // not used
pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid; pCxt->pSma->stbUid = pCxt->pSrcStb ? pCxt->pSrcStb->uid : pCxt->pCreateSmaReq->normSourceTbUid;
pCxt->pSma->dbUid = pCxt->pDb->uid; pCxt->pSma->dbUid = pCxt->pDb->uid;
pCxt->pSma->interval = pCxt->pCreateSmaReq->interval; pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
@ -1598,7 +1595,7 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
if (!pCxt->pCreateStreamReq->pTags) { if (!pCxt->pCreateStreamReq->pTags) {
return terrno; return terrno;
} }
SField f = {0}; SField f = {0};
int32_t code = 0; int32_t code = 0;
if (pCxt->pSrcStb) { if (pCxt->pSrcStb) {
for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) { for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
@ -1626,9 +1623,9 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
// construct output cols // construct output cols
SNode* pNode; SNode *pNode;
FOREACH(pNode, pCxt->pProjects) { FOREACH(pNode, pCxt->pProjects) {
SExprNode* pExprNode = (SExprNode*)pNode; SExprNode *pExprNode = (SExprNode *)pNode;
f.bytes = pExprNode->resType.bytes; f.bytes = pExprNode->resType.bytes;
f.type = pExprNode->resType.type; f.type = pExprNode->resType.type;
f.flags = COL_SMA_ON; f.flags = COL_SMA_ON;
@ -1642,7 +1639,7 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
return code; return code;
} }
static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) { static int32_t mndCreateTSMABuildDropStreamReq(SCreateTSMACxt *pCxt) {
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN); tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
pCxt->pDropStreamReq->igNotExists = false; pCxt->pDropStreamReq->igNotExists = false;
pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name); pCxt->pDropStreamReq->sql = taosStrdup(pCxt->pDropSmaReq->name);
@ -1685,14 +1682,13 @@ static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTran
TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); TAOS_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
} }
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt *pCxt) {
int32_t code = -1; int32_t code = -1;
STransAction createStreamRedoAction = {0}; STransAction createStreamRedoAction = {0};
STransAction createStreamUndoAction = {0}; STransAction createStreamUndoAction = {0};
STransAction dropStbUndoAction = {0}; STransAction dropStbUndoAction = {0};
SMDropStbReq dropStbReq = {0}; SMDropStbReq dropStbReq = {0};
STrans *pTrans = STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
if (!pTrans) { if (!pTrans) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
@ -1713,7 +1709,9 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont, createStreamRedoAction.contLen, pCxt->pCreateStreamReq)) { if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
createStreamRedoAction.contLen,
pCxt->pCreateStreamReq)) {
mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name); mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
@ -1728,14 +1726,15 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
if (createStreamUndoAction.contLen != tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) { if (createStreamUndoAction.contLen !=
tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name); mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }
dropStbReq.igNotExists = true; dropStbReq.igNotExists = true;
strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
dropStbUndoAction.epSet = createStreamRedoAction.epSet; dropStbUndoAction.epSet = createStreamRedoAction.epSet;
dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST; dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
@ -1746,7 +1745,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) { if (dropStbUndoAction.contLen !=
tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name); mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
@ -1781,7 +1781,7 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
pCxt->pSma = &sma; pCxt->pSma = &sma;
initSMAObj(pCxt); initSMAObj(pCxt);
SNodeList* pProjects = NULL; SNodeList *pProjects = NULL;
code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects); code = nodesStringToList(pCxt->pCreateSmaReq->expr, &pProjects);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
goto _OVER; goto _OVER;
@ -1830,28 +1830,28 @@ _OVER:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static int32_t mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, char* targetStbName) { static int32_t mndTSMAGenerateOutputName(const char *tsmaName, char *streamName, char *targetStbName) {
SName smaName; SName smaName;
int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname); sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName); snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s" TSMA_RES_STB_POSTFIX, tsmaName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { static int32_t mndProcessCreateTSMAReq(SRpcMsg *pReq) {
#ifdef WINDOWS #ifdef WINDOWS
TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM); TAOS_RETURN(TSDB_CODE_MND_INVALID_PLATFORM);
#endif #endif
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SDbObj * pDb = NULL; SDbObj *pDb = NULL;
SStbObj * pStb = NULL; SStbObj *pStb = NULL;
SSmaObj * pSma = NULL; SSmaObj *pSma = NULL;
SSmaObj * pBaseTsma = NULL; SSmaObj *pBaseTsma = NULL;
SStreamObj * pStream = NULL; SStreamObj *pStream = NULL;
int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId); int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
SMCreateSmaReq createReq = {0}; SMCreateSmaReq createReq = {0};
@ -1941,16 +1941,16 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
} }
SCreateTSMACxt cxt = { SCreateTSMACxt cxt = {
.pMnode = pMnode, .pMnode = pMnode,
.pCreateSmaReq = &createReq, .pCreateSmaReq = &createReq,
.pCreateStreamReq = NULL, .pCreateStreamReq = NULL,
.streamName = streamName, .streamName = streamName,
.targetStbFullName = streamTargetStbFullName, .targetStbFullName = streamTargetStbFullName,
.pDb = pDb, .pDb = pDb,
.pRpcReq = pReq, .pRpcReq = pReq,
.pSma = NULL, .pSma = NULL,
.pBaseSma = pBaseTsma, .pBaseSma = pBaseTsma,
.pSrcStb = pStb, .pSrcStb = pStb,
}; };
code = mndCreateTSMA(&cxt); code = mndCreateTSMA(&cxt);
@ -1971,10 +1971,10 @@ _OVER:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) { static int32_t mndDropTSMA(SCreateTSMACxt *pCxt) {
int32_t code = -1; int32_t code = -1;
STransAction dropStreamRedoAction = {0}; STransAction dropStreamRedoAction = {0};
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma"); STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "drop-tsma");
if (!pTrans) { if (!pTrans) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
@ -2021,7 +2021,8 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
if (dropStbRedoAction.contLen != tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) { if (dropStbRedoAction.contLen !=
tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name); mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
@ -2044,9 +2045,9 @@ _OVER:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static bool hasRecursiveTsmasBasedOnMe(SMnode* pMnode, const SSmaObj* pSma) { static bool hasRecursiveTsmasBasedOnMe(SMnode *pMnode, const SSmaObj *pSma) {
SSmaObj *pSmaObj = NULL; SSmaObj *pSmaObj = NULL;
void * pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj); pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
if (pIter == NULL) break; if (pIter == NULL) break;
@ -2060,25 +2061,25 @@ static bool hasRecursiveTsmasBasedOnMe(SMnode* pMnode, const SSmaObj* pSma) {
return false; return false;
} }
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { static int32_t mndProcessDropTSMAReq(SRpcMsg *pReq) {
int32_t code = -1; int32_t code = -1;
SMDropSmaReq dropReq = {0}; SMDropSmaReq dropReq = {0};
SSmaObj * pSma = NULL; SSmaObj *pSma = NULL;
SDbObj * pDb = NULL; SDbObj *pDb = NULL;
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) { if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }
char streamName[TSDB_TABLE_FNAME_LEN] = {0}; char streamName[TSDB_TABLE_FNAME_LEN] = {0};
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0}; char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName); code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
goto _OVER; goto _OVER;
} }
SStbObj* pStb = mndAcquireStb(pMnode, streamTargetStbFullName); SStbObj *pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
pSma = mndAcquireSma(pMnode, dropReq.name); pSma = mndAcquireSma(pMnode, dropReq.name);
if (!pSma && dropReq.igNotExists) { if (!pSma && dropReq.igNotExists) {
@ -2113,13 +2114,13 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
} }
SCreateTSMACxt cxt = { SCreateTSMACxt cxt = {
.pDb = pDb, .pDb = pDb,
.pMnode = pMnode, .pMnode = pMnode,
.pRpcReq = pReq, .pRpcReq = pReq,
.pSma = pSma, .pSma = pSma,
.streamName = streamName, .streamName = streamName,
.targetStbFullName = streamTargetStbFullName, .targetStbFullName = streamTargetStbFullName,
.pDropSmaReq = &dropReq, .pDropSmaReq = &dropReq,
}; };
code = mndDropTSMA(&cxt); code = mndDropTSMA(&cxt);
@ -2134,10 +2135,10 @@ _OVER:
} }
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SDbObj * pDb = NULL; SDbObj *pDb = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SSmaObj * pSma = NULL; SSmaObj *pSma = NULL;
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = 0; int32_t code = 0;
SColumnInfoData *pColInfo; SColumnInfoData *pColInfo;
if (pShow->pIter == NULL) { if (pShow->pIter == NULL) {
@ -2153,7 +2154,7 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
while (numOfRows < rows) { while (numOfRows < rows) {
pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma); pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
if (pIter->pSmaIter == NULL) break; if (pIter->pSmaIter == NULL) break;
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db); SDbObj *pSrcDb = mndAcquireDb(pMnode, pSma->db);
if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) { if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
sdbRelease(pMnode->pSdb, pSma); sdbRelease(pMnode->pSdb, pSma);
@ -2176,7 +2177,7 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db)); STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char*)db, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -2186,12 +2187,12 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n)); STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char*)srcTb, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)srcTb, false);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char*)db, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -2200,29 +2201,29 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(targetTb, (char*)tNameGetTableName(&n)); STR_TO_VARSTR(targetTb, (char *)tNameGetTableName(&n));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char*)targetTb, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)targetTb, false);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
// stream name // stream name
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char*)smaName, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false); code = colDataSetVal(pColInfo, numOfRows, (const char *)(&pSma->createdTime), false);
} }
// interval // interval
char interval[64 + VARSTR_HEADER_SIZE] = {0}; char interval[64 + VARSTR_HEADER_SIZE] = {0};
int32_t len = 0; int32_t len = 0;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) { if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
getPrecisionUnit(pSrcDb->cfg.precision)); getPrecisionUnit(pSrcDb->cfg.precision));
} else { } else {
len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit); len = tsnprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
} }
@ -2243,17 +2244,17 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
// func list // func list
len = 0; len = 0;
SNode *pNode = NULL, *pFunc = NULL; SNode *pNode = NULL, *pFunc = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesStringToNode(pSma->ast, &pNode); code = nodesStringToNode(pSma->ast, &pNode);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
char * start = buf + VARSTR_HEADER_SIZE; char *start = buf + VARSTR_HEADER_SIZE;
FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) { FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
if (nodeType(pFunc) == QUERY_NODE_FUNCTION) { if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
SFunctionNode *pFuncNode = (SFunctionNode *)pFunc; SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue; if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "", len += tsnprintf(start, TSDB_MAX_SAVED_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
((SExprNode *)pFunc)->userAlias); ((SExprNode *)pFunc)->userAlias);
if (len >= TSDB_MAX_SAVED_SQL_LEN) { if (len >= TSDB_MAX_SAVED_SQL_LEN) {
len = TSDB_MAX_SAVED_SQL_LEN; len = TSDB_MAX_SAVED_SQL_LEN;
break; break;
@ -2297,7 +2298,8 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
taosMemoryFree(p); taosMemoryFree(p);
} }
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo, const SSmaObj* pBaseTsma) { int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
const SSmaObj *pBaseTsma) {
int32_t code = 0; int32_t code = 0;
pInfo->interval = pSma->interval; pInfo->interval = pSma->interval;
pInfo->unit = pSma->intervalUnit; pInfo->unit = pSma->intervalUnit;
@ -2336,7 +2338,7 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
SSelectStmt *pSelect = (SSelectStmt *)pNode; SSelectStmt *pSelect = (SSelectStmt *)pNode;
FOREACH(pFunc, pSelect->pProjectionList) { FOREACH(pFunc, pSelect->pProjectionList) {
STableTSMAFuncInfo funcInfo = {0}; STableTSMAFuncInfo funcInfo = {0};
SFunctionNode * pFuncNode = (SFunctionNode *)pFunc; SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue; if (!fmIsTSMASupportedFunc(pFuncNode->funcId)) continue;
funcInfo.funcId = pFuncNode->funcId; funcInfo.funcId = pFuncNode->funcId;
funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId; funcInfo.colId = ((SColumnNode *)pFuncNode->pParameterList->pHead->pNode)->colId;
@ -2383,9 +2385,9 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
} }
// @note remember to mndReleaseSma(*ppOut) // @note remember to mndReleaseSma(*ppOut)
static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj** ppOut) { static int32_t mndGetDeepestBaseForTsma(SMnode *pMnode, SSmaObj *pSma, SSmaObj **ppOut) {
int32_t code = 0; int32_t code = 0;
SSmaObj* pRecursiveTsma = NULL; SSmaObj *pRecursiveTsma = NULL;
if (pSma->baseSmaName[0]) { if (pSma->baseSmaName[0]) {
pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName); pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
if (!pRecursiveTsma) { if (!pRecursiveTsma) {
@ -2393,7 +2395,7 @@ static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj**
return TSDB_CODE_MND_SMA_NOT_EXIST; return TSDB_CODE_MND_SMA_NOT_EXIST;
} }
while (pRecursiveTsma->baseSmaName[0]) { while (pRecursiveTsma->baseSmaName[0]) {
SSmaObj* pTmpSma = pRecursiveTsma; SSmaObj *pTmpSma = pRecursiveTsma;
pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName); pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
if (!pRecursiveTsma) { if (!pRecursiveTsma) {
mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name); mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
@ -2407,7 +2409,6 @@ static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj**
return code; return code;
} }
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) { static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
int32_t code = -1; int32_t code = -1;
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
@ -2450,17 +2451,17 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
TAOS_RETURN(code); TAOS_RETURN(code);
} }
typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param); typedef bool (*tsmaFilter)(const SSmaObj *pSma, void *param);
static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) { static int32_t mndGetSomeTsmas(SMnode *pMnode, STableTSMAInfoRsp *pRsp, tsmaFilter filtered, void *param, bool *exist) {
int32_t code = 0; int32_t code = 0;
SSmaObj * pSma = NULL; SSmaObj *pSma = NULL;
SSmaObj * pBaseTsma = NULL; SSmaObj *pBaseTsma = NULL;
SSdb * pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void * pIter = NULL; void *pIter = NULL;
SStreamObj * pStream = NULL; SStreamObj *pStream = NULL;
SStbObj * pStb = NULL; SStbObj *pStb = NULL;
bool shouldRetry = false; bool shouldRetry = false;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
@ -2479,7 +2480,7 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
} }
SName smaName; SName smaName;
char streamName[TSDB_TABLE_FNAME_LEN] = {0}; char streamName[TSDB_TABLE_FNAME_LEN] = {0};
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
@ -2541,8 +2542,8 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool tsmaTbFilter(const SSmaObj* pSma, void* param) { static bool tsmaTbFilter(const SSmaObj *pSma, void *param) {
const char* tbFName = param; const char *tbFName = param;
return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0; return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
} }
@ -2550,7 +2551,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist); return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
} }
static bool tsmaDbFilter(const SSmaObj* pSma, void* param) { static bool tsmaDbFilter(const SSmaObj *pSma, void *param) {
uint64_t *dbUid = param; uint64_t *dbUid = param;
return pSma->dbUid != *dbUid; return pSma->dbUid != *dbUid;
} }
@ -2564,7 +2565,7 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
int32_t code = -1; int32_t code = -1;
STableTSMAInfoReq tsmaReq = {0}; STableTSMAInfoReq tsmaReq = {0};
bool exist = false; bool exist = false;
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER); TAOS_CHECK_GOTO(tDeserializeTableTSMAInfoReq(pReq->pCont, pReq->contLen, &tsmaReq), NULL, _OVER);
@ -2635,12 +2636,12 @@ static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo *
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp, int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
int32_t *pRspLen) { int32_t *pRspLen) {
int32_t code = -1; int32_t code = -1;
STSMAHbRsp hbRsp = {0}; STSMAHbRsp hbRsp = {0};
int32_t rspLen = 0; int32_t rspLen = 0;
void * pRsp = NULL; void *pRsp = NULL;
char tsmaFName[TSDB_TABLE_FNAME_LEN] = {0}; char tsmaFName[TSDB_TABLE_FNAME_LEN] = {0};
STableTSMAInfo * pTsmaInfo = NULL; STableTSMAInfo *pTsmaInfo = NULL;
hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES); hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
if (!hbRsp.pTsmas) { if (!hbRsp.pTsmas) {
@ -2649,13 +2650,13 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
} }
for (int32_t i = 0; i < numOfTsmas; ++i) { for (int32_t i = 0; i < numOfTsmas; ++i) {
STSMAVersion* pTsmaVer = &pTsmaVersions[i]; STSMAVersion *pTsmaVer = &pTsmaVersions[i];
pTsmaVer->dbId = be64toh(pTsmaVer->dbId); pTsmaVer->dbId = be64toh(pTsmaVer->dbId);
pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId); pTsmaVer->tsmaId = be64toh(pTsmaVer->tsmaId);
pTsmaVer->version = ntohl(pTsmaVer->version); pTsmaVer->version = ntohl(pTsmaVer->version);
snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name); snprintf(tsmaFName, sizeof(tsmaFName), "%s.%s", pTsmaVer->dbFName, pTsmaVer->name);
SSmaObj* pSma = mndAcquireSma(pMnode, tsmaFName); SSmaObj *pSma = mndAcquireSma(pMnode, tsmaFName);
if (!pSma) { if (!pSma) {
code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo); code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
if (code) goto _OVER; if (code) goto _OVER;
@ -2683,7 +2684,7 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
continue; continue;
} }
SStbObj* pDestStb = mndAcquireStb(pMnode, pSma->dstTbName); SStbObj *pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
if (!pDestStb) { if (!pDestStb) {
mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName); mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo); code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
@ -2698,7 +2699,7 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
} }
// dump smaObj into rsp // dump smaObj into rsp
STableTSMAInfo * pInfo = NULL; STableTSMAInfo *pInfo = NULL;
pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pInfo) { if (!pInfo) {
code = terrno; code = terrno;
@ -2707,7 +2708,7 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
goto _OVER; goto _OVER;
} }
SSmaObj* pBaseSma = NULL; SSmaObj *pBaseSma = NULL;
code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma); code = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma); if (code == 0) code = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);

View File

@ -1707,7 +1707,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass); taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass);
} else { } else {
// mInfo("pCreate->pass:%s", pCreate->pass) // mInfo("pCreate->pass:%s", pCreate->pass)
strncpy(userObj.pass, pCreate->pass, TSDB_PASSWORD_LEN); tstrncpy(userObj.pass, pCreate->pass, TSDB_PASSWORD_LEN);
} }
tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN); tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN);

View File

@ -370,7 +370,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
opts.source = pRaw->pData; opts.source = pRaw->pData;
opts.result = plantContent; opts.result = plantContent;
opts.unitLen = 16; opts.unitLen = 16;
strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
count = CBC_Decrypt(&opts); count = CBC_Decrypt(&opts);
@ -510,7 +510,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
opts.source = pRaw->pData; opts.source = pRaw->pData;
opts.result = newData; opts.result = newData;
opts.unitLen = 16; opts.unitLen = 16;
strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
int32_t count = CBC_Encrypt(&opts); int32_t count = CBC_Encrypt(&opts);

View File

@ -396,7 +396,7 @@ static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname)
STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry)); STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
if (entry == NULL) return terrno; if (entry == NULL) return terrno;
strncpy(entry->fname, fname, TSDB_FILENAME_LEN); tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket; uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;

View File

@ -175,7 +175,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *e
opts.result = PacketData; opts.result = PacketData;
opts.unitLen = 128; opts.unitLen = 128;
// strncpy(opts.key, tsEncryptKey, 16); // strncpy(opts.key, tsEncryptKey, 16);
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN); tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
NewLen = CBC_Encrypt(&opts); NewLen = CBC_Encrypt(&opts);
@ -249,7 +249,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgor
opts.result = PacketData; opts.result = PacketData;
opts.unitLen = 128; opts.unitLen = 128;
// strncpy(opts.key, tsEncryptKey, 16); // strncpy(opts.key, tsEncryptKey, 16);
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN); tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
NewLen = CBC_Decrypt(&opts); NewLen = CBC_Decrypt(&opts);

View File

@ -265,7 +265,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tsEncryptKey[0] == 0) { if (tsEncryptKey[0] == 0) {
return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
} else { } else {
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
} }
#endif #endif
@ -292,7 +292,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tsEncryptKey[0] == 0) { if (tsEncryptKey[0] == 0) {
return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
} else { } else {
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
} }
#endif #endif
@ -303,7 +303,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tsEncryptKey[0] == 0) { if (tsEncryptKey[0] == 0) {
return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY; return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
} else { } else {
strncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN); tstrncpy(pCfg->tdbEncryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
} }
} }
#endif #endif

View File

@ -1024,7 +1024,7 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void
expiredTb.suid = *uid; expiredTb.suid = *uid;
terrno = metaReaderGetTableEntryByUid(&mr, *uid); terrno = metaReaderGetTableEntryByUid(&mr, *uid);
if (terrno < 0) goto _end; if (terrno < 0) goto _end;
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN); tstrncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
void *p = taosArrayPush(pNames, buf); void *p = taosArrayPush(pNames, buf);
if (p == NULL) { if (p == NULL) {
goto _end; goto _end;

View File

@ -691,7 +691,7 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) {
memset(outToken, 0, TSDB_ARB_TOKEN_SIZE); memset(outToken, 0, TSDB_ARB_TOKEN_SIZE);
(void)taosThreadMutexLock(&pSyncNode->arbTokenMutex); (void)taosThreadMutexLock(&pSyncNode->arbTokenMutex);
strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE); tstrncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE);
(void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); (void)taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
@ -2901,7 +2901,7 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
n += tsnprintf(buf + n, len - n, "%s", "{"); n += tsnprintf(buf + n, len - n, "%s", "{");
for (int i = 0; i < ths->peersEpset->numOfEps; i++) { for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port, n += tsnprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
(i + 1 < ths->peersEpset->numOfEps ? ", " : "")); (i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
} }
n += tsnprintf(buf + n, len - n, "%s", "}"); n += tsnprintf(buf + n, len - n, "%s", "}");

View File

@ -52,7 +52,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
pDb->encryptAlgorithm = encryptAlgorithm; pDb->encryptAlgorithm = encryptAlgorithm;
if (encryptKey != NULL) { if (encryptKey != NULL) {
strncpy(pDb->encryptKey, encryptKey, ENCRYPT_KEY_LEN); tstrncpy(pDb->encryptKey, encryptKey, ENCRYPT_KEY_LEN);
} }
ret = tdbPCacheOpen(szPage, pages, &(pDb->pCache)); ret = tdbPCacheOpen(szPage, pages, &(pDb->pCache));

View File

@ -459,7 +459,7 @@ static char *tdbEncryptPage(SPager *pPager, char *pPageData, int32_t pageSize, c
opts.source = pPageData + count; opts.source = pPageData + count;
opts.result = packetData; opts.result = packetData;
opts.unitLen = 128; opts.unitLen = 128;
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN); tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
int32_t newLen = CBC_Encrypt(&opts); int32_t newLen = CBC_Encrypt(&opts);
@ -927,7 +927,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
opts.source = pPage->pData + count; opts.source = pPage->pData + count;
opts.result = packetData; opts.result = packetData;
opts.unitLen = 128; opts.unitLen = 128;
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN); tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
int newLen = CBC_Decrypt(&opts); int newLen = CBC_Decrypt(&opts);

View File

@ -200,8 +200,8 @@ bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) {
if (pFile1->did.level != pFile2->did.level) return false; if (pFile1->did.level != pFile2->did.level) return false;
if (pFile1->did.id != pFile2->did.id) return false; if (pFile1->did.id != pFile2->did.id) return false;
char nameBuf1[TMPNAME_LEN], nameBuf2[TMPNAME_LEN]; char nameBuf1[TMPNAME_LEN], nameBuf2[TMPNAME_LEN];
(void)strncpy(nameBuf1, pFile1->rname, TMPNAME_LEN); tstrncpy(nameBuf1, pFile1->rname, TMPNAME_LEN);
(void)strncpy(nameBuf2, pFile2->rname, TMPNAME_LEN); tstrncpy(nameBuf2, pFile2->rname, TMPNAME_LEN);
nameBuf1[TMPNAME_LEN - 1] = 0; nameBuf1[TMPNAME_LEN - 1] = 0;
nameBuf2[TMPNAME_LEN - 1] = 0; nameBuf2[TMPNAME_LEN - 1] = 0;
TAOS_UNUSED(taosRealPath(nameBuf1, NULL, TMPNAME_LEN)); TAOS_UNUSED(taosRealPath(nameBuf1, NULL, TMPNAME_LEN));
@ -573,7 +573,7 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
TAOS_RETURN(TSDB_CODE_FS_INVLD_CFG); TAOS_RETURN(TSDB_CODE_FS_INVLD_CFG);
} }
strncpy(pCfg->dir, dirName, TSDB_FILENAME_LEN); tstrncpy(pCfg->dir, dirName, TSDB_FILENAME_LEN);
TAOS_RETURN(0); TAOS_RETURN(0);
} }

View File

@ -638,7 +638,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
opts.source = newBody; opts.source = newBody;
opts.result = newBodyEncrypted; opts.result = newBodyEncrypted;
opts.unitLen = 16; opts.unitLen = 16;
TAOS_UNUSED(strncpy((char *)opts.key, pWal->cfg.encryptKey, ENCRYPT_KEY_LEN)); tstrncpy((char *)opts.key, pWal->cfg.encryptKey, ENCRYPT_KEY_LEN);
int32_t count = CBC_Encrypt(&opts); int32_t count = CBC_Encrypt(&opts);

View File

@ -330,7 +330,7 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese
if (pDebugFlagItem == NULL) return -1; if (pDebugFlagItem == NULL) return -1;
if (pDebugFlagItem->array != NULL) { if (pDebugFlagItem->array != NULL) {
SLogVar logVar = {0}; SLogVar logVar = {0};
(void)strncpy(logVar.name, name, TSDB_LOG_VAR_LEN - 1); tstrncpy(logVar.name, name, TSDB_LOG_VAR_LEN);
if (NULL == taosArrayPush(pDebugFlagItem->array, &logVar)) { if (NULL == taosArrayPush(pDebugFlagItem->array, &logVar)) {
TAOS_RETURN(terrno); TAOS_RETURN(terrno);
} }
@ -942,7 +942,7 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
name = value = value2 = value3 = value4 = NULL; name = value = value2 = value3 = value4 = NULL;
olen = vlen = vlen2 = vlen3 = vlen4 = 0; olen = vlen = vlen2 = vlen3 = vlen4 = 0;
strncpy(line, *pEnv, sizeof(line) - 1); tstrncpy(line, *pEnv, sizeof(line));
pEnv++; pEnv++;
if (taosEnvToCfg(line, line) < 0) { if (taosEnvToCfg(line, line) < 0) {
uTrace("failed to convert env to cfg:%s", line); uTrace("failed to convert env to cfg:%s", line);
@ -987,7 +987,7 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
int32_t index = 0; int32_t index = 0;
if (envCmd == NULL) TAOS_RETURN(TSDB_CODE_SUCCESS); if (envCmd == NULL) TAOS_RETURN(TSDB_CODE_SUCCESS);
while (envCmd[index] != NULL) { while (envCmd[index] != NULL) {
strncpy(buf, envCmd[index], sizeof(buf) - 1); tstrncpy(buf, envCmd[index], sizeof(buf));
buf[sizeof(buf) - 1] = 0; buf[sizeof(buf) - 1] = 0;
if (taosEnvToCfg(buf, buf) < 0) { if (taosEnvToCfg(buf, buf) < 0) {
uTrace("failed to convert env to cfg:%s", buf); uTrace("failed to convert env to cfg:%s", buf);
@ -1438,7 +1438,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl
char **pEnv = environ; char **pEnv = environ;
line[1023] = 0; line[1023] = 0;
while (*pEnv != NULL) { while (*pEnv != NULL) {
strncpy(line, *pEnv, sizeof(line) - 1); tstrncpy(line, *pEnv, sizeof(line));
pEnv++; pEnv++;
if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) { if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) {
char *p = strchr(line, '='); char *p = strchr(line, '=');