Merge pull request #25531 from taosdata/fix/3_liaohj
fix(stream):add ref for task in check rsp monitor timer.
This commit is contained in:
commit
384efdc185
|
@ -106,12 +106,15 @@ int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const void *sourceStr);
|
||||||
int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair
|
int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair
|
||||||
void cfgCleanup(SConfig *pCfg);
|
void cfgCleanup(SConfig *pCfg);
|
||||||
int32_t cfgGetSize(SConfig *pCfg);
|
int32_t cfgGetSize(SConfig *pCfg);
|
||||||
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name);
|
SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName);
|
||||||
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype);
|
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock);
|
||||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
|
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
|
||||||
|
|
||||||
SConfigIter *cfgCreateIter(SConfig *pConf);
|
SConfigIter *cfgCreateIter(SConfig *pConf);
|
||||||
SConfigItem *cfgNextIter(SConfigIter *pIter);
|
SConfigItem *cfgNextIter(SConfigIter *pIter);
|
||||||
void cfgDestroyIter(SConfigIter *pIter);
|
void cfgDestroyIter(SConfigIter *pIter);
|
||||||
|
void cfgLock(SConfig *pCfg);
|
||||||
|
void cfgUnLock(SConfig *pCfg);
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope);
|
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope);
|
||||||
|
|
|
@ -838,7 +838,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS);
|
int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
|
tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1081,13 +1081,13 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, true);
|
||||||
|
|
||||||
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
|
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
|
||||||
SEp secondEp = {0};
|
SEp secondEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? defaultFirstEp : pSecondpItem->str, &secondEp);
|
taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? defaultFirstEp : pSecondpItem->str, &secondEp);
|
||||||
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
||||||
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
|
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype, true);
|
||||||
|
|
||||||
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
|
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
|
||||||
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
||||||
|
@ -1149,9 +1149,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
static void taosSetSystemCfg(SConfig *pCfg) {
|
static void taosSetSystemCfg(SConfig *pCfg) {
|
||||||
SConfigItem *pItem = cfgGetItem(pCfg, "timezone");
|
SConfigItem *pItem = cfgGetItem(pCfg, "timezone");
|
||||||
|
|
||||||
osSetTimezone(pItem->str);
|
osSetTimezone(pItem->str);
|
||||||
uDebug("timezone format changed from %s to %s", pItem->str, tsTimezoneStr);
|
uDebug("timezone format changed from %s to %s", pItem->str, tsTimezoneStr);
|
||||||
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype);
|
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype, true);
|
||||||
|
|
||||||
const char *locale = cfgGetItem(pCfg, "locale")->str;
|
const char *locale = cfgGetItem(pCfg, "locale")->str;
|
||||||
const char *charset = cfgGetItem(pCfg, "charset")->str;
|
const char *charset = cfgGetItem(pCfg, "charset")->str;
|
||||||
|
@ -1515,15 +1516,20 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfgLock(pCfg);
|
||||||
|
|
||||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||||
if (!pItem || (pItem->dynScope & CFG_DYN_SERVER) == 0) {
|
if (!pItem || (pItem->dynScope & CFG_DYN_SERVER) == 0) {
|
||||||
uError("failed to config:%s, not support", name);
|
uError("failed to config:%s, not support", name);
|
||||||
terrno = TSDB_CODE_INVALID_CFG;
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strncasecmp(name, "debugFlag", 9) == 0) {
|
if (strncasecmp(name, "debugFlag", 9) == 0) {
|
||||||
taosSetAllDebugFlag(pCfg, pItem->i32);
|
taosSetAllDebugFlag(pCfg, pItem->i32);
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1580,17 +1586,21 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo fix race condition caused by update of config, pItem->str may be removed
|
|
||||||
static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
cfgLock(pCfg);
|
||||||
|
|
||||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||||
if ((pItem == NULL) || (pItem->dynScope & CFG_DYN_CLIENT) == 0) {
|
if ((pItem == NULL) || (pItem->dynScope & CFG_DYN_CLIENT) == 0) {
|
||||||
uError("failed to config:%s, not support", name);
|
uError("failed to config:%s, not support", name);
|
||||||
terrno = TSDB_CODE_INVALID_CFG;
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1630,7 +1640,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
|
||||||
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false);
|
||||||
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
||||||
matched = true;
|
matched = true;
|
||||||
} else if (strcasecmp("firstEp", name) == 0) {
|
} else if (strcasecmp("firstEp", name) == 0) {
|
||||||
|
@ -1645,7 +1656,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
|
||||||
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false);
|
||||||
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
||||||
matched = true;
|
matched = true;
|
||||||
}
|
}
|
||||||
|
@ -1692,7 +1704,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
SEp secondEp = {0};
|
SEp secondEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? tsFirst : pItem->str, &secondEp);
|
taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? tsFirst : pItem->str, &secondEp);
|
||||||
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
||||||
cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype);
|
cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype, false);
|
||||||
uInfo("%s set to %s", name, tsSecond);
|
uInfo("%s set to %s", name, tsSecond);
|
||||||
matched = true;
|
matched = true;
|
||||||
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
||||||
|
@ -1723,11 +1735,13 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
|
||||||
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false);
|
||||||
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
||||||
matched = true;
|
matched = true;
|
||||||
} else if (strcasecmp("slowLogScope", name) == 0) {
|
} else if (strcasecmp("slowLogScope", name) == 0) {
|
||||||
if (taosSetSlowLogScope(pItem->str)) {
|
if (taosSetSlowLogScope(pItem->str)) {
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
uInfo("%s set to %s", name, pItem->str);
|
uInfo("%s set to %s", name, pItem->str);
|
||||||
|
@ -1739,7 +1753,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
if (strcasecmp("timezone", name) == 0) {
|
if (strcasecmp("timezone", name) == 0) {
|
||||||
osSetTimezone(pItem->str);
|
osSetTimezone(pItem->str);
|
||||||
uInfo("%s set from %s to %s", name, tsTimezoneStr, pItem->str);
|
uInfo("%s set from %s to %s", name, tsTimezoneStr, pItem->str);
|
||||||
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype);
|
|
||||||
|
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype, false);
|
||||||
matched = true;
|
matched = true;
|
||||||
} else if (strcasecmp("tempDir", name) == 0) {
|
} else if (strcasecmp("tempDir", name) == 0) {
|
||||||
uInfo("%s set from %s to %s", name, tsTempDir, pItem->str);
|
uInfo("%s set from %s to %s", name, tsTempDir, pItem->str);
|
||||||
|
@ -1747,6 +1762,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
||||||
if (taosMulMkDir(tsTempDir) != 0) {
|
if (taosMulMkDir(tsTempDir) != 0) {
|
||||||
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
|
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
matched = true;
|
matched = true;
|
||||||
|
@ -1802,6 +1818,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1878,7 +1895,9 @@ static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) {
|
||||||
taosArrayClear(noNeedToSetVars); // reset array
|
taosArrayClear(noNeedToSetVars); // reset array
|
||||||
|
|
||||||
uInfo("all debug flag are set to %d", flag);
|
uInfo("all debug flag are set to %d", flag);
|
||||||
if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist
|
if (terrno == TSDB_CODE_CFG_NOT_FOUND) {
|
||||||
|
terrno = TSDB_CODE_SUCCESS; // ignore not exist
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t taosGranted(int8_t type) {
|
int8_t taosGranted(int8_t type) {
|
||||||
|
|
|
@ -219,7 +219,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
||||||
|
|
||||||
SConfig *pCfg = taosGetCfg();
|
SConfig *pCfg = taosGetCfg();
|
||||||
cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD);
|
cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true);
|
||||||
taosCfgDynamicOptions(pCfg, cfgReq.config, true);
|
taosCfgDynamicOptions(pCfg, cfgReq.config, true);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,6 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
||||||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
|
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
|
||||||
pStatus->details[0] = 0;
|
pStatus->details[0] = 0;
|
||||||
|
|
||||||
SServerStatusRsp statusRsp = {0};
|
|
||||||
SMonMloadInfo minfo = {0};
|
SMonMloadInfo minfo = {0};
|
||||||
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||||
if (minfo.isMnode &&
|
if (minfo.isMnode &&
|
||||||
|
|
|
@ -926,7 +926,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
|
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD, true)) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define CHECK_RSP_INTERVAL 300
|
#define CHECK_RSP_CHECK_INTERVAL 300
|
||||||
#define LAUNCH_HTASK_INTERVAL 100
|
#define LAUNCH_HTASK_INTERVAL 100
|
||||||
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
|
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
|
||||||
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
||||||
|
|
|
@ -163,23 +163,24 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
|
|
||||||
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here
|
||||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||||
|
|
||||||
if (pInfo->checkRspTmr == NULL) {
|
if (pInfo->checkRspTmr == NULL) {
|
||||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
|
||||||
} else {
|
} else {
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
@ -329,7 +330,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s set the in-check-procedure flag", id);
|
stDebug("s-task:%s set the in-check-procedure flag", id);
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
|
int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
|
||||||
|
@ -363,7 +364,6 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
|
||||||
|
|
||||||
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
|
||||||
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
|
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
|
||||||
|
|
||||||
taosThreadMutexLock(&pInfo->checkInfoLock);
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
|
||||||
|
@ -519,6 +519,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
||||||
|
|
||||||
void rspMonitorFn(void* param, void* tmrId) {
|
void rspMonitorFn(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
||||||
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
@ -546,6 +547,8 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -554,6 +557,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,6 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,6 +596,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
@ -609,12 +615,14 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pHId = &pTask->hTaskInfo.id;
|
STaskId* pHId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
taosArrayDestroy(pNotReadyList);
|
taosArrayDestroy(pNotReadyList);
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
return;
|
return;
|
||||||
|
@ -628,7 +636,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
|
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
|
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
|
||||||
|
|
|
@ -85,9 +85,10 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
|
||||||
|
|
||||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||||
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
|
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
|
||||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
|
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs) {
|
||||||
int32_t size = taosArrayGetSize(pArgs);
|
int32_t size = taosArrayGetSize(pArgs);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SConfigPair *pPair = taosArrayGet(pArgs, i);
|
SConfigPair *pPair = taosArrayGet(pArgs, i);
|
||||||
if (cfgSetItem(pCfg, pPair->name, pPair->value, CFG_STYPE_ARG_LIST) != 0) {
|
if (cfgSetItem(pCfg, pPair->name, pPair->value, CFG_STYPE_ARG_LIST, true) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cfgFreeItem(SConfigItem *pItem) {
|
void cfgItemFreeVal(SConfigItem *pItem) {
|
||||||
if (pItem->dtype == CFG_DTYPE_STRING || pItem->dtype == CFG_DTYPE_DIR || pItem->dtype == CFG_DTYPE_LOCALE ||
|
if (pItem->dtype == CFG_DTYPE_STRING || pItem->dtype == CFG_DTYPE_DIR || pItem->dtype == CFG_DTYPE_LOCALE ||
|
||||||
pItem->dtype == CFG_DTYPE_CHARSET || pItem->dtype == CFG_DTYPE_TIMEZONE) {
|
pItem->dtype == CFG_DTYPE_CHARSET || pItem->dtype == CFG_DTYPE_TIMEZONE) {
|
||||||
taosMemoryFreeClear(pItem->str);
|
taosMemoryFreeClear(pItem->str);
|
||||||
|
@ -100,23 +100,26 @@ static void cfgFreeItem(SConfigItem *pItem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void cfgCleanup(SConfig *pCfg) {
|
void cfgCleanup(SConfig *pCfg) {
|
||||||
if (pCfg != NULL) {
|
if (pCfg == NULL) {
|
||||||
int32_t size = taosArrayGetSize(pCfg->array);
|
return;
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
|
||||||
SConfigItem *pItem = taosArrayGet(pCfg->array, i);
|
|
||||||
cfgFreeItem(pItem);
|
|
||||||
taosMemoryFreeClear(pItem->name);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pCfg->array);
|
|
||||||
taosThreadMutexDestroy(&pCfg->lock);
|
|
||||||
taosMemoryFree(pCfg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pCfg->array);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SConfigItem *pItem = taosArrayGet(pCfg->array, i);
|
||||||
|
cfgItemFreeVal(pItem);
|
||||||
|
taosMemoryFreeClear(pItem->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pCfg->array);
|
||||||
|
taosThreadMutexDestroy(&pCfg->lock);
|
||||||
|
taosMemoryFree(pCfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); }
|
int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); }
|
||||||
|
|
||||||
static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) {
|
static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) {
|
||||||
cfgFreeItem(pItem);
|
cfgItemFreeVal(pItem);
|
||||||
ASSERT(pItem->str == NULL);
|
ASSERT(pItem->str == NULL);
|
||||||
|
|
||||||
pItem->str = taosStrdup(conf);
|
pItem->str = taosStrdup(conf);
|
||||||
|
@ -257,13 +260,21 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType
|
||||||
|
|
||||||
static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, const char *level, const char *primary,
|
static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, const char *level, const char *primary,
|
||||||
ECfgSrcType stype) {
|
ECfgSrcType stype) {
|
||||||
|
taosThreadMutexLock(&pCfg->lock);
|
||||||
|
|
||||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||||
if (pItem == NULL) return -1;
|
if (pItem == NULL) {
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pItem->array == NULL) {
|
if (pItem->array == NULL) {
|
||||||
pItem->array = taosArrayInit(16, sizeof(SDiskCfg));
|
pItem->array = taosArrayInit(16, sizeof(SDiskCfg));
|
||||||
if (pItem->array == NULL) {
|
if (pItem->array == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,10 +286,14 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value,
|
||||||
void *ret = taosArrayPush(pItem->array, &cfg);
|
void *ret = taosArrayPush(pItem->array, &cfg);
|
||||||
if (ret == NULL) {
|
if (ret == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,17 +325,21 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) {
|
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock) {
|
||||||
// GRANT_CFG_SET;
|
// GRANT_CFG_SET;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (lock) {
|
||||||
|
taosThreadMutexLock(&pCfg->lock);
|
||||||
|
}
|
||||||
|
|
||||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pCfg->lock);
|
|
||||||
|
|
||||||
switch (pItem->dtype) {
|
switch (pItem->dtype) {
|
||||||
case CFG_DTYPE_BOOL: {
|
case CFG_DTYPE_BOOL: {
|
||||||
code = cfgSetBool(pItem, value, stype);
|
code = cfgSetBool(pItem, value, stype);
|
||||||
|
@ -365,16 +384,19 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCfg->lock);
|
if (lock) {
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
|
SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) {
|
||||||
if (pCfg == NULL) return NULL;
|
if (pCfg == NULL) return NULL;
|
||||||
int32_t size = taosArrayGetSize(pCfg->array);
|
int32_t size = taosArrayGetSize(pCfg->array);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SConfigItem *pItem = taosArrayGet(pCfg->array, i);
|
SConfigItem *pItem = taosArrayGet(pCfg->array, i);
|
||||||
if (strcasecmp(pItem->name, name) == 0) {
|
if (strcasecmp(pItem->name, pName) == 0) {
|
||||||
return pItem;
|
return pItem;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -383,13 +405,28 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cfgLock(SConfig *pCfg) {
|
||||||
|
if (pCfg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pCfg->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cfgUnLock(SConfig *pCfg) {
|
||||||
|
taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) {
|
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) {
|
||||||
ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT;
|
ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT;
|
||||||
|
|
||||||
|
cfgLock(pCfg);
|
||||||
|
|
||||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||||
if (!pItem || (pItem->dynScope & dynType) == 0) {
|
if (!pItem || (pItem->dynScope & dynType) == 0) {
|
||||||
uError("failed to config:%s, not support update this config", name);
|
uError("failed to config:%s, not support update this config", name);
|
||||||
terrno = TSDB_CODE_INVALID_CFG;
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,28 +436,37 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
||||||
if (ival != 0 && ival != 1) {
|
if (ival != 0 && ival != 1) {
|
||||||
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival);
|
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival);
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case CFG_DTYPE_INT32: {
|
case CFG_DTYPE_INT32: {
|
||||||
int32_t ival;
|
int32_t ival;
|
||||||
int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival);
|
int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival);
|
||||||
if (code != TSDB_CODE_SUCCESS) return code;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
cfgUnLock(pCfg);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
if (ival < pItem->imin || ival > pItem->imax) {
|
if (ival < pItem->imin || ival > pItem->imax) {
|
||||||
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case CFG_DTYPE_INT64: {
|
case CFG_DTYPE_INT64: {
|
||||||
int64_t ival;
|
int64_t ival;
|
||||||
int32_t code = taosStrHumanToInt64(pVal, &ival);
|
int32_t code = taosStrHumanToInt64(pVal, &ival);
|
||||||
if (code != TSDB_CODE_SUCCESS) return code;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
cfgUnLock(pCfg);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
if (ival < pItem->imin || ival > pItem->imax) {
|
if (ival < pItem->imin || ival > pItem->imax) {
|
||||||
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
@ -428,17 +474,23 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
||||||
case CFG_DTYPE_DOUBLE: {
|
case CFG_DTYPE_DOUBLE: {
|
||||||
double dval;
|
double dval;
|
||||||
int32_t code = parseCfgReal(pVal, &dval);
|
int32_t code = parseCfgReal(pVal, &dval);
|
||||||
if (code != TSDB_CODE_SUCCESS) return code;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
cfgUnLock(pCfg);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
if (dval < pItem->fmin || dval > pItem->fmax) {
|
if (dval < pItem->fmin || dval > pItem->fmax) {
|
||||||
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval,
|
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval,
|
||||||
pItem->fmin, pItem->fmax);
|
pItem->fmin, pItem->fmax);
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfgUnLock(pCfg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -877,7 +929,7 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
||||||
if (vlen3 != 0) value3[vlen3] = 0;
|
if (vlen3 != 0) value3[vlen3] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
|
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR, true);
|
||||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||||
|
|
||||||
if (strcasecmp(name, "dataDir") == 0) {
|
if (strcasecmp(name, "dataDir") == 0) {
|
||||||
|
@ -920,7 +972,7 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
||||||
if (vlen3 != 0) value3[vlen3] = 0;
|
if (vlen3 != 0) value3[vlen3] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
|
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD, true);
|
||||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||||
|
|
||||||
if (strcasecmp(name, "dataDir") == 0) {
|
if (strcasecmp(name, "dataDir") == 0) {
|
||||||
|
@ -985,7 +1037,7 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
||||||
if (vlen3 != 0) value3[vlen3] = 0;
|
if (vlen3 != 0) value3[vlen3] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
|
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE, true);
|
||||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||||
|
|
||||||
if (strcasecmp(name, "dataDir") == 0) {
|
if (strcasecmp(name, "dataDir") == 0) {
|
||||||
|
@ -1037,28 +1089,27 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
||||||
paGetToken(name + olen + 1, &value, &vlen);
|
paGetToken(name + olen + 1, &value, &vlen);
|
||||||
if (vlen == 0) continue;
|
if (vlen == 0) continue;
|
||||||
value[vlen] = 0;
|
value[vlen] = 0;
|
||||||
|
|
||||||
if (strcasecmp(name, "encryptScope") == 0) {
|
if (strcasecmp(name, "encryptScope") == 0) {
|
||||||
char* tmp = NULL;
|
char *tmp = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char newValue[1024] = {0};
|
char newValue[1024] = {0};
|
||||||
|
|
||||||
strcpy(newValue, value);
|
strcpy(newValue, value);
|
||||||
|
|
||||||
int32_t count = 1;
|
int32_t count = 1;
|
||||||
while(vlen < 1024){
|
while (vlen < 1024) {
|
||||||
paGetToken(value + vlen + 1 * count, &tmp, &len);
|
paGetToken(value + vlen + 1 * count, &tmp, &len);
|
||||||
if(len == 0) break;
|
if (len == 0) break;
|
||||||
tmp[len] = 0;
|
tmp[len] = 0;
|
||||||
strcpy(newValue + vlen, tmp);
|
strcpy(newValue + vlen, tmp);
|
||||||
vlen += len;
|
vlen += len;
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cfgSetItem(pConfig, name, newValue, CFG_STYPE_CFG_FILE);
|
code = cfgSetItem(pConfig, name, newValue, CFG_STYPE_CFG_FILE, true);
|
||||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
paGetToken(value + vlen + 1, &value2, &vlen2);
|
paGetToken(value + vlen + 1, &value2, &vlen2);
|
||||||
if (vlen2 != 0) {
|
if (vlen2 != 0) {
|
||||||
value2[vlen2] = 0;
|
value2[vlen2] = 0;
|
||||||
|
@ -1066,7 +1117,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
||||||
if (vlen3 != 0) value3[vlen3] = 0;
|
if (vlen3 != 0) value3[vlen3] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
|
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE, true);
|
||||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1241,7 +1292,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
|
||||||
if (vlen3 != 0) value3[vlen3] = 0;
|
if (vlen3 != 0) value3[vlen3] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
|
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL, true);
|
||||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||||
|
|
||||||
if (strcasecmp(name, "dataDir") == 0) {
|
if (strcasecmp(name, "dataDir") == 0) {
|
||||||
|
|
Loading…
Reference in New Issue