feat: add vgroup epset for tsma
This commit is contained in:
parent
e75f14b98b
commit
79769dd2ad
|
@ -1122,13 +1122,13 @@ typedef struct {
|
||||||
SSchema* pSchemas;
|
SSchema* pSchemas;
|
||||||
} STableMetaRsp;
|
} STableMetaRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STableMetaRsp* pMeta;
|
STableMetaRsp* pMeta;
|
||||||
} SMAlterStbRsp;
|
} SMAlterStbRsp;
|
||||||
|
|
||||||
int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp);
|
int32_t tEncodeSMAlterStbRsp(SEncoder* pEncoder, const SMAlterStbRsp* pRsp);
|
||||||
int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp);
|
int32_t tDecodeSMAlterStbRsp(SDecoder* pDecoder, SMAlterStbRsp* pRsp);
|
||||||
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
|
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
|
||||||
|
|
||||||
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||||||
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||||||
|
@ -2303,23 +2303,23 @@ typedef struct {
|
||||||
} SVgEpSet;
|
} SVgEpSet;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t version; // for compatibility(default 0)
|
int8_t version; // for compatibility(default 0)
|
||||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||||||
int8_t timezoneInt; // sma data expired if timezone changes.
|
int8_t timezoneInt; // sma data expired if timezone changes.
|
||||||
int32_t dstVgId;
|
int32_t dstVgId;
|
||||||
char indexName[TSDB_INDEX_NAME_LEN];
|
char indexName[TSDB_INDEX_NAME_LEN];
|
||||||
int32_t exprLen;
|
int32_t exprLen;
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
int64_t indexUid;
|
int64_t indexUid;
|
||||||
tb_uid_t tableUid; // super/child/common table uid
|
tb_uid_t tableUid; // super/child/common table uid
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t offset; // use unit by precision of DB
|
int64_t offset; // use unit by precision of DB
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
char* expr; // sma expression
|
char* expr; // sma expression
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
SVgEpSet vgEpSet[];
|
SVgEpSet* pVgEpSet;
|
||||||
} STSma; // Time-range-wise SMA
|
} STSma; // Time-range-wise SMA
|
||||||
|
|
||||||
typedef STSma SVCreateTSmaReq;
|
typedef STSma SVCreateTSmaReq;
|
||||||
|
@ -2405,7 +2405,7 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t indexUid;
|
int64_t indexUid;
|
||||||
STimeWindow queryWindow;
|
STimeWindow queryWindow;
|
||||||
} SVGetTsmaExpWndsReq;
|
} SVGetTsmaExpWndsReq;
|
||||||
|
|
||||||
|
|
|
@ -694,7 +694,6 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
|
||||||
pReq->pFields = NULL;
|
pReq->pFields = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
|
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -3674,12 +3673,12 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
||||||
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
||||||
}
|
}
|
||||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
||||||
if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1;
|
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
|
||||||
if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
|
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
||||||
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
|
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
||||||
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
||||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
for (int32_t n = 0; n < numOfEps; ++n) {
|
||||||
const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
|
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
||||||
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
||||||
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -3712,15 +3711,25 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
||||||
} else {
|
} else {
|
||||||
pSma->tagsFilter = NULL;
|
pSma->tagsFilter = NULL;
|
||||||
}
|
}
|
||||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
if (pSma->numOfVgroups > 0) {
|
||||||
if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1;
|
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
|
||||||
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
|
if (!pSma->pVgEpSet) {
|
||||||
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
|
return -1;
|
||||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
}
|
||||||
SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
|
|
||||||
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
|
memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet));
|
||||||
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
|
|
||||||
|
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
||||||
|
if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1;
|
||||||
|
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
||||||
|
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1;
|
||||||
|
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
||||||
|
for (int32_t n = 0; n < numOfEps; ++n) {
|
||||||
|
SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
||||||
|
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
|
||||||
|
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3765,7 +3774,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq) {
|
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
||||||
|
@ -3773,10 +3782,10 @@ int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq*
|
||||||
if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1;
|
if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq) {
|
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
||||||
|
@ -3787,7 +3796,7 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq) {
|
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
||||||
|
@ -3814,7 +3823,7 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
|
int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1;
|
||||||
|
@ -3832,7 +3841,7 @@ int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
|
int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1;
|
||||||
|
@ -3850,7 +3859,7 @@ int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
|
int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
|
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
|
||||||
|
@ -3860,7 +3869,7 @@ int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq) {
|
int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
|
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
|
||||||
|
@ -4502,7 +4511,7 @@ int32_t tDecodeSVAlterTbRsp(SDecoder *pDecoder, SVAlterTbRsp *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSVAlterTbRsp(void *buf, int32_t bufLen, SVAlterTbRsp *pRsp) {
|
int32_t tDeserializeSVAlterTbRsp(void *buf, int32_t bufLen, SVAlterTbRsp *pRsp) {
|
||||||
int32_t meta = 0;
|
int32_t meta = 0;
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
@ -4543,7 +4552,7 @@ int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp) {
|
int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp) {
|
||||||
int32_t meta = 0;
|
int32_t meta = 0;
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
@ -4559,7 +4568,7 @@ int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
|
void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
|
||||||
if (NULL == pRsp) {
|
if (NULL == pRsp) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -4569,6 +4578,3 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
|
||||||
taosMemoryFree(pRsp->pMeta);
|
taosMemoryFree(pRsp->pMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -298,28 +298,30 @@ typedef struct {
|
||||||
} SVgObj;
|
} SVgObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
char stb[TSDB_TABLE_FNAME_LEN];
|
char stb[TSDB_TABLE_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int64_t stbUid;
|
int64_t stbUid;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int8_t intervalUnit;
|
int8_t intervalUnit;
|
||||||
int8_t slidingUnit;
|
int8_t slidingUnit;
|
||||||
int8_t timezone;
|
int8_t timezone;
|
||||||
int32_t dstVgId; // for stream
|
int32_t dstVgId; // for stream
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
int32_t exprLen; // strlen + 1
|
int32_t exprLen; // strlen + 1
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
int32_t astLen;
|
int32_t astLen;
|
||||||
char* expr;
|
int32_t numOfVgroups;
|
||||||
char* tagsFilter;
|
char* expr;
|
||||||
char* sql;
|
char* tagsFilter;
|
||||||
char* ast;
|
char* sql;
|
||||||
|
char* ast;
|
||||||
|
SVgEpSet* pVgEpSet;
|
||||||
} SSmaObj;
|
} SSmaObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -36,6 +36,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
|
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
|
||||||
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
|
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
|
||||||
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
|
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
|
||||||
|
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups);
|
||||||
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
||||||
|
@ -262,7 +263,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
||||||
req.sliding = pSma->sliding;
|
req.sliding = pSma->sliding;
|
||||||
req.expr = pSma->expr;
|
req.expr = pSma->expr;
|
||||||
req.tagsFilter = pSma->tagsFilter;
|
req.tagsFilter = pSma->tagsFilter;
|
||||||
|
req.numOfVgroups = pSma->numOfVgroups;
|
||||||
|
req.pVgEpSet = pSma->pVgEpSet;
|
||||||
|
|
||||||
// get length
|
// get length
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
|
tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
|
||||||
|
@ -420,6 +423,15 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
// todo add sma info here
|
// todo add sma info here
|
||||||
|
SVgEpSet *pVgEpSet = NULL;
|
||||||
|
int32_t numOfVgroups = 0;
|
||||||
|
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSma->pVgEpSet = pVgEpSet;
|
||||||
|
pSma->numOfVgroups = numOfVgroups;
|
||||||
|
|
||||||
int32_t smaContLen = 0;
|
int32_t smaContLen = 0;
|
||||||
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
||||||
if (pSmaReq == NULL) return -1;
|
if (pSmaReq == NULL) return -1;
|
||||||
|
@ -964,3 +976,52 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
void *pIter = NULL;
|
||||||
|
SVgEpSet *pVgEpSet = NULL;
|
||||||
|
int32_t nAllocVgs = 16;
|
||||||
|
int32_t nVgs = 0;
|
||||||
|
|
||||||
|
pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet));
|
||||||
|
if (!pVgEpSet) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (pVgroup->dbUid != pDb->uid) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nVgs >= nAllocVgs) {
|
||||||
|
void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet));
|
||||||
|
if (!p) {
|
||||||
|
taosMemoryFree(pVgEpSet);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pVgEpSet = (SVgEpSet *)p;
|
||||||
|
nAllocVgs *= 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
(pVgEpSet + nVgs)->vgId = pVgroup->vgId;
|
||||||
|
(pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
|
++nVgs;
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppVgEpSet = pVgEpSet;
|
||||||
|
*numOfVgroups = nVgs;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue