feature/qnode

This commit is contained in:
dapan1121 2022-04-22 19:28:27 +08:00
parent 8cfcd6d4f4
commit 11cb0afa30
5 changed files with 37 additions and 22 deletions

View File

@ -639,13 +639,19 @@ int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
typedef struct {
SArray* epSetList; // SArray<SEpSet>
SArray* addrsList; // SArray<SQueryNodeAddr>
} SQnodeListRsp;
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
} SQueryNodeAddr;
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;

View File

@ -129,12 +129,6 @@ typedef struct SMsgSendInfo {
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;

View File

@ -114,6 +114,19 @@ int32_t tDecodeSEpSet(SCoder *pDecoder, SEpSet *pEp) {
return 0;
}
int32_t tEncodeSQueryNodeAddr(SCoder *pEncoder, SQueryNodeAddr *pAddr) {
if (tEncodeI32(pEncoder, pAddr->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pAddr->epSet) < 0) return -1;
return 0;
}
int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) {
if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1;
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
@ -2058,11 +2071,11 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->epSetList);
int32_t num = taosArrayGetSize(pRsp->addrsList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SEpSet *epSet = taosArrayGet(pRsp->epSetList, i);
if (tEncodeSEpSet(&encoder, epSet) < 0) return -1;
SQueryNodeAddr *addr = taosArrayGet(pRsp->addrsList, i);
if (tEncodeSQueryNodeAddr(&encoder, addr) < 0) return -1;
}
tEndEncode(&encoder);
@ -2078,10 +2091,10 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
pRsp->epSetList = taosArrayInit(num, sizeof(SEpSet));
if (NULL == pRsp->epSetList) return -1;
pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr));
if (NULL == pRsp->addrsList) return -1;
for (int32_t i = 0; i < num; ++i) {
if (tDecodeSEpSet(&decoder, TARRAY_GET_ELEM(pRsp->epSetList, i)) < 0) return -1;
if (tDecodeSQueryNodeAddr(&decoder, TARRAY_GET_ELEM(pRsp->addrsList, i)) < 0) return -1;
}
tEndDecode(&decoder);
@ -2089,7 +2102,7 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
return 0;
}
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->epSetList); }
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->addrsList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SCoder encoder = {0};

View File

@ -444,8 +444,8 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
goto _OVER;
}
qlistRsp.epSetList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == qlistRsp.epSetList) {
qlistRsp.addrsList = taosArrayInit(5, sizeof(SQueryNodeAddr));
if (NULL == qlistRsp.addrsList) {
mError("failed to alloc epSet while process qnode list req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
@ -455,11 +455,13 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj);
if (pIter == NULL) break;
SEpSet epSet = {.numOfEps = 1};
tstrncpy(epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->pDnode->port;
SQueryNodeAddr nodeAddr = {0};
nodeAddr.nodeId = QNODE_HANDLE;
nodeAddr.epSet.numOfEps = 1;
tstrncpy(nodeAddr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
nodeAddr.epSet.eps[0].port = pObj->pDnode->port;
(void)taosArrayPush(qlistRsp.epSetList, &epSet);
(void)taosArrayPush(qlistRsp.addrsList, &nodeAddr);
numOfRows++;
sdbRelease(pSdb, pObj);

View File

@ -359,10 +359,10 @@ PROCESS_QLIST_OVER:
if (code != 0) {
tFreeSQnodeListRsp(&out);
out.epSetList = NULL;
out.addrsList = NULL;
}
*(SArray **)output = out.epSetList;
*(SArray **)output = out.addrsList;
return code;
}