feat: add drop tag index of super table feature
This commit is contained in:
parent
582a40874f
commit
161081a0a2
|
@ -181,7 +181,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
|
|||
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
|
||||
|
||||
int32_t metaAddIndexToSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||
int32_t metaDropIndexFromSuperTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
|
||||
void metaUpdTimeSeriesNum(SMeta* pMeta);
|
||||
|
|
|
@ -1664,192 +1664,82 @@ int32_t metaAddIndexToSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *
|
|||
|
||||
int32_t metaDropIndexFromSuperTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
// TODO
|
||||
|
||||
if (strlen(pReq->colName) == 0 || strlen(pReq->stb) == 0) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since invalid table name or column name, version:%" PRId64,
|
||||
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, version);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
SMetaEntry *pEntry = NULL;
|
||||
code = metaFetchEntryByName(pMeta, pReq->stb, &pEntry);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
|
||||
__FILE__, __LINE__, pReq->stb, version);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE != pEntry->type) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s type %d is invalid, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||
__func__, __FILE__, __LINE__, pReq->stb, pEntry->type, version);
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(TSDB_CODE_VND_INVALID_TABLE_ACTION);
|
||||
}
|
||||
|
||||
if (pReq->stbUid != pEntry->uid) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " is not equal to %" PRId64
|
||||
", version:%" PRId64,
|
||||
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->stb, pEntry->uid, pReq->stbUid, version);
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
SSchemaWrapper *pTagSchema = &pEntry->stbEntry.schemaTag;
|
||||
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s has no tag, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||
__func__, __FILE__, __LINE__, pReq->stb, version);
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(TSDB_CODE_VND_INVALID_TABLE_ACTION);
|
||||
}
|
||||
|
||||
// search and set the tag index off
|
||||
int32_t numOfChangedTags = 0;
|
||||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
SSchema *pCol = pTagSchema->pSchema + i;
|
||||
if (0 == strncmp(pCol->name, pReq->colName, sizeof(pReq->colName))) {
|
||||
if (!IS_IDX_ON(pCol)) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s column %s is not indexed, version:%" PRId64,
|
||||
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->stb, pReq->colName, version);
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(TSDB_CODE_VND_INVALID_TABLE_ACTION);
|
||||
}
|
||||
numOfChangedTags++;
|
||||
SSCHMEA_SET_IDX_OFF(pCol);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfChangedTags != 1) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s column %s is not found, version:%" PRId64,
|
||||
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->stb, pReq->colName, version);
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(TSDB_CODE_VND_COL_NOT_EXISTS);
|
||||
}
|
||||
|
||||
// do handle the entry
|
||||
pEntry->version = version;
|
||||
pTagSchema->version++;
|
||||
code = metaHandleEntry2(pMeta, pEntry);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||
__func__, __FILE__, __LINE__, tstrerror(code), pEntry->uid, pReq->stb, version);
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(code);
|
||||
} else {
|
||||
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->stb,
|
||||
pEntry->uid, version);
|
||||
}
|
||||
|
||||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(code);
|
||||
#if 0
|
||||
int32_t code = 0;
|
||||
SMetaEntry oStbEntry = {0};
|
||||
SMetaEntry nStbEntry = {0};
|
||||
|
||||
STbDbKey tbDbKey = {0};
|
||||
TBC *pUidIdxc = NULL;
|
||||
TBC *pTbDbc = NULL;
|
||||
int ret = 0;
|
||||
int c = -2;
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
int64_t oversion;
|
||||
SDecoder dc = {0};
|
||||
|
||||
tb_uid_t suid = pReq->stbUid;
|
||||
|
||||
if ((code = tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pData, &nData)) != 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tbDbKey.uid = suid;
|
||||
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
|
||||
if ((code = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData)) != 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tDecoderInit(&dc, pData, nData);
|
||||
code = metaDecodeEntry(&dc, &oStbEntry);
|
||||
if (code != 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSchema *pCol = NULL;
|
||||
int32_t colId = -1;
|
||||
for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
|
||||
SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
|
||||
if (0 == strncmp(schema->name, pReq->colName, sizeof(pReq->colName))) {
|
||||
if (IS_IDX_ON(schema)) {
|
||||
pCol = schema;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pCol == NULL) {
|
||||
metaError("vgId:%d, failed to drop index on %s.%s,since %s", TD_VID(pMeta->pVnode), pReq->stb, pReq->colName,
|
||||
tstrerror(TSDB_CODE_VND_COL_NOT_EXISTS));
|
||||
code = 0;
|
||||
|
||||
goto _err;
|
||||
}
|
||||
|
||||
/*
|
||||
* iterator all pTdDbc by uid and version
|
||||
*/
|
||||
TBC *pCtbIdxc = NULL;
|
||||
code = tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
|
||||
if (code != 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
|
||||
if (code < 0) {
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
goto _err;
|
||||
}
|
||||
for (;;) {
|
||||
void *pKey = NULL, *pVal = NULL;
|
||||
int nKey = 0, nVal = 0;
|
||||
|
||||
code = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
|
||||
if (code < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
pCtbIdxc = NULL;
|
||||
break;
|
||||
}
|
||||
if (((SCtbIdxKey *)pKey)->suid != suid) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
continue;
|
||||
}
|
||||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
|
||||
const void *pTagData = NULL;
|
||||
int32_t nTagData = 0;
|
||||
|
||||
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
|
||||
STagVal tagVal = {.cid = pCol->colId};
|
||||
if (tTagGet((const STag *)pVal, &tagVal)) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
pTagData = tagVal.pData;
|
||||
nTagData = (int32_t)tagVal.nData;
|
||||
} else {
|
||||
pTagData = &(tagVal.i64);
|
||||
nTagData = tDataTypes[pCol->type].bytes;
|
||||
}
|
||||
} else {
|
||||
if (!IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
nTagData = tDataTypes[pCol->type].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
code = metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey);
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
if (code < 0) {
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
metaWLock(pMeta);
|
||||
ret = tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to delete tag idx key:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->stb,
|
||||
pReq->stbUid, tstrerror(ret));
|
||||
}
|
||||
metaULock(pMeta);
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
pTagIdxKey = NULL;
|
||||
}
|
||||
|
||||
// clear idx flag
|
||||
SSCHMEA_SET_IDX_OFF(pCol);
|
||||
|
||||
nStbEntry.version = version;
|
||||
nStbEntry.type = TSDB_SUPER_TABLE;
|
||||
nStbEntry.uid = oStbEntry.uid;
|
||||
nStbEntry.name = oStbEntry.name;
|
||||
|
||||
SSchemaWrapper *row = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaRow);
|
||||
SSchemaWrapper *tag = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaTag);
|
||||
SColCmprWrapper *cmpr = tCloneSColCmprWrapper(&oStbEntry.colCmpr);
|
||||
if (row == NULL || tag == NULL || cmpr == NULL) {
|
||||
tDeleteSchemaWrapper(row);
|
||||
tDeleteSchemaWrapper(tag);
|
||||
tDeleteSColCmprWrapper(cmpr);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
nStbEntry.stbEntry.schemaRow = *row;
|
||||
nStbEntry.stbEntry.schemaTag = *tag;
|
||||
nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam;
|
||||
nStbEntry.colCmpr = *cmpr;
|
||||
|
||||
nStbEntry.colCmpr = oStbEntry.colCmpr;
|
||||
|
||||
metaWLock(pMeta);
|
||||
// update table.db
|
||||
ret = metaSaveToTbDb(pMeta, &nStbEntry);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to save tb db:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->stb,
|
||||
pReq->stbUid, tstrerror(ret));
|
||||
}
|
||||
// update uid index
|
||||
ret = metaUpdateUidIdx(pMeta, &nStbEntry);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->stb,
|
||||
pReq->stbUid, tstrerror(ret));
|
||||
}
|
||||
metaULock(pMeta);
|
||||
|
||||
tDeleteSchemaWrapper(tag);
|
||||
tDeleteSchemaWrapper(row);
|
||||
tDeleteSColCmprWrapper(cmpr);
|
||||
|
||||
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||
tDecoderClear(&dc);
|
||||
tdbFree(pData);
|
||||
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||
tDecoderClear(&dc);
|
||||
tdbFree(pData);
|
||||
|
||||
return code;
|
||||
#endif
|
||||
}
|
|
@ -2458,7 +2458,7 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
|
|||
return code;
|
||||
}
|
||||
|
||||
code = metaDropIndexFromSTable(pVnode->pMeta, ver, &req);
|
||||
code = metaDropIndexFromSuperTable(pVnode->pMeta, ver, &req);
|
||||
if (code) {
|
||||
pRsp->code = code;
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue