feat: analysis msg
This commit is contained in:
parent
a5b5bd61f6
commit
c36fe0af46
|
@ -398,6 +398,21 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
|
||||||
{.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const SSysDbTableSchema anodesSchema[] = {
|
||||||
|
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
|
{.name = "url", .bytes = TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
|
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||||
|
{.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||||
|
};
|
||||||
|
|
||||||
|
static const SSysDbTableSchema anodesFullSchema[] = {
|
||||||
|
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
|
{.name = "type", .bytes = TSDB_ANAL_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
|
{.name = "algo", .bytes = TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema tsmaSchema[] = {
|
static const SSysDbTableSchema tsmaSchema[] = {
|
||||||
{.name = "tsma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "tsma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
@ -472,6 +487,8 @@ static const SSysTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true},
|
{TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true},
|
||||||
{TSDB_INS_TABLE_ENCRYPTIONS, encryptionsSchema, tListLen(encryptionsSchema), true},
|
{TSDB_INS_TABLE_ENCRYPTIONS, encryptionsSchema, tListLen(encryptionsSchema), true},
|
||||||
{TSDB_INS_TABLE_TSMAS, tsmaSchema, tListLen(tsmaSchema), false},
|
{TSDB_INS_TABLE_TSMAS, tsmaSchema, tListLen(tsmaSchema), false},
|
||||||
|
{TSDB_INS_TABLE_ANODES, anodesSchema, tListLen(anodesSchema), true},
|
||||||
|
{TSDB_INS_TABLE_ANODES_FULL, anodesFullSchema, tListLen(anodesFullSchema), true},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema connectionsSchema[] = {
|
static const SSysDbTableSchema connectionsSchema[] = {
|
||||||
|
|
|
@ -40,6 +40,7 @@
|
||||||
#define TD_MSG_RANGE_CODE_
|
#define TD_MSG_RANGE_CODE_
|
||||||
#include "tmsgdef.h"
|
#include "tmsgdef.h"
|
||||||
|
|
||||||
|
#include "tanal.h"
|
||||||
#include "tcol.h"
|
#include "tcol.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
|
@ -1453,6 +1454,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->ipWhiteVer));
|
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->ipWhiteVer));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer));
|
||||||
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas));
|
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas));
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
@ -1576,6 +1578,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->ipWhiteVer));
|
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->ipWhiteVer));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->analVer));
|
||||||
|
}
|
||||||
|
|
||||||
if (!tDecodeIsEnd(&decoder)) {
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas));
|
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas));
|
||||||
}
|
}
|
||||||
|
@ -1652,6 +1658,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
|
||||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->statusSeq));
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->statusSeq));
|
||||||
|
|
||||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ipWhiteVer));
|
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ipWhiteVer));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->analVer));
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -1703,6 +1710,11 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
|
||||||
if (!tDecodeIsEnd(&decoder)) {
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ipWhiteVer));
|
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ipWhiteVer));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->analVer));
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
_exit:
|
_exit:
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -2044,6 +2056,156 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
int32_t tlen;
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->dnodeId));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer));
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tlen = code;
|
||||||
|
} else {
|
||||||
|
tlen = encoder.pos;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->dnodeId));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->analVer));
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
int32_t tlen;
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
int32_t numOfAlgos = 0;
|
||||||
|
void *pIter = taosHashIterate(pRsp->hash, NULL);
|
||||||
|
while (pIter != NULL) {
|
||||||
|
SAnalUrl *pUrl = pIter;
|
||||||
|
size_t nameLen = 0;
|
||||||
|
const char *name = taosHashGetKey(pIter, &nameLen);
|
||||||
|
if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) {
|
||||||
|
numOfAlgos++;
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(pRsp->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, numOfAlgos));
|
||||||
|
|
||||||
|
pIter = taosHashIterate(pRsp->hash, NULL);
|
||||||
|
while (pIter != NULL) {
|
||||||
|
SAnalUrl *pUrl = pIter;
|
||||||
|
size_t nameLen = 0;
|
||||||
|
const char *name = taosHashGetKey(pIter, &nameLen);
|
||||||
|
if (nameLen > 0 && pUrl->urlLen > 0) {
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, nameLen));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)name, nameLen));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->anode));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->type));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->urlLen));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pUrl->url, pUrl->urlLen));
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(pRsp->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tlen = code;
|
||||||
|
} else {
|
||||||
|
tlen = encoder.pos;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) {
|
||||||
|
if (pRsp->hash == NULL) {
|
||||||
|
pRsp->hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
if (pRsp->hash == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_BUFFER;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
int32_t numOfAlgos = 0;
|
||||||
|
int32_t nameLen;
|
||||||
|
int32_t type;
|
||||||
|
char name[TSDB_ANAL_ALGO_KEY_LEN];
|
||||||
|
SAnalUrl url = {0};
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfAlgos));
|
||||||
|
|
||||||
|
for (int32_t f = 0; f < numOfAlgos; ++f) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nameLen));
|
||||||
|
if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_NAME_LEN) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, name));
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &url.anode));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
|
||||||
|
url.type = (EAnalAlgoType)type;
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &url.urlLen));
|
||||||
|
if (url.urlLen > 0) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalUrl)));
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) {
|
||||||
|
void *pIter = taosHashIterate(pRsp->hash, NULL);
|
||||||
|
while (pIter != NULL) {
|
||||||
|
SAnalUrl *pUrl = (SAnalUrl *)pIter;
|
||||||
|
taosMemoryFree(pUrl->url);
|
||||||
|
pIter = taosHashIterate(pRsp->hash, pIter);
|
||||||
|
}
|
||||||
|
taosHashCleanup(pRsp->hash);
|
||||||
|
|
||||||
|
pRsp->hash = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void tFreeSCreateUserReq(SCreateUserReq *pReq) {
|
void tFreeSCreateUserReq(SCreateUserReq *pReq) {
|
||||||
FREESQL();
|
FREESQL();
|
||||||
taosMemoryFreeClear(pReq->pIpRanges);
|
taosMemoryFreeClear(pReq->pIpRanges);
|
||||||
|
@ -2961,6 +3123,108 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSMCreateAnodeReq(void *buf, int32_t bufLen, SMCreateAnodeReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
int32_t tlen;
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->urlLen));
|
||||||
|
if (pReq->urlLen > 0) {
|
||||||
|
TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pReq->url, pReq->urlLen));
|
||||||
|
}
|
||||||
|
ENCODESQL();
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tlen = code;
|
||||||
|
} else {
|
||||||
|
tlen = encoder.pos;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSMCreateAnodeReq(void *buf, int32_t bufLen, SMCreateAnodeReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->urlLen));
|
||||||
|
if (pReq->urlLen > 0) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&pReq->url, NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
DECODESQL();
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSMCreateAnodeReq(SMCreateAnodeReq *pReq) {
|
||||||
|
taosMemoryFreeClear(pReq->url);
|
||||||
|
FREESQL();
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSMDropAnodeReq(void *buf, int32_t bufLen, SMDropAnodeReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
int32_t tlen;
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->anodeId));
|
||||||
|
ENCODESQL();
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tlen = code;
|
||||||
|
} else {
|
||||||
|
tlen = encoder.pos;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSMDropAnodeReq(void *buf, int32_t bufLen, SMDropAnodeReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->anodeId));
|
||||||
|
DECODESQL();
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSMDropAnodeReq(SMDropAnodeReq *pReq) { FREESQL(); }
|
||||||
|
|
||||||
|
int32_t tSerializeSMUpdateAnodeReq(void *buf, int32_t bufLen, SMUpdateAnodeReq *pReq) {
|
||||||
|
return tSerializeSMDropAnodeReq(buf, bufLen, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSMUpdateAnodeReq(void *buf, int32_t bufLen, SMUpdateAnodeReq *pReq) {
|
||||||
|
return tDeserializeSMDropAnodeReq(buf, bufLen, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq *pReq) { tFreeSMDropAnodeReq(pReq); }
|
||||||
|
|
||||||
int32_t tSerializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *pReq) {
|
int32_t tSerializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "dmInt.h"
|
#include "dmInt.h"
|
||||||
#include "monitor.h"
|
#include "monitor.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
|
#include "tanal.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
|
|
||||||
extern SConfig *tsCfg;
|
extern SConfig *tsCfg;
|
||||||
|
@ -39,6 +40,7 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
|
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
|
dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
|
||||||
|
@ -84,6 +86,47 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
|
||||||
dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
|
dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t oldVer = taosAnalGetVersion();
|
||||||
|
if (oldVer == newVer) return;
|
||||||
|
dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
|
||||||
|
|
||||||
|
SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
|
||||||
|
int32_t contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req);
|
||||||
|
if (contLen < 0) {
|
||||||
|
dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pHead = rpcMallocCont(contLen);
|
||||||
|
contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req);
|
||||||
|
if (contLen < 0) {
|
||||||
|
rpcFreeCont(pHead);
|
||||||
|
dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.pCont = pHead,
|
||||||
|
.contLen = contLen,
|
||||||
|
.msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
|
||||||
|
.info.ahandle = (void *)0x9527,
|
||||||
|
.info.refId = 0,
|
||||||
|
.info.noResp = 0,
|
||||||
|
.info.handle = 0,
|
||||||
|
};
|
||||||
|
SEpSet epset = {0};
|
||||||
|
|
||||||
|
(void)dmGetMnodeEpSet(pMgmt->pData, &epset);
|
||||||
|
|
||||||
|
code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
|
||||||
|
if (code != 0) {
|
||||||
|
dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
|
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
|
||||||
const STraceId *trace = &pRsp->info.traceId;
|
const STraceId *trace = &pRsp->info.traceId;
|
||||||
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
|
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
|
||||||
|
@ -111,6 +154,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
|
||||||
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
|
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
|
||||||
}
|
}
|
||||||
dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
|
dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
|
||||||
|
dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer);
|
||||||
}
|
}
|
||||||
tFreeSStatusRsp(&statusRsp);
|
tFreeSStatusRsp(&statusRsp);
|
||||||
}
|
}
|
||||||
|
@ -172,6 +216,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
pMgmt->statusSeq++;
|
pMgmt->statusSeq++;
|
||||||
req.statusSeq = pMgmt->statusSeq;
|
req.statusSeq = pMgmt->statusSeq;
|
||||||
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
|
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
|
||||||
|
req.analVer = taosAnalGetVersion();
|
||||||
|
|
||||||
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
#include "tversion.h"
|
#include "tversion.h"
|
||||||
|
#include "tanal.h"
|
||||||
|
|
||||||
static inline void dmSendRsp(SRpcMsg *pMsg) {
|
static inline void dmSendRsp(SRpcMsg *pMsg) {
|
||||||
if (rpcSendResponse(pMsg) != 0) {
|
if (rpcSendResponse(pMsg) != 0) {
|
||||||
|
@ -105,6 +106,17 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
|
||||||
|
SRetrieveAnalAlgoRsp rsp = {0};
|
||||||
|
if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
|
||||||
|
taosAnalUpdate(rsp.ver, rsp.hash);
|
||||||
|
rsp.hash = NULL;
|
||||||
|
}
|
||||||
|
tFreeRetrieveAnalAlgoRsp(&rsp);
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
}
|
||||||
|
|
||||||
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -150,10 +162,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
dmSetMnodeEpSet(&pDnode->data, pEpSet);
|
dmSetMnodeEpSet(&pDnode->data, pEpSet);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_MND_RETRIEVE_IP_WHITE_RSP: {
|
case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
|
||||||
dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
|
dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
|
||||||
return;
|
return;
|
||||||
} break;
|
case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
|
||||||
|
dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
|
||||||
|
return;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue