add/delete tag index
This commit is contained in:
parent
7e720269e2
commit
3e738363aa
|
@ -2678,7 +2678,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
|
|||
|
||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||
if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER;
|
||||
if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
return code;
|
||||
|
|
|
@ -129,7 +129,6 @@ int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
|||
int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
||||
|
||||
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
}
|
||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
|
|
|
@ -390,8 +390,166 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
return 0;
|
||||
}
|
||||
int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||
// impl later
|
||||
SMetaEntry oStbEntry = {0};
|
||||
SMetaEntry nStbEntry = {0};
|
||||
|
||||
TBC *pUidIdxc = NULL;
|
||||
TBC *pTbDbc = NULL;
|
||||
void *pData;
|
||||
int nData;
|
||||
int64_t oversion;
|
||||
SDecoder dc = {0};
|
||||
int32_t ret;
|
||||
int32_t c = -2;
|
||||
tb_uid_t suid = pReq->suid;
|
||||
|
||||
// get super table
|
||||
if (tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
|
||||
ret = -1;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL);
|
||||
// ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
|
||||
// if (ret < 0 || c) {
|
||||
// tdbTbcClose(pUidIdxc);
|
||||
|
||||
// terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
// ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||
// if (ret < 0) {
|
||||
// tdbTbcClose(pUidIdxc);
|
||||
|
||||
// terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
// oversion = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
// tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL);
|
||||
// ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
|
||||
// if (!(ret == 0 && c == 0)) {
|
||||
// metaError("meta/table: invalide ret: %" PRId32 " or c: %" PRId32 "alter stb failed.", ret, c);
|
||||
// return -1;
|
||||
// }
|
||||
// ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
|
||||
// if (ret < 0) {
|
||||
// tdbTbcClose(pTbDbc);
|
||||
|
||||
// terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||
// return -1;
|
||||
//}
|
||||
|
||||
oStbEntry.pBuf = taosMemoryMalloc(nData);
|
||||
memcpy(oStbEntry.pBuf, pData, nData);
|
||||
tDecoderInit(&dc, oStbEntry.pBuf, nData);
|
||||
metaDecodeEntry(&dc, &oStbEntry);
|
||||
|
||||
nStbEntry.version = version;
|
||||
nStbEntry.type = TSDB_SUPER_TABLE;
|
||||
nStbEntry.uid = pReq->suid;
|
||||
nStbEntry.name = pReq->name;
|
||||
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
||||
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||
|
||||
metaWLock(pMeta);
|
||||
// compare two entry
|
||||
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
||||
metaSaveToSkmDb(pMeta, &nStbEntry);
|
||||
}
|
||||
|
||||
// update table.db
|
||||
metaSaveToTbDb(pMeta, &nStbEntry);
|
||||
|
||||
// update uid index
|
||||
metaUpdateUidIdx(pMeta, &nStbEntry);
|
||||
|
||||
// metaStatsCacheDrop(pMeta, nStbEntry.uid);
|
||||
|
||||
metaULock(pMeta);
|
||||
|
||||
// Get target schema info
|
||||
SSchemaWrapper *pTagSchema = &nStbEntry.stbEntry.schemaTag;
|
||||
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
char *tagName = "tag_col";
|
||||
SSchema *pCol = NULL;
|
||||
int32_t iCol = 0;
|
||||
for (;;) {
|
||||
pCol = NULL;
|
||||
if (iCol >= pTagSchema->nCols) break;
|
||||
pCol = &pTagSchema->pSchema[iCol];
|
||||
if (strcmp(pCol->name, tagName) == 0) break;
|
||||
iCol++;
|
||||
}
|
||||
if (iCol == 0) {
|
||||
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
if (pCol == NULL) {
|
||||
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
/*
|
||||
* iterator all pTdDbc by uid and version
|
||||
*/
|
||||
TBC *pCtbIdxc = NULL;
|
||||
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
|
||||
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
|
||||
if (rc < 0) {
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
goto _err;
|
||||
}
|
||||
for (;;) {
|
||||
void *pKey, *pVal;
|
||||
int nKey, nVal;
|
||||
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
|
||||
if (rc < 0) break;
|
||||
if (((SCtbIdxKey *)pKey)->suid != suid) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
continue;
|
||||
}
|
||||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
|
||||
const void *pTagData = NULL;
|
||||
int32_t nTagData = 0;
|
||||
|
||||
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
|
||||
STagVal tagVal = {.cid = pCol->colId};
|
||||
tTagGet((const STag *)pVal, &tagVal);
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
pTagData = tagVal.pData;
|
||||
nTagData = (int32_t)tagVal.nData;
|
||||
} else {
|
||||
pTagData = &(tagVal.i64);
|
||||
nTagData = tDataTypes[pCol->type].bytes;
|
||||
}
|
||||
|
||||
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey) <
|
||||
0) {
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
goto _err;
|
||||
}
|
||||
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
}
|
||||
|
||||
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||
tDecoderClear(&dc);
|
||||
tdbTbcClose(pTbDbc);
|
||||
tdbTbcClose(pUidIdxc);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
return TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||
}
|
||||
int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||
// impl later
|
||||
|
@ -1292,7 +1450,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// Get targe schema info
|
||||
// Get target schema info
|
||||
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||
|
@ -1352,7 +1510,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
|||
pTagData = &(tagVal.i64);
|
||||
nTagData = tDataTypes[pCol->type].bytes;
|
||||
}
|
||||
if (metaCreateTagIdxKey(uid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||
|
||||
#define MEM_TERM_LIMIT 10 * 10000
|
||||
#define MEM_THRESHOLD 8 * 512 * 1024 // 8M
|
||||
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20
|
||||
#define MEM_THRESHOLD 128 * 1024 * 1024 // 8M
|
||||
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 5
|
||||
#define MEM_ESTIMATE_RADIO 1.5
|
||||
|
||||
static void idxMemRef(MemTable* tbl);
|
||||
|
|
Loading…
Reference in New Issue