Merge pull request #29122 from taosdata/enh/3.0/TD-31709

Replace unsafe memory functions with safe versions in wal&sync&mnode.
This commit is contained in:
Shengliang Guan 2024-12-16 21:37:33 +08:00 committed by GitHub
commit 4c1499ebeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 254 additions and 532 deletions

View File

@ -36,6 +36,7 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_JSON_BUF_SIZE 30
typedef enum { typedef enum {
TAOS_WAL_SKIP = 0, TAOS_WAL_SKIP = 0,

View File

@ -31,6 +31,7 @@ SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId);
int32_t mndGetDnodeSize(SMnode *pMnode); int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs); bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo); int32_t mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo);
void getSlowLogScopeString(int32_t scope, char *result);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -25,6 +25,7 @@ extern "C" {
#define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 5 #define MND_STREAM_VER_NUMBER 5
#define MND_STREAM_TRIGGER_NAME_SIZE 20
#define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
@ -60,7 +61,7 @@ typedef struct SStreamTaskResetMsg {
} SStreamTaskResetMsg; } SStreamTaskResetMsg;
typedef struct SChkptReportInfo { typedef struct SChkptReportInfo {
SArray* pTaskList; SArray *pTaskList;
int64_t reportChkpt; int64_t reportChkpt;
int64_t streamId; int64_t streamId;
} SChkptReportInfo; } SChkptReportInfo;
@ -106,7 +107,7 @@ typedef struct STaskChkptInfo {
int64_t ts; int64_t ts;
int32_t transId; int32_t transId;
int8_t dropHTask; int8_t dropHTask;
}STaskChkptInfo; } STaskChkptInfo;
int32_t mndInitStream(SMnode *pMnode); int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode);
@ -121,7 +122,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList); int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList);
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo); void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
@ -132,7 +133,7 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream);
bool mndStreamNodeIsUpdated(SMnode *pMnode); bool mndStreamNodeIsUpdated(SMnode *pMnode);
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb);
@ -146,8 +147,8 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *p
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts); int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int64_t ts);
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream); int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger); int8_t mndTrigger);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
@ -174,9 +175,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId);
int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId); int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t nRows, int32_t p); int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t nRows, int32_t p);

View File

@ -243,7 +243,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
int32_t code = taosGetSystemUUIDLen(clusterObj.name, TSDB_CLUSTER_ID_LEN); int32_t code = taosGetSystemUUIDLen(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) { if (code != 0) {
(void)strcpy(clusterObj.name, "tdengine3.0"); tstrncpy(clusterObj.name, "tdengine3.0", sizeof(clusterObj.name));
mError("failed to get name from system, set to default val %s", clusterObj.name); mError("failed to get name from system, set to default val %s", clusterObj.name);
} }

View File

@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "mndCompact.h"
#include "audit.h" #include "audit.h"
#include "mndCompact.h"
#include "mndCompactDetail.h" #include "mndCompactDetail.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
@ -254,7 +254,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompac
int32_t code = 0; int32_t code = 0;
pCompact->compactId = tGenIdPI32(); pCompact->compactId = tGenIdPI32();
(void)strcpy(pCompact->dbname, pDb->name); tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
pCompact->startTime = taosGetTimestampMs(); pCompact->startTime = taosGetTimestampMs();

View File

@ -418,17 +418,17 @@ static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
} }
size_t optLen = p - pMCfgReq->config; size_t optLen = p - pMCfgReq->config;
strncpy(pDCfgReq->config, pMCfgReq->config, optLen); tstrncpy(pDCfgReq->config, pMCfgReq->config, sizeof(pDCfgReq->config));
pDCfgReq->config[optLen] = 0; pDCfgReq->config[optLen] = 0;
if (' ' == pMCfgReq->config[optLen]) { if (' ' == pMCfgReq->config[optLen]) {
// 'key value' // 'key value'
if (strlen(pMCfgReq->value) != 0) goto _err; if (strlen(pMCfgReq->value) != 0) goto _err;
(void)strcpy(pDCfgReq->value, p + 1); tstrncpy(pDCfgReq->value, p + 1, sizeof(pDCfgReq->value));
} else { } else {
// 'key' 'value' // 'key' 'value'
if (strlen(pMCfgReq->value) == 0) goto _err; if (strlen(pMCfgReq->value) == 0) goto _err;
(void)strcpy(pDCfgReq->value, pMCfgReq->value); tstrncpy(pDCfgReq->value, pMCfgReq->value, sizeof(pDCfgReq->value));
} }
TAOS_RETURN(code); TAOS_RETURN(code);
@ -576,7 +576,7 @@ _send_req :
{ // audit { // audit
char obj[50] = {0}; char obj[50] = {0};
(void)sprintf(obj, "%d", cfgReq.dnodeId); (void)tsnprintf(obj, sizeof(obj), "%d", cfgReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
} }
@ -785,59 +785,59 @@ SArray *initVariablesFromItems(SArray *pItems) {
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SConfigItem *pItem = taosArrayGet(pItems, i); SConfigItem *pItem = taosArrayGet(pItems, i);
SVariablesInfo info = {0}; SVariablesInfo info = {0};
strcpy(info.name, pItem->name); tstrncpy(info.name, pItem->name, sizeof(info.name));
// init info value // init info value
switch (pItem->dtype) { switch (pItem->dtype) {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
break; break;
case CFG_DTYPE_BOOL: case CFG_DTYPE_BOOL:
sprintf(info.value, "%d", pItem->bval); tsnprintf(info.value, sizeof(info.value), "%d", pItem->bval);
break; break;
case CFG_DTYPE_INT32: case CFG_DTYPE_INT32:
sprintf(info.value, "%d", pItem->i32); tsnprintf(info.value, sizeof(info.value), "%d", pItem->i32);
break; break;
case CFG_DTYPE_INT64: case CFG_DTYPE_INT64:
sprintf(info.value, "%" PRId64, pItem->i64); tsnprintf(info.value, sizeof(info.value), "%" PRId64, pItem->i64);
break; break;
case CFG_DTYPE_FLOAT: case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: case CFG_DTYPE_DOUBLE:
sprintf(info.value, "%f", pItem->fval); tsnprintf(info.value, sizeof(info.value), "%f", pItem->fval);
break; break;
case CFG_DTYPE_STRING: case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR: case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE: case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET: case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE: case CFG_DTYPE_TIMEZONE:
sprintf(info.value, "%s", pItem->str); tsnprintf(info.value, sizeof(info.value), "%s", pItem->str);
break; break;
} }
// init info scope // init info scope
switch (pItem->scope) { switch (pItem->scope) {
case CFG_SCOPE_SERVER: case CFG_SCOPE_SERVER:
strcpy(info.scope, "server"); tstrncpy(info.scope, "server", sizeof(info.scope));
break; break;
case CFG_SCOPE_CLIENT: case CFG_SCOPE_CLIENT:
strcpy(info.scope, "client"); tstrncpy(info.scope, "client", sizeof(info.scope));
break; break;
case CFG_SCOPE_BOTH: case CFG_SCOPE_BOTH:
strcpy(info.scope, "both"); tstrncpy(info.scope, "both", sizeof(info.scope));
break; break;
default: default:
strcpy(info.scope, "unknown"); tstrncpy(info.scope, "unknown", sizeof(info.scope));
break; break;
} }
// init info category // init info category
switch (pItem->category) { switch (pItem->category) {
case CFG_CATEGORY_GLOBAL: case CFG_CATEGORY_GLOBAL:
strcpy(info.category, "global"); tstrncpy(info.category, "global", sizeof(info.category));
break; break;
case CFG_CATEGORY_LOCAL: case CFG_CATEGORY_LOCAL:
strcpy(info.category, "local"); tstrncpy(info.category, "local", sizeof(info.category));
break; break;
default: default:
strcpy(info.category, "unknown"); tstrncpy(info.category, "unknown", sizeof(info.category));
break; break;
} }
if (NULL == taosArrayPush(pInfos, &info)) { if (NULL == taosArrayPush(pInfos, &info)) {

View File

@ -889,22 +889,6 @@ _OVER:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static void mndBuildAuditDetailInt32(char *detail, char *tmp, char *format, int32_t para) {
if (para > 0) {
if (strlen(detail) > 0) (void)strcat(detail, ", ");
(void)sprintf(tmp, format, para);
(void)strcat(detail, tmp);
}
}
static void mndBuildAuditDetailInt64(char *detail, char *tmp, char *format, int64_t para) {
if (para > 0) {
if (strlen(detail) > 0) (void)strcat(detail, ", ");
(void)sprintf(tmp, format, para);
(void)strcat(detail, tmp);
}
}
static int32_t mndCheckDbEncryptKey(SMnode *pMnode, SCreateDbReq *pReq) { static int32_t mndCheckDbEncryptKey(SMnode *pMnode, SCreateDbReq *pReq) {
int32_t code = 0; int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -1381,7 +1365,7 @@ _OVER:
} }
static void mndDumpDbCfgInfo(SDbCfgRsp *cfgRsp, SDbObj *pDb) { static void mndDumpDbCfgInfo(SDbCfgRsp *cfgRsp, SDbObj *pDb) {
(void)strcpy(cfgRsp->db, pDb->name); tstrncpy(cfgRsp->db, pDb->name, sizeof(cfgRsp->db));
cfgRsp->dbId = pDb->uid; cfgRsp->dbId = pDb->uid;
cfgRsp->cfgVersion = pDb->cfgVersion; cfgRsp->cfgVersion = pDb->cfgVersion;
cfgRsp->numOfVgroups = pDb->cfg.numOfVgroups; cfgRsp->numOfVgroups = pDb->cfg.numOfVgroups;
@ -2280,24 +2264,24 @@ static char *buildRetension(SArray *pRetension) {
int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); len += tsnprintf(p1 + len, 100 - len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
if (size > 1) { if (size > 1) {
len += sprintf(p1 + len, ","); len += tsnprintf(p1 + len, 100 - len, ",");
p = taosArrayGet(pRetension, 1); p = taosArrayGet(pRetension, 1);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep); v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); len += tsnprintf(p1 + len, 100 - len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
} }
if (size > 2) { if (size > 2) {
len += sprintf(p1 + len, ","); len += tsnprintf(p1 + len, 100 - len, ",");
p = taosArrayGet(pRetension, 2); p = taosArrayGet(pRetension, 2);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep); v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); len += tsnprintf(p1 + len, 100 - len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
} }
varDataSetLen(p1, len); varDataSetLen(p1, len);
@ -2466,9 +2450,9 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
int32_t lenKeep2 = formatDurationOrKeep(keep2Str, sizeof(keep2Str), pDb->cfg.daysToKeep2); int32_t lenKeep2 = formatDurationOrKeep(keep2Str, sizeof(keep2Str), pDb->cfg.daysToKeep2);
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep1Str, keep2Str, keep0Str); len = tsnprintf(&keepVstr[VARSTR_HEADER_SIZE], sizeof(keepVstr), "%s,%s,%s", keep1Str, keep2Str, keep0Str);
} else { } else {
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%s,%s,%s", keep0Str, keep1Str, keep2Str); len = tsnprintf(&keepVstr[VARSTR_HEADER_SIZE], sizeof(keepVstr), "%s,%s,%s", keep0Str, keep1Str, keep2Str);
} }
varDataSetLen(keepVstr, len); varDataSetLen(keepVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2556,7 +2540,7 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)&pDb->cfg.s3ChunkSize, false), &lino, _OVER); TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)&pDb->cfg.s3ChunkSize, false), &lino, _OVER);
char keeplocalVstr[128] = {0}; char keeplocalVstr[128] = {0};
len = sprintf(&keeplocalVstr[VARSTR_HEADER_SIZE], "%dm", pDb->cfg.s3KeepLocal); len = tsnprintf(&keeplocalVstr[VARSTR_HEADER_SIZE], sizeof(keeplocalVstr), "%dm", pDb->cfg.s3KeepLocal);
varDataSetLen(keeplocalVstr, len); varDataSetLen(keeplocalVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)keeplocalVstr, false), &lino, _OVER); TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)keeplocalVstr, false), &lino, _OVER);

View File

@ -735,7 +735,7 @@ SConfigObj *mndInitConfigObj(SConfigItem *pItem) {
if (pObj == NULL) { if (pObj == NULL) {
return NULL; return NULL;
} }
strncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN); tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
pObj->dtype = pItem->dtype; pObj->dtype = pItem->dtype;
switch (pItem->dtype) { switch (pItem->dtype) {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
@ -776,7 +776,7 @@ int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
if (strcasecmp(value, "true") == 0) { if (strcasecmp(value, "true") == 0) {
tmp = true; tmp = true;
} }
if (atoi(value) > 0) { if (taosStr2Int32(value, NULL, 10) > 0) {
tmp = true; tmp = true;
} }
pObjNew->bval = tmp; pObjNew->bval = tmp;

View File

@ -14,11 +14,11 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndDnode.h"
#include <stdio.h> #include <stdio.h>
#include "audit.h" #include "audit.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndQnode.h" #include "mndQnode.h"
@ -1051,20 +1051,20 @@ _OVER:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static void getSlowLogScopeString(int32_t scope, char *result) { void getSlowLogScopeString(int32_t scope, char *result) {
if (scope == SLOW_LOG_TYPE_NULL) { if (scope == SLOW_LOG_TYPE_NULL) {
(void)strcat(result, "NONE"); (void)strncat(result, "NONE", 64);
return; return;
} }
while (scope > 0) { while (scope > 0) {
if (scope & SLOW_LOG_TYPE_QUERY) { if (scope & SLOW_LOG_TYPE_QUERY) {
(void)strcat(result, "QUERY"); (void)strncat(result, "QUERY", 64);
scope &= ~SLOW_LOG_TYPE_QUERY; scope &= ~SLOW_LOG_TYPE_QUERY;
} else if (scope & SLOW_LOG_TYPE_INSERT) { } else if (scope & SLOW_LOG_TYPE_INSERT) {
(void)strcat(result, "INSERT"); (void)strncat(result, "INSERT", 64);
scope &= ~SLOW_LOG_TYPE_INSERT; scope &= ~SLOW_LOG_TYPE_INSERT;
} else if (scope & SLOW_LOG_TYPE_OTHERS) { } else if (scope & SLOW_LOG_TYPE_OTHERS) {
(void)strcat(result, "OTHERS"); (void)strncat(result, "OTHERS", 64);
scope &= ~SLOW_LOG_TYPE_OTHERS; scope &= ~SLOW_LOG_TYPE_OTHERS;
} else { } else {
(void)printf("invalid slow log scope:%d", scope); (void)printf("invalid slow log scope:%d", scope);
@ -1072,7 +1072,7 @@ static void getSlowLogScopeString(int32_t scope, char *result) {
} }
if (scope > 0) { if (scope > 0) {
(void)strcat(result, "|"); (void)strncat(result, "|", 64);
} }
} }
} }
@ -1112,7 +1112,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
} }
char obj[200] = {0}; char obj[200] = {0};
(void)sprintf(obj, "%s:%d", createReq.fqdn, createReq.port); (void)tsnprintf(obj, sizeof(obj), "%s:%d", createReq.fqdn, createReq.port);
auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
@ -1296,7 +1296,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj1[30] = {0}; char obj1[30] = {0};
(void)sprintf(obj1, "%d", dropReq.dnodeId); (void)tsnprintf(obj1, sizeof(obj1), "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
@ -1406,8 +1406,8 @@ static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
const STraceId *trace = &pReq->info.traceId; const STraceId *trace = &pReq->info.traceId;
SDCfgDnodeReq dcfgReq = {0}; SDCfgDnodeReq dcfgReq = {0};
if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) { if (strncasecmp(cfgReq.config, "encrypt_key", 12) == 0) {
strcpy(dcfgReq.config, cfgReq.config); tstrncpy(dcfgReq.config, cfgReq.config, sizeof(dcfgReq.config));
strcpy(dcfgReq.value, cfgReq.value); tstrncpy(dcfgReq.value, cfgReq.value, sizeof(dcfgReq.value));
tFreeSMCfgDnodeReq(&cfgReq); tFreeSMCfgDnodeReq(&cfgReq);
return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq); return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
} else { } else {

View File

@ -723,7 +723,7 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
} }
char varLang[TSDB_TYPE_STR_MAX_LEN + 1] = {0}; char varLang[TSDB_TYPE_STR_MAX_LEN + 1] = {0};
varDataSetLen(varLang, strlen(language)); varDataSetLen(varLang, strlen(language));
strcpy(varDataVal(varLang), language); tstrncpy(varDataVal(varLang), language, sizeof(varLang) - VARSTR_HEADER_SIZE);
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false), pSdb, pFunc); TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false), pSdb, pFunc);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);

View File

@ -122,9 +122,9 @@ int32_t mndBuildInsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbN
TAOS_RETURN(code); TAOS_RETURN(code);
} }
strcpy(pRsp->tbName, pMeta->tbName); tstrncpy(pRsp->tbName, pMeta->tbName, sizeof(pRsp->tbName));
strcpy(pRsp->stbName, pMeta->stbName); tstrncpy(pRsp->stbName, pMeta->stbName, sizeof(pRsp->stbName));
strcpy(pRsp->dbFName, pMeta->dbFName); tstrncpy(pRsp->dbFName, pMeta->dbFName, sizeof(pRsp->dbFName));
pRsp->numOfTags = pMeta->numOfTags; pRsp->numOfTags = pMeta->numOfTags;
pRsp->numOfColumns = pMeta->numOfColumns; pRsp->numOfColumns = pMeta->numOfColumns;
pRsp->tableType = pMeta->tableType; pRsp->tableType = pMeta->tableType;

View File

@ -14,10 +14,10 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndMnode.h"
#include "audit.h" #include "audit.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndSync.h" #include "mndSync.h"
@ -884,7 +884,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[40] = {0}; char obj[40] = {0};
sprintf(obj, "%d", dropReq.dnodeId); (void)tsnprintf(obj, sizeof(obj), "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropMnode", "", obj, dropReq.sql, dropReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "dropMnode", "", obj, dropReq.sql, dropReq.sqlLen);

View File

@ -107,9 +107,9 @@ int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *t
TAOS_RETURN(code); TAOS_RETURN(code);
} }
strcpy(pRsp->tbName, pMeta->tbName); tstrncpy(pRsp->tbName, pMeta->tbName, sizeof(pRsp->tbName));
strcpy(pRsp->stbName, pMeta->stbName); tstrncpy(pRsp->stbName, pMeta->stbName, sizeof(pRsp->stbName));
strcpy(pRsp->dbFName, pMeta->dbFName); tstrncpy(pRsp->dbFName, pMeta->dbFName, sizeof(pRsp->dbFName));
pRsp->numOfTags = pMeta->numOfTags; pRsp->numOfTags = pMeta->numOfTags;
pRsp->numOfColumns = pMeta->numOfColumns; pRsp->numOfColumns = pMeta->numOfColumns;
pRsp->tableType = pMeta->tableType; pRsp->tableType = pMeta->tableType;

View File

@ -14,12 +14,12 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndProfile.h"
#include "audit.h" #include "audit.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndProfile.h"
#include "mndQnode.h" #include "mndQnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndSma.h" #include "mndSma.h"
@ -137,8 +137,8 @@ void mndCleanupProfile(SMnode *pMnode) {
} }
} }
static void setUserInfo2Conn(SConnObj* connObj, char* userApp, uint32_t userIp){ static void setUserInfo2Conn(SConnObj *connObj, char *userApp, uint32_t userIp) {
if (connObj == NULL){ if (connObj == NULL) {
return; return;
} }
tstrncpy(connObj->userApp, userApp, sizeof(connObj->userApp)); tstrncpy(connObj->userApp, userApp, sizeof(connObj->userApp));
@ -384,7 +384,7 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq)
app.appId = pReq->appId; app.appId = pReq->appId;
app.ip = clientIp; app.ip = clientIp;
app.pid = pReq->pid; app.pid = pReq->pid;
(void)strcpy(app.name, pReq->name); tstrncpy(app.name, pReq->name, sizeof(app.name));
app.startTime = pReq->startTime; app.startTime = pReq->startTime;
(void)memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary)); (void)memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
app.lastAccessTimeMs = taosGetTimestampMs(); app.lastAccessTimeMs = taosGetTimestampMs();
@ -911,7 +911,8 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
taosInetNtoa(varDataVal(endpoint), pConn->ip); taosInetNtoa(varDataVal(endpoint), pConn->ip);
(void)sprintf(varDataVal(endpoint) + strlen(varDataVal(endpoint)), ":%d", pConn->port); (void)tsnprintf(varDataVal(endpoint) + strlen(varDataVal(endpoint)),
sizeof(endpoint) - VARSTR_HEADER_SIZE - strlen(varDataVal(endpoint)), ":%d", pConn->port);
varDataLen(endpoint) = strlen(varDataVal(endpoint)); varDataLen(endpoint) = strlen(varDataVal(endpoint));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false);
@ -944,7 +945,7 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
} }
char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE){ if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE) {
taosInetNtoa(varDataVal(userIp), pConn->userIp); taosInetNtoa(varDataVal(userIp), pConn->userIp);
varDataLen(userIp) = strlen(varDataVal(userIp)); varDataLen(userIp) = strlen(varDataVal(userIp));
} }
@ -987,7 +988,8 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
cols = 0; cols = 0;
char queryId[26 + VARSTR_HEADER_SIZE] = {0}; char queryId[26 + VARSTR_HEADER_SIZE] = {0};
(void)sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid); (void)tsnprintf(&queryId[VARSTR_HEADER_SIZE], sizeof(queryId) - VARSTR_HEADER_SIZE, "%x:%" PRIx64, pConn->id,
pQuery->reqRid);
varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]); varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false); code = colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false);
@ -1043,7 +1045,8 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; char endpoint[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
taosInetNtoa(varDataVal(endpoint), pConn->ip); taosInetNtoa(varDataVal(endpoint), pConn->ip);
(void)sprintf(varDataVal(endpoint) + strlen(varDataVal(endpoint)), ":%d", pConn->port); (void)tsnprintf(varDataVal(endpoint) + strlen(varDataVal(endpoint)),
sizeof(endpoint) - VARSTR_HEADER_SIZE - strlen(varDataVal(endpoint)), ":%d", pConn->port);
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]); varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false); code = colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false);
@ -1099,11 +1102,12 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
int32_t offset = VARSTR_HEADER_SIZE; int32_t offset = VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) { for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) {
if (i) { if (i) {
offset += sprintf(subStatus + offset, ","); offset += tsnprintf(subStatus + offset, sizeof(subStatus) - offset, ",");
} }
if (offset + reserve < strSize) { if (offset + reserve < strSize) {
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset += sprintf(subStatus + offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); offset +=
tsnprintf(subStatus + offset, sizeof(subStatus) - offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
} else { } else {
break; break;
} }
@ -1138,7 +1142,7 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
} }
char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE){ if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE) {
taosInetNtoa(varDataVal(userIp), pConn->userIp); taosInetNtoa(varDataVal(userIp), pConn->userIp);
varDataLen(userIp) = strlen(varDataVal(userIp)); varDataLen(userIp) = strlen(varDataVal(userIp));
} }
@ -1242,7 +1246,7 @@ static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
} }
char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
(void)sprintf(&name[VARSTR_HEADER_SIZE], "%s", pApp->name); (void)tsnprintf(&name[VARSTR_HEADER_SIZE], sizeof(name) - VARSTR_HEADER_SIZE, "%s", pApp->name);
varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]); varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)name, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)name, false);

View File

@ -333,7 +333,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
(void)sprintf(obj, "%d", createReq.dnodeId); (void)tsnprintf(obj, sizeof(obj), "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen);
_OVER: _OVER:
@ -465,7 +465,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
(void)sprintf(obj, "%d", dropReq.dnodeId); (void)tsnprintf(obj, sizeof(obj), "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen);

View File

@ -14,7 +14,6 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndStb.h"
#include "audit.h" #include "audit.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
@ -27,6 +26,7 @@
#include "mndScheduler.h" #include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndSma.h" #include "mndSma.h"
#include "mndStb.h"
#include "mndTopic.h" #include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
@ -1368,7 +1368,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
if (createReq.sql == NULL && createReq.sqlLen == 0) { if (createReq.sql == NULL && createReq.sqlLen == 0) {
char detail[1000] = {0}; char detail[1000] = {0};
sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname); (void)tsnprintf(detail, sizeof(detail), "dbname:%s, stable name:%s", name.dbname, name.tname);
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail)); auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
} else { } else {
@ -3148,7 +3148,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
TAOS_RETURN(code); TAOS_RETURN(code);
} }
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName); (void)tsnprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist); int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
if (code || !exist) { if (code || !exist) {
indexRsp.suid = pStbVersion->suid; indexRsp.suid = pStbVersion->suid;
@ -3156,8 +3156,8 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
indexRsp.pIndex = NULL; indexRsp.pIndex = NULL;
} }
strcpy(indexRsp.dbFName, pStbVersion->dbFName); tstrncpy(indexRsp.dbFName, pStbVersion->dbFName, sizeof(indexRsp.dbFName));
strcpy(indexRsp.tbName, pStbVersion->stbName); tstrncpy(indexRsp.tbName, pStbVersion->stbName, sizeof(indexRsp.tbName));
if (taosArrayPush(hbRsp.pIndexRsp, &indexRsp) == NULL) { if (taosArrayPush(hbRsp.pIndexRsp, &indexRsp) == NULL) {
code = terrno; code = terrno;
@ -3256,282 +3256,6 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
} }
} }
// static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;
// SShowMgmt *pMgmt = &pMnode->showMgmt;
// SShowObj *pShow = NULL;
// int32_t rowsToRead = SHOW_STEP_SIZE;
// int32_t rowsRead = 0;
//
// SRetrieveTableReq retrieveReq = {0};
// if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
// SMnode *pMnode = pReq->info.node;
// SSdb *pSdb = pMnode->pSdb;
// int32_t numOfRows = 0;
// SDbObj *pDb = NULL;
// ESdbStatus objStatus = 0;
//
// SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
// if (pUser == NULL) return 0;
// bool sysinfo = pUser->sysInfo;
//
// // Append the information_schema database into the result.
//// if (!pShow->sysDbRsp) {
//// SDbObj infoschemaDb = {0};
//// setInformationSchemaDbCfg(pMnode, &infoschemaDb);
//// size_t numOfTables = 0;
//// getVisibleInfosTablesNum(sysinfo, &numOfTables);
//// mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
////
//// numOfRows += 1;
////
//// SDbObj perfschemaDb = {0};
//// setPerfSchemaDbCfg(pMnode, &perfschemaDb);
//// numOfTables = 0;
//// getPerfDbMeta(NULL, &numOfTables);
//// mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
////
//// numOfRows += 1;
//// pShow->sysDbRsp = true;
//// }
//
// SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
// blockDataEnsureCapacity(p, rowsToRead);
//
// size_t size = 0;
// const SSysTableMeta* pSysDbTableMeta = NULL;
//
// getInfosDbMeta(&pSysDbTableMeta, &size);
// p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
//
// getPerfDbMeta(&pSysDbTableMeta, &size);
// p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
//
// blockDataDestroy(p);
//
//
// while (numOfRows < rowsToRead) {
// pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
// if (pShow->pIter == NULL) break;
// if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
// continue;
// }
// if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
// continue;
// }
//
// while (numOfRows < rowsToRead) {
// pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
// if (pShow->pIter == NULL) break;
//
// if (pDb != NULL && pStb->dbUid != pDb->uid) {
// sdbRelease(pSdb, pStb);
// continue;
// }
//
// cols = 0;
//
// SName name = {0};
// char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
// mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
// varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
//
// SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
//
// char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
// tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
// tNameGetDbName(&name, varDataVal(db));
// varDataSetLen(db, strlen(varDataVal(db)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// if (pStb->commentLen > 0) {
// char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
// STR_TO_VARSTR(comment, pStb->comment);
// colDataSetVal(pColInfo, numOfRows, comment, false);
// } else if (pStb->commentLen == 0) {
// char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
// STR_TO_VARSTR(comment, "");
// colDataSetVal(pColInfo, numOfRows, comment, false);
// } else {
// colDataSetNULL(pColInfo, numOfRows);
// }
//
// char watermark[64 + VARSTR_HEADER_SIZE] = {0};
// sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
// varDataSetLen(watermark, strlen(varDataVal(watermark)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
//
// char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
// sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
// varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
//
// char rollup[160 + VARSTR_HEADER_SIZE] = {0};
// int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
// char *sep = ", ";
// int32_t sepLen = strlen(sep);
// int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
// for (int32_t i = 0; i < rollupNum; ++i) {
// char *funcName = taosArrayGet(pStb->pFuncs, i);
// if (i) {
// strncat(varDataVal(rollup), sep, rollupLen);
// rollupLen -= sepLen;
// }
// strncat(varDataVal(rollup), funcName, rollupLen);
// rollupLen -= strlen(funcName);
// }
// varDataSetLen(rollup, strlen(varDataVal(rollup)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
//
// numOfRows++;
// sdbRelease(pSdb, pStb);
// }
//
// if (pDb != NULL) {
// mndReleaseDb(pMnode, pDb);
// }
//
// sdbRelease(pSdb, pDb);
// }
//
// pShow->numOfRows += numOfRows;
// mndReleaseUser(pMnode, pUser);
//
//
//
//
//
//
//
//
// ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
// if (retrieveFp == NULL) {
// mndReleaseShowObj(pShow, false);
// terrno = TSDB_CODE_MSG_NOT_PROCESSED;
// mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
// return -1;
// }
//
// mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
// if (retrieveReq.user[0] != 0) {
// memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
// } else {
// memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
// }
// if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
// return -1;
// }
//
// int32_t numOfCols = pShow->pMeta->numOfColumns;
//
// SSDataBlock *pBlock = createDataBlock();
// for (int32_t i = 0; i < numOfCols; ++i) {
// SColumnInfoData idata = {0};
//
// SSchema *p = &pShow->pMeta->pSchemas[i];
//
// idata.info.bytes = p->bytes;
// idata.info.type = p->type;
// idata.info.colId = p->colId;
// blockDataAppendColInfo(pBlock, &idata);
// }
//
// blockDataEnsureCapacity(pBlock, rowsToRead);
//
// if (mndCheckRetrieveFinished(pShow)) {
// mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
// rowsRead = 0;
// } else {
// rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
// if (rowsRead < 0) {
// terrno = rowsRead;
// mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
// mndReleaseShowObj(pShow, true);
// blockDataDestroy(pBlock);
// return -1;
// }
//
// pBlock->info.rows = rowsRead;
// mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
// }
//
// size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
// blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
//
// SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
// if (pRsp == NULL) {
// mndReleaseShowObj(pShow, false);
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
// blockDataDestroy(pBlock);
// return -1;
// }
//
// pRsp->handle = htobe64(pShow->id);
//
// if (rowsRead > 0) {
// char *pStart = pRsp->data;
// SSchema *ps = pShow->pMeta->pSchemas;
//
// *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
// pStart += sizeof(int32_t); // number of columns
//
// for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
// SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
// pSchema->bytes = htonl(ps[i].bytes);
// pSchema->colId = htons(ps[i].colId);
// pSchema->type = ps[i].type;
//
// pStart += sizeof(SSysTableSchema);
// }
//
// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
// }
//
// pRsp->numOfRows = htonl(rowsRead);
// pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
// pReq->info.rsp = pRsp;
// pReq->info.rspLen = size;
//
// if (rowsRead == 0 || rowsRead < rowsToRead) {
// pRsp->completed = 1;
// mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
// mndReleaseShowObj(pShow, true);
// } else {
// mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
// mndReleaseShowObj(pShow, false);
// }
//
// blockDataDestroy(pBlock);
// return TSDB_CODE_SUCCESS;
//}
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -3607,14 +3331,16 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
} }
char watermark[64 + VARSTR_HEADER_SIZE] = {0}; char watermark[64 + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]); (void)tsnprintf(varDataVal(watermark), sizeof(watermark) - VARSTR_HEADER_SIZE, "%" PRId64 "a,%" PRId64 "a",
pStb->watermark[0], pStb->watermark[1]);
varDataSetLen(watermark, strlen(varDataVal(watermark))); varDataSetLen(watermark, strlen(varDataVal(watermark)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false), pStb, &lino, _ERROR); RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false), pStb, &lino, _ERROR);
char maxDelay[64 + VARSTR_HEADER_SIZE] = {0}; char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]); (void)tsnprintf(varDataVal(maxDelay), sizeof(maxDelay) - VARSTR_HEADER_SIZE, "%" PRId64 "a,%" PRId64 "a",
pStb->maxdelay[0], pStb->maxdelay[1]);
varDataSetLen(maxDelay, strlen(varDataVal(maxDelay))); varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -3708,12 +3434,15 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
int8_t colType = pm->schema[j].type; int8_t colType = pm->schema[j].type;
pColInfoData = taosArrayGet(p->pDataBlock, 4); pColInfoData = taosArrayGet(p->pDataBlock, 4);
char colTypeStr[VARSTR_HEADER_SIZE + 32]; char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name); int colTypeLen =
tsnprintf(varDataVal(colTypeStr), sizeof(colTypeStr) - VARSTR_HEADER_SIZE, "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) { if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += colTypeLen +=
sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", (int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE)); tsnprintf(varDataVal(colTypeStr) + colTypeLen, sizeof(colTypeStr) - colTypeLen - VARSTR_HEADER_SIZE, "(%d)",
(int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) { } else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", colTypeLen +=
tsnprintf(varDataVal(colTypeStr) + colTypeLen, sizeof(colTypeStr) - colTypeLen - VARSTR_HEADER_SIZE, "(%d)",
(int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
varDataSetLen(colTypeStr, colTypeLen); varDataSetLen(colTypeStr, colTypeLen);
@ -3864,13 +3593,16 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int8_t colType = pStb->pColumns[i].type; int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32]; char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name); int colTypeLen =
tsnprintf(varDataVal(colTypeStr), sizeof(colTypeStr) - VARSTR_HEADER_SIZE, "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) { if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", colTypeLen +=
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE)); tsnprintf(varDataVal(colTypeStr) + colTypeLen, sizeof(colTypeStr) - colTypeLen - VARSTR_HEADER_SIZE,
"(%d)", (int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) { } else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", colTypeLen +=
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); tsnprintf(varDataVal(colTypeStr) + colTypeLen, sizeof(colTypeStr) - colTypeLen - VARSTR_HEADER_SIZE,
"(%d)", (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
varDataSetLen(colTypeStr, colTypeLen); varDataSetLen(colTypeStr, colTypeLen);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false), pStb, &lino, _OVER); RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false), pStb, &lino, _OVER);
@ -4092,8 +3824,7 @@ typedef struct SMndDropTbsWithTsmaCtx {
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs> SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
} SMndDropTbsWithTsmaCtx; } SMndDropTbsWithTsmaCtx;
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs, static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs, int32_t vgId);
int32_t vgId);
static void destroySVDropTbBatchReqs(void *p); static void destroySVDropTbBatchReqs(void *p);
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) { static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
@ -4300,8 +4031,7 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
return 0; return 0;
} }
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs, static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs, int32_t vgId) {
int32_t vgId) {
int32_t code = 0; int32_t code = 0;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);

View File

@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "mndStream.h"
#include "audit.h" #include "audit.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndScheduler.h" #include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndStream.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "osMemory.h" #include "osMemory.h"
#include "parser.h" #include "parser.h"
@ -159,16 +159,16 @@ void mndCleanupStream(SMnode *pMnode) {
taosHashCleanup(execInfo.pTransferStateStreams); taosHashCleanup(execInfo.pTransferStateStreams);
taosHashCleanup(execInfo.pChkptStreams); taosHashCleanup(execInfo.pChkptStreams);
taosHashCleanup(execInfo.pStreamConsensus); taosHashCleanup(execInfo.pStreamConsensus);
(void) taosThreadMutexDestroy(&execInfo.lock); (void)taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup"); mDebug("mnd stream exec info cleanup");
} }
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSdbRow * pRow = NULL; SSdbRow *pRow = NULL;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void * buf = NULL; void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
int32_t tlen; int32_t tlen;
int32_t dataPos = 0; int32_t dataPos = 0;
@ -237,7 +237,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) { static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
mTrace("stream:%s, perform update action", pOldStream->name); mTrace("stream:%s, perform update action", pOldStream->name);
(void) atomic_exchange_32(&pOldStream->version, pNewStream->version); (void)atomic_exchange_32(&pOldStream->version, pNewStream->version);
taosWLockLatch(&pOldStream->lock); taosWLockLatch(&pOldStream->lock);
@ -301,7 +301,7 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap
pWrapper->pSchema[index].bytes = pField->bytes; pWrapper->pSchema[index].bytes = pField->bytes;
} }
pWrapper->pSchema[index].colId = index + 1; pWrapper->pSchema[index].colId = index + 1;
strcpy(pWrapper->pSchema[index].name, pField->name); tstrncpy(pWrapper->pSchema[index].name, pField->name, sizeof(pWrapper->pSchema[index].name));
pWrapper->pSchema[index].flags = pField->flags; pWrapper->pSchema[index].flags = pField->flags;
index += 1; index += 1;
} }
@ -359,7 +359,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) { if (pSourceDb == NULL) {
code = terrno; code = terrno;
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code)); mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb,
tstrerror(code));
goto FAIL; goto FAIL;
} }
@ -371,7 +372,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) { if (pTargetDb == NULL) {
code = terrno; code = terrno;
mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code)); mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb,
tstrerror(code));
goto FAIL; goto FAIL;
} }
@ -417,7 +419,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++; dataIndex++;
} else { } else {
@ -435,7 +437,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); tstrncpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name, sizeof(pFullSchema[i].name));
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++; dataIndex++;
} else { } else {
@ -457,7 +459,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.pAstRoot = pAst, .pAstRoot = pAst,
.topicQuery = false, .topicQuery = false,
.streamQuery = true, .streamQuery = true,
.triggerType = (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY)? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, .triggerType =
(pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark, .watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired, .igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark, .deleteMark = pObj->deleteMark,
@ -540,7 +543,8 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
return code; return code;
} }
code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0,
TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code) { if (code) {
taosMemoryFree(buf); taosMemoryFree(buf);
} }
@ -637,7 +641,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
SField *pField = taosArrayGet(createReq.pTags, 0); SField *pField = taosArrayGet(createReq.pTags, 0);
TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno); TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
strcpy(pField->name, "group_id"); tstrncpy(pField->name, "group_id", sizeof(pField->name));
pField->type = TSDB_DATA_TYPE_UBIGINT; pField->type = TSDB_DATA_TYPE_UBIGINT;
pField->flags = 0; pField->flags = 0;
pField->bytes = 8; pField->bytes = 8;
@ -963,7 +967,8 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
} }
STrans *pTrans = NULL; STrans *pTrans = NULL;
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", &pTrans); code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream",
&pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code)); mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1289,7 +1294,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
void* p = taosArrayPush(pList, &in); void *p = taosArrayPush(pList, &in);
if (p) { if (p) {
int32_t currentSize = taosArrayGetSize(pList); int32_t currentSize = taosArrayGetSize(pList);
mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64 mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
@ -1366,7 +1371,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
} }
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
int32_t code = 0; int32_t code = 0;
@ -1396,7 +1401,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (pStream->smaId != 0) { if (pStream->smaId != 0) {
mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid); mDebug("stream:%s, uid:0x%" PRIx64 " try to drop sma related stream", dropReq.name, pStream->uid);
void * pIter = NULL; void *pIter = NULL;
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
while (pIter) { while (pIter) {
@ -1925,7 +1930,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans // here create only one trans
if (pTrans == NULL) { if (pTrans == NULL) {
code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets", &pTrans); code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME,
"update task epsets", &pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
@ -2178,7 +2184,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
mndDestroyVgroupChangeInfo(&changeInfo); mndDestroyVgroupChangeInfo(&changeInfo);
_end: _end:
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
@ -2227,7 +2233,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode)
code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); code = taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
if (code == 0) { if (code == 0) {
void * px = taosArrayPush(pExecNode->pTaskList, &id); void *px = taosArrayPush(pExecNode->pTaskList, &id);
int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList); int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
if (px) { if (px) {
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num); mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
@ -2235,7 +2241,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode)
mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num); mError("s-task:0x%x failed to add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
} }
} else { } else {
mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t) entry.id.taskId); mError("s-task:0x%x failed to add into task map, since out of memory", (int32_t)entry.id.taskId);
} }
// add the new vgroups if not added yet // add the new vgroups if not added yet
@ -2252,11 +2258,12 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode)
SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
epsetAssign(&nodeEntry.epset, &pTask->info.epSet); epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry); void *px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
if (px) { if (px) {
mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList)); mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
} else { } else {
mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList)) mError("vgId:%d failed to add into nodeList, total:%d", nodeEntry.nodeId,
(int)taosArrayGetSize(pExecNode->pNodeList))
} }
} }
} }
@ -2470,7 +2477,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
mError("failed to put into task list, taskId:0x%x", pReport->taskId); mError("failed to put into task list, taskId:0x%x", pReport->taskId);
} else { } else {
int32_t size = taosArrayGetSize(pList); int32_t size = taosArrayGetSize(pList);
mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size); mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size);
} }
} }
@ -2520,7 +2527,8 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); SChkptReportInfo *pInfo =
(SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pInfo == NULL) { if (pInfo == NULL) {
SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId}; SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
if (info.pTaskList != NULL) { if (info.pTaskList != NULL) {
@ -2553,14 +2561,14 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
return code; return code;
} }
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) { static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pExistedTasks, bool *pAllSame) {
int32_t num = 0; int32_t num = 0;
int64_t chkId = INT64_MAX; int64_t chkId = INT64_MAX;
*pExistedTasks = 0; *pExistedTasks = 0;
*pAllSame = true; *pAllSame = true;
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId* p = taosArrayGet(execInfo.pTaskList, i); STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) { if (p == NULL) {
continue; continue;
} }
@ -2570,7 +2578,7 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx
} }
num += 1; num += 1;
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (chkId > pe->checkpointInfo.latestId) { if (chkId > pe->checkpointInfo.latestId) {
if (chkId != INT64_MAX) { if (chkId != INT64_MAX) {
*pAllSame = false; *pAllSame = false;
@ -2677,7 +2685,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
} }
void* p = taosArrayPush(pList, &pe->req.taskId); void *p = taosArrayPush(pList, &pe->req.taskId);
if (p == NULL) { if (p == NULL) {
mError("failed to put into task list, taskId:0x%x", pe->req.taskId); mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
} }
@ -2717,7 +2725,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId); mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
void* p = taosArrayPush(pStreamList, &streamId); void *p = taosArrayPush(pStreamList, &streamId);
if (p == NULL) { if (p == NULL) {
mError("failed to put into stream list, stream:0x%" PRIx64, streamId); mError("failed to put into stream list, stream:0x%" PRIx64, streamId);
} }
@ -2784,7 +2792,7 @@ void mndStreamResetInitTaskListLoadFlag() {
execInfo.initTaskList = false; execInfo.initTaskList = false;
} }
void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) { void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role) {
execInfo.switchFromFollower = false; execInfo.switchFromFollower = false;
if (execInfo.role == NODE_ROLE_UNINIT) { if (execInfo.role == NODE_ROLE_UNINIT) {
@ -2840,7 +2848,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream,
} }
code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
if (code){ if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;

View File

@ -64,7 +64,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
} }
} }
void* p = taosArrayPush(pList, pInfo); void *p = taosArrayPush(pList, pInfo);
if (p == NULL) { if (p == NULL) {
mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId); mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId);
} }
@ -121,8 +121,8 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
int32_t size = sizeof(SStreamTaskResetMsg); int32_t size = sizeof(SStreamTaskResetMsg);
int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans); int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SStreamTaskResetMsg* p = taosArrayGet(execInfo.pKilledChkptTrans, i); SStreamTaskResetMsg *p = taosArrayGet(execInfo.pKilledChkptTrans, i);
if (p == NULL) { if (p == NULL) {
continue; continue;
} }
@ -219,7 +219,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
SStreamTaskResetMsg* pMsg = pReq->pCont; SStreamTaskResetMsg *pMsg = pReq->pCont;
mndKillTransImpl(pMnode, pMsg->transId, ""); mndKillTransImpl(pMnode, pMsg->transId, "");
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
@ -291,7 +291,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
if (pStream->status != STREAM_STATUS__PAUSE) { if (pStream->status != STREAM_STATUS__PAUSE) {
SMPauseStreamReq reqPause = {0}; SMPauseStreamReq reqPause = {0};
strcpy(reqPause.name, pStream->name); tstrncpy(reqPause.name, pStream->name, sizeof(reqPause.name));
reqPause.igNotExists = 1; reqPause.igNotExists = 1;
int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause); int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
@ -375,8 +375,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
TAOS_RETURN(TSDB_CODE_INVALID_MSG); TAOS_RETURN(TSDB_CODE_INVALID_MSG);
} }
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execInfo.pNodeList, i); SNodeEntry *pEntry = taosArrayGet(execInfo.pNodeList, i);
if (pEntry == NULL) { if (pEntry == NULL) {
continue; continue;
} }
@ -469,7 +469,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// remove failed trans from pChkptStreams // remove failed trans from pChkptStreams
code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
if (code) { if (code) {
mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId); mError("failed to remove stream:0x%" PRIx64 " in checkpoint stream list", p->id.streamId);
} }
} }
} }
@ -576,8 +576,8 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, i
return; return;
} }
((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId); ((SMStreamHbRspMsg *)buf)->head.vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
@ -595,7 +595,7 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, i
pRpcInfo->handle = NULL; // disable auto rsp pRpcInfo->handle = NULL; // disable auto rsp
} }
void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) { void checkforOrphanTask(SMnode *pMnode, STaskStatusEntry *p, SArray *pOrphanTasks) {
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
@ -606,7 +606,7 @@ void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTask
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
void *px = taosArrayPush(pOrphanTasks, &oTask); void *px = taosArrayPush(pOrphanTasks, &oTask);
if (px == NULL) { if (px == NULL) {
mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno)); mError("failed to put task into orphan list, taskId:0x%" PRIx64 ", code:%s", p->id.taskId, tstrerror(terrno));
} }
} else { } else {
if (pStream != NULL) { if (pStream != NULL) {

View File

@ -31,7 +31,7 @@ struct SStreamTaskIter {
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) { int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter) {
*pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (*pIter == NULL) { if (*pIter == NULL) {
return terrno; return terrno;
@ -46,7 +46,7 @@ int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) {
return 0; return 0;
} }
bool streamTaskIterNextTask(SStreamTaskIter* pIter) { bool streamTaskIterNextTask(SStreamTaskIter *pIter) {
if (pIter->level >= pIter->totalLevel) { if (pIter->level >= pIter->totalLevel) {
pIter->pTask = NULL; pIter->pTask = NULL;
return false; return false;
@ -56,7 +56,7 @@ bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
pIter->level += 1; pIter->level += 1;
} }
while(pIter->level < pIter->totalLevel) { while (pIter->level < pIter->totalLevel) {
SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level); SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
if (pIter->ordinalIndex >= taosArrayGetSize(pList)) { if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
pIter->level += 1; pIter->level += 1;
@ -74,7 +74,7 @@ bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
return false; return false;
} }
int32_t streamTaskIterGetCurrent(SStreamTaskIter* pIter, SStreamTask** pTask) { int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask) {
if (pTask) { if (pTask) {
*pTask = pIter->pTask; *pTask = pIter->pTask;
if (*pTask != NULL) { if (*pTask != NULL) {
@ -85,9 +85,7 @@ int32_t streamTaskIterGetCurrent(SStreamTaskIter* pIter, SStreamTask** pTask) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
void destroyStreamTaskIter(SStreamTaskIter* pIter) { void destroyStreamTaskIter(SStreamTaskIter *pIter) { taosMemoryFree(pIter); }
taosMemoryFree(pIter);
}
static bool checkStatusForEachReplica(SVgObj *pVgroup) { static bool checkStatusForEachReplica(SVgObj *pVgroup) {
for (int32_t i = 0; i < pVgroup->replica; ++i) { for (int32_t i = 0; i < pVgroup->replica; ++i) {
@ -396,8 +394,8 @@ int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask)
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
int32_t num = 0; int32_t num = 0;
for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
SArray* pLevel = taosArrayGetP(pStream->tasks, i); SArray *pLevel = taosArrayGetP(pStream->tasks, i);
num += taosArrayGetSize(pLevel); num += taosArrayGetSize(pLevel);
} }
@ -430,8 +428,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
return 0; return 0;
} }
static void freeTaskList(void* param) { static void freeTaskList(void *param) {
SArray** pList = (SArray **)param; SArray **pList = (SArray **)param;
taosArrayDestroy(*pList); taosArrayDestroy(*pList);
} }
@ -492,7 +490,7 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
if (pEntry->nodeId == p->nodeId) { if (pEntry->nodeId == p->nodeId) {
p->hbTimestamp = pEntry->hbTimestamp; p->hbTimestamp = pEntry->hbTimestamp;
void* px = taosArrayPush(pValidList, p); void *px = taosArrayPush(pValidList, p);
if (px == NULL) { if (px == NULL) {
mError("failed to put node into list, nodeId:%d", p->nodeId); mError("failed to put node into list, nodeId:%d", p->nodeId);
} else { } else {
@ -539,7 +537,7 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
STaskId *pId = taosArrayGet(pTaskIds, i); STaskId *pId = taosArrayGet(pTaskIds, i);
if (pId == NULL) { if (pId == NULL) {
@ -548,7 +546,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
int32_t code = doRemoveTasks(pExecInfo, pId); int32_t code = doRemoveTasks(pExecInfo, pId);
if (code) { if (code) {
mError("failed to remove task in buffer list, 0x%"PRIx64, pId->taskId); mError("failed to remove task in buffer list, 0x%" PRIx64, pId->taskId);
} }
} }
} }
@ -575,7 +573,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
code = doRemoveTasks(pExecNode, &id); code = doRemoveTasks(pExecNode, &id);
if (code) { if (code) {
mError("failed to remove task in buffer list, 0x%"PRIx64, id.taskId); mError("failed to remove task in buffer list, 0x%" PRIx64, id.taskId);
} }
} }
@ -642,7 +640,7 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) { if (!existed) {
void* p = taosArrayPush(pRemovedTasks, pId); void *p = taosArrayPush(pRemovedTasks, pId);
if (p == NULL) { if (p == NULL) {
mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId); mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
} }
@ -673,7 +671,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
SChkptReportInfo* px = (SChkptReportInfo *)pIter; SChkptReportInfo *px = (SChkptReportInfo *)pIter;
if (taosArrayGetSize(px->pTaskList) == 0) { if (taosArrayGetSize(px->pTaskList) == 0) {
continue; continue;
} }
@ -727,14 +725,14 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
int32_t size = taosArrayGetSize(pDropped); int32_t size = taosArrayGetSize(pDropped);
if (size > 0) { if (size > 0) {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
int64_t* pStreamId = (int64_t *)taosArrayGet(pDropped, i); int64_t *pStreamId = (int64_t *)taosArrayGet(pDropped, i);
if (pStreamId == NULL) { if (pStreamId == NULL) {
continue; continue;
} }
code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId)); code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId));
if (code) { if (code) {
mError("failed to remove stream in buf:0x%"PRIx64, *pStreamId); mError("failed to remove stream in buf:0x%" PRIx64, *pStreamId);
} }
} }
@ -802,10 +800,10 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) { int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
*pInfo = NULL; *pInfo = NULL;
void* px = taosHashGet(pHash, &streamId, sizeof(streamId)); void *px = taosHashGet(pHash, &streamId, sizeof(streamId));
if (px != NULL) { if (px != NULL) {
*pInfo = px; *pInfo = px;
return 0; return 0;
@ -865,12 +863,12 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
} }
} }
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
taosArrayDestroy(pInfo->pTaskList); taosArrayDestroy(pInfo->pTaskList);
pInfo->pTaskList = NULL; pInfo->pTaskList = NULL;
} }
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
int32_t code = 0; int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash); int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) { if (numOfStreams == 0) {
@ -881,13 +879,13 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
if (code == 0) { if (code == 0) {
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams); mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
} else { } else {
mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list, remain:%d", streamId, numOfStreams); mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
} }
return code; return code;
} }
int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) { int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
int32_t code = 0; int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash); int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) { if (numOfStreams == 0) {
@ -898,14 +896,14 @@ int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) {
if (code == 0) { if (code == 0) {
mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams); mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
} else { } else {
mError("failed to remove stream:0x%"PRIx64" in chkpt-report list, remain:%d", streamId, numOfStreams); mError("failed to remove stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
} }
return code; return code;
} }
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) { int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) {
SChkptReportInfo* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); SChkptReportInfo *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
if (pInfo != NULL) { if (pInfo != NULL) {
taosArrayClear(pInfo->pTaskList); taosArrayClear(pInfo->pTaskList);
mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId, mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
@ -919,28 +917,28 @@ int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) {
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
int8_t status = atomic_load_8(&pStream->status); int8_t status = atomic_load_8(&pStream->status);
if (status == STREAM_STATUS__NORMAL) { if (status == STREAM_STATUS__NORMAL) {
strcpy(dst, "ready"); tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__STOP) { } else if (status == STREAM_STATUS__STOP) {
strcpy(dst, "stop"); tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__FAILED) { } else if (status == STREAM_STATUS__FAILED) {
strcpy(dst, "failed"); tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__RECOVER) { } else if (status == STREAM_STATUS__RECOVER) {
strcpy(dst, "recover"); tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__PAUSE) { } else if (status == STREAM_STATUS__PAUSE) {
strcpy(dst, "paused"); tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
} }
} }
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->conf.trigger; int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) { if (trigger == STREAM_TRIGGER_AT_ONCE) {
strcpy(dst, "at once"); tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) { } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
strcpy(dst, "window close"); tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (trigger == STREAM_TRIGGER_MAX_DELAY) { } else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
strcpy(dst, "max delay"); tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
strcpy(dst, "force window close"); tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
} }
} }
@ -1000,7 +998,7 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
TSDB_CHECK_CODE(code, lino, _end); TSDB_CHECK_CODE(code, lino, _end);
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0}; char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
mndShowStreamStatus(status2, pStream); mndShowStreamStatus(status2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1047,7 +1045,7 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
TSDB_CHECK_CODE(code, lino, _end); TSDB_CHECK_CODE(code, lino, _end);
char trigger[20 + VARSTR_HEADER_SIZE] = {0}; char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[20] = {0}; char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
mndShowStreamTrigger(trigger2, pStream); mndShowStreamTrigger(trigger2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger)); STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1069,7 +1067,7 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
// checkpoint interval // checkpoint interval
char tmp[20 + VARSTR_HEADER_SIZE] = {0}; char tmp[20 + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval); (void)tsnprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%d sec", tsStreamCheckpointInterval);
varDataSetLen(tmp, strlen(varDataVal(tmp))); varDataSetLen(tmp, strlen(varDataVal(tmp)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1089,7 +1087,7 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
// history scan idle // history scan idle
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a"); tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
memset(dstStr, 0, tListLen(dstStr)); memset(dstStr, 0, tListLen(dstStr));
STR_TO_VARSTR(dstStr, scanHistoryIdle) STR_TO_VARSTR(dstStr, scanHistoryIdle)
@ -1105,7 +1103,8 @@ _end:
return code; return code;
} }
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows, int32_t precision) { int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows,
int32_t precision) {
SColumnInfoData *pColInfo = NULL; SColumnInfoData *pColInfo = NULL;
int32_t cols = 0; int32_t cols = 0;
int32_t code = 0; int32_t code = 0;
@ -1254,7 +1253,7 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
} else { } else {
sprintf(buf, formatTotalMb, pe->outputTotal); (void)tsnprintf(buf, sizeof(buf), formatTotalMb, pe->outputTotal);
memset(vbuf, 0, tListLen(vbuf)); memset(vbuf, 0, tListLen(vbuf));
STR_TO_VARSTR(vbuf, buf); STR_TO_VARSTR(vbuf, buf);
@ -1281,14 +1280,6 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
TSDB_CHECK_CODE(code, lino, _end); TSDB_CHECK_CODE(code, lino, _end);
} }
// output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// info // info
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char *sinkStr = "%.2f MiB"; const char *sinkStr = "%.2f MiB";
@ -1401,7 +1392,7 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
code = colDataSetVal(pColInfo, numOfRows, 0, true); code = colDataSetVal(pColInfo, numOfRows, 0, true);
TSDB_CHECK_CODE(code, lino, _end); TSDB_CHECK_CODE(code, lino, _end);
_end: _end:
if (code) { if (code) {
mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code)); mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code));
} }
@ -1418,7 +1409,7 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent)
return true; return true;
} }
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) { void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo) {
if (pInfo != NULL) { if (pInfo != NULL) {
taosArrayDestroy(pInfo->pUpdateNodeList); taosArrayDestroy(pInfo->pUpdateNodeList);
taosHashCleanup(pInfo->pDBMap); taosHashCleanup(pInfo->pDBMap);
@ -1457,7 +1448,7 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons
int32_t num = taosArrayGetSize(pNodeList); int32_t num = taosArrayGetSize(pNodeList);
for (int32_t j = 0; j < num; ++j) { for (int32_t j = 0; j < num; ++j) {
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if(pCurrent == NULL) { if (pCurrent == NULL) {
continue; continue;
} }
@ -1479,7 +1470,7 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
epsetAssign(&updateInfo.newEp, &pCurrent->epset); epsetAssign(&updateInfo.newEp, &pCurrent->epset);
void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo); void *p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
TSDB_CHECK_NULL(p, code, lino, _err, terrno); TSDB_CHECK_NULL(p, code, lino, _err, terrno);
} }
@ -1498,11 +1489,11 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons
return code; return code;
_err: _err:
mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino); mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
mndDestroyVgroupChangeInfo(pInfo); mndDestroyVgroupChangeInfo(pInfo);
return code; return code;
} }
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
bool allReady = false; bool allReady = false;

View File

@ -691,7 +691,7 @@ static void ipRangeToStr(SIpV4Range *range, char *buf) {
(void)uv_inet_ntop(AF_INET, &addr, buf, 32); (void)uv_inet_ntop(AF_INET, &addr, buf, 32);
if (range->mask != 32) { if (range->mask != 32) {
(void)sprintf(buf + strlen(buf), "/%d", range->mask); (void)tsnprintf(buf + strlen(buf), 36 - strlen(buf), "/%d", range->mask);
} }
return; return;
} }
@ -699,14 +699,14 @@ static bool isDefaultRange(SIpV4Range *pRange) {
static SIpV4Range val = {.ip = 16777343, .mask = 32}; static SIpV4Range val = {.ip = 16777343, .mask = 32};
return pRange->ip == val.ip && pRange->mask == val.mask; return pRange->ip == val.ip && pRange->mask == val.mask;
} }
static int32_t ipRangeListToStr(SIpV4Range *range, int32_t num, char *buf) { static int32_t ipRangeListToStr(SIpV4Range *range, int32_t num, char *buf, int64_t bufLen) {
int32_t len = 0; int32_t len = 0;
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
char tbuf[36] = {0}; char tbuf[36] = {0};
SIpV4Range *pRange = &range[i]; SIpV4Range *pRange = &range[i];
ipRangeToStr(&range[i], tbuf); ipRangeToStr(&range[i], tbuf);
len += sprintf(buf + len, "%s,", tbuf); len += tsnprintf(buf + len, bufLen - len, "%s,", tbuf);
} }
if (len > 0) buf[len - 1] = 0; if (len > 0) buf[len - 1] = 0;
return len; return len;
@ -738,11 +738,13 @@ int32_t convertIpWhiteListToStr(SIpWhiteList *pList, char **buf) {
*buf = NULL; *buf = NULL;
return 0; return 0;
} }
*buf = taosMemoryCalloc(1, pList->num * 36); int64_t bufLen = pList->num * 36;
*buf = taosMemoryCalloc(1, bufLen);
if (*buf == NULL) { if (*buf == NULL) {
return 0; return 0;
} }
int32_t len = ipRangeListToStr(pList->pIpRange, pList->num, *buf);
int32_t len = ipRangeListToStr(pList->pIpRange, pList->num, *buf, bufLen);
if (len == 0) { if (len == 0) {
taosMemoryFreeClear(*buf); taosMemoryFreeClear(*buf);
return 0; return 0;
@ -1899,8 +1901,8 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[1000] = {0}; char detail[1000] = {0};
TAOS_UNUSED(snprintf(detail, sizeof(detail), "enable:%d, superUser:%d, sysInfo:%d, password:xxx", createReq.enable, (void)tsnprintf(detail, sizeof(detail), "enable:%d, superUser:%d, sysInfo:%d, password:xxx", createReq.enable,
createReq.superUser, createReq.sysInfo)); createReq.superUser, createReq.sysInfo);
char operation[15] = {0}; char operation[15] = {0};
if (createReq.isImport == 1) { if (createReq.isImport == 1) {
tstrncpy(operation, "importUser", sizeof(operation)); tstrncpy(operation, "importUser", sizeof(operation));
@ -2502,7 +2504,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
char detail[1000] = {0}; char detail[1000] = {0};
(void)snprintf(detail, sizeof(detail), (void)tsnprintf(detail, sizeof(detail),
"alterType:%s, enable:%d, superUser:%d, sysInfo:%d, createdb:%d, tabName:%s, password:xxx", "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, createdb:%d, tabName:%s, password:xxx",
mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo,
alterReq.createdb ? 1 : 0, alterReq.tabName); alterReq.createdb ? 1 : 0, alterReq.tabName);
@ -2893,12 +2895,12 @@ static int32_t mndLoopHash(SHashObj *hash, char *priType, SSDataBlock *pBlock, i
if (nodesStringToNode(value, &pAst) == 0) { if (nodesStringToNode(value, &pAst) == 0) {
if (nodesNodeToSQL(pAst, *sql, bufSz, &sqlLen) != 0) { if (nodesNodeToSQL(pAst, *sql, bufSz, &sqlLen) != 0) {
sqlLen = 5; sqlLen = 5;
(void)snprintf(*sql, bufSz, "error"); (void)tsnprintf(*sql, bufSz, "error");
} }
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
} else { } else {
sqlLen = 5; sqlLen = 5;
(void)snprintf(*sql, bufSz, "error"); (void)tsnprintf(*sql, bufSz, "error");
} }
STR_WITH_MAXSIZE_TO_VARSTR((*condition), (*sql), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR((*condition), (*sql), pShow->pMeta->pSchemas[cols].bytes);

View File

@ -14,7 +14,6 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndVgroup.h"
#include "audit.h" #include "audit.h"
#include "mndArbGroup.h" #include "mndArbGroup.h"
#include "mndDb.h" #include "mndDb.h"
@ -27,6 +26,7 @@
#include "mndTopic.h" #include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h"
#include "tmisce.h" #include "tmisce.h"
#define VGROUP_VER_NUMBER 1 #define VGROUP_VER_NUMBER 1
@ -1077,7 +1077,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
char buf1[20] = {0}; char buf1[20] = {0};
char role[20] = "offline"; char role[20] = "offline";
if (!exist) { if (!exist) {
strcpy(role, "dropping"); tstrncpy(role, "dropping", sizeof(role));
} else if (online) { } else if (online) {
char *star = ""; char *star = "";
if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER || if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER ||
@ -2561,7 +2561,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
sprintf(obj, "%d", req.vgId); (void)tsnprintf(obj, sizeof(obj), "%d", req.vgId);
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen); auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);

View File

@ -39,11 +39,11 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); return snprintf(buf, WAL_FILE_LEN, "%s/meta-ver%d", pWal->path, metaVer);
} }
static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) { static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
return sprintf(buf, "%s/meta-ver.tmp", pWal->path); return snprintf(buf, WAL_FILE_LEN, "%s/meta-ver.tmp", pWal->path);
} }
static FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) { static FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) {
@ -819,7 +819,7 @@ int32_t walRollFileInfo(SWal* pWal) {
} }
int32_t walMetaSerialize(SWal* pWal, char** serialized) { int32_t walMetaSerialize(SWal* pWal, char** serialized) {
char buf[30]; char buf[WAL_JSON_BUF_SIZE];
int sz = taosArrayGetSize(pWal->fileInfoSet); int sz = taosArrayGetSize(pWal->fileInfoSet);
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
cJSON* pMeta = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject();
@ -841,19 +841,19 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) { if (!cJSON_AddItemToObject(pRoot, "meta", pMeta)) {
wInfo("vgId:%d, failed to add meta to root", pWal->cfg.vgId); wInfo("vgId:%d, failed to add meta to root", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pWal->vers.firstVer); snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.firstVer);
if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) { if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) {
wInfo("vgId:%d, failed to add firstVer to meta", pWal->cfg.vgId); wInfo("vgId:%d, failed to add firstVer to meta", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pWal->vers.snapshotVer); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.snapshotVer);
if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) { if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) {
wInfo("vgId:%d, failed to add snapshotVer to meta", pWal->cfg.vgId); wInfo("vgId:%d, failed to add snapshotVer to meta", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pWal->vers.commitVer); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.commitVer);
if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) { if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) {
wInfo("vgId:%d, failed to add commitVer to meta", pWal->cfg.vgId); wInfo("vgId:%d, failed to add commitVer to meta", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pWal->vers.lastVer); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.lastVer);
if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) { if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) {
wInfo("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId); wInfo("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId);
} }
@ -874,23 +874,23 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
} }
// cjson only support int32_t or double // cjson only support int32_t or double
// string are used to prohibit the loss of precision // string are used to prohibit the loss of precision
(void)sprintf(buf, "%" PRId64, pInfo->firstVer); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->firstVer);
if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) { if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) {
wInfo("vgId:%d, failed to add firstVer to field", pWal->cfg.vgId); wInfo("vgId:%d, failed to add firstVer to field", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pInfo->lastVer); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->lastVer);
if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) { if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) {
wInfo("vgId:%d, failed to add lastVer to field", pWal->cfg.vgId); wInfo("vgId:%d, failed to add lastVer to field", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pInfo->createTs); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->createTs);
if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) { if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) {
wInfo("vgId:%d, failed to add createTs to field", pWal->cfg.vgId); wInfo("vgId:%d, failed to add createTs to field", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pInfo->closeTs); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->closeTs);
if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) { if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) {
wInfo("vgId:%d, failed to add closeTs to field", pWal->cfg.vgId); wInfo("vgId:%d, failed to add closeTs to field", pWal->cfg.vgId);
} }
(void)sprintf(buf, "%" PRId64, pInfo->fileSize); (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pInfo->fileSize);
if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) { if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) {
wInfo("vgId:%d, failed to add fileSize to field", pWal->cfg.vgId); wInfo("vgId:%d, failed to add fileSize to field", pWal->cfg.vgId);
} }