Add config req and config thread.
This commit is contained in:
parent
95f8330637
commit
df2331d5e7
|
@ -49,6 +49,8 @@ extern char tsLocalEp[];
|
||||||
extern char tsVersionName[];
|
extern char tsVersionName[];
|
||||||
extern uint16_t tsServerPort;
|
extern uint16_t tsServerPort;
|
||||||
extern int32_t tsVersion;
|
extern int32_t tsVersion;
|
||||||
|
int32_t configVersion;
|
||||||
|
int32_t configInited;
|
||||||
extern int32_t tsStatusInterval;
|
extern int32_t tsStatusInterval;
|
||||||
extern int32_t tsNumOfSupportVnodes;
|
extern int32_t tsNumOfSupportVnodes;
|
||||||
extern char tsEncryptAlgorithm[];
|
extern char tsEncryptAlgorithm[];
|
||||||
|
@ -154,7 +156,7 @@ extern bool tsEnableCrashReport;
|
||||||
extern char *tsTelemUri;
|
extern char *tsTelemUri;
|
||||||
extern char *tsClientCrashReportUri;
|
extern char *tsClientCrashReportUri;
|
||||||
extern char *tsSvrCrashReportUri;
|
extern char *tsSvrCrashReportUri;
|
||||||
extern int8_t tsSafetyCheckLevel;
|
extern int8_t tsSafetyCheckLevel;
|
||||||
enum {
|
enum {
|
||||||
TSDB_SAFETY_CHECK_LEVELL_NEVER = 0,
|
TSDB_SAFETY_CHECK_LEVELL_NEVER = 0,
|
||||||
TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1,
|
TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1,
|
||||||
|
|
|
@ -1806,6 +1806,14 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||||||
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||||||
void tFreeSStatusReq(SStatusReq* pReq);
|
void tFreeSStatusReq(SStatusReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t cver;
|
||||||
|
} SConfigReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSConfigReq(void* buf, int32_t bufLen, SConfigReq* pReq);
|
||||||
|
int32_t tDeserializeSConfigReq(void* buf, int32_t bufLen, SConfigReq* pReq);
|
||||||
|
// void tFreeSStatusReq(SStatusReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
||||||
|
|
|
@ -260,6 +260,7 @@
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL)
|
||||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
|
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
|
||||||
|
|
|
@ -43,6 +43,8 @@ char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
|
||||||
char tsVersionName[16] = "community";
|
char tsVersionName[16] = "community";
|
||||||
uint16_t tsServerPort = 6030;
|
uint16_t tsServerPort = 6030;
|
||||||
int32_t tsVersion = 30000000;
|
int32_t tsVersion = 30000000;
|
||||||
|
int32_t configVersion = 0;
|
||||||
|
int32_t configInited = 0;
|
||||||
int32_t tsStatusInterval = 1; // second
|
int32_t tsStatusInterval = 1; // second
|
||||||
int32_t tsNumOfSupportVnodes = 256;
|
int32_t tsNumOfSupportVnodes = 256;
|
||||||
char tsEncryptAlgorithm[16] = {0};
|
char tsEncryptAlgorithm[16] = {0};
|
||||||
|
|
|
@ -1600,6 +1600,38 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
int32_t tlen;
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver));
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tlen = code;
|
||||||
|
} else {
|
||||||
|
tlen = encoder.pos;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->cver));
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); }
|
void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); }
|
||||||
|
|
||||||
int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) {
|
int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) {
|
||||||
|
@ -2221,10 +2253,10 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
int32_t numOfAlgos = 0;
|
int32_t numOfAlgos = 0;
|
||||||
int32_t nameLen;
|
int32_t nameLen;
|
||||||
int32_t type;
|
int32_t type;
|
||||||
char name[TSDB_ANAL_ALGO_KEY_LEN];
|
char name[TSDB_ANAL_ALGO_KEY_LEN];
|
||||||
SAnalUrl url = {0};
|
SAnalUrl url = {0};
|
||||||
|
|
||||||
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef struct SDnodeMgmt {
|
||||||
const char *path;
|
const char *path;
|
||||||
const char *name;
|
const char *name;
|
||||||
TdThread statusThread;
|
TdThread statusThread;
|
||||||
|
TdThread configThread;
|
||||||
TdThread statusInfoThread;
|
TdThread statusInfoThread;
|
||||||
TdThread notifyThread;
|
TdThread notifyThread;
|
||||||
TdThread monitorThread;
|
TdThread monitorThread;
|
||||||
|
@ -50,6 +51,7 @@ typedef struct SDnodeMgmt {
|
||||||
// dmHandle.c
|
// dmHandle.c
|
||||||
SArray *dmGetMsgHandles();
|
SArray *dmGetMsgHandles();
|
||||||
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
||||||
|
void dmSendConfigReq(SDnodeMgmt *pMgmt);
|
||||||
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt);
|
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt);
|
||||||
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
|
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
|
||||||
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -283,6 +283,54 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmSendConfigReq(SDnodeMgmt *pMgmt) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SConfigReq req = {0};
|
||||||
|
|
||||||
|
req.cver = configVersion;
|
||||||
|
dDebug("send config req to mnode, configVersion:%d", req.cver);
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSConfigReq(NULL, 0, &req);
|
||||||
|
if (contLen < 0) {
|
||||||
|
dError("failed to serialize status req since %s", tstrerror(contLen));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pHead = rpcMallocCont(contLen);
|
||||||
|
contLen = tSerializeSConfigReq(pHead, contLen, &req);
|
||||||
|
if (contLen < 0) {
|
||||||
|
rpcFreeCont(pHead);
|
||||||
|
dError("failed to serialize status req since %s", tstrerror(contLen));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||||
|
.contLen = contLen,
|
||||||
|
.msgType = TDMT_MND_CONFIG,
|
||||||
|
.info.ahandle = 0,
|
||||||
|
.info.notFreeAhandle = 1,
|
||||||
|
.info.refId = 0,
|
||||||
|
.info.noResp = 0,
|
||||||
|
.info.handle = 0};
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
int8_t epUpdated = 0;
|
||||||
|
(void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||||
|
|
||||||
|
dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
|
||||||
|
code =
|
||||||
|
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
|
||||||
|
if (code != 0) {
|
||||||
|
dError("failed to send status req since %s", tstrerror(code));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rpcRsp.code != 0) {
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
|
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
|
||||||
SMonVloadInfo vinfo = {0};
|
SMonVloadInfo vinfo = {0};
|
||||||
dDebug("begin to get vnode loads");
|
dDebug("begin to get vnode loads");
|
||||||
|
|
|
@ -47,6 +47,35 @@ static void *dmStatusThreadFp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *dmConfigThreadFp(void *param) {
|
||||||
|
SDnodeMgmt *pMgmt = param;
|
||||||
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
setThreadName("dnode-config");
|
||||||
|
|
||||||
|
int32_t upTimeCount = 0;
|
||||||
|
int64_t upTime = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
taosMsleep(200);
|
||||||
|
if (pMgmt->pData->dropped || pMgmt->pData->stopped || configInited) break;
|
||||||
|
|
||||||
|
int64_t curTime = taosGetTimestampMs();
|
||||||
|
if (curTime < lastTime) lastTime = curTime;
|
||||||
|
float interval = (curTime - lastTime) / 1000.0f;
|
||||||
|
if (interval >= tsStatusInterval) {
|
||||||
|
dmSendConfigReq(pMgmt);
|
||||||
|
lastTime = curTime;
|
||||||
|
|
||||||
|
if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
|
||||||
|
upTime = taosGetOsUptime() - tsDndStartOsUptime;
|
||||||
|
tsDndUpTime = TMAX(tsDndUpTime, upTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static void *dmStatusInfoThreadFp(void *param) {
|
static void *dmStatusInfoThreadFp(void *param) {
|
||||||
SDnodeMgmt *pMgmt = param;
|
SDnodeMgmt *pMgmt = param;
|
||||||
int64_t lastTime = taosGetTimestampMs();
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
@ -309,6 +338,22 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmStartConfigThread(SDnodeMgmt *pMgmt) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
(void)taosThreadAttrInit(&thAttr);
|
||||||
|
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
if (taosThreadCreate(&pMgmt->configThread, &thAttr, dmConfigThreadFp, pMgmt) != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("failed to create config thread since %s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)taosThreadAttrDestroy(&thAttr);
|
||||||
|
tmsgReportStartup("config-status", "initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
|
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
|
@ -332,6 +377,13 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmStopConfigThread(SDnodeMgmt *pMgmt) {
|
||||||
|
if (taosCheckPthreadValid(pMgmt->configThread)) {
|
||||||
|
(void)taosThreadJoin(pMgmt->configThread, NULL);
|
||||||
|
taosThreadClear(&pMgmt->configThread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
|
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
|
||||||
if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
|
if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
|
||||||
(void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
|
(void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
|
||||||
|
|
Loading…
Reference in New Issue