Merge branch '3.0' into fix/TS-3247
This commit is contained in:
commit
c4418046e1
|
@ -148,6 +148,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
|
||||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
||||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
|
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
|
||||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
|
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
|
||||||
|
int32_t metaTrimTables(SMeta* pMeta);
|
||||||
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
||||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||||
|
|
|
@ -838,22 +838,96 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
||||||
|
metaWLock(pMeta);
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) {
|
||||||
|
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
|
||||||
|
metaDropTableByUid(pMeta, uid, NULL);
|
||||||
|
metaDebug("batch drop table:%" PRId64, uid);
|
||||||
|
}
|
||||||
|
metaULock(pMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// 1, tranverse table's
|
||||||
|
// 2, validate table name using vnodeValidateTableHash
|
||||||
|
// 3, push invalidated table's uid into uidList
|
||||||
|
|
||||||
|
TBC *pCur;
|
||||||
|
code = tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
|
||||||
|
if (code < 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveToFirst(pCur);
|
||||||
|
if (code) {
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pData = NULL, *pKey = NULL;
|
||||||
|
int nData = 0, nKey = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int32_t ret = tdbTbcNext(pCur, &pKey, &nKey, &pData, &nData);
|
||||||
|
if (ret < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaEntry me = {0};
|
||||||
|
SDecoder dc = {0};
|
||||||
|
tDecoderInit(&dc, pData, nData);
|
||||||
|
metaDecodeEntry(&dc, &me);
|
||||||
|
if (me.type != TSDB_SUPER_TABLE) {
|
||||||
|
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, me.name);
|
||||||
|
if (TSDB_CODE_VND_HASH_MISMATCH == ret) {
|
||||||
|
taosArrayPush(uidList, &me.uid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
}
|
||||||
|
tdbFree(pData);
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaTrimTables(SMeta *pMeta) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||||
|
if (tbUids == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = metaFilterTableByHash(pMeta, tbUids);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
if (TARRAY_SIZE(tbUids) == 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaDropTables(pMeta, tbUids);
|
||||||
|
|
||||||
|
end:
|
||||||
|
taosArrayDestroy(tbUids);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
|
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
|
||||||
int ret = metaTtlSmaller(pMeta, ttl, tbUids);
|
int ret = metaTtlSmaller(pMeta, ttl, tbUids);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
if (taosArrayGetSize(tbUids) == 0) {
|
if (TARRAY_SIZE(tbUids) == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaDropTables(pMeta, tbUids);
|
||||||
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
|
|
||||||
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
|
|
||||||
metaDropTableByUid(pMeta, *uid, NULL);
|
|
||||||
metaDebug("ttl drop table:%" PRId64, *uid);
|
|
||||||
}
|
|
||||||
metaULock(pMeta);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -999,7 +1073,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
|
|
||||||
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1);
|
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1);
|
||||||
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
||||||
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
||||||
} else if (e.type == TSDB_NORMAL_TABLE) {
|
} else if (e.type == TSDB_NORMAL_TABLE) {
|
||||||
// drop schema.db (todo)
|
// drop schema.db (todo)
|
||||||
|
|
||||||
|
@ -1011,7 +1085,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
|
|
||||||
metaStatsCacheDrop(pMeta, uid);
|
metaStatsCacheDrop(pMeta, uid);
|
||||||
metaUidCacheClear(pMeta, uid);
|
metaUidCacheClear(pMeta, uid);
|
||||||
metaTbGroupCacheClear(pMeta, uid);
|
metaTbGroupCacheClear(pMeta, uid);
|
||||||
--pMeta->pVnode->config.vndStats.numOfSTables;
|
--pMeta->pVnode->config.vndStats.numOfSTables;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1432,7 +1506,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
||||||
((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn);
|
((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn);
|
||||||
|
|
||||||
metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
|
metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
|
||||||
metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid);
|
metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid);
|
||||||
|
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
|
|
@ -245,11 +245,11 @@ _exit:
|
||||||
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int32_t size;
|
int32_t size;
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
uint8_t *pCont;
|
uint8_t *pCont;
|
||||||
SEncoder *pCoder = &(SEncoder){0};
|
SEncoder *pCoder = &(SEncoder){0};
|
||||||
SDeleteRes res = {0};
|
SDeleteRes res = {0};
|
||||||
|
|
||||||
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
initStorageAPI(&handle.api);
|
initStorageAPI(&handle.api);
|
||||||
|
@ -316,8 +316,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
|
||||||
ver);
|
|
||||||
|
|
||||||
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||||
ASSERT(pVnode->state.applied + 1 == ver);
|
ASSERT(pVnode->state.applied + 1 == ver);
|
||||||
|
@ -1479,6 +1478,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
|
||||||
pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
|
pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
|
||||||
|
|
||||||
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
|
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
|
||||||
|
code = metaTrimTables(pVnode->pMeta);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1492,8 +1492,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pR
|
||||||
|
|
||||||
code = vnodeConsolidateAlterHashRange(pVnode, ver);
|
code = vnodeConsolidateAlterHashRange(pVnode, ver);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(),
|
vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
|
||||||
ver);
|
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
pVnode->config.hashChange = false;
|
pVnode->config.hashChange = false;
|
||||||
|
|
Loading…
Reference in New Issue