Merge pull request #29038 from taosdata/enh/useSafySysFunc
Replace unsafe memory functions with safe versions
This commit is contained in:
commit
a1d196688e
|
@ -74,10 +74,15 @@ int64_t tsnprintf(char *dst, int64_t size, const char *format, ...);
|
|||
char *tstrdup(const char *src);
|
||||
int32_t taosUcs4len(TdUcs4 *ucs4);
|
||||
int32_t taosStr2int64(const char *str, int64_t *val);
|
||||
int32_t taosStr2int16(const char *str, int16_t *val);
|
||||
int32_t taosStr2int32(const char *str, int32_t *val);
|
||||
int32_t taosStr2int16(const char *str, int16_t *val);
|
||||
int32_t taosStr2int8(const char *str, int8_t *val);
|
||||
|
||||
int32_t taosStr2Uint64(const char *str, uint64_t *val);
|
||||
int32_t taosStr2Uint32(const char *str, uint32_t *val);
|
||||
int32_t taosStr2Uint16(const char *str, uint16_t *val);
|
||||
int32_t taosStr2Uint8(const char *str, uint8_t *val);
|
||||
|
||||
int32_t taosConvInit(void);
|
||||
void taosConvDestroy();
|
||||
iconv_t taosAcquireConv(int32_t *idx, ConvType type);
|
||||
|
|
|
@ -300,7 +300,13 @@ void* doConsumeData(void* param) {
|
|||
int main(int argc, char** argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
if (argc > 1) {
|
||||
numOfThreads = atoi(argv[1]);
|
||||
//numOfThreads = atoi(argv[1]);
|
||||
int32_t code = taosStr2int32(argv[1], &numOfThreads);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
numOfThreads = TMAX(numOfThreads, 1);
|
||||
|
|
|
@ -1916,6 +1916,7 @@ void s3EvictCache(const char *path, long object_size) {
|
|||
}
|
||||
|
||||
long s3Size(const char *object_name) {
|
||||
int32_t code = 0;
|
||||
long size = 0;
|
||||
|
||||
cos_pool_t *p = NULL;
|
||||
|
@ -1941,7 +1942,10 @@ long s3Size(const char *object_name) {
|
|||
if (cos_status_is_ok(s)) {
|
||||
char *content_length_str = (char *)apr_table_get(resp_headers, COS_CONTENT_LENGTH);
|
||||
if (content_length_str != NULL) {
|
||||
size = atol(content_length_str);
|
||||
code = taosStr2Int64(content_length_str, &size);
|
||||
if (code != 0) {
|
||||
cos_warn_log("parse content length failed since %s\n", tstrerror(code));
|
||||
}
|
||||
}
|
||||
cos_warn_log("head object succeeded: %ld\n", size);
|
||||
} else {
|
||||
|
|
|
@ -251,7 +251,7 @@ bool checkColumnEncode(char encode[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
|||
}
|
||||
bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(encode)) {
|
||||
strncpy(encode, getDefaultEncodeStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
tstrncpy(encode, getDefaultEncodeStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
return true;
|
||||
}
|
||||
return checkColumnEncode(encode) && validColEncode(type, columnEncodeVal(encode));
|
||||
|
@ -268,7 +268,7 @@ bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
|||
}
|
||||
bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(compress)) {
|
||||
strncpy(compress, getDefaultCompressStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
tstrncpy(compress, getDefaultCompressStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -290,7 +290,7 @@ bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
|||
}
|
||||
bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(level)) {
|
||||
strncpy(level, getDefaultLevelStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
tstrncpy(level, getDefaultLevelStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
return true;
|
||||
}
|
||||
return checkColumnLevel(level) && validColCompressLevel(type, columnLevelVal(level));
|
||||
|
|
|
@ -22,12 +22,15 @@
|
|||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
|
||||
pEp->port = 0;
|
||||
memset(pEp->fqdn, 0, TSDB_FQDN_LEN);
|
||||
strncpy(pEp->fqdn, ep, TSDB_FQDN_LEN - 1);
|
||||
tstrncpy(pEp->fqdn, ep, TSDB_FQDN_LEN);
|
||||
|
||||
char* temp = strchr(pEp->fqdn, ':');
|
||||
if (temp) {
|
||||
*temp = 0;
|
||||
pEp->port = atoi(temp + 1);
|
||||
pEp->port = taosStr2UInt16(temp + 1, NULL, 10);
|
||||
if (pEp->port < 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
}
|
||||
|
||||
if (pEp->port == 0) {
|
||||
|
|
|
@ -251,7 +251,7 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
|||
printf("ERROR: Encrypt key overflow, it should be at most %d characters\n", ENCRYPT_KEY_LEN);
|
||||
return TSDB_CODE_INVALID_CFG;
|
||||
}
|
||||
tstrncpy(global.encryptKey, argv[i], ENCRYPT_KEY_LEN);
|
||||
tstrncpy(global.encryptKey, argv[i], ENCRYPT_KEY_LEN + 1);
|
||||
} else {
|
||||
printf("'-y' requires a parameter\n");
|
||||
return TSDB_CODE_INVALID_CFG;
|
||||
|
|
|
@ -114,7 +114,7 @@ static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
|
|||
.pCont = pHead,
|
||||
.contLen = contLen,
|
||||
.msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
|
||||
.info.ahandle = (void *)0x9527,
|
||||
.info.ahandle = 0,
|
||||
.info.refId = 0,
|
||||
.info.noResp = 0,
|
||||
.info.handle = 0,
|
||||
|
|
|
@ -186,7 +186,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
#if defined(TD_ENTERPRISE)
|
||||
pCfg->tsdbCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
|
||||
if (pCfg->tsdbCfg.encryptAlgorithm == DND_CA_SM4) {
|
||||
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
}
|
||||
#else
|
||||
pCfg->tsdbCfg.encryptAlgorithm = 0;
|
||||
|
@ -202,7 +202,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
#if defined(TD_ENTERPRISE)
|
||||
pCfg->walCfg.encryptAlgorithm = pCreate->encryptAlgorithm;
|
||||
if (pCfg->walCfg.encryptAlgorithm == DND_CA_SM4) {
|
||||
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
}
|
||||
#else
|
||||
pCfg->walCfg.encryptAlgorithm = 0;
|
||||
|
@ -898,7 +898,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
size_t size = taosArrayGetSize(arbHbReq.hbMembers);
|
||||
|
||||
arbHbRsp.dnodeId = pMgmt->pData->dnodeId;
|
||||
strncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
|
||||
tstrncpy(arbHbRsp.arbToken, arbHbReq.arbToken, TSDB_ARB_TOKEN_SIZE);
|
||||
arbHbRsp.hbMembers = taosArrayInit(size, sizeof(SVArbHbRspMember));
|
||||
if (arbHbRsp.hbMembers == NULL) {
|
||||
goto _OVER;
|
||||
|
|
|
@ -179,7 +179,7 @@ int32_t dmInitVars(SDnode *pDnode) {
|
|||
|
||||
//code = taosGetCryptKey(tsAuthCode, pData->machineId, tsCryptKey);
|
||||
code = 0;
|
||||
strncpy(tsEncryptKey, tsAuthCode, 16);
|
||||
tstrncpy(tsEncryptKey, tsAuthCode, 16);
|
||||
|
||||
if (code != 0) {
|
||||
if(code == -1){
|
||||
|
@ -220,6 +220,7 @@ int32_t dmInitVars(SDnode *pDnode) {
|
|||
}
|
||||
|
||||
extern SMonVloadInfo tsVinfo;
|
||||
|
||||
void dmClearVars(SDnode *pDnode) {
|
||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
|
|
|
@ -230,7 +230,7 @@ static int32_t dmWriteCheckCodeFile(char *file, char *realfile, char *key, bool
|
|||
}
|
||||
|
||||
SCryptOpts opts;
|
||||
strncpy(opts.key, key, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, key, ENCRYPT_KEY_LEN + 1);
|
||||
opts.len = len;
|
||||
opts.source = DM_KEY_INDICATOR;
|
||||
opts.result = result;
|
||||
|
@ -349,7 +349,7 @@ static int32_t dmCompareEncryptKey(char *file, char *key, bool toLogFile) {
|
|||
}
|
||||
|
||||
SCryptOpts opts = {0};
|
||||
strncpy(opts.key, key, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, key, ENCRYPT_KEY_LEN + 1);
|
||||
opts.len = len;
|
||||
opts.source = content;
|
||||
opts.result = result;
|
||||
|
@ -551,7 +551,7 @@ int32_t dmGetEncryptKey() {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
strncpy(tsEncryptKey, encryptKey, ENCRYPT_KEY_LEN);
|
||||
strncpy(tsEncryptKey, encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
taosMemoryFreeClear(encryptKey);
|
||||
tsEncryptionKeyChksum = taosCalcChecksum(0, tsEncryptKey, strlen(tsEncryptKey));
|
||||
tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
|
||||
|
|
|
@ -1315,7 +1315,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
continue;
|
||||
}
|
||||
char dbNameInGroup[TSDB_DB_FNAME_LEN];
|
||||
strncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(dbNameInGroup, pVgObj->dbName, TSDB_DB_FNAME_LEN);
|
||||
sdbRelease(pSdb, pVgObj);
|
||||
|
||||
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
|
|
@ -244,7 +244,7 @@ int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
(void)strncpy(dbname, pCompact->dbname, len);
|
||||
tstrncpy(dbname, pCompact->dbname, len);
|
||||
mndReleaseCompact(pMnode, pCompact);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
@ -321,7 +321,7 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock,
|
|||
TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
|
||||
(void)tNameGetDbName(&name, varDataVal(tmpBuf));
|
||||
} else {
|
||||
(void)strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
|
||||
tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
|
||||
}
|
||||
varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
|
||||
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
|
||||
|
@ -517,10 +517,13 @@ int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
|
|||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
char obj[TSDB_INT32_ID_LEN] = {0};
|
||||
(void)sprintf(obj, "%d", pCompact->compactId);
|
||||
|
||||
auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
|
||||
|
||||
int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
|
||||
if ((uint32_t)nBytes < sizeof(obj)) {
|
||||
auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
|
||||
killCompactReq.sqlLen);
|
||||
} else {
|
||||
mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||
}
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "mndDb.h"
|
||||
#include "audit.h"
|
||||
#include "command.h"
|
||||
#include "mndArbGroup.h"
|
||||
#include "mndCluster.h"
|
||||
#include "mndDnode.h"
|
||||
|
@ -34,7 +35,6 @@
|
|||
#include "systable.h"
|
||||
#include "thttp.h"
|
||||
#include "tjson.h"
|
||||
#include "command.h"
|
||||
|
||||
#define DB_VER_NUMBER 1
|
||||
#define DB_RESERVE_SIZE 27
|
||||
|
@ -416,7 +416,12 @@ static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
|
|||
return TSDB_CODE_MND_INVALID_DB;
|
||||
}
|
||||
|
||||
int32_t acctId = atoi(dbName);
|
||||
int32_t acctId;
|
||||
int32_t code = taosStr2int32(dbName, &acctId);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (acctId != pUser->acctId) {
|
||||
return TSDB_CODE_MND_INVALID_DB_ACCT;
|
||||
}
|
||||
|
@ -2386,7 +2391,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
|
|||
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)strictVstr, false), &lino, _OVER);
|
||||
|
||||
char durationVstr[128] = {0};
|
||||
int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE, pDb->cfg.daysPerFile);
|
||||
int32_t len = formatDurationOrKeep(&durationVstr[VARSTR_HEADER_SIZE], sizeof(durationVstr) - VARSTR_HEADER_SIZE,
|
||||
pDb->cfg.daysPerFile);
|
||||
|
||||
varDataSetLen(durationVstr, len);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
|
|
@ -487,7 +487,8 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
|
|||
CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
|
||||
|
||||
if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
|
||||
mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
|
||||
mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id,
|
||||
pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
|
||||
terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS;
|
||||
return DND_REASON_STATUS_MONITOR_NOT_MATCH;
|
||||
}
|
||||
|
@ -1449,7 +1450,7 @@ static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
|
|||
}
|
||||
|
||||
size_t optLen = p - pMCfgReq->config;
|
||||
(void)strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
|
||||
tstrncpy(pDCfgReq->config, pMCfgReq->config, optLen + 1);
|
||||
pDCfgReq->config[optLen] = 0;
|
||||
|
||||
if (' ' == pMCfgReq->config[optLen]) {
|
||||
|
@ -1881,11 +1882,19 @@ static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32
|
|||
if (' ' == pMCfgReq->config[optLen]) {
|
||||
// 'key value'
|
||||
if (strlen(pMCfgReq->value) != 0) goto _err;
|
||||
*pOutValue = atoi(pMCfgReq->config + optLen + 1);
|
||||
code = taosStr2int32(pMCfgReq->config + optLen + 1, pOutValue);
|
||||
if (code != 0) {
|
||||
mError("dnode:%d, failed to get cfg since %s", pMCfgReq->dnodeId, tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
// 'key' 'value'
|
||||
if (strlen(pMCfgReq->value) == 0) goto _err;
|
||||
*pOutValue = atoi(pMCfgReq->value);
|
||||
code = taosStr2int32(pMCfgReq->value, pOutValue);
|
||||
if (code != 0) {
|
||||
mError("dnode:%d, failed to get cfg since %s", pMCfgReq->dnodeId, tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
TAOS_RETURN(code);
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndAcct.h"
|
||||
#include "mndArbGroup.h"
|
||||
#include "mndAnode.h"
|
||||
#include "mndArbGroup.h"
|
||||
#include "mndCluster.h"
|
||||
#include "mndCompact.h"
|
||||
#include "mndCompactDetail.h"
|
||||
|
@ -254,7 +254,7 @@ static void mndIncreaseUpTime(SMnode *pMnode) {
|
|||
.pCont = pReq,
|
||||
.contLen = contLen,
|
||||
.info.notFreeAhandle = 1,
|
||||
.info.ahandle = (void *)0x9527};
|
||||
.info.ahandle = 0};
|
||||
// TODO check return value
|
||||
if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
|
||||
mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
|
||||
|
@ -530,7 +530,7 @@ static int32_t mndInitWal(SMnode *pMnode) {
|
|||
code = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
TAOS_RETURN(code);
|
||||
} else {
|
||||
(void)strncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(cfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -14,10 +14,10 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndMnode.h"
|
||||
#include "audit.h"
|
||||
#include "mndCluster.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSync.h"
|
||||
|
@ -723,9 +723,12 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
|||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
char obj[40] = {0};
|
||||
sprintf(obj, "%d", createReq.dnodeId);
|
||||
|
||||
int32_t bytes = snprintf(obj, sizeof(obj), "%d", createReq.dnodeId);
|
||||
if ((uint32_t)bytes < sizeof(obj)) {
|
||||
auditRecord(pReq, pMnode->clusterId, "createMnode", "", obj, createReq.sql, createReq.sqlLen);
|
||||
} else {
|
||||
mError("mnode:%d, failed to audit create req since %s", createReq.dnodeId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
|
|
|
@ -14,12 +14,12 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndProfile.h"
|
||||
#include "audit.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndProfile.h"
|
||||
#include "mndQnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSma.h"
|
||||
|
@ -337,9 +337,12 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
code = 0;
|
||||
|
||||
char detail[1000] = {0};
|
||||
(void)sprintf(detail, "app:%s", connReq.app);
|
||||
|
||||
int32_t nBytes = snprintf(detail, sizeof(detail), "app:%s", connReq.app);
|
||||
if ((uint32_t)nBytes < sizeof(detail)) {
|
||||
auditRecord(pReq, pMnode->clusterId, "login", "", "", detail, strlen(detail));
|
||||
} else {
|
||||
mError("failed to audit logic since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||
}
|
||||
|
||||
_OVER:
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndSma.h"
|
||||
#include "functionMgt.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndIndex.h"
|
||||
|
@ -31,7 +32,6 @@
|
|||
#include "mndVgroup.h"
|
||||
#include "parser.h"
|
||||
#include "tname.h"
|
||||
#include "functionMgt.h"
|
||||
|
||||
#define TSDB_SMA_VER_NUMBER 1
|
||||
#define TSDB_SMA_RESERVE_SIZE 64
|
||||
|
@ -298,10 +298,7 @@ void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
|
|||
sdbRelease(pSdb, pSma);
|
||||
}
|
||||
|
||||
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) {
|
||||
|
||||
return mndAcquireDb(pMnode, db);
|
||||
}
|
||||
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) { return mndAcquireDb(pMnode, db); }
|
||||
|
||||
static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
|
||||
SEncoder encoder = {0};
|
||||
|
@ -670,7 +667,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
// check the maxDelay
|
||||
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||
int64_t msInterval = -1;
|
||||
int32_t code = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
|
||||
int32_t code =
|
||||
convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND, &msInterval);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
mError("sma:%s, failed to create since convert time failed: %s", smaObj.name, tstrerror(code));
|
||||
return code;
|
||||
|
@ -1023,7 +1021,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
|
||||
code = mndAcquireStream(pMnode, streamName, &pStream);
|
||||
if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
|
@ -1691,8 +1688,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
STransAction createStreamUndoAction = {0};
|
||||
STransAction dropStbUndoAction = {0};
|
||||
SMDropStbReq dropStbReq = {0};
|
||||
STrans *pTrans =
|
||||
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
|
||||
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TSMA, pCxt->pRpcReq, "create-tsma");
|
||||
if (!pTrans) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
|
@ -1713,7 +1709,9 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont, createStreamRedoAction.contLen, pCxt->pCreateStreamReq)) {
|
||||
if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont,
|
||||
createStreamRedoAction.contLen,
|
||||
pCxt->pCreateStreamReq)) {
|
||||
mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
|
@ -1728,7 +1726,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
if (createStreamUndoAction.contLen != tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
|
||||
if (createStreamUndoAction.contLen !=
|
||||
tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
|
||||
mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
|
@ -1746,7 +1745,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
|
||||
if (dropStbUndoAction.contLen !=
|
||||
tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
|
||||
mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
|
@ -2021,7 +2021,8 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
|
|||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
if (dropStbRedoAction.contLen != tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
|
||||
if (dropStbRedoAction.contLen !=
|
||||
tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
|
||||
mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
|
@ -2297,7 +2298,8 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
|
|||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo, const SSmaObj* pBaseTsma) {
|
||||
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj *pSma, const SStbObj *pDestStb, STableTSMAInfo *pInfo,
|
||||
const SSmaObj *pBaseTsma) {
|
||||
int32_t code = 0;
|
||||
pInfo->interval = pSma->interval;
|
||||
pInfo->unit = pSma->intervalUnit;
|
||||
|
@ -2407,7 +2409,6 @@ static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj**
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
|
||||
int32_t code = -1;
|
||||
SSmaObj *pSma = NULL;
|
||||
|
|
|
@ -1706,8 +1706,8 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
|
|||
if (pCreate->isImport != 1) {
|
||||
taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass);
|
||||
} else {
|
||||
// mInfo("pCreate->pass:%s", pCreate->pass)
|
||||
strncpy(userObj.pass, pCreate->pass, TSDB_PASSWORD_LEN);
|
||||
// mInfo("pCreate->pass:%s", pCreate->eass)
|
||||
memcpy(userObj.pass, pCreate->pass, TSDB_PASSWORD_LEN);
|
||||
}
|
||||
tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN);
|
||||
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
||||
|
|
|
@ -370,7 +370,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
opts.source = pRaw->pData;
|
||||
opts.result = plantContent;
|
||||
opts.unitLen = 16;
|
||||
strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
count = CBC_Decrypt(&opts);
|
||||
|
||||
|
@ -510,7 +510,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
|
|||
opts.source = pRaw->pData;
|
||||
opts.result = newData;
|
||||
opts.unitLen = 16;
|
||||
strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
int32_t count = CBC_Encrypt(&opts);
|
||||
|
||||
|
|
|
@ -396,7 +396,7 @@ static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname)
|
|||
STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
|
||||
if (entry == NULL) return terrno;
|
||||
|
||||
strncpy(entry->fname, fname, TSDB_FILENAME_LEN);
|
||||
tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
|
||||
|
||||
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
|
||||
|
||||
|
|
|
@ -175,7 +175,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *e
|
|||
opts.result = PacketData;
|
||||
opts.unitLen = 128;
|
||||
// strncpy(opts.key, tsEncryptKey, 16);
|
||||
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
NewLen = CBC_Encrypt(&opts);
|
||||
|
||||
|
@ -249,7 +249,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgor
|
|||
opts.result = PacketData;
|
||||
opts.unitLen = 128;
|
||||
// strncpy(opts.key, tsEncryptKey, 16);
|
||||
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
NewLen = CBC_Decrypt(&opts);
|
||||
|
||||
|
|
|
@ -265,7 +265,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (tsEncryptKey[0] == 0) {
|
||||
return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
} else {
|
||||
strncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(pCfg->tsdbCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -292,7 +292,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (tsEncryptKey[0] == 0) {
|
||||
return terrno = TSDB_CODE_DNODE_INVALID_ENCRYPTKEY;
|
||||
} else {
|
||||
strncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(pCfg->walCfg.encryptKey, tsEncryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -190,7 +190,14 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
|||
char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
|
||||
if (tsdbFilePrefixPos == NULL) continue;
|
||||
|
||||
int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + prefixLen);
|
||||
int32_t tsdbFileVgId = 0; // atoi(tsdbFilePrefixPos + prefixLen);
|
||||
ret = taosStr2int32(tsdbFilePrefixPos + prefixLen, &tsdbFileVgId);
|
||||
if (ret != 0) {
|
||||
vError("vgId:%d, failed to get tsdb file vgid since %s", dstVgId, tstrerror(ret));
|
||||
tfsClosedir(tsdbDir);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (tsdbFileVgId == srcVgId) {
|
||||
char *tsdbFileSurfixPos = tsdbFilePrefixPos + prefixLen + vnodeVgroupIdLen(srcVgId);
|
||||
|
||||
|
|
|
@ -1024,7 +1024,7 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void
|
|||
expiredTb.suid = *uid;
|
||||
terrno = metaReaderGetTableEntryByUid(&mr, *uid);
|
||||
if (terrno < 0) goto _end;
|
||||
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
|
||||
tstrncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
|
||||
void *p = taosArrayPush(pNames, buf);
|
||||
if (p == NULL) {
|
||||
goto _end;
|
||||
|
|
|
@ -41,10 +41,11 @@ void taos_monitor_split_str_metric(char** arr, taos_metric_t* metric, const char
|
|||
memset(name, 0, size + 1);
|
||||
memcpy(name, metric->name, size);
|
||||
|
||||
char* s = strtok(name, del);
|
||||
char* saveptr;
|
||||
char* s = strtok_r(name, del, &saveptr);
|
||||
while (s != NULL) {
|
||||
*arr++ = s;
|
||||
s = strtok(NULL, del);
|
||||
s = strtok_r(NULL, del, &saveptr);
|
||||
}
|
||||
|
||||
*buf = name;
|
||||
|
|
|
@ -2872,7 +2872,9 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
|
|||
*slash = '\0';
|
||||
struct in_addr addr;
|
||||
if (uv_inet_pton(AF_INET, ipCopy, &addr) == 0) {
|
||||
int prefix = atoi(slash + 1);
|
||||
int32_t prefix = 0;
|
||||
code = taosStr2int32(slash + 1, &prefix);
|
||||
if (code == 0) {
|
||||
if (prefix < 0 || prefix > 32) {
|
||||
code = TSDB_CODE_PAR_INVALID_IP_RANGE;
|
||||
} else {
|
||||
|
@ -2880,6 +2882,7 @@ static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange
|
|||
pIpRange->mask = prefix;
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
code = TSDB_CODE_PAR_INVALID_IP_RANGE;
|
||||
}
|
||||
|
|
|
@ -166,10 +166,12 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
|
|||
int32_t streamStateClear_rocksdb(SStreamState* pState);
|
||||
void streamStateCurNext_rocksdb(SStreamStateCur* pCur);
|
||||
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey,
|
||||
const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
|
||||
int32_t* pVLen);
|
||||
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
|
@ -218,14 +220,13 @@ int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* p
|
|||
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
|
||||
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
|
||||
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur);
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal,
|
||||
int32_t* pVLen);
|
||||
|
||||
// parname cf
|
||||
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
|
||||
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
|
||||
|
||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
|
||||
|
||||
// default cf
|
||||
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
|
||||
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
|
||||
|
|
|
@ -153,7 +153,6 @@ void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
|
|||
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId);
|
||||
int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId);
|
||||
|
||||
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
|
||||
int32_t copyFiles(const char* src, const char* dst);
|
||||
uint32_t nextPow2(uint32_t x);
|
||||
|
||||
|
@ -1156,13 +1155,17 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
|||
|
||||
taosArrayDestroy(pBackend->chkpSaved);
|
||||
pBackend->chkpSaved = chkpDup;
|
||||
chkpDup = NULL;
|
||||
|
||||
TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
|
||||
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
|
||||
char tbuf[256] = {0};
|
||||
sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
|
||||
if (snprintf(tbuf, sizeof(tbuf), "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id) >= sizeof(tbuf)) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||
}
|
||||
|
||||
stInfo("backend remove obsolete checkpoint: %s", tbuf);
|
||||
if (taosIsDir(tbuf)) {
|
||||
|
@ -1187,12 +1190,17 @@ int chkpIdComp(const void* a, const void* b) {
|
|||
}
|
||||
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||
int32_t code = 0;
|
||||
char* pChkpDir = taosMemoryCalloc(1, 256);
|
||||
int32_t nBytes = 0;
|
||||
int32_t cap = 256;
|
||||
char* pChkpDir = taosMemoryCalloc(1, cap);
|
||||
if (pChkpDir == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||
nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||
if (nBytes >= cap) {
|
||||
return TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
if (!taosIsDir(pChkpDir)) {
|
||||
taosMemoryFree(pChkpDir);
|
||||
return 0;
|
||||
|
@ -1413,12 +1421,18 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
|
|||
if (pSnapInfo == NULL) return 0;
|
||||
SStreamMeta* pMeta = arg;
|
||||
int32_t code = 0;
|
||||
int32_t cap = 256;
|
||||
int32_t nBytes = 0;
|
||||
streamMutexLock(&pMeta->backendMutex);
|
||||
|
||||
char buf[128] = {0};
|
||||
char buf[256] = {0};
|
||||
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
|
||||
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
|
||||
nBytes = snprintf(buf, cap, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
|
||||
if (nBytes <= 0 || nBytes >= cap) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
break;
|
||||
}
|
||||
STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
|
||||
if (pTaskDb == NULL || *pTaskDb == NULL) {
|
||||
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
|
||||
|
@ -1430,7 +1444,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
|
|||
taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
|
||||
}
|
||||
streamMutexUnlock(&pMeta->backendMutex);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
|
||||
|
@ -1685,9 +1699,6 @@ void streamBackendDelCompare(void* backend, void* arg) {
|
|||
taosMemoryFree(node);
|
||||
}
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
|
||||
#endif
|
||||
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||
if (inst->pHandle) {
|
||||
|
@ -1712,7 +1723,7 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
|||
}
|
||||
|
||||
// |key|-----value------|
|
||||
// |key|ttl|len|userData|
|
||||
// |key|ttl|len|userData
|
||||
|
||||
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
||||
int len = aLen < bLen ? aLen : bLen;
|
||||
|
@ -2447,43 +2458,50 @@ void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
|
|||
|
||||
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
|
||||
int32_t code = 0;
|
||||
char* statePath = taosMemoryCalloc(1, strlen(path) + 128);
|
||||
int32_t cap = strlen(path) + 128, nBytes = 0;
|
||||
char* statePath = NULL;
|
||||
char* dbPath = NULL;
|
||||
|
||||
statePath = taosMemoryCalloc(1, cap);
|
||||
if (statePath == NULL) {
|
||||
return terrno;
|
||||
TAOS_CHECK_GOTO(terrno, NULL, _err);
|
||||
}
|
||||
|
||||
nBytes = snprintf(statePath, cap, "%s%s%s", path, TD_DIRSEP, key);
|
||||
if (nBytes < 0 || nBytes >= cap) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key);
|
||||
if (!taosDirExist(statePath)) {
|
||||
code = taosMulMkDir(statePath);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code));
|
||||
taosMemoryFree(statePath);
|
||||
return code;
|
||||
}
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128);
|
||||
dbPath = taosMemoryCalloc(1, cap);
|
||||
if (dbPath == NULL) {
|
||||
taosMemoryFree(statePath);
|
||||
return terrno;
|
||||
TAOS_CHECK_GOTO(terrno, NULL, _err);
|
||||
}
|
||||
nBytes = snprintf(dbPath, cap, "%s%s%s", statePath, TD_DIRSEP, "state");
|
||||
if (nBytes < 0 || nBytes >= cap) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state");
|
||||
if (!taosDirExist(dbPath)) {
|
||||
code = taosMulMkDir(dbPath);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
|
||||
taosMemoryFree(statePath);
|
||||
taosMemoryFree(dbPath);
|
||||
return code;
|
||||
}
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
*dbFullPath = dbPath;
|
||||
*stateFullPath = statePath;
|
||||
return 0;
|
||||
_err:
|
||||
stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
|
||||
|
||||
taosMemoryFree(statePath);
|
||||
taosMemoryFree(dbPath);
|
||||
return code;
|
||||
}
|
||||
|
||||
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
|
||||
|
@ -2864,6 +2882,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
int64_t streamId;
|
||||
int32_t taskId, dummy = 0;
|
||||
char suffix[64] = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
|
||||
|
@ -2873,6 +2892,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
for (int i = 0; i < nCf; i++) {
|
||||
char* cf = cfs[i];
|
||||
char funcname[64] = {0};
|
||||
|
||||
cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
|
||||
if (i == 0) continue;
|
||||
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
|
||||
|
@ -2909,7 +2929,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
taosMemoryFree(params);
|
||||
taosMemoryFree(cfOpts);
|
||||
// fix other leak
|
||||
return -1;
|
||||
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||
} else {
|
||||
stDebug("succ to open rocksdb cf");
|
||||
}
|
||||
|
@ -2930,7 +2950,12 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
char funcname[64] = {0};
|
||||
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
|
||||
char idstr[128] = {0};
|
||||
sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
|
||||
int32_t nBytes = snprintf(idstr, sizeof(idstr), "0x%" PRIx64 "-%d", streamId, taskId);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(idstr)) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
stError("failed to open cf since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int idx = streamStateGetCfIdx(NULL, funcname);
|
||||
|
||||
|
@ -2997,117 +3022,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
taosMemoryFree(cfOpts);
|
||||
return 0;
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||
taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||
SBackendWrapper* handle = backend;
|
||||
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
|
||||
|
||||
streamMutexLock(&handle->cfMutex);
|
||||
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
if (ppInst != NULL && *ppInst != NULL) {
|
||||
RocksdbCfInst* inst = *ppInst;
|
||||
pBackendCfWrapper->rocksdb = inst->db;
|
||||
pBackendCfWrapper->pHandle = (void**)inst->pHandle;
|
||||
pBackendCfWrapper->writeOpts = inst->wOpt;
|
||||
pBackendCfWrapper->readOpts = inst->rOpt;
|
||||
pBackendCfWrapper->cfOpts = (void**)(inst->cfOpt);
|
||||
pBackendCfWrapper->dbOpt = handle->dbOpt;
|
||||
pBackendCfWrapper->param = inst->param;
|
||||
pBackendCfWrapper->pBackend = handle;
|
||||
pBackendCfWrapper->pComparNode = inst->pCompareNode;
|
||||
streamMutexUnlock(&handle->cfMutex);
|
||||
pBackendCfWrapper->backendId = pState->streamBackendRid;
|
||||
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
|
||||
|
||||
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
|
||||
pState->pTdbState->backendCfWrapperId = id;
|
||||
pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
|
||||
stInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
|
||||
|
||||
inst->pHandle = NULL;
|
||||
inst->cfOpt = NULL;
|
||||
inst->param = NULL;
|
||||
|
||||
inst->wOpt = NULL;
|
||||
inst->rOpt = NULL;
|
||||
return 0;
|
||||
}
|
||||
streamMutexUnlock(&handle->cfMutex);
|
||||
|
||||
char* err = NULL;
|
||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||
|
||||
RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
|
||||
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
|
||||
for (int i = 0; i < cfLen; i++) {
|
||||
cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt);
|
||||
// refactor later
|
||||
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
|
||||
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
|
||||
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
|
||||
|
||||
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
|
||||
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
|
||||
|
||||
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
|
||||
|
||||
param[i].tableOpt = tableOpt;
|
||||
};
|
||||
|
||||
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
|
||||
for (int i = 0; i < cfLen; i++) {
|
||||
SCfInit* cf = &ginitDict[i];
|
||||
|
||||
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpKey, cf->cmpName);
|
||||
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
|
||||
pCompare[i] = compare;
|
||||
}
|
||||
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
||||
pBackendCfWrapper->rocksdb = handle->db;
|
||||
pBackendCfWrapper->pHandle = (void**)cfHandle;
|
||||
pBackendCfWrapper->writeOpts = rocksdb_writeoptions_create();
|
||||
pBackendCfWrapper->readOpts = rocksdb_readoptions_create();
|
||||
pBackendCfWrapper->cfOpts = (void**)cfOpt;
|
||||
pBackendCfWrapper->dbOpt = handle->dbOpt;
|
||||
pBackendCfWrapper->param = param;
|
||||
pBackendCfWrapper->pBackend = handle;
|
||||
pBackendCfWrapper->backendId = pState->streamBackendRid;
|
||||
taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
|
||||
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
|
||||
pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
|
||||
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
|
||||
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
|
||||
|
||||
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
|
||||
pState->pTdbState->backendCfWrapperId = id;
|
||||
pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
|
||||
stInfo("succ to open state %p on backendWrapper %p %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||
SBackendWrapper* pHandle = wrapper->pBackend;
|
||||
|
||||
stInfo("start to close state on backend: %p", pHandle);
|
||||
|
||||
streamMutexLock(&pHandle->cfMutex);
|
||||
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
if (ppInst != NULL && *ppInst != NULL) {
|
||||
RocksdbCfInst* inst = *ppInst;
|
||||
taosMemoryFree(inst);
|
||||
taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
}
|
||||
streamMutexUnlock(&pHandle->cfMutex);
|
||||
|
||||
char* status[] = {"close", "drop"};
|
||||
stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
|
||||
wrapper->idstr);
|
||||
wrapper->remove |= remove; // update by other pState
|
||||
taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
|
||||
}
|
||||
#endif
|
||||
void streamStateDestroyCompar(void* arg) {
|
||||
SCfComparator* comp = (SCfComparator*)arg;
|
||||
for (int i = 0; i < comp->numOfComp; i++) {
|
||||
|
@ -3386,7 +3301,8 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
|
|||
return streamStateDel_rocksdb(pState, &tmp);
|
||||
}
|
||||
|
||||
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
|
||||
int32_t* pVLen) {
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -4377,7 +4293,9 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal,
|
||||
int32_t* pVLen) {
|
||||
stDebug("streamStateFillGetKVByCur_rocksdb");
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -4713,7 +4631,7 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
|||
if (fname == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
TAOS_UNUSED(strncpy(fname, name, len));
|
||||
tstrncpy(fname, name, strlen(name));
|
||||
if (taosArrayPush(diff, &fname) == NULL) {
|
||||
taosMemoryFree(fname);
|
||||
return terrno;
|
||||
|
@ -4737,17 +4655,32 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
|
|||
void hashTableToDebug(SHashObj* pTbl, char** buf) {
|
||||
size_t sz = taosHashGetSize(pTbl);
|
||||
int32_t total = 0;
|
||||
char* p = taosMemoryCalloc(1, sz * 16 + 4);
|
||||
int32_t cap = sz * 16 + 4;
|
||||
|
||||
char* p = taosMemoryCalloc(1, cap);
|
||||
if (p == NULL) {
|
||||
stError("failed to alloc memory for stream snapshot debug info");
|
||||
return;
|
||||
}
|
||||
|
||||
void* pIter = taosHashIterate(pTbl, NULL);
|
||||
while (pIter) {
|
||||
size_t len = 0;
|
||||
char* name = taosHashGetKey(pIter, &len);
|
||||
char* tname = taosMemoryCalloc(1, len + 1);
|
||||
memcpy(tname, name, len);
|
||||
total += sprintf(p + total, "%s,", tname);
|
||||
if (name == NULL || len <= 0) {
|
||||
pIter = taosHashIterate(pTbl, pIter);
|
||||
continue;
|
||||
}
|
||||
int32_t left = cap - strlen(p);
|
||||
int32_t nBytes = snprintf(p + total, left, "%s,", name);
|
||||
if (nBytes <= 0 || nBytes >= left) {
|
||||
stError("failed to debug snapshot info since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||
taosMemoryFree(p);
|
||||
return;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pTbl, pIter);
|
||||
taosMemoryFree(tname);
|
||||
total += nBytes;
|
||||
}
|
||||
if (total > 0) {
|
||||
p[total - 1] = 0;
|
||||
|
@ -4758,13 +4691,30 @@ void strArrayDebugInfo(SArray* pArr, char** buf) {
|
|||
int32_t sz = taosArrayGetSize(pArr);
|
||||
if (sz <= 0) return;
|
||||
|
||||
char* p = (char*)taosMemoryCalloc(1, 64 + sz * 64);
|
||||
int32_t total = 0;
|
||||
int32_t code = 0;
|
||||
int32_t total = 0, nBytes = 0;
|
||||
int32_t cap = 64 + sz * 64;
|
||||
|
||||
char* p = (char*)taosMemoryCalloc(1, cap);
|
||||
if (p == NULL) {
|
||||
stError("failed to alloc memory for stream snapshot debug info");
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < sz; i++) {
|
||||
char* name = taosArrayGetP(pArr, i);
|
||||
total += sprintf(p + total, "%s,", name);
|
||||
int32_t left = cap - strlen(p);
|
||||
nBytes = snprintf(p + total, left, "%s,", name);
|
||||
if (nBytes <= 0 || nBytes >= left) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
stError("failed to debug snapshot info since %s", tstrerror(code));
|
||||
taosMemoryFree(p);
|
||||
return;
|
||||
}
|
||||
|
||||
total += nBytes;
|
||||
}
|
||||
|
||||
p[total - 1] = 0;
|
||||
|
||||
*buf = p;
|
||||
|
@ -4774,16 +4724,16 @@ void dbChkpDebugInfo(SDbChkp* pDb) {
|
|||
char* p[4] = {NULL};
|
||||
|
||||
hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]);
|
||||
stTrace("chkp previous file: [%s]", p[0]);
|
||||
if (p[0]) stTrace("chkp previous file: [%s]", p[0]);
|
||||
|
||||
hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]);
|
||||
stTrace("chkp curr file: [%s]", p[1]);
|
||||
if (p[1]) stTrace("chkp curr file: [%s]", p[1]);
|
||||
|
||||
strArrayDebugInfo(pDb->pAdd, &p[2]);
|
||||
stTrace("chkp newly addded file: [%s]", p[2]);
|
||||
if (p[2]) stTrace("chkp newly addded file: [%s]", p[2]);
|
||||
|
||||
strArrayDebugInfo(pDb->pDel, &p[3]);
|
||||
stTrace("chkp newly deleted file: [%s]", p[3]);
|
||||
if (p[3]) stTrace("chkp newly deleted file: [%s]", p[3]);
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
taosMemoryFree(p[i]);
|
||||
|
@ -4875,7 +4825,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
TAOS_UNUSED(strncpy(fname, name, len));
|
||||
tstrncpy(fname, name, strlen(name));
|
||||
if (taosArrayPush(p->pAdd, &fname) == NULL) {
|
||||
taosMemoryFree(fname);
|
||||
TAOS_UNUSED(taosThreadRwlockUnlock(&p->rwLock));
|
||||
|
@ -5350,7 +5300,8 @@ SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWin
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey,
|
||||
const void** pVal, int32_t* pVLen) {
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -292,6 +292,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
|||
void* p = taskDbAddRef(*ppBackend);
|
||||
if (p == NULL) {
|
||||
stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
|
||||
streamMutexUnlock(&pMeta->backendMutex);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -946,7 +947,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
pTask->info.delaySchedParam = 0;
|
||||
}
|
||||
|
||||
|
||||
int64_t refId = pTask->id.refId;
|
||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret != 0) {
|
||||
|
|
|
@ -120,7 +120,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
|
|||
SStreamTask* pStreamTask = pTask;
|
||||
pState->streamId = streamId;
|
||||
pState->taskId = taskId;
|
||||
TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId));
|
||||
TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x",
|
||||
pState->streamId, pState->taskId));
|
||||
|
||||
code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -527,7 +528,6 @@ _end:
|
|||
|
||||
void streamStateDestroy(SStreamState* pState, bool remove) {
|
||||
streamFileStateDestroy(pState->pFileState);
|
||||
// streamStateDestroy_rocksdb(pState, remove);
|
||||
tSimpleHashCleanup(pState->parNameMap);
|
||||
// do nothong
|
||||
taosMemoryFreeClear(pState->pTdbState);
|
||||
|
@ -572,7 +572,8 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey
|
|||
return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode);
|
||||
}
|
||||
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
|
||||
int32_t* pVLen) {
|
||||
return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen);
|
||||
}
|
||||
|
||||
|
@ -593,9 +594,7 @@ SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) {
|
|||
return pCur;
|
||||
}
|
||||
|
||||
void streamStateGroupCurNext(SStreamStateCur* pCur) {
|
||||
streamFileStateGroupCurNext(pCur);
|
||||
}
|
||||
void streamStateGroupCurNext(SStreamStateCur* pCur) { streamFileStateGroupCurNext(pCur); }
|
||||
|
||||
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
|
||||
if (pVal != NULL) {
|
||||
|
@ -604,13 +603,9 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void**
|
|||
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
|
||||
}
|
||||
|
||||
void streamStateClearExpiredState(SStreamState* pState) {
|
||||
clearExpiredState(pState->pFileState);
|
||||
}
|
||||
void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); }
|
||||
|
||||
void streamStateSetFillInfo(SStreamState* pState) {
|
||||
setFillInfo(pState->pFileState);
|
||||
}
|
||||
void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); }
|
||||
|
||||
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
|
||||
int32_t* pWinCode) {
|
||||
|
|
|
@ -52,7 +52,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
|
|||
|
||||
pDb->encryptAlgorithm = encryptAlgorithm;
|
||||
if (encryptKey != NULL) {
|
||||
strncpy(pDb->encryptKey, encryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(pDb->encryptKey, encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
}
|
||||
|
||||
ret = tdbPCacheOpen(szPage, pages, &(pDb->pCache));
|
||||
|
|
|
@ -459,7 +459,7 @@ static char *tdbEncryptPage(SPager *pPager, char *pPageData, int32_t pageSize, c
|
|||
opts.source = pPageData + count;
|
||||
opts.result = packetData;
|
||||
opts.unitLen = 128;
|
||||
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
int32_t newLen = CBC_Encrypt(&opts);
|
||||
|
||||
|
@ -927,7 +927,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
|
|||
opts.source = pPage->pData + count;
|
||||
opts.result = packetData;
|
||||
opts.unitLen = 128;
|
||||
strncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN);
|
||||
tstrncpy(opts.key, encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
int newLen = CBC_Decrypt(&opts);
|
||||
|
||||
|
|
|
@ -200,8 +200,8 @@ bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) {
|
|||
if (pFile1->did.level != pFile2->did.level) return false;
|
||||
if (pFile1->did.id != pFile2->did.id) return false;
|
||||
char nameBuf1[TMPNAME_LEN], nameBuf2[TMPNAME_LEN];
|
||||
(void)strncpy(nameBuf1, pFile1->rname, TMPNAME_LEN);
|
||||
(void)strncpy(nameBuf2, pFile2->rname, TMPNAME_LEN);
|
||||
tstrncpy(nameBuf1, pFile1->rname, TMPNAME_LEN);
|
||||
tstrncpy(nameBuf2, pFile2->rname, TMPNAME_LEN);
|
||||
nameBuf1[TMPNAME_LEN - 1] = 0;
|
||||
nameBuf2[TMPNAME_LEN - 1] = 0;
|
||||
TAOS_UNUSED(taosRealPath(nameBuf1, NULL, TMPNAME_LEN));
|
||||
|
@ -573,7 +573,7 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
|
|||
TAOS_RETURN(TSDB_CODE_FS_INVLD_CFG);
|
||||
}
|
||||
|
||||
strncpy(pCfg->dir, dirName, TSDB_FILENAME_LEN);
|
||||
tstrncpy(pCfg->dir, dirName, TSDB_FILENAME_LEN);
|
||||
|
||||
TAOS_RETURN(0);
|
||||
}
|
||||
|
|
|
@ -638,7 +638,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
|||
opts.source = newBody;
|
||||
opts.result = newBodyEncrypted;
|
||||
opts.unitLen = 16;
|
||||
TAOS_UNUSED(strncpy((char *)opts.key, pWal->cfg.encryptKey, ENCRYPT_KEY_LEN));
|
||||
tstrncpy((char *)opts.key, pWal->cfg.encryptKey, ENCRYPT_KEY_LEN + 1);
|
||||
|
||||
int32_t count = CBC_Encrypt(&opts);
|
||||
|
||||
|
|
|
@ -120,33 +120,20 @@ int32_t taosStr2int64(const char *str, int64_t *val) {
|
|||
if (str == NULL || val == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
errno = 0;
|
||||
char *endptr = NULL;
|
||||
int64_t ret = strtoll(str, &endptr, 10);
|
||||
if (errno == ERANGE && (ret == LLONG_MAX || ret == LLONG_MIN)) {
|
||||
if (errno != 0) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
} else if (errno == EINVAL && ret == 0) {
|
||||
} else {
|
||||
if (endptr == str) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
} else {
|
||||
}
|
||||
*val = ret;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2int16(const char *str, int16_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
OS_PARAM_CHECK(val);
|
||||
int64_t tmp = 0;
|
||||
int32_t code = taosStr2int64(str, &tmp);
|
||||
if (code) {
|
||||
return code;
|
||||
} else if (tmp > INT16_MAX || tmp < INT16_MIN) {
|
||||
return TAOS_SYSTEM_ERROR(ERANGE);
|
||||
} else {
|
||||
*val = (int16_t)tmp;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2int32(const char *str, int32_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
OS_PARAM_CHECK(val);
|
||||
|
@ -161,6 +148,20 @@ int32_t taosStr2int32(const char *str, int32_t *val) {
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
int32_t taosStr2int16(const char *str, int16_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
OS_PARAM_CHECK(val);
|
||||
int64_t tmp = 0;
|
||||
int32_t code = taosStr2int64(str, &tmp);
|
||||
if (code) {
|
||||
return code;
|
||||
} else if (tmp > INT16_MAX || tmp < INT16_MIN) {
|
||||
return TAOS_SYSTEM_ERROR(ERANGE);
|
||||
} else {
|
||||
*val = (int16_t)tmp;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2int8(const char *str, int8_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
|
@ -177,6 +178,70 @@ int32_t taosStr2int8(const char *str, int8_t *val) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2Uint64(const char *str, uint64_t *val) {
|
||||
if (str == NULL || val == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
char *endptr = NULL;
|
||||
errno = 0;
|
||||
uint64_t ret = strtoull(str, &endptr, 10);
|
||||
|
||||
if (errno != 0) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
} else {
|
||||
if (endptr == str) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
*val = ret;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2Uint32(const char *str, uint32_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
OS_PARAM_CHECK(val);
|
||||
uint64_t tmp = 0;
|
||||
int32_t code = taosStr2Uint64(str, &tmp);
|
||||
if (code) {
|
||||
return code;
|
||||
} else if (tmp > UINT32_MAX) {
|
||||
return TAOS_SYSTEM_ERROR(ERANGE);
|
||||
} else {
|
||||
*val = (int32_t)tmp;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2Uint16(const char *str, uint16_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
OS_PARAM_CHECK(val);
|
||||
uint64_t tmp = 0;
|
||||
int32_t code = taosStr2Uint64(str, &tmp);
|
||||
if (code) {
|
||||
return code;
|
||||
} else if (tmp > UINT16_MAX) {
|
||||
return TAOS_SYSTEM_ERROR(ERANGE);
|
||||
} else {
|
||||
*val = (int16_t)tmp;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosStr2Uint8(const char *str, uint8_t *val) {
|
||||
OS_PARAM_CHECK(str);
|
||||
OS_PARAM_CHECK(val);
|
||||
uint64_t tmp = 0;
|
||||
int32_t code = taosStr2Uint64(str, &tmp);
|
||||
if (code) {
|
||||
return code;
|
||||
} else if (tmp > UINT8_MAX) {
|
||||
return TAOS_SYSTEM_ERROR(ERANGE);
|
||||
} else {
|
||||
*val = (int8_t)tmp;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes) {
|
||||
if ((f1_ucs4 == NULL || f2_ucs4 == NULL)) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -558,7 +623,8 @@ int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size) {
|
|||
|
||||
int32_t taosWcharToMb(char *pStr, TdWchar wchar) {
|
||||
OS_PARAM_CHECK(pStr);
|
||||
return wctomb(pStr, wchar); }
|
||||
return wctomb(pStr, wchar);
|
||||
}
|
||||
|
||||
char *taosStrCaseStr(const char *str, const char *pattern) {
|
||||
if (str == NULL) {
|
||||
|
|
|
@ -154,3 +154,483 @@ TEST(osStringTests, ostsnprintfTests) {
|
|||
EXPECT_EQ(ret, 11);
|
||||
EXPECT_STREQ(buffer, "Float: 3.14");
|
||||
}
|
||||
TEST(osStringTests, osStr2Int64) {
|
||||
int64_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2int64(NULL, &val);
|
||||
assert(result == TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2int64("123", NULL);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2int64("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2int64("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%lld", LLONG_MAX);
|
||||
result = taosStr2int64(large_num, &val);
|
||||
assert(result == 0);
|
||||
assert(val == LLONG_MAX);
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%lld", LLONG_MIN);
|
||||
result = taosStr2int64(large_num, &val);
|
||||
assert(result == 0);
|
||||
assert(val == LLONG_MIN);
|
||||
|
||||
result = taosStr2int64("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2int64("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2int64("12345", &val);
|
||||
assert(result == 0);
|
||||
assert(val == 12345);
|
||||
|
||||
result = taosStr2int64("-12345", &val);
|
||||
assert(result == 0);
|
||||
assert(val == -12345);
|
||||
|
||||
result = taosStr2int64("0", &val);
|
||||
assert(result == 0);
|
||||
assert(val == 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2int64(" 12345", &val);
|
||||
assert(result == 0);
|
||||
assert(val == 12345);
|
||||
|
||||
result = taosStr2int64("12345 ", &val);
|
||||
assert(result == 0);
|
||||
assert(val == 12345);
|
||||
}
|
||||
TEST(osStringTests, osStr2int32) {
|
||||
int32_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2int32(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2int32("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2int32("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2int32("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试超出范围的值
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%d", INT_MAX);
|
||||
result = taosStr2int32(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, INT_MAX);
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%d", INT_MIN);
|
||||
result = taosStr2int32(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, INT_MIN);
|
||||
|
||||
// 测试大于 INT32 范围的值
|
||||
snprintf(large_num, sizeof(large_num), "%lld", (long long)INT_MAX + 1);
|
||||
result = taosStr2int32(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%lld", (long long)INT_MIN - 1);
|
||||
result = taosStr2int32(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
|
||||
result = taosStr2int32("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2int32("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2int32("12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2int32("-12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, -12345);
|
||||
|
||||
result = taosStr2int32("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2int32(" 12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2int32("12345 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
}
|
||||
|
||||
TEST(osStringTests, taosStr2int16) {
|
||||
int16_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2int16(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2int16("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2int16("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2int16("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试超出范围的值
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%d", INT16_MAX);
|
||||
result = taosStr2int16(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, INT16_MAX);
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%d", INT16_MIN);
|
||||
result = taosStr2int16(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, INT16_MIN);
|
||||
|
||||
// 测试大于 INT16 范围的值
|
||||
snprintf(large_num, sizeof(large_num), "%lld", (long long)INT16_MAX + 1);
|
||||
result = taosStr2int16(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%lld", (long long)INT16_MIN - 1);
|
||||
result = taosStr2int16(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
result = taosStr2int16("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2int16("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2int16("12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2int16("-12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, -12345);
|
||||
|
||||
result = taosStr2int16("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2int16(" 12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2int16("12345 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
}
|
||||
|
||||
|
||||
TEST(osStringTests, taosStr2int8) {
|
||||
int8_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2int8(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2int8("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2int8("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2int8("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试超出范围的值
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%d", INT8_MAX);
|
||||
result = taosStr2int8(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, INT8_MAX);
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%d", INT8_MIN);
|
||||
result = taosStr2int8(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, INT8_MIN);
|
||||
|
||||
// 测试大于 INT8 范围的值
|
||||
snprintf(large_num, sizeof(large_num), "%lld", (long long)INT8_MAX + 1);
|
||||
result = taosStr2int8(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
snprintf(large_num, sizeof(large_num), "%lld", (long long)INT8_MIN - 1);
|
||||
result = taosStr2int8(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
result = taosStr2int8("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2int8("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2int8("123", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2int8("-123", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, -123);
|
||||
|
||||
result = taosStr2int8("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2int8(" 123", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2int8("123 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
}
|
||||
|
||||
TEST(osStringTests, osStr2Uint64) {
|
||||
uint64_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2Uint64(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2Uint64("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2Uint64("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2Uint64("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%llu", ULLONG_MAX);
|
||||
result = taosStr2Uint64(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, ULLONG_MAX);
|
||||
|
||||
result = taosStr2Uint64("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2Uint64("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2Uint64("12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2Uint64("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2Uint64(" 12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2Uint64("12345 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
}
|
||||
|
||||
TEST(osStringTests, taosStr2Uint32) {
|
||||
uint32_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2Uint32(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2Uint32("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2Uint32("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2Uint32("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试超出范围的值
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%u", UINT32_MAX);
|
||||
result = taosStr2Uint32(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, UINT32_MAX);
|
||||
|
||||
// 测试大于 UINT32 范围的值
|
||||
snprintf(large_num, sizeof(large_num), "%llu", (unsigned long long)UINT32_MAX + 1);
|
||||
result = taosStr2Uint32(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
result = taosStr2Uint32("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2Uint32("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2Uint32("12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2Uint32("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2Uint32(" 12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2Uint32("12345 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
}
|
||||
|
||||
TEST(osStringTests, taosStr2Uint16) {
|
||||
uint16_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2Uint16(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2Uint16("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2Uint16("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2Uint16("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试超出范围的值
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%u", UINT16_MAX);
|
||||
result = taosStr2Uint16(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, UINT16_MAX);
|
||||
|
||||
// 测试大于 UINT16 范围的值
|
||||
snprintf(large_num, sizeof(large_num), "%llu", (unsigned long long)UINT16_MAX + 1);
|
||||
result = taosStr2Uint16(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
result = taosStr2Uint16("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2Uint16("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2Uint16("12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2Uint16("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2Uint16(" 12345", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
|
||||
result = taosStr2Uint16("12345 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 12345);
|
||||
}
|
||||
|
||||
TEST(osStringTests, taosStr2Uint8) {
|
||||
uint8_t val;
|
||||
int32_t result;
|
||||
|
||||
// 测试空指针输入
|
||||
result = taosStr2Uint8(NULL, &val);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
result = taosStr2Uint8("123", NULL);
|
||||
ASSERT_EQ(result, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
// 测试无效输入
|
||||
result = taosStr2Uint8("abc", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
result = taosStr2Uint8("", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
|
||||
// 测试超出范围的值
|
||||
char large_num[50];
|
||||
snprintf(large_num, sizeof(large_num), "%u", UINT8_MAX);
|
||||
result = taosStr2Uint8(large_num, &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, UINT8_MAX);
|
||||
|
||||
// 测试大于 UINT8 范围的值
|
||||
snprintf(large_num, sizeof(large_num), "%llu", (unsigned long long)UINT8_MAX + 1);
|
||||
result = taosStr2Uint8(large_num, &val);
|
||||
ASSERT_EQ(result, TAOS_SYSTEM_ERROR(ERANGE));
|
||||
|
||||
result = taosStr2Uint8("123abc", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2Uint8("abc123", &val);
|
||||
ASSERT_NE(result, 0);
|
||||
// 测试有效的整数字符串
|
||||
result = taosStr2Uint8("123", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2Uint8("0", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 0);
|
||||
|
||||
// 测试带空格的字符串
|
||||
result = taosStr2Uint8(" 123", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
|
||||
result = taosStr2Uint8("123 ", &val);
|
||||
ASSERT_EQ(result, 0);
|
||||
ASSERT_EQ(val, 123);
|
||||
}
|
||||
|
||||
|
|
|
@ -151,11 +151,13 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) {
|
|||
}
|
||||
|
||||
static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
||||
int32_t code = 0;
|
||||
bool tmp = false;
|
||||
if (strcasecmp(value, "true") == 0) {
|
||||
tmp = true;
|
||||
}
|
||||
if (atoi(value) > 0) {
|
||||
int32_t val = 0;
|
||||
if ((code = taosStr2int32(value, &val)) == 0 && val > 0) {
|
||||
tmp = true;
|
||||
}
|
||||
|
||||
|
@ -258,6 +260,7 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType
|
|||
|
||||
static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, const char *level, const char *primary,
|
||||
const char *disable, ECfgSrcType stype) {
|
||||
int32_t code = 0;
|
||||
(void)taosThreadMutexLock(&pCfg->lock);
|
||||
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||
|
@ -278,20 +281,40 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value,
|
|||
|
||||
SDiskCfg cfg = {0};
|
||||
tstrncpy(cfg.dir, pItem->str, sizeof(cfg.dir));
|
||||
cfg.level = level ? atoi(level) : 0;
|
||||
cfg.primary = primary ? atoi(primary) : 1;
|
||||
cfg.disable = disable ? atoi(disable) : 0;
|
||||
|
||||
if (level == NULL || strlen(level) == 0) {
|
||||
cfg.level = 0;
|
||||
} else {
|
||||
code = taosStr2int32(level, &cfg.level);
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
if (primary == NULL || strlen(primary) == 0) {
|
||||
cfg.primary = 1;
|
||||
} else {
|
||||
code = taosStr2int32(primary, &cfg.primary);
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
if (disable == NULL || strlen(disable) == 0) {
|
||||
cfg.disable = 0;
|
||||
} else {
|
||||
code = taosStr2int8(disable, &cfg.disable);
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
void *ret = taosArrayPush(pItem->array, &cfg);
|
||||
if (ret == NULL) {
|
||||
(void)taosThreadMutexUnlock(&pCfg->lock);
|
||||
|
||||
TAOS_RETURN(terrno);
|
||||
code = terrno;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
pItem->stype = stype;
|
||||
(void)taosThreadMutexUnlock(&pCfg->lock);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
_err:
|
||||
(void)taosThreadMutexUnlock(&pCfg->lock);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool resetArray) {
|
||||
|
@ -315,7 +338,7 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese
|
|||
if (pDebugFlagItem == NULL) return -1;
|
||||
if (pDebugFlagItem->array != NULL) {
|
||||
SLogVar logVar = {0};
|
||||
(void)strncpy(logVar.name, name, TSDB_LOG_VAR_LEN - 1);
|
||||
tstrncpy(logVar.name, name, TSDB_LOG_VAR_LEN);
|
||||
if (NULL == taosArrayPush(pDebugFlagItem->array, &logVar)) {
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
|
@ -412,6 +435,7 @@ void cfgLock(SConfig *pCfg) {
|
|||
void cfgUnLock(SConfig *pCfg) { (void)taosThreadMutexUnlock(&pCfg->lock); }
|
||||
|
||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) {
|
||||
int32_t code = 0;
|
||||
ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT;
|
||||
|
||||
cfgLock(pCfg);
|
||||
|
@ -443,8 +467,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
|||
}
|
||||
} break;
|
||||
case CFG_DTYPE_BOOL: {
|
||||
int32_t ival = (int32_t)atoi(pVal);
|
||||
if (ival != 0 && ival != 1) {
|
||||
int32_t ival = 0;
|
||||
code = taosStr2int32(pVal, &ival);
|
||||
if (code != 0 || (ival != 0 && ival != 1)) {
|
||||
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival);
|
||||
cfgUnLock(pCfg);
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
|
||||
|
@ -887,7 +912,8 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
|
|||
for (size_t j = 0; j < sz; ++j) {
|
||||
SDiskCfg *pCfg = taosArrayGet(pItem->array, j);
|
||||
if (dump) {
|
||||
(void)printf("%s %s %s l:%d p:%d d:%"PRIi8"\n", src, name, pCfg->dir, pCfg->level, pCfg->primary, pCfg->disable);
|
||||
(void)printf("%s %s %s l:%d p:%d d:%" PRIi8 "\n", src, name, pCfg->dir, pCfg->level, pCfg->primary,
|
||||
pCfg->disable);
|
||||
} else {
|
||||
uInfo("%s %s %s l:%d p:%d d:%" PRIi8, src, name, pCfg->dir, pCfg->level, pCfg->primary, pCfg->disable);
|
||||
}
|
||||
|
@ -924,7 +950,7 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
|||
name = value = value2 = value3 = value4 = NULL;
|
||||
olen = vlen = vlen2 = vlen3 = vlen4 = 0;
|
||||
|
||||
strncpy(line, *pEnv, sizeof(line) - 1);
|
||||
tstrncpy(line, *pEnv, sizeof(line));
|
||||
pEnv++;
|
||||
if (taosEnvToCfg(line, line) < 0) {
|
||||
uTrace("failed to convert env to cfg:%s", line);
|
||||
|
@ -969,7 +995,7 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
|||
int32_t index = 0;
|
||||
if (envCmd == NULL) TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
while (envCmd[index] != NULL) {
|
||||
strncpy(buf, envCmd[index], sizeof(buf) - 1);
|
||||
tstrncpy(buf, envCmd[index], sizeof(buf));
|
||||
buf[sizeof(buf) - 1] = 0;
|
||||
if (taosEnvToCfg(buf, buf) < 0) {
|
||||
uTrace("failed to convert env to cfg:%s", buf);
|
||||
|
@ -1420,7 +1446,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl
|
|||
char **pEnv = environ;
|
||||
line[1023] = 0;
|
||||
while (*pEnv != NULL) {
|
||||
strncpy(line, *pEnv, sizeof(line) - 1);
|
||||
tstrncpy(line, *pEnv, sizeof(line));
|
||||
pEnv++;
|
||||
if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) {
|
||||
char *p = strchr(line, '=');
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "taoserror.h"
|
||||
|
||||
int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
|
||||
int32_t code = 0;
|
||||
if (vstr == NULL) {
|
||||
return terrno = TSDB_CODE_INVALID_VERSION_STRING;
|
||||
}
|
||||
|
@ -31,7 +32,10 @@ int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
|
|||
if (vstr[spos] != '.') {
|
||||
tmp[spos - tpos] = vstr[spos];
|
||||
} else {
|
||||
vnum[vpos] = atoi(tmp);
|
||||
code = taosStr2int32(tmp, &vnum[vpos]);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
memset(tmp, 0, sizeof(tmp));
|
||||
vpos++;
|
||||
tpos = spos + 1;
|
||||
|
@ -39,7 +43,10 @@ int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
|
|||
}
|
||||
|
||||
if ('\0' != tmp[0] && vpos < 4) {
|
||||
vnum[vpos] = atoi(tmp);
|
||||
code = taosStr2int32(tmp, &vnum[vpos]);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (vnum[0] <= 0) {
|
||||
|
|
Loading…
Reference in New Issue