Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
Hongze Cheng 2024-03-06 08:57:51 +08:00
commit dd155a52fa
6 changed files with 26 additions and 35 deletions

View File

@ -586,7 +586,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSupportVnodes = tsNumOfCores * 2; tsNumOfSupportVnodes = tsNumOfCores * 2;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2); tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)

View File

@ -111,7 +111,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);

View File

@ -610,7 +610,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
for(int32_t j = 0; j < tagSize; j++){ for(int32_t j = 0; j < tagSize; j++){
SJson* item = tjsonGetArrayItem(arrayTag, j); SJson* item = tjsonGetArrayItem(arrayTag, j);
*(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN);
tjsonGetStringValue(item, "name", *(labels + j)); tjsonGetStringValue(item, "name", *(labels + j));
*(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN); *(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN);
@ -626,7 +626,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
for(int32_t j = 0; j < metricLen; j++){ for(int32_t j = 0; j < metricLen; j++){
SJson *item = tjsonGetArrayItem(metrics, j); SJson *item = tjsonGetArrayItem(metrics, j);
char name[MONITOR_METRIC_NAME_LEN] = {0}; char name[MONITOR_METRIC_NAME_LEN] = {0};
tjsonGetStringValue(item, "name", name); tjsonGetStringValue(item, "name", name);
double value = 0; double value = 0;
@ -636,7 +636,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
tjsonGetDoubleValue(item, "type", &type); tjsonGetDoubleValue(item, "type", &type);
int32_t metricNameLen = strlen(name) + strlen(tableName) + 2; int32_t metricNameLen = strlen(name) + strlen(tableName) + 2;
char* metricName = taosMemoryMalloc(metricNameLen); char* metricName = taosMemoryMalloc(metricNameLen);
memset(metricName, 0, metricNameLen); memset(metricName, 0, metricNameLen);
sprintf(metricName, "%s:%s", tableName, name); sprintf(metricName, "%s:%s", tableName, name);
@ -669,7 +669,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
else{ else{
mTrace("get metric from registry:%p", metric); mTrace("get metric from registry:%p", metric);
} }
if(type == 0){ if(type == 0){
taos_counter_add(metric, value, (const char**)sample_labels); taos_counter_add(metric, value, (const char**)sample_labels);
} }
@ -689,7 +689,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
taosMemoryFreeClear(labels); taosMemoryFreeClear(labels);
} }
} }
} }
code = 0; code = 0;
@ -1409,24 +1409,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (strcasecmp(cfgReq.config, "resetlog") == 0) { if (strcasecmp(cfgReq.config, "resetlog") == 0) {
strcpy(dcfgReq.config, "resetlog"); strcpy(dcfgReq.config, "resetlog");
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
} else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) {
int32_t optLen = strlen("supportvnodes");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
if (code < 0) return code;
if (flag < 0 || flag > 4096) {
mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag);
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _err_out;
}
if (flag == 0) {
flag = tsNumOfCores * 2;
}
flag = TMAX(flag, 2);
strcpy(dcfgReq.config, "supportvnodes");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) { } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
int32_t optLen = strlen("s3blocksize"); int32_t optLen = strlen("s3blocksize");
int32_t flag = -1; int32_t flag = -1;

View File

@ -1813,7 +1813,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id); pStream->name, pTrans->id);
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans); int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again // todo: not continue, drop all and retry again
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -462,14 +462,22 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL; void *pBuf = NULL;
int32_t len = 0; int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
terrno = code;
return code;
}
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }
@ -478,14 +486,14 @@ static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroup
} }
// build trans to update the epset // build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo); int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);

View File

@ -39,7 +39,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
} }
// check for the dispath info and the upstream task info // check for the dispatch info and the upstream task info
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
@ -622,6 +622,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0}; char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf); EPSET_TO_STR(pEpSet, buf);
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type; int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -633,8 +634,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
if (pVgInfo->vgId == nodeId) { if (pVgInfo->vgId == nodeId) {
epsetAssign(&pVgInfo->epSet, pEpSet); epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId,
pVgInfo->taskId, nodeId, buf); buf);
break; break;
} }
} }
@ -642,8 +643,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
if (pDispatcher->nodeId == nodeId) { if (pDispatcher->nodeId == nodeId) {
epsetAssign(&pDispatcher->epSet, pEpSet); epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId,
pDispatcher->taskId, nodeId, buf); buf);
} }
} }
} }