Merge pull request #27914 from taosdata/fix/TD-31899
fix:[TD-31899] check return value by malloc/strdup
This commit is contained in:
commit
9e115de97f
|
@ -736,6 +736,16 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
|
|||
continue;
|
||||
}
|
||||
char* tmp = taosStrdup(filename);
|
||||
if (tmp == NULL) {
|
||||
tscError("failed to dup string:%s since %s", filename, terrstr());
|
||||
if (taosUnLockFile(pFile) != 0) {
|
||||
tscError("failed to unlock file:%s, terrno:%d", filename, terrno);
|
||||
}
|
||||
if (taosCloseFile(&(pFile)) != 0) {
|
||||
tscError("failed to close file:%s, terrno:%d", filename, terrno);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
|
||||
taosMemoryFree(tmp);
|
||||
}
|
||||
|
|
|
@ -1939,6 +1939,10 @@ int32_t smlClearForRerun(SSmlHandle *info) {
|
|||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
|
||||
if (unlikely(info->lines == NULL)) {
|
||||
uError("SML:0x%" PRIx64 " info->lines == NULL", info->id);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
(void)memset(&info->preLine, 0, sizeof(SSmlLineInfo));
|
||||
|
@ -1971,10 +1975,14 @@ static bool getLine(SSmlHandle *info, char *lines[], char **rawLine, char *rawLi
|
|||
|
||||
if (*rawLine != NULL && (uDebugFlag & DEBUG_DEBUG)) {
|
||||
char *print = taosMemoryCalloc(*len + 1, 1);
|
||||
(void)memcpy(print, *tmp, *len);
|
||||
uDebug("SML:0x%" PRIx64 " smlParseLine is raw, numLines:%d, protocol:%d, len:%d, data:%s", info->id, numLines,
|
||||
info->protocol, *len, print);
|
||||
taosMemoryFree(print);
|
||||
if (print != NULL){
|
||||
(void)memcpy(print, *tmp, *len);
|
||||
uDebug("SML:0x%" PRIx64 " smlParseLine is raw, numLines:%d, protocol:%d, len:%d, data:%s", info->id, numLines,
|
||||
info->protocol, *len, print);
|
||||
taosMemoryFree(print);
|
||||
} else{
|
||||
uError("SML:0x%" PRIx64 " smlParseLine taosMemoryCalloc failed", info->id);
|
||||
}
|
||||
} else {
|
||||
uDebug("SML:0x%" PRIx64 " smlParseLine is not numLines:%d, protocol:%d, len:%d, data:%s", info->id, numLines,
|
||||
info->protocol, *len, *tmp);
|
||||
|
|
|
@ -300,6 +300,7 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
|
|||
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
||||
int32_t code = 0;
|
||||
if (conf == NULL || key == NULL || value == NULL) {
|
||||
tscError("tmq_conf_set null, conf:%p key:%p value:%p", conf, key, value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
if (strcasecmp(key, "group.id") == 0) {
|
||||
|
@ -320,6 +321,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
conf->autoCommit = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
tscError("invalid value for enable.auto.commit: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
@ -328,6 +330,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
int64_t tmp;
|
||||
code = taosStr2int64(value, &tmp);
|
||||
if (tmp < 0 || code != 0) {
|
||||
tscError("invalid value for auto.commit.interval.ms: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
|
||||
|
@ -338,6 +341,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
int64_t tmp;
|
||||
code = taosStr2int64(value, &tmp);
|
||||
if (tmp < 6000 || tmp > 1800000 || code != 0) {
|
||||
tscError("invalid value for session.timeout.ms: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->sessionTimeoutMs = tmp;
|
||||
|
@ -348,6 +352,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
int64_t tmp;
|
||||
code = taosStr2int64(value, &tmp);
|
||||
if (tmp < 1000 || tmp >= conf->sessionTimeoutMs || code != 0) {
|
||||
tscError("invalid value for heartbeat.interval.ms: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->heartBeatIntervalMs = tmp;
|
||||
|
@ -358,6 +363,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
int32_t tmp;
|
||||
code = taosStr2int32(value, &tmp);
|
||||
if (tmp < 1000 || code != 0) {
|
||||
tscError("invalid value for max.poll.interval.ms: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->maxPollIntervalMs = tmp;
|
||||
|
@ -375,6 +381,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
tscError("invalid value for auto.offset.reset: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
@ -387,6 +394,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
conf->withTbName = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
tscError("invalid value for msg.with.table.name: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
@ -399,22 +407,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
conf->snapEnable = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
tscError("invalid value for experimental.snapshot.enable: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = taosStrdup(value);
|
||||
void *tmp = taosStrdup(value);
|
||||
if (tmp == NULL) {
|
||||
tscError("tmq_conf_set out of memory:%d", terrno);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->ip = tmp;
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "td.connect.user") == 0) {
|
||||
conf->user = taosStrdup(value);
|
||||
void *tmp = taosStrdup(value);
|
||||
if (tmp == NULL) {
|
||||
tscError("tmq_conf_set out of memory:%d", terrno);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->user = tmp;
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "td.connect.pass") == 0) {
|
||||
conf->pass = taosStrdup(value);
|
||||
void *tmp = taosStrdup(value);
|
||||
if (tmp == NULL) {
|
||||
tscError("tmq_conf_set out of memory:%d", terrno);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
conf->pass = tmp;
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
|
@ -422,6 +446,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
int64_t tmp;
|
||||
code = taosStr2int64(value, &tmp);
|
||||
if (tmp <= 0 || tmp > 65535 || code != 0) {
|
||||
tscError("invalid value for td.connect.port: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
|
||||
|
@ -437,6 +462,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
conf->replayEnable = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
tscError("invalid value for enable.replay: %s", value);
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
@ -458,6 +484,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
tscError("unknown key: %s", key);
|
||||
return TMQ_CONF_UNKNOWN;
|
||||
}
|
||||
|
||||
|
@ -468,7 +495,11 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
|
|||
SArray* container = &list->container;
|
||||
if (src == NULL || src[0] == 0) return TSDB_CODE_INVALID_PARA;
|
||||
char* topic = taosStrdup(src);
|
||||
if (taosArrayPush(container, &topic) == NULL) return TSDB_CODE_INVALID_PARA;
|
||||
if (topic == NULL) return terrno;
|
||||
if (taosArrayPush(container, &topic) == NULL) {
|
||||
taosMemoryFree(topic);
|
||||
return terrno;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -947,13 +978,13 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
|
||||
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
||||
if (tlen < 0) {
|
||||
tscError("tSerializeSMqHbReq failed");
|
||||
tscError("tSerializeSMqHbReq failed, size:%d", tlen);
|
||||
goto OVER;
|
||||
}
|
||||
|
||||
void* pReq = taosMemoryCalloc(1, tlen);
|
||||
if (tlen < 0) {
|
||||
tscError("failed to malloc MqHbReq msg, size:%d", tlen);
|
||||
if (pReq == NULL) {
|
||||
tscError("failed to malloc MqHbReq msg, code:%d", terrno);
|
||||
goto OVER;
|
||||
}
|
||||
|
||||
|
@ -3514,6 +3545,10 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
}
|
||||
(void)taosThreadMutexInit(&pCommon->mutex, 0);
|
||||
pCommon->pTopicName = taosStrdup(pTopic->topicName);
|
||||
if (pCommon->pTopicName == NULL) {
|
||||
code = terrno;
|
||||
goto end;
|
||||
}
|
||||
pCommon->consumerId = tmq->consumerId;
|
||||
|
||||
for (int32_t i = 0; i < (*numOfAssignment); ++i) {
|
||||
|
|
|
@ -1092,6 +1092,9 @@ int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope) {
|
|||
while((scope = strsep(&pScopeStr, "|")) != NULL){
|
||||
taosMemoryFreeClear(tmp);
|
||||
tmp = taosStrdup(scope);
|
||||
if (tmp == NULL) {
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
(void)strtrim(tmp);
|
||||
if (0 == strcasecmp(tmp, "all")) {
|
||||
slowScope |= SLOW_LOG_TYPE_ALL;
|
||||
|
|
|
@ -463,6 +463,15 @@ static void freeItem(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
#define ADD_TOPIC_TO_ARRAY(element, array) \
|
||||
char *newTopicCopy = taosStrdup(element); \
|
||||
MND_TMQ_NULL_CHECK(newTopicCopy);\
|
||||
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
|
||||
taosMemoryFree(newTopicCopy);\
|
||||
code = terrno;\
|
||||
goto END;\
|
||||
}
|
||||
|
||||
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
|
||||
int32_t code = 0;
|
||||
pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
|
||||
|
@ -477,15 +486,13 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
|
|||
if (i >= oldTopicNum) {
|
||||
void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
|
||||
MND_TMQ_NULL_CHECK(tmp);
|
||||
char *newTopicCopy = taosStrdup(tmp);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy));
|
||||
ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
|
||||
j++;
|
||||
continue;
|
||||
} else if (j >= newTopicNum) {
|
||||
void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
|
||||
MND_TMQ_NULL_CHECK(tmp);
|
||||
char *oldTopicCopy = taosStrdup(tmp);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy));
|
||||
ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
|
@ -499,13 +506,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
|
|||
j++;
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
char *oldTopicCopy = taosStrdup(oldTopic);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy));
|
||||
ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *newTopicCopy = taosStrdup(newTopic);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy));
|
||||
ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
|
||||
j++;
|
||||
continue;
|
||||
}
|
||||
|
@ -789,6 +794,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
char *pNewTopic = taosStrdup(tmp);
|
||||
if (pNewTopic == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
|
||||
bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
|
||||
if (existing) {
|
||||
|
|
|
@ -113,6 +113,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
|
|||
MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
|
||||
} else {
|
||||
req.qmsg = taosStrdup("");
|
||||
MND_TMQ_NULL_CHECK(req.qmsg);
|
||||
}
|
||||
req.subType = pSub->subType;
|
||||
req.withMeta = pSub->withMeta;
|
||||
|
|
|
@ -451,12 +451,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
topicObj.dbUid = pDb->uid;
|
||||
topicObj.version = 1;
|
||||
topicObj.sql = taosStrdup(pCreate->sql);
|
||||
MND_TMQ_NULL_CHECK(topicObj.sql);
|
||||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||
topicObj.subType = pCreate->subType;
|
||||
topicObj.withMeta = pCreate->withMeta;
|
||||
|
||||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
topicObj.ast = taosStrdup(pCreate->ast);
|
||||
MND_TMQ_NULL_CHECK(topicObj.ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
||||
MND_TMQ_RETURN_CHECK(nodesStringToNode(pCreate->ast, &pAst));
|
||||
|
@ -482,6 +484,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
if(pCreate->ast != NULL){
|
||||
qDebugL("topic:%s ast %s", topicObj.name, pCreate->ast);
|
||||
topicObj.ast = taosStrdup(pCreate->ast);
|
||||
MND_TMQ_NULL_CHECK(topicObj.ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,9 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {
|
|||
}
|
||||
pVnode->pTq = pTq;
|
||||
pTq->path = taosStrdup(path);
|
||||
if (pTq->path == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
pTq->pVnode = pVnode;
|
||||
|
||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
|
|
@ -341,7 +341,11 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
|
|||
handle->execHandle.subType = req->subType;
|
||||
handle->fetchMeta = req->withMeta;
|
||||
if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg);
|
||||
void *tmp = taosStrdup(req->qmsg);
|
||||
if (tmp == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
handle->execHandle.execCol.qmsg = tmp;
|
||||
} else if (req->subType == TOPIC_SUB_TYPE__DB) {
|
||||
handle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
|
@ -350,7 +354,11 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
|
|||
}
|
||||
}else if(req->subType == TOPIC_SUB_TYPE__TABLE){
|
||||
handle->execHandle.execTb.suid = req->suid;
|
||||
handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
|
||||
void *tmp = taosStrdup(req->qmsg);
|
||||
if (tmp == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
handle->execHandle.execTb.qmsg = tmp;
|
||||
}
|
||||
|
||||
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
|
||||
|
|
|
@ -69,6 +69,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, void* pRsp, int32_t
|
|||
|
||||
for (int32_t i = 0; i < n; i++) {
|
||||
char* tbName = taosStrdup(mr.me.name);
|
||||
if (tbName == NULL) {
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
}
|
||||
if(taosArrayPush(((SMqDataRspCommon*)pRsp)->blockTbName, &tbName) == NULL){
|
||||
continue;
|
||||
}
|
||||
|
@ -213,6 +217,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
|
|||
}
|
||||
} else {
|
||||
char* tbName = taosStrdup(qExtractTbnameFromTask(task));
|
||||
if (tbName == NULL) {
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
|
||||
return terrno;
|
||||
}
|
||||
if (taosArrayPush(pRsp->common.blockTbName, &tbName) == NULL){
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
|
|
Loading…
Reference in New Issue