support update multi tag
This commit is contained in:
parent
3027e377fa
commit
97e08abe8c
|
@ -2011,7 +2011,257 @@ _err:
|
||||||
return terrno != 0 ? terrno : TSDB_CODE_FAILED;
|
return terrno != 0 ? terrno : TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateTableMultiTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { return 0; }
|
static int metaUpdateTableMultiTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
|
||||||
|
SMetaEntry ctbEntry = {0};
|
||||||
|
SMetaEntry stbEntry = {0};
|
||||||
|
void *pVal = NULL;
|
||||||
|
int nVal = 0;
|
||||||
|
int ret;
|
||||||
|
int c;
|
||||||
|
tb_uid_t uid;
|
||||||
|
int64_t oversion;
|
||||||
|
const void *pData = NULL;
|
||||||
|
int nData = 0;
|
||||||
|
SHashObj *pTagTable = NULL;
|
||||||
|
SArray *updateTagColumnIds = NULL;
|
||||||
|
|
||||||
|
// search name index
|
||||||
|
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
|
||||||
|
if (ret < 0) {
|
||||||
|
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
uid = *(tb_uid_t *)pVal;
|
||||||
|
tdbFree(pVal);
|
||||||
|
pVal = NULL;
|
||||||
|
|
||||||
|
// search uid index
|
||||||
|
TBC *pUidIdxc = NULL;
|
||||||
|
|
||||||
|
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL));
|
||||||
|
if (tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c) < 0) {
|
||||||
|
metaTrace("meta/table: failed to move to uid index, uid:%" PRId64, uid);
|
||||||
|
}
|
||||||
|
if (c != 0) {
|
||||||
|
tdbTbcClose(pUidIdxc);
|
||||||
|
metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c);
|
||||||
|
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData) != 0) {
|
||||||
|
metaError("meta/table: failed to get uid index, uid:%" PRId64, uid);
|
||||||
|
}
|
||||||
|
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
|
// search table.db
|
||||||
|
TBC *pTbDbc = NULL;
|
||||||
|
SDecoder dc1 = {0};
|
||||||
|
SDecoder dc2 = {0};
|
||||||
|
|
||||||
|
/* get ctbEntry */
|
||||||
|
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL));
|
||||||
|
if (tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c) != 0) {
|
||||||
|
metaError("meta/table: failed to move to tb db, uid:%" PRId64, uid);
|
||||||
|
}
|
||||||
|
if (c != 0) {
|
||||||
|
tdbTbcClose(pUidIdxc);
|
||||||
|
tdbTbcClose(pTbDbc);
|
||||||
|
metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c);
|
||||||
|
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData) != 0) {
|
||||||
|
metaError("meta/table: failed to get tb db, uid:%" PRId64, uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((ctbEntry.pBuf = taosMemoryMalloc(nData)) == NULL) {
|
||||||
|
tdbTbcClose(pUidIdxc);
|
||||||
|
tdbTbcClose(pTbDbc);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
memcpy(ctbEntry.pBuf, pData, nData);
|
||||||
|
tDecoderInit(&dc1, ctbEntry.pBuf, nData);
|
||||||
|
ret = metaDecodeEntry(&dc1, &ctbEntry);
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = ret;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* get stbEntry*/
|
||||||
|
if (tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal) != 0) {
|
||||||
|
metaError("meta/table: failed to get uid index, uid:%" PRId64, ctbEntry.ctbEntry.suid);
|
||||||
|
}
|
||||||
|
if (!pVal) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
|
||||||
|
sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal) != 0) {
|
||||||
|
metaError("meta/table: failed to get tb db, uid:%" PRId64, ctbEntry.ctbEntry.suid);
|
||||||
|
}
|
||||||
|
tdbFree(pVal);
|
||||||
|
tDecoderInit(&dc2, stbEntry.pBuf, nVal);
|
||||||
|
ret = metaDecodeEntry(&dc2, &stbEntry);
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = ret;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nTagVals = taosArrayGetSize(pAlterTbReq->pMultiTag);
|
||||||
|
pTagTable = taosHashInit(nTagVals, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
if (pTagTable == NULL) {
|
||||||
|
ret = terrno;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove duplicate tag name
|
||||||
|
for (int i = 0; i < nTagVals; i++) {
|
||||||
|
SMultiTagUpateVal *pTagVal = taosArrayGet(pAlterTbReq->pMultiTag, i);
|
||||||
|
ret = taosHashPut(pTagTable, pTagVal->tagName, strlen(pTagVal->tagName), pTagVal, sizeof(*pTagVal));
|
||||||
|
if (ret != 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int32_t nUpdateTagVal = taosHashGetSize(pTagTable);
|
||||||
|
updateTagColumnIds = taosArrayInit(nUpdateTagVal, sizeof(int32_t));
|
||||||
|
|
||||||
|
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||||
|
SSchema *pColumn = NULL;
|
||||||
|
int32_t iCol = 0;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
pColumn = NULL;
|
||||||
|
|
||||||
|
if (iCol >= pTagSchema->nCols) break;
|
||||||
|
pColumn = &pTagSchema->pSchema[iCol];
|
||||||
|
if (taosHashGet(pTagTable, pColumn->name, strlen(pColumn->name)) != NULL) {
|
||||||
|
if (taosArrayPush(updateTagColumnIds, &iCol) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
iCol++;
|
||||||
|
}
|
||||||
|
if (taosArrayGetSize(updateTagColumnIds) == nUpdateTagVal) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctbEntry.version = version;
|
||||||
|
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
||||||
|
goto _err;
|
||||||
|
} else {
|
||||||
|
const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
|
||||||
|
STag *pNewTag = NULL;
|
||||||
|
SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
|
||||||
|
if (!pTagArray) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||||
|
SSchema *pCol = &pTagSchema->pSchema[i];
|
||||||
|
SMultiTagUpateVal *pTagVal = taosHashGet(pTagTable, pCol->name, strlen(pCol->name));
|
||||||
|
if (pTagVal == NULL) {
|
||||||
|
STagVal val = {.cid = pCol->colId};
|
||||||
|
if (tTagGet(pOldTag, &val)) {
|
||||||
|
if (taosArrayPush(pTagArray, &val) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosArrayDestroy(pTagArray);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STagVal val = {0};
|
||||||
|
val.type = pCol->type;
|
||||||
|
val.cid = pCol->colId;
|
||||||
|
if (pTagVal->isNull) continue;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
val.pData = pTagVal->pTagVal;
|
||||||
|
val.nData = pTagVal->nTagVal;
|
||||||
|
} else {
|
||||||
|
memcpy(&val.i64, pTagVal->pTagVal, pTagVal->nTagVal);
|
||||||
|
}
|
||||||
|
if (taosArrayPush(pTagArray, &val) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosArrayDestroy(pTagArray);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ((terrno = tTagNew(pTagArray, pTagSchema->version, false, &pNewTag)) < 0) {
|
||||||
|
taosArrayDestroy(pTagArray);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag;
|
||||||
|
taosArrayDestroy(pTagArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
metaWLock(pMeta);
|
||||||
|
|
||||||
|
// save to table.db
|
||||||
|
if (metaSaveToTbDb(pMeta, &ctbEntry) < 0) {
|
||||||
|
metaError("meta/table: failed to save to tb db:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
// save to uid.idx
|
||||||
|
if (metaUpdateUidIdx(pMeta, &ctbEntry) < 0) {
|
||||||
|
metaError("meta/table: failed to update uid idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaUpdateTagIdx(pMeta, &ctbEntry) < 0) {
|
||||||
|
metaError("meta/table: failed to update tag idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == ctbEntry.ctbEntry.pTags) {
|
||||||
|
metaError("meta/table: null tags, update tag val failed.");
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
|
||||||
|
if (tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
|
||||||
|
((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn) < 0) {
|
||||||
|
metaError("meta/table: failed to upsert ctb idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid) < 0) {
|
||||||
|
metaError("meta/table: failed to clear uid cache:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid) < 0) {
|
||||||
|
metaError("meta/table: failed to clear group cache:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs) < 0) {
|
||||||
|
metaError("meta/table: failed to update change time:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
tDecoderClear(&dc1);
|
||||||
|
tDecoderClear(&dc2);
|
||||||
|
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
||||||
|
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
||||||
|
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
||||||
|
tdbTbcClose(pTbDbc);
|
||||||
|
tdbTbcClose(pUidIdxc);
|
||||||
|
taosHashCleanup(pTagTable);
|
||||||
|
taosArrayDestroy(updateTagColumnIds);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tDecoderClear(&dc1);
|
||||||
|
tDecoderClear(&dc2);
|
||||||
|
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
||||||
|
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
||||||
|
tdbTbcClose(pTbDbc);
|
||||||
|
tdbTbcClose(pUidIdxc);
|
||||||
|
taosHashCleanup(pTagTable);
|
||||||
|
taosArrayDestroy(updateTagColumnIds);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
|
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
|
||||||
SMetaEntry ctbEntry = {0};
|
SMetaEntry ctbEntry = {0};
|
||||||
SMetaEntry stbEntry = {0};
|
SMetaEntry stbEntry = {0};
|
||||||
|
@ -2114,6 +2364,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
||||||
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||||
SSchema *pColumn = NULL;
|
SSchema *pColumn = NULL;
|
||||||
int32_t iCol = 0;
|
int32_t iCol = 0;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
pColumn = NULL;
|
pColumn = NULL;
|
||||||
|
|
||||||
|
|
|
@ -15256,9 +15256,6 @@ static int32_t buildUpdateTagValReqImpl2(STranslateContext* pCxt, SAlterTableStm
|
||||||
static int32_t buildUpdateTagValReqImpl(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
static int32_t buildUpdateTagValReqImpl(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||||
char* colName, SVAlterTbReq* pReq) {
|
char* colName, SVAlterTbReq* pReq) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
// if (NULL == pReq->tagName) {
|
|
||||||
// return terrno;
|
|
||||||
// }
|
|
||||||
|
|
||||||
SSchema* pSchema = getTagSchema(pTableMeta, colName);
|
SSchema* pSchema = getTagSchema(pTableMeta, colName);
|
||||||
if (NULL == pSchema) {
|
if (NULL == pSchema) {
|
||||||
|
|
Loading…
Reference in New Issue