diff --git a/source/common/src/systable.c b/source/common/src/systable.c index be841d9682..eef38bf18e 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -398,6 +398,21 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = { {.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; + +static const SSysDbTableSchema anodesSchema[] = { + {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "url", .bytes = TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, + {.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, +}; + +static const SSysDbTableSchema anodesFullSchema[] = { + {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "type", .bytes = TSDB_ANAL_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "algo", .bytes = TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, +}; + static const SSysDbTableSchema tsmaSchema[] = { {.name = "tsma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -472,6 +487,8 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true}, {TSDB_INS_TABLE_ENCRYPTIONS, encryptionsSchema, tListLen(encryptionsSchema), true}, {TSDB_INS_TABLE_TSMAS, tsmaSchema, tListLen(tsmaSchema), false}, + {TSDB_INS_TABLE_ANODES, anodesSchema, tListLen(anodesSchema), true}, + {TSDB_INS_TABLE_ANODES_FULL, anodesFullSchema, tListLen(anodesFullSchema), true}, }; static const SSysDbTableSchema connectionsSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 986747fe58..63fcf900bf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -40,6 +40,7 @@ #define TD_MSG_RANGE_CODE_ #include "tmsgdef.h" +#include "tanal.h" #include "tcol.h" #include "tlog.h" @@ -1453,6 +1454,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->ipWhiteVer)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer)); TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas)); tEndEncode(&encoder); @@ -1576,6 +1578,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->ipWhiteVer)); } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->analVer)); + } + if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas)); } @@ -1652,6 +1658,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->statusSeq)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ipWhiteVer)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->analVer)); tEndEncode(&encoder); _exit: @@ -1703,6 +1710,11 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ipWhiteVer)); } + + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->analVer)); + } + tEndDecode(&decoder); _exit: tDecoderClear(&decoder); @@ -2044,6 +2056,156 @@ _exit: return code; } +int32_t tSerializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->dnodeId)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer)); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->dnodeId)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->analVer)); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + int32_t numOfAlgos = 0; + void *pIter = taosHashIterate(pRsp->hash, NULL); + while (pIter != NULL) { + SAnalUrl *pUrl = pIter; + size_t nameLen = 0; + const char *name = taosHashGetKey(pIter, &nameLen); + if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) { + numOfAlgos++; + } + pIter = taosHashIterate(pRsp->hash, pIter); + } + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, numOfAlgos)); + + pIter = taosHashIterate(pRsp->hash, NULL); + while (pIter != NULL) { + SAnalUrl *pUrl = pIter; + size_t nameLen = 0; + const char *name = taosHashGetKey(pIter, &nameLen); + if (nameLen > 0 && pUrl->urlLen > 0) { + TAOS_CHECK_EXIT(tEncodeI32(&encoder, nameLen)); + TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)name, nameLen)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->anode)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->type)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->urlLen)); + TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pUrl->url, pUrl->urlLen)); + } + pIter = taosHashIterate(pRsp->hash, pIter); + } + + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) { + if (pRsp->hash == NULL) { + pRsp->hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + if (pRsp->hash == NULL) { + terrno = TSDB_CODE_OUT_OF_BUFFER; + return terrno; + } + } + + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + int32_t numOfAlgos = 0; + int32_t nameLen; + int32_t type; + char name[TSDB_ANAL_ALGO_KEY_LEN]; + SAnalUrl url = {0}; + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfAlgos)); + + for (int32_t f = 0; f < numOfAlgos; ++f) { + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nameLen)); + if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_NAME_LEN) { + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, name)); + } + + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &url.anode)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type)); + url.type = (EAnalAlgoType)type; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &url.urlLen)); + if (url.urlLen > 0) { + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0); + } + + TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalUrl))); + } + + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) { + void *pIter = taosHashIterate(pRsp->hash, NULL); + while (pIter != NULL) { + SAnalUrl *pUrl = (SAnalUrl *)pIter; + taosMemoryFree(pUrl->url); + pIter = taosHashIterate(pRsp->hash, pIter); + } + taosHashCleanup(pRsp->hash); + + pRsp->hash = NULL; +} + void tFreeSCreateUserReq(SCreateUserReq *pReq) { FREESQL(); taosMemoryFreeClear(pReq->pIpRanges); @@ -2961,6 +3123,108 @@ _exit: return code; } +int32_t tSerializeSMCreateAnodeReq(void *buf, int32_t bufLen, SMCreateAnodeReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->urlLen)); + if (pReq->urlLen > 0) { + TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pReq->url, pReq->urlLen)); + } + ENCODESQL(); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMCreateAnodeReq(void *buf, int32_t bufLen, SMCreateAnodeReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->urlLen)); + if (pReq->urlLen > 0) { + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&pReq->url, NULL)); + } + + DECODESQL(); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSMCreateAnodeReq(SMCreateAnodeReq *pReq) { + taosMemoryFreeClear(pReq->url); + FREESQL(); +} + +int32_t tSerializeSMDropAnodeReq(void *buf, int32_t bufLen, SMDropAnodeReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->anodeId)); + ENCODESQL(); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropAnodeReq(void *buf, int32_t bufLen, SMDropAnodeReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->anodeId)); + DECODESQL(); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSMDropAnodeReq(SMDropAnodeReq *pReq) { FREESQL(); } + +int32_t tSerializeSMUpdateAnodeReq(void *buf, int32_t bufLen, SMUpdateAnodeReq *pReq) { + return tSerializeSMDropAnodeReq(buf, bufLen, pReq); +} + +int32_t tDeserializeSMUpdateAnodeReq(void *buf, int32_t bufLen, SMUpdateAnodeReq *pReq) { + return tDeserializeSMDropAnodeReq(buf, bufLen, pReq); +} + +void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq *pReq) { tFreeSMDropAnodeReq(pReq); } + int32_t tSerializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 419c669103..bc33fc43dc 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -18,6 +18,7 @@ #include "dmInt.h" #include "monitor.h" #include "systable.h" +#include "tanal.h" #include "tchecksum.h" extern SConfig *tsCfg; @@ -39,6 +40,7 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { (void)taosThreadRwlockUnlock(&pMgmt->pData->lock); } } + static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { int32_t code = 0; dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver); @@ -84,6 +86,47 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { dError("failed to send retrieve ip white list request since:%s", tstrerror(code)); } } + +static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) { + int32_t code = 0; + int64_t oldVer = taosAnalGetVersion(); + if (oldVer == newVer) return; + dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer); + + SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer}; + int32_t contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req); + if (contLen < 0) { + dError("failed to serialize analysis function ver request since %s", tstrerror(contLen)); + return; + } + + void *pHead = rpcMallocCont(contLen); + contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req); + if (contLen < 0) { + rpcFreeCont(pHead); + dError("failed to serialize analysis function ver request since %s", tstrerror(contLen)); + return; + } + + SRpcMsg rpcMsg = { + .pCont = pHead, + .contLen = contLen, + .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO, + .info.ahandle = (void *)0x9527, + .info.refId = 0, + .info.noResp = 0, + .info.handle = 0, + }; + SEpSet epset = {0}; + + (void)dmGetMnodeEpSet(pMgmt->pData, &epset); + + code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL); + if (code != 0) { + dError("failed to send retrieve analysis func ver request since %s", tstrerror(code)); + } +} + static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { const STraceId *trace = &pRsp->info.traceId; dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); @@ -111,6 +154,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); } dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer); + dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer); } tFreeSStatusRsp(&statusRsp); } @@ -172,6 +216,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { pMgmt->statusSeq++; req.statusSeq = pMgmt->statusSeq; req.ipWhiteVer = pMgmt->pData->ipWhiteVer; + req.analVer = taosAnalGetVersion(); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); if (contLen < 0) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b9f4ab54f4..e84d756e0a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -17,6 +17,7 @@ #include "dmMgmt.h" #include "qworker.h" #include "tversion.h" +#include "tanal.h" static inline void dmSendRsp(SRpcMsg *pMsg) { if (rpcSendResponse(pMsg) != 0) { @@ -105,6 +106,17 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) { return false; } } + +static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { + SRetrieveAnalAlgoRsp rsp = {0}; + if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) { + taosAnalUpdate(rsp.ver, rsp.hash); + rsp.hash = NULL; + } + tFreeRetrieveAnalAlgoRsp(&rsp); + rpcFreeCont(pRpc->pCont); +} + static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; @@ -150,10 +162,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dmSetMnodeEpSet(&pDnode->data, pEpSet); } break; - case TDMT_MND_RETRIEVE_IP_WHITE_RSP: { + case TDMT_MND_RETRIEVE_IP_WHITE_RSP: dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc); return; - } break; + case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP: + dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc); + return; default: break; }