Merge pull request #16664 from taosdata/feature/3.0_interval_hash_optimize
enh: drop child tables support for rsma
This commit is contained in:
commit
9a7774817d
|
@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
|
||||||
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
||||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
|
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
|
||||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
|
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t *tbUid);
|
||||||
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
||||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||||
|
@ -208,7 +208,7 @@ int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||||
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
|
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
|
||||||
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
||||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
|
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
||||||
void tdUidStoreDestory(STbUidStore* pStore);
|
void tdUidStoreDestory(STbUidStore* pStore);
|
||||||
void* tdUidStoreFree(STbUidStore* pStore);
|
void* tdUidStoreFree(STbUidStore* pStore);
|
||||||
|
|
||||||
|
|
|
@ -474,7 +474,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
|
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) {
|
||||||
void *pData = NULL;
|
void *pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
@ -496,6 +496,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
taosArrayPush(tbUids, &uid);
|
taosArrayPush(tbUids, &uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((type == TSDB_CHILD_TABLE) && tbUid) {
|
||||||
|
*tbUid = uid;
|
||||||
|
}
|
||||||
|
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||||
|
|
||||||
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
|
||||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||||
int8_t idx);
|
int8_t idx);
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||||
|
@ -175,7 +175,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) {
|
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
if (!suid || !tbUids) {
|
if (!suid || !tbUids) {
|
||||||
|
@ -199,7 +199,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pRSmaInfo->taskInfo[i]) {
|
if (pRSmaInfo->taskInfo[i]) {
|
||||||
if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true)) < 0)) {
|
if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0)) {
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
|
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
|
||||||
terrstr());
|
terrstr());
|
||||||
|
@ -215,12 +215,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) {
|
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
|
||||||
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
|
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) {
|
if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,7 +229,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) {
|
||||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||||
SArray *pTbUids = *(SArray **)pIter;
|
SArray *pTbUids = *(SArray **)pIter;
|
||||||
|
|
||||||
if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) {
|
if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) {
|
||||||
taosHashCancelIterate(pStore->uidHash, pIter);
|
taosHashCancelIterate(pStore->uidHash, pIter);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -1118,7 +1118,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdUpdateTbUidList(pVnode->pSma, &uidStore) < 0) {
|
if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
|
||||||
smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
|
smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
|
||||||
terrstr());
|
terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
||||||
SSubmitBlkRsp r = {0};
|
SSubmitBlkRsp r = {0};
|
||||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
if (tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r) < 0) {
|
if ((terrno = tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -534,7 +534,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
|
||||||
}
|
}
|
||||||
|
|
||||||
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
||||||
if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) {
|
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
tdUidStoreFree(pStore);
|
tdUidStoreFree(pStore);
|
||||||
|
@ -692,6 +692,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
SArray *tbUids = NULL;
|
SArray *tbUids = NULL;
|
||||||
|
STbUidStore *pStore = NULL;
|
||||||
|
|
||||||
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
|
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
|
||||||
pRsp->pCont = NULL;
|
pRsp->pCont = NULL;
|
||||||
|
@ -715,9 +716,10 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
|
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
|
||||||
SVDropTbRsp dropTbRsp = {0};
|
SVDropTbRsp dropTbRsp = {0};
|
||||||
|
tb_uid_t tbUid = 0;
|
||||||
|
|
||||||
/* code */
|
/* code */
|
||||||
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
|
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids, &tbUid);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
|
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
|
||||||
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -726,15 +728,18 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(rsp.pArray, &dropTbRsp);
|
taosArrayPush(rsp.pArray, &dropTbRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
|
||||||
|
tdUpdateTbUidList(pVnode->pSma, pStore, false);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosArrayDestroy(tbUids);
|
taosArrayDestroy(tbUids);
|
||||||
|
tdUidStoreFree(pStore);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
|
|
Loading…
Reference in New Issue