add dynamic change json tag index
This commit is contained in:
parent
345d92d073
commit
2a0543f713
|
@ -1196,6 +1196,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
||||||
int64_t oversion;
|
int64_t oversion;
|
||||||
const void *pData = NULL;
|
const void *pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
|
SDecoder dc = {0};
|
||||||
|
|
||||||
if (pAlterTbReq->tagName == NULL) {
|
if (pAlterTbReq->tagName == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -1208,82 +1209,147 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
||||||
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uid = *(tb_uid_t *)pVal;
|
uid = *(tb_uid_t *)pVal;
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
pVal = NULL;
|
pVal = NULL;
|
||||||
|
|
||||||
// search uid index
|
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
|
||||||
TBC *pUidIdxc = NULL;
|
ret = -1;
|
||||||
|
|
||||||
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
|
|
||||||
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
|
|
||||||
ASSERT(c == 0);
|
|
||||||
|
|
||||||
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
|
||||||
oversion = ((SUidIdxVal *)pData)[0].version;
|
|
||||||
|
|
||||||
// search table.db
|
|
||||||
TBC *pTbDbc = NULL;
|
|
||||||
SDecoder dc1 = {0};
|
|
||||||
SDecoder dc2 = {0};
|
|
||||||
|
|
||||||
/* get ctbEntry */
|
|
||||||
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
|
|
||||||
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
|
|
||||||
ASSERT(c == 0);
|
|
||||||
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
|
|
||||||
|
|
||||||
ctbEntry.pBuf = taosMemoryMalloc(nData);
|
|
||||||
memcpy(ctbEntry.pBuf, pData, nData);
|
|
||||||
tDecoderInit(&dc1, ctbEntry.pBuf, nData);
|
|
||||||
metaDecodeEntry(&dc1, &ctbEntry);
|
|
||||||
|
|
||||||
/* get stbEntry*/
|
|
||||||
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
|
|
||||||
if (!pVal) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
|
STbDbKey tbDbKey = {0};
|
||||||
sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal);
|
tbDbKey.uid = uid;
|
||||||
tdbFree(pVal);
|
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
|
||||||
tDecoderInit(&dc2, stbEntry.pBuf, nVal);
|
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
|
||||||
metaDecodeEntry(&dc2, &stbEntry);
|
|
||||||
|
tDecoderInit(&dc, pVal, nVal);
|
||||||
|
ret = metaDecodeEntry(&dc, &stbEntry);
|
||||||
|
if (ret < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||||
SSchema *pColumn = NULL;
|
|
||||||
int32_t iCol = 0;
|
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema *pCol = NULL;
|
||||||
|
int32_t iCol = 0;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
pColumn = NULL;
|
pCol = NULL;
|
||||||
|
|
||||||
if (iCol >= pTagSchema->nCols) break;
|
if (iCol >= pTagSchema->nCols) break;
|
||||||
pColumn = &pTagSchema->pSchema[iCol];
|
pCol = &pTagSchema->pSchema[iCol];
|
||||||
|
|
||||||
if (strcmp(pColumn->name, pAlterTbReq->tagName) == 0) break;
|
if (strcmp(pCol->name, pAlterTbReq->tagName) == 0) break;
|
||||||
iCol++;
|
iCol++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pColumn == NULL) {
|
if (iCol == 0) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
if (pCol == NULL) {
|
||||||
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
ctbEntry.version = version;
|
* iterator all pTdDbc by uid and version
|
||||||
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
*/
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
// drop all child tables
|
||||||
|
TBC *pCtbIdxc = NULL;
|
||||||
|
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
|
||||||
|
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = uid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
|
||||||
|
if (rc < 0) {
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
for (;;) {
|
||||||
const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
|
void *pKey, *pVal;
|
||||||
STag *pNewTag = NULL;
|
int nKey, nVal;
|
||||||
SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
|
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
|
||||||
if (!pTagArray) {
|
if (rc < 0) break;
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
if (((SCtbIdxKey *)pKey)->suid != uid) {
|
||||||
goto _err;
|
tdbFree(pVal);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
STag *pTag = pVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// search uid index
|
||||||
|
// TBC *pUidIdxc = NULL;
|
||||||
|
|
||||||
|
// tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, pMeta->txn);
|
||||||
|
// tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
|
||||||
|
// ASSERT(c == 0);
|
||||||
|
|
||||||
|
// tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||||
|
// oversion = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
|
// search table.db
|
||||||
|
// TBC *pTbDbc = NULL;
|
||||||
|
// SDecoder dc1 = {0};
|
||||||
|
// SDecoder dc2 = {0};
|
||||||
|
|
||||||
|
///* get ctbEntry */
|
||||||
|
// tdbTbcOpen(pMeta->pTbDb, &pTbDbc, pMeta->txn);
|
||||||
|
// tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
|
||||||
|
// ASSERT(c == 0);
|
||||||
|
// tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
|
||||||
|
|
||||||
|
// ctbEntry.pBuf = taosMemoryMalloc(nData);
|
||||||
|
// memcpy(ctbEntry.pBuf, pData, nData);
|
||||||
|
// tDecoderInit(&dc1, ctbEntry.pBuf, nData);
|
||||||
|
// metaDecodeEntry(&dc1, &ctbEntry);
|
||||||
|
|
||||||
|
/* get stbEntry*/
|
||||||
|
// tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
|
||||||
|
// if (!pVal) {
|
||||||
|
// terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
// goto _err;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
|
||||||
|
// sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal);
|
||||||
|
// tdbFree(pVal);
|
||||||
|
// tDecoderInit(&dc2, stbEntry.pBuf, nVal);
|
||||||
|
// metaDecodeEntry(&dc2, &stbEntry);
|
||||||
|
|
||||||
|
// SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||||
|
// SSchema *pColumn = NULL;
|
||||||
|
// int32_t iCol = 0;
|
||||||
|
// for (;;) {
|
||||||
|
// pColumn = NULL;
|
||||||
|
|
||||||
|
// if (iCol >= pTagSchema->nCols) break;
|
||||||
|
// pColumn = &pTagSchema->pSchema[iCol];
|
||||||
|
|
||||||
|
// if (strcmp(pColumn->name, pAlterTbReq->tagName) == 0) break;
|
||||||
|
// iCol++;
|
||||||
|
//}
|
||||||
|
|
||||||
|
// if (pColumn == NULL) {
|
||||||
|
// terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
||||||
|
// goto _err;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ctbEntry.version = version;
|
||||||
|
// if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
// terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
// goto _err;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
|
||||||
|
// STag *pNewTag = NULL;
|
||||||
|
// SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
|
||||||
|
// if (!pTagArray) {
|
||||||
|
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
// goto _err;
|
||||||
|
// }
|
||||||
|
|
||||||
// for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
// for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||||
// SSchema *pCol = &pTagSchema->pSchema[i];
|
// SSchema *pCol = &pTagSchema->pSchema[i];
|
||||||
// if (iCol == i) {
|
// if (iCol == i) {
|
||||||
|
@ -1315,43 +1381,43 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
||||||
// ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag;
|
// ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag;
|
||||||
// taosArrayDestroy(pTagArray);
|
// taosArrayDestroy(pTagArray);
|
||||||
|
|
||||||
metaWLock(pMeta);
|
// metaWLock(pMeta);
|
||||||
|
|
||||||
// save to table.db
|
// save to table.db
|
||||||
metaSaveToTbDb(pMeta, &ctbEntry);
|
// metaSaveToTbDb(pMeta, &ctbEntry);
|
||||||
|
|
||||||
// save to uid.idx
|
// save to uid.idx
|
||||||
metaUpdateUidIdx(pMeta, &ctbEntry);
|
// metaUpdateUidIdx(pMeta, &ctbEntry);
|
||||||
|
|
||||||
if (iCol == 0) {
|
// if (iCol == 0) {
|
||||||
metaUpdateTagIdx(pMeta, &ctbEntry);
|
// metaUpdateTagIdx(pMeta, &ctbEntry);
|
||||||
}
|
// }
|
||||||
|
|
||||||
ASSERT(ctbEntry.ctbEntry.pTags);
|
// ASSERT(ctbEntry.ctbEntry.pTags);
|
||||||
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
|
// SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
|
||||||
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
|
// tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
|
||||||
((STag *)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
|
// ((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn);
|
||||||
|
|
||||||
metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
|
// metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
|
||||||
|
|
||||||
metaULock(pMeta);
|
// metaULock(pMeta);
|
||||||
|
|
||||||
tDecoderClear(&dc1);
|
// tDecoderClear(&dc1);
|
||||||
tDecoderClear(&dc2);
|
// tDecoderClear(&dc2);
|
||||||
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
// taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
||||||
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
// if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
||||||
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
// if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
||||||
tdbTbcClose(pTbDbc);
|
// tdbTbcClose(pTbDbc);
|
||||||
tdbTbcClose(pUidIdxc);
|
// tdbTbcClose(pUidIdxc);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tDecoderClear(&dc1);
|
// tDecoderClear(&dc1);
|
||||||
tDecoderClear(&dc2);
|
// tDecoderClear(&dc2);
|
||||||
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
// if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
||||||
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
// if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
||||||
tdbTbcClose(pTbDbc);
|
// tdbTbcClose(pTbDbc);
|
||||||
tdbTbcClose(pUidIdxc);
|
// tdbTbcClose(pUidIdxc);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue