chore: merge main

This commit is contained in:
Haojun Liao 2025-03-03 14:29:10 +08:00
commit dfd1beaead
11 changed files with 85 additions and 71 deletions

View File

@ -2232,6 +2232,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
int32_t blockVersion = *(int32_t*)p;
int32_t dataLen = estimateJsonLen(pResultInfo);
if (dataLen <= 0) {
tscError("doConvertJson error: estimateJsonLen failed");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}

View File

@ -888,6 +888,7 @@ int taos_select_db(TAOS *taos, const char *db) {
if (db == NULL || strlen(db) == 0) {
releaseTscObj(*(int64_t *)taos);
tscError("invalid parameter for %s", db == NULL ? "db is NULL" : "db is empty");
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}

View File

@ -372,11 +372,13 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg == NULL) {
tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
return TSDB_CODE_TSC_INVALID_INPUT;
}
if (param == NULL) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
tscError("processCreateSTableRsp: invalid input param, param is NULL");
return TSDB_CODE_TSC_INVALID_INPUT;
}

View File

@ -170,6 +170,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
int32_t code = 0;
SStatusReq req = {0};
req.timestamp = taosGetTimestampMs();
pMgmt->statusSeq++;
dDebug("send status req to mnode, statusSeq:%d, begin to mgnt statusInfolock", pMgmt->statusSeq);
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
@ -230,7 +231,6 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
(*pMgmt->getQnodeLoadsFp)(&req.qload);
pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
req.analVer = taosAnalGetVersion();

View File

@ -91,10 +91,8 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSTableInfoReq(pBuf, bufLen, &infoReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSTableInfoReq(pBuf, bufLen, &infoReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -121,10 +119,8 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSUseDbReq(pBuf, bufLen, &usedbReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -144,10 +140,9 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -167,10 +162,8 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -189,10 +182,8 @@ int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSServerVerReq(pBuf, bufLen, &req) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSServerVerReq(pBuf, bufLen, &req);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -213,10 +204,8 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -237,10 +226,8 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSUserIndexReq(pBuf, bufLen, &indexReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSUserIndexReq(pBuf, bufLen, &indexReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -272,10 +259,10 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
taosArrayDestroy(funcReq.pFuncNames);
return terrno;
}
if(tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq) < 0)
{
int32_t ret = tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq);
if (ret < 0) {
taosArrayDestroy(funcReq.pFuncNames);
return TSDB_CODE_TSC_INVALID_INPUT;
return ret;
}
taosArrayDestroy(funcReq.pFuncNames);
@ -299,9 +286,8 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
if (NULL == pBuf) {
return terrno;
}
if (tSerializeSGetUserAuthReq(pBuf, bufLen, &req) < 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSGetUserAuthReq(pBuf, bufLen, &req);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -322,10 +308,8 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSTableIndexReq(pBuf, bufLen, &indexReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSTableIndexReq(pBuf, bufLen, &indexReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -349,10 +333,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -373,10 +355,8 @@ int32_t queryBuildGetViewMetaMsg(void *input, char **msg, int32_t msgSize, int32
if (NULL == pBuf) {
return terrno;
}
if(tSerializeSViewMetaReq(pBuf, bufLen, &req) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeSViewMetaReq(pBuf, bufLen, &req);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -398,10 +378,8 @@ int32_t queryBuildGetTableTSMAMsg(void *input, char **msg, int32_t msgSize, int3
if (NULL == pBuf) {
return terrno;
}
if(tSerializeTableTSMAInfoReq(pBuf, bufLen, &req) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeTableTSMAInfoReq(pBuf, bufLen, &req);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -424,10 +402,8 @@ int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *
{
return terrno;
}
if(tSerializeTableTSMAInfoReq(pBuf, bufLen, &req) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeTableTSMAInfoReq(pBuf, bufLen, &req);
if(ret < 0) return ret;
*msg = pBuf;
*msgLen = bufLen;
@ -445,10 +421,8 @@ int32_t queryBuildGetStreamProgressMsg(void* input, char** msg, int32_t msgSize,
return terrno;
}
if(tSerializeStreamProgressReq(pBuf, len, input) < 0)
{
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t ret = tSerializeStreamProgressReq(pBuf, len, input);
if (ret < 0) return ret;
*msg = pBuf;
*msgLen = len;
@ -462,6 +436,7 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
code = TSDB_CODE_TSC_INVALID_INPUT;
qError("invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
goto PROCESS_USEDB_OVER;
}
@ -673,6 +648,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
STableMetaRsp metaRsp = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessTableMetaRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
code = TSDB_CODE_TSC_INVALID_INPUT;
goto PROCESS_META_OVER;
}
@ -729,6 +705,7 @@ static int32_t queryProcessTableNameRsp(void *output, char *msg, int32_t msgSize
STableMetaRsp metaRsp = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessTableNameRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
code = TSDB_CODE_TSC_INVALID_INPUT;
goto PROCESS_NAME_OVER;
}
@ -785,6 +762,7 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessQnodeListRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
@ -804,6 +782,7 @@ int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessDnodeListRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
@ -824,6 +803,7 @@ int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) {
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetSerVerRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
@ -846,6 +826,7 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
SDbCfgRsp out = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetDbCfgRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -863,6 +844,7 @@ int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) {
SUserIndexRsp out = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetIndexRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -880,6 +862,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
SRetrieveFuncRsp out = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessRetrieveFuncRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -904,6 +887,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetUserAuthRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -917,6 +901,7 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetTbIndexRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -931,6 +916,7 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetTbCfgRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -952,6 +938,7 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetViewMetaRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -975,6 +962,7 @@ int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) {
int32_t queryProcessGetTbTSMARsp(void* output, char* msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
qError("queryProcessGetTbTSMARsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}
@ -988,6 +976,7 @@ int32_t queryProcessGetTbTSMARsp(void* output, char* msg, int32_t msgSize) {
int32_t queryProcessStreamProgressRsp(void* output, char* msg, int32_t msgSize) {
if (!output || !msg || msgSize <= 0) {
qError("queryProcessStreamProgressRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
return TSDB_CODE_TSC_INVALID_INPUT;
}

View File

@ -148,6 +148,7 @@ int32_t schedulerUpdatePolicy(int32_t policy) {
qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy);
break;
default:
qError("invalid schedule policy %d", policy);
SCH_RET(TSDB_CODE_TSC_INVALID_INPUT);
}

View File

@ -35,6 +35,7 @@ void raftStoreClearVote(SSyncNode *pNode);
void raftStoreNextTerm(SSyncNode *pNode);
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term);
SyncTerm raftStoreGetTerm(SSyncNode *pNode);
SyncTerm raftStoreTryGetTerm(SSyncNode *pNode);
#ifdef __cplusplus
}

View File

@ -213,3 +213,13 @@ SyncTerm raftStoreGetTerm(SSyncNode *pNode) {
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
return term;
}
SyncTerm raftStoreTryGetTerm(SSyncNode *pNode) {
SyncTerm term = 0;
if (taosThreadMutexTryLock(&pNode->raftStore.mutex) == 0) {
term = pNode->raftStore.currentTerm;
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
}
return term;
}

View File

@ -286,7 +286,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
const char* format, ...) {
if (pNode == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = raftStoreGetTerm(pNode);
int64_t currentTerm = raftStoreTryGetTerm(pNode);
// save error code, otherwise it will be overwritten
int32_t errCode = terrno;
@ -467,17 +467,24 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
int64_t execTime) {
if (printX) {
sHTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, x",
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
} else {
sHTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
execTime);
if (sDebugFlag & DEBUG_TRACE) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
if (printX) {
sHTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%s}, x",
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf);
} else {
sHTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%s}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, timerElapsed, execTime);
}
}
}

View File

@ -48,6 +48,7 @@ class TDTestCase:
tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,4,'ready')
tdSql.checkData(4,4,'ready')
time.sleep(1)
tdSql.query("select * from information_schema.ins_mnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader')

View File

@ -42,6 +42,7 @@ class TDTestCase:
return buildPath
def check_setup_cluster_status(self):
time.sleep(1)
tdSql.query("select * from information_schema.ins_mnodes")
for mnode in tdSql.queryResult:
name = mnode[1]