From 97e08abe8ca6cd09293cdba12a0432da5a23c89d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 25 Nov 2024 10:55:28 +0800 Subject: [PATCH] support update multi tag --- source/dnode/vnode/src/meta/metaTable.c | 253 +++++++++++++++++++++++- source/libs/parser/src/parTranslater.c | 3 - 2 files changed, 252 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 03b8bdd93f..f51b641640 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -2011,7 +2011,257 @@ _err: return terrno != 0 ? terrno : TSDB_CODE_FAILED; } -static int metaUpdateTableMultiTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { return 0; } +static int metaUpdateTableMultiTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + SMetaEntry ctbEntry = {0}; + SMetaEntry stbEntry = {0}; + void *pVal = NULL; + int nVal = 0; + int ret; + int c; + tb_uid_t uid; + int64_t oversion; + const void *pData = NULL; + int nData = 0; + SHashObj *pTagTable = NULL; + SArray *updateTagColumnIds = NULL; + + // search name index + ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + if (ret < 0) { + return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; + } + + uid = *(tb_uid_t *)pVal; + tdbFree(pVal); + pVal = NULL; + + // search uid index + TBC *pUidIdxc = NULL; + + TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL)); + if (tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c) < 0) { + metaTrace("meta/table: failed to move to uid index, uid:%" PRId64, uid); + } + if (c != 0) { + tdbTbcClose(pUidIdxc); + metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c); + return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; + } + + if (tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData) != 0) { + metaError("meta/table: failed to get uid index, uid:%" PRId64, uid); + } + oversion = ((SUidIdxVal *)pData)[0].version; + + // search table.db + TBC *pTbDbc = NULL; + SDecoder dc1 = {0}; + SDecoder dc2 = {0}; + + /* get ctbEntry */ + TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL)); + if (tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c) != 0) { + metaError("meta/table: failed to move to tb db, uid:%" PRId64, uid); + } + if (c != 0) { + tdbTbcClose(pUidIdxc); + tdbTbcClose(pTbDbc); + metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c); + return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; + } + + if (tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData) != 0) { + metaError("meta/table: failed to get tb db, uid:%" PRId64, uid); + } + + if ((ctbEntry.pBuf = taosMemoryMalloc(nData)) == NULL) { + tdbTbcClose(pUidIdxc); + tdbTbcClose(pTbDbc); + return terrno; + } + memcpy(ctbEntry.pBuf, pData, nData); + tDecoderInit(&dc1, ctbEntry.pBuf, nData); + ret = metaDecodeEntry(&dc1, &ctbEntry); + if (ret < 0) { + terrno = ret; + goto _err; + } + + /* get stbEntry*/ + if (tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal) != 0) { + metaError("meta/table: failed to get uid index, uid:%" PRId64, ctbEntry.ctbEntry.suid); + } + if (!pVal) { + terrno = TSDB_CODE_INVALID_MSG; + goto _err; + } + + if (tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}), + sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal) != 0) { + metaError("meta/table: failed to get tb db, uid:%" PRId64, ctbEntry.ctbEntry.suid); + } + tdbFree(pVal); + tDecoderInit(&dc2, stbEntry.pBuf, nVal); + ret = metaDecodeEntry(&dc2, &stbEntry); + if (ret < 0) { + terrno = ret; + goto _err; + } + + int32_t nTagVals = taosArrayGetSize(pAlterTbReq->pMultiTag); + pTagTable = taosHashInit(nTagVals, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (pTagTable == NULL) { + ret = terrno; + goto _err; + } + + // remove duplicate tag name + for (int i = 0; i < nTagVals; i++) { + SMultiTagUpateVal *pTagVal = taosArrayGet(pAlterTbReq->pMultiTag, i); + ret = taosHashPut(pTagTable, pTagVal->tagName, strlen(pTagVal->tagName), pTagVal, sizeof(*pTagVal)); + if (ret != 0) { + goto _err; + } + } + int32_t nUpdateTagVal = taosHashGetSize(pTagTable); + updateTagColumnIds = taosArrayInit(nUpdateTagVal, sizeof(int32_t)); + + SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; + SSchema *pColumn = NULL; + int32_t iCol = 0; + + for (;;) { + pColumn = NULL; + + if (iCol >= pTagSchema->nCols) break; + pColumn = &pTagSchema->pSchema[iCol]; + if (taosHashGet(pTagTable, pColumn->name, strlen(pColumn->name)) != NULL) { + if (taosArrayPush(updateTagColumnIds, &iCol) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + iCol++; + } + if (taosArrayGetSize(updateTagColumnIds) == nUpdateTagVal) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + + ctbEntry.version = version; + if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } else { + const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags; + STag *pNewTag = NULL; + SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal)); + if (!pTagArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + for (int32_t i = 0; i < pTagSchema->nCols; i++) { + SSchema *pCol = &pTagSchema->pSchema[i]; + SMultiTagUpateVal *pTagVal = taosHashGet(pTagTable, pCol->name, strlen(pCol->name)); + if (pTagVal == NULL) { + STagVal val = {.cid = pCol->colId}; + if (tTagGet(pOldTag, &val)) { + if (taosArrayPush(pTagArray, &val) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(pTagArray); + goto _err; + } + } + } else { + STagVal val = {0}; + val.type = pCol->type; + val.cid = pCol->colId; + if (pTagVal->isNull) continue; + + if (IS_VAR_DATA_TYPE(pCol->type)) { + val.pData = pTagVal->pTagVal; + val.nData = pTagVal->nTagVal; + } else { + memcpy(&val.i64, pTagVal->pTagVal, pTagVal->nTagVal); + } + if (taosArrayPush(pTagArray, &val) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(pTagArray); + goto _err; + } + } + } + if ((terrno = tTagNew(pTagArray, pTagSchema->version, false, &pNewTag)) < 0) { + taosArrayDestroy(pTagArray); + goto _err; + } + ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag; + taosArrayDestroy(pTagArray); + } + + metaWLock(pMeta); + + // save to table.db + if (metaSaveToTbDb(pMeta, &ctbEntry) < 0) { + metaError("meta/table: failed to save to tb db:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + // save to uid.idx + if (metaUpdateUidIdx(pMeta, &ctbEntry) < 0) { + metaError("meta/table: failed to update uid idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + if (metaUpdateTagIdx(pMeta, &ctbEntry) < 0) { + metaError("meta/table: failed to update tag idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + if (NULL == ctbEntry.ctbEntry.pTags) { + metaError("meta/table: null tags, update tag val failed."); + goto _err; + } + + SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid}; + if (tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags, + ((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn) < 0) { + metaError("meta/table: failed to upsert ctb idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + if (metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid) < 0) { + metaError("meta/table: failed to clear uid cache:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + if (metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid) < 0) { + metaError("meta/table: failed to clear group cache:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + if (metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs) < 0) { + metaError("meta/table: failed to update change time:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid); + } + + metaULock(pMeta); + + tDecoderClear(&dc1); + tDecoderClear(&dc2); + taosMemoryFree((void *)ctbEntry.ctbEntry.pTags); + if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); + if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + taosHashCleanup(pTagTable); + taosArrayDestroy(updateTagColumnIds); + return 0; + +_err: + tDecoderClear(&dc1); + tDecoderClear(&dc2); + if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); + if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + taosHashCleanup(pTagTable); + taosArrayDestroy(updateTagColumnIds); + return -1; +} static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { SMetaEntry ctbEntry = {0}; SMetaEntry stbEntry = {0}; @@ -2114,6 +2364,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; SSchema *pColumn = NULL; int32_t iCol = 0; + for (;;) { pColumn = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3d0c3d7c00..3543ee1a53 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -15256,9 +15256,6 @@ static int32_t buildUpdateTagValReqImpl2(STranslateContext* pCxt, SAlterTableStm static int32_t buildUpdateTagValReqImpl(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, char* colName, SVAlterTbReq* pReq) { int32_t code = TSDB_CODE_SUCCESS; - // if (NULL == pReq->tagName) { - // return terrno; - // } SSchema* pSchema = getTagSchema(pTableMeta, colName); if (NULL == pSchema) {