Merge pull request #20309 from taosdata/feat/compact_with_time_range
feat: compact with time range
This commit is contained in:
commit
93dbe454d8
|
@ -1291,9 +1291,10 @@ int32_t tSerializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
|
||||||
int32_t tDeserializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
|
int32_t tDeserializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t compactStartTime;
|
int64_t compactStartTime;
|
||||||
|
STimeWindow tw;
|
||||||
} SCompactVnodeReq;
|
} SCompactVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||||
|
|
|
@ -4108,6 +4108,11 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *
|
||||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->compactStartTime) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->compactStartTime) < 0) return -1;
|
||||||
|
|
||||||
|
// 1.1 add tw.skey and tw.ekey
|
||||||
|
if (tEncodeI64(&encoder, pReq->tw.skey) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->tw.ekey) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -4120,11 +4125,21 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->compactStartTime) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->compactStartTime) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
|
||||||
|
|
||||||
|
// 1.1
|
||||||
|
if (tDecodeIsEnd(&decoder)) {
|
||||||
|
pReq->tw.skey = TSKEY_MIN;
|
||||||
|
pReq->tw.ekey = TSKEY_MAX;
|
||||||
|
} else {
|
||||||
|
if (tDecodeI64(&decoder, &pReq->tw.skey) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->tw.ekey) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,8 @@ int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVg
|
||||||
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId, bool force);
|
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId, bool force);
|
||||||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||||
SArray *pArray);
|
SArray *pArray);
|
||||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs);
|
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||||
|
STimeWindow tw);
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
|
|
|
@ -2209,11 +2209,12 @@ _OVER:
|
||||||
|
|
||||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
||||||
|
|
||||||
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen,
|
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
|
||||||
int64_t compactTs) {
|
STimeWindow tw) {
|
||||||
SCompactVnodeReq compactReq = {0};
|
SCompactVnodeReq compactReq = {0};
|
||||||
compactReq.dbUid = pDb->uid;
|
compactReq.dbUid = pDb->uid;
|
||||||
compactReq.compactStartTime = compactTs;
|
compactReq.compactStartTime = compactTs;
|
||||||
|
compactReq.tw = tw;
|
||||||
tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
|
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
|
||||||
|
@ -2239,13 +2240,13 @@ static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgrou
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||||
int64_t compactTs) {
|
STimeWindow tw) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs);
|
void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -2260,7 +2261,8 @@ static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs) {
|
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||||
if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) return -1;
|
STimeWindow tw) {
|
||||||
|
if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw) != 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -457,9 +457,10 @@ struct SCommitInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SCompactInfo {
|
struct SCompactInfo {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
int32_t flag;
|
int32_t flag;
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
|
STimeWindow tw;
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
Loading…
Reference in New Issue