support rm ttl tb

This commit is contained in:
wangjiaming0909 2024-03-15 01:09:16 +00:00
parent 1ef153de63
commit 3d77e8432d
8 changed files with 275 additions and 4 deletions

View File

@ -4291,6 +4291,25 @@ typedef struct SDropCtbWithTsmaReq {
int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq);
int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq);
typedef struct SVTtlExpiredTb {
tb_uid_t uid;
char name[TSDB_TABLE_NAME_LEN];
tb_uid_t suid;
} SVTtlExpiredTb;
typedef struct SVFetchTtlExpiredTbsRsp {
SArray* pExpiredTbs;
int32_t vgId;
} SVFetchTtlExpiredTbsRsp;
int32_t tEncodeTtlExpiredTb(SEncoder* pEncoder, const SVTtlExpiredTb* pTb);
int32_t tDecodeTtlExpiredTb(SDecoder* pDecoder, SVTtlExpiredTb* pTb);
int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp);
int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp);
void tFreeFetchTtlExpiredTbsRsp(void* p);
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -273,6 +273,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)

View File

@ -10302,3 +10302,51 @@ int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgres
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq){ return 0;}
int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq) { return 0;}
int32_t tEncodeTtlExpiredTb(SEncoder* pEncoder, const SVTtlExpiredTb* pTb) {
if (tEncodeI64(pEncoder, pTb->uid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTb->name) < 0) return -1;
if (tEncodeI64(pEncoder, pTb->suid) < 0) return -1;
return 0;
}
int32_t tDecodeTtlExpiredTb(SDecoder* pDecoder, SVTtlExpiredTb* pTb) {
if (tDecodeI64(pDecoder, &pTb->uid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTb->name) < 0) return -1;
if (tDecodeI64(pDecoder, &pTb->suid) < 0) return -1;
return 0;
}
int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp) {
if (tEncodeI32(pCoder, pRsp->vgId) < 0) return -1;
int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0;
if (tEncodeI32(pCoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
if (tEncodeTtlExpiredTb(pCoder, taosArrayGet(pRsp->pExpiredTbs, i)) < 0) return -1;
}
return 0;
}
int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp) {
if (tDecodeI32(pclose, &pRsp->vgId) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pCoder, &size) < 0) return -1;
if (size > 0) {
pRsp->pExpiredTbs = taosArrayInit(size, sizeof(SVTtlExpiredTb));
if (!pRsp->pExpiredTbs) return TSDB_CODE_OUT_OF_MEMORY;
SVTtlExpiredTb tb;
for (int32_t i = 0; i < size; ++i) {
if (tDecodeTtlExpiredTb(pCoder, &tb) < 0) return -1;
taosArrayPush(pRsp->pExpiredTbs, &tb);
}
}
return 0;
}
void tFreeFetchTtlExpiredTbsRsp(void* p) {
SVFetchTtlExpiredTbsRsp* pRsp = p;
taosArrayDestroy(pRsp->pExpiredTbs);
}

View File

@ -169,6 +169,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -884,6 +884,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -64,6 +64,7 @@ static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq);
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {
@ -93,7 +94,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
@ -943,7 +944,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SRpcMsg rpcMsg = {.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
@ -3707,6 +3708,27 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
return code;
}
static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, int32_t vgId, SArray* pTbs) {
return 0;
}
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp) {
int32_t code = -1;
SMnode *pMnode = pRsp->info.node;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
if (pTrans == NULL) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
//if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
int32_t code = -1;
SMnode *pMnode = pReq->info.node;
@ -3744,7 +3766,7 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
if (taosHashGet(pSourceTbHashSet, &pSma->stbUid, sizeof(pSma->stbUid))) {
if (!taosHashGet(pTsmaHashSet, &pSma->uid, sizeof(pSma->uid))) {
// TODO should retry
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
}
sdbRelease(pMnode->pSdb, pSma);
@ -3759,3 +3781,117 @@ _OVER:
taosHashCleanup(pSourceTbHashSet);
return code;
}
typedef struct SDropTbVgReqs {
SVDropTbBatchReq req;
SVgroupInfo info;
} SDropTbVgReqs;
static int32_t mndDropTbAdd(SMnode* pMnode, SHashObj* pVgHashMap, const SVgObj* pVgObj, SVDropTbReq* pReq, char* name, tb_uid_t suid) {
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = true};
SVgroupInfo info = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .vgId = pVgObj->vgId};
info.epSet = mndGetVgroupEpset(pMnode, pVgObj);
SDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgObj->vgId, sizeof(pVgObj->vgId));
SDropTbVgReqs reqs = {0};
if (pReqs == NULL) {
reqs.info = info;
reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
taosArrayPush(reqs.req.pArray, pReq);
taosHashPut(pVgHashMap, &pVgObj->vgId, sizeof(pVgObj->vgId), &reqs, sizeof(reqs));
} else {
taosArrayPush(pReqs->req.pArray, pReq);
}
return 0;
}
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
int32_t code = -1;
SDecoder decoder = {0};
SMnode *pMnode = pRsp->info.node;
bool locked = false;
SHashObj* pTsmaMap = NULL;
SHashObj* pVgroupMap = NULL;
SHashObj* pDbMap = NULL;
SVFetchTtlExpiredTbsRsp rsp;
if (pRsp->code != TSDB_CODE_SUCCESS) goto _end;
tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
terrno = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp);
if (terrno) goto _end;
pTsmaMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (!pTsmaMap) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
// get all stb uids
for (int32_t i = 0; i < rsp.pExpiredTbs->size; ++i) {
const SVTtlExpiredTb* pTb = taosArrayGet(rsp.pExpiredTbs, i);
if (taosHashGet(pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
} else {
SArray* pTsmas = taosArrayInit(2, TSDB_TABLE_NAME_LEN);
if (!pTsmas) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
taosHashPut(pTsmaMap, &pTb->suid, sizeof(pTb->suid), &pTsmas, sizeof(pTsmas));
}
}
sdbReadLock(pMnode->pSdb, SDB_SMA);
locked = true;
void *pIter = NULL;
SSmaObj *pSma = NULL;
char buf[TSDB_TABLE_NAME_LEN + TSDB_DB_FNAME_LEN + 1] = {0};
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
if (!pIter) break;
SArray* pTsmas = taosHashGet(pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
if (pTsmas) {
int32_t len = sprintf(buf, "%s.%s", pSma->db, pSma->name);
len = taosCreateMD5Hash(buf, len);
taosArrayPush(pTsmas, buf);
}
sdbRelease(pMnode->pSdb, pSma);
}
pVgroupMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pVgroupMap) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
SVgObj* pVgObj = mndAcquireVgroup(pMnode, rsp.vgId);
if (!pVgObj) {
code = 0;
goto _end;
}
SVDropTbReq req = {.igNotExists = true};
for (int32_t i = 0; i < rsp.pExpiredTbs->size; ++i) {
SVTtlExpiredTb* pTb = taosArrayGet(rsp.pExpiredTbs, i);
req.name = pTb->name;
req.suid = pTb->suid;
mndDropTbAdd(pMnode, pVgroupMap, pVgObj, &req, pTb->name, pTb->suid);
SArray* pTsmas = taosHashGet(pTsmaMap, &pTb->suid, sizeof(pTb->suid));
for (int32_t j = 0; j > pTsmas->size; ++j) {
char* name = taosArrayGet(pTsmas, j);
sprintf(name + strlen(name), "_%s", pTb->name);
}
}
mndReleaseVgroup(pMnode, pVgObj);
if (terrno == 0) code = 0;
_end:
tDecoderClear(&decoder);
tFreeFetchTtlExpiredTbsRsp(&rsp);
if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA);
//pVgroupHashmap
//pTsmaMap
return code;
}

View File

@ -50,6 +50,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp);
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg);
@ -484,6 +485,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_ALTER_TABLE: {
code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
} break;
case TDMT_VND_FETCH_TTL_EXPIRED_TBS:
case TDMT_VND_DROP_TTL_TABLE: {
code = vnodePreProcessDropTtlMsg(pVnode, pMsg);
} break;
@ -563,6 +565,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_DROP_TTL_TABLE:
if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_FETCH_TTL_EXPIRED_TBS:
if (vnodeProcessFetchTtlExpiredTbs(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_TRIM:
if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
break;
@ -926,6 +931,65 @@ end:
return ret;
}
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp) {
int32_t code = -1;
SMetaReader mr = {0};
SVDropTtlTableReq ttlReq = {0};
SVFetchTtlExpiredTbsRsp rsp = {0};
SEncoder encoder = {0};
pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _end;
}
ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids));
tb_uid_t suid;
char ctbName[TSDB_TABLE_NAME_LEN];
SVTtlExpiredTb expiredTb = {0};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
rsp.vgId = TD_VID(pVnode);
rsp.pExpiredTbs = taosArrayInit(ttlReq.nUids, sizeof(SVTtlExpiredTb));
if (!rsp.pExpiredTbs) goto _end;
for (int32_t i = 0; i < ttlReq.nUids; ++i) {
tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i);
expiredTb.uid = *uid;
expiredTb.suid = *uid;
terrno = metaReaderGetTableEntryByUid(&mr, *uid);
if (terrno < 0) goto _end;
strncpy(expiredTb.name, mr.me.name, TSDB_TABLE_NAME_LEN);
if (mr.me.type == TSDB_CHILD_TABLE) {
expiredTb.suid = mr.me.ctbEntry.suid;
}
taosArrayPush(rsp.pExpiredTbs, &expiredTb);
}
int32_t ret = 0;
tEncodeSize(tEncodeVFetchTtlExpiredTbsRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
if (pRsp->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _end;
}
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
terrno = tEncodeVFetchTtlExpiredTbsRsp(&encoder, &rsp);
tEncoderClear(&encoder);
if (terrno == 0) code = 0;
_end:
metaReaderClear(&mr);
tFreeFetchTtlExpiredTbsRsp(&rsp);
pRsp->code = terrno;
return code;
}
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder coder;

View File

@ -566,7 +566,7 @@ class TSMATestSQLGenerator:
class TDTestCase:
updatecfgDict = {'debugFlag': 143, 'asynclog': 0}
updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 10, 'ttlPushInterval': 5}
def __init__(self):
self.vgroups = 4