feat: refact alter child table tag
This commit is contained in:
parent
3aebca5404
commit
8c7b8a8afa
|
@ -16,6 +16,7 @@ void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
|||
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||
void metaTimeSeriesNotifyCheck(SMeta *pMeta);
|
||||
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
||||
#define metaErr(VGID, ERRNO) \
|
||||
do { \
|
||||
|
@ -457,7 +458,16 @@ static int32_t metaChildIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam)
|
|||
}
|
||||
|
||||
static int32_t metaChildIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||
return metaChildIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||
const SMetaEntry *pEntry = pParam->pEntry;
|
||||
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||
const SMetaEntry *pSuperEntry = pParam->pSuperEntry;
|
||||
|
||||
const STag *pNewTags = (const STag *)pEntry->ctbEntry.pTags;
|
||||
const STag *pOldTags = (const STag *)pOldEntry->ctbEntry.pTags;
|
||||
if (pNewTags->len != pOldTags->len || memcmp(pNewTags, pOldTags, pNewTags->len)) {
|
||||
return metaChildIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t metaChildIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||
|
@ -471,6 +481,50 @@ static int32_t metaChildIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam)
|
|||
}
|
||||
|
||||
// Tag Index
|
||||
static int32_t metaFetchTagIdxKey(SMeta *pMeta, const SMetaEntry *pEntry, const SSchema *pTagColumn,
|
||||
STagIdxKey **ppTagIdxKey, int32_t *pTagIdxKeySize) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
const void *pTagData = NULL;
|
||||
int32_t nTagData = 0;
|
||||
|
||||
STagVal tagVal = {
|
||||
.cid = pTagColumn->colId,
|
||||
};
|
||||
|
||||
if (tTagGet((const STag *)pEntry->ctbEntry.pTags, &tagVal)) {
|
||||
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
|
||||
pTagData = tagVal.pData;
|
||||
nTagData = (int32_t)tagVal.nData;
|
||||
} else {
|
||||
pTagData = &(tagVal.i64);
|
||||
nTagData = tDataTypes[pTagColumn->type].bytes;
|
||||
}
|
||||
} else {
|
||||
if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
|
||||
nTagData = tDataTypes[pTagColumn->type].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
code = metaCreateTagIdxKey(pEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
||||
pEntry->uid, &pTagIdxKey, &nTagIdxKey);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
}
|
||||
|
||||
*ppTagIdxKey = pTagIdxKey;
|
||||
*pTagIdxKeySize = nTagIdxKey;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void metaFetchTagIdxKeyFree(STagIdxKey **ppTagIdxKey) {
|
||||
metaDestroyTagIdxKey(*ppTagIdxKey);
|
||||
*ppTagIdxKey = NULL;
|
||||
}
|
||||
|
||||
static int32_t metaTagIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -495,34 +549,13 @@ static int32_t metaTagIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
|||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
const void *pTagData = NULL;
|
||||
int32_t nTagData = 0;
|
||||
const SSchema *pTagColumn = &pTagSchema->pSchema[i];
|
||||
|
||||
if (!IS_IDX_ON(pTagColumn)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STagVal tagVal = {
|
||||
.cid = pTagColumn->colId,
|
||||
};
|
||||
|
||||
if (tTagGet((const STag *)pEntry->ctbEntry.pTags, &tagVal)) {
|
||||
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
|
||||
pTagData = tagVal.pData;
|
||||
nTagData = (int32_t)tagVal.nData;
|
||||
} else {
|
||||
pTagData = &(tagVal.i64);
|
||||
nTagData = tDataTypes[pTagColumn->type].bytes;
|
||||
}
|
||||
} else {
|
||||
if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
|
||||
nTagData = tDataTypes[pTagColumn->type].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
code = metaCreateTagIdxKey(pSuperEntry->uid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pEntry->uid,
|
||||
&pTagIdxKey, &nTagIdxKey);
|
||||
code = metaFetchTagIdxKey(pMeta, pEntry, pTagColumn, &pTagIdxKey, &nTagIdxKey);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
|
@ -531,19 +564,90 @@ static int32_t metaTagIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
|||
code = tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||
return code;
|
||||
}
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
pTagIdxKey = NULL;
|
||||
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t metaTagIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||
// TODO
|
||||
return 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
const SMetaEntry *pEntry = pParam->pEntry;
|
||||
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||
const SMetaEntry *pSuperEntry = pParam->pSuperEntry;
|
||||
const SSchemaWrapper *pTagSchema = &pSuperEntry->stbEntry.schemaTag;
|
||||
const STag *pNewTags = (const STag *)pEntry->ctbEntry.pTags;
|
||||
const STag *pOldTags = (const STag *)pOldEntry->ctbEntry.pTags;
|
||||
|
||||
if (pNewTags->len == pOldTags->len && !memcmp(pNewTags, pOldTags, pNewTags->len)) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||
code = metaDelJsonVarFromIdx(pMeta, pOldEntry, &pTagSchema->pSchema[0]);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = metaSaveJsonVarToIdx(pMeta, pEntry, &pTagSchema->pSchema[0]);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
const SSchema *pTagColumn = &pTagSchema->pSchema[i];
|
||||
|
||||
if (!IS_IDX_ON(pTagColumn)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STagIdxKey *pOldTagIdxKey = NULL;
|
||||
int32_t oldTagIdxKeySize = 0;
|
||||
STagIdxKey *pNewTagIdxKey = NULL;
|
||||
int32_t newTagIdxKeySize = 0;
|
||||
|
||||
code = metaFetchTagIdxKey(pMeta, pOldEntry, pTagColumn, &pOldTagIdxKey, &oldTagIdxKeySize);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = metaFetchTagIdxKey(pMeta, pEntry, pTagColumn, &pNewTagIdxKey, &newTagIdxKeySize);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaFetchTagIdxKeyFree(&pOldTagIdxKey);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (tagIdxKeyCmpr(pOldTagIdxKey, oldTagIdxKeySize, pNewTagIdxKey, newTagIdxKeySize)) {
|
||||
code = tdbTbDelete(pMeta->pTagIdx, pOldTagIdxKey, oldTagIdxKeySize, pMeta->txn);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaFetchTagIdxKeyFree(&pOldTagIdxKey);
|
||||
metaFetchTagIdxKeyFree(&pNewTagIdxKey);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tdbTbInsert(pMeta->pTagIdx, pNewTagIdxKey, newTagIdxKeySize, NULL, 0, pMeta->txn);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaFetchTagIdxKeyFree(&pOldTagIdxKey);
|
||||
metaFetchTagIdxKeyFree(&pNewTagIdxKey);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
metaFetchTagIdxKeyFree(&pOldTagIdxKey);
|
||||
metaFetchTagIdxKeyFree(&pNewTagIdxKey);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t metaTagIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||
|
@ -565,7 +669,6 @@ static int32_t metaTagIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
|||
} else {
|
||||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
pTagColumn = &pTagSchema->pSchema[i];
|
||||
|
||||
if (!IS_IDX_ON(pTagColumn)) {
|
||||
continue;
|
||||
}
|
||||
|
@ -573,25 +676,7 @@ static int32_t metaTagIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
|||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
|
||||
const void *pTagData = NULL;
|
||||
int32_t nTagData = 0;
|
||||
|
||||
STagVal tagVal = {.cid = pTagColumn->colId};
|
||||
if (tTagGet(pTags, &tagVal)) {
|
||||
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
|
||||
pTagData = tagVal.pData;
|
||||
nTagData = (int32_t)tagVal.nData;
|
||||
} else {
|
||||
pTagData = &(tagVal.i64);
|
||||
nTagData = tDataTypes[pTagColumn->type].bytes;
|
||||
}
|
||||
} else {
|
||||
if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
|
||||
nTagData = tDataTypes[pTagColumn->type].bytes;
|
||||
}
|
||||
}
|
||||
code = metaCreateTagIdxKey(pSuper->uid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pChild->uid,
|
||||
&pTagIdxKey, &nTagIdxKey);
|
||||
code = metaFetchTagIdxKey(pMeta, pChild, pTagColumn, &pTagIdxKey, &nTagIdxKey);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
|
@ -600,11 +685,10 @@ static int32_t metaTagIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
|||
code = tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||
return code;
|
||||
}
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
pTagIdxKey = NULL;
|
||||
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -1269,6 +1353,84 @@ static int32_t metaHandleNormalTableUpdateImpl(SMeta *pMeta, const SMetaHandlePa
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t metaHandleChildTableUpdateImpl(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
const SMetaEntry *pEntry = pParam->pEntry;
|
||||
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||
const SMetaEntry *pSuperEntry = pParam->pSuperEntry;
|
||||
|
||||
SMetaTableOp ops[] = {
|
||||
{META_ENTRY_TABLE, META_TABLE_OP_UPDATA}, //
|
||||
{META_UID_IDX, META_TABLE_OP_UPDATA}, //
|
||||
{META_TAG_IDX, META_TABLE_OP_UPDATA}, //
|
||||
{META_CHILD_IDX, META_TABLE_OP_UPDATA}, //
|
||||
};
|
||||
|
||||
for (int i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
|
||||
SMetaTableOp *op = &ops[i];
|
||||
code = metaTableOpFn[op->table][op->op](pMeta, pParam);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (metaUidCacheClear(pMeta, pSuperEntry->uid) < 0) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
}
|
||||
|
||||
if (metaTbGroupCacheClear(pMeta, pSuperEntry->uid) < 0) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
}
|
||||
return code;
|
||||
#if 0
|
||||
if (metaUpdateChangeTime(pMeta, ctbEntry.uid, pReq->ctimeMs) < 0) {
|
||||
metaError("meta/table: failed to update change time:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t metaHandleChildTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SMetaEntry *pOldEntry = NULL;
|
||||
SMetaEntry *pSuperEntry = NULL;
|
||||
|
||||
code = metaFetchEntryByUid(pMeta, pEntry->uid, &pOldEntry);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = metaFetchEntryByUid(pMeta, pEntry->ctbEntry.suid, &pSuperEntry);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaFetchEntryFree(&pOldEntry);
|
||||
return code;
|
||||
}
|
||||
|
||||
SMetaHandleParam param = {
|
||||
.pEntry = pEntry,
|
||||
.pOldEntry = pOldEntry,
|
||||
.pSuperEntry = pSuperEntry,
|
||||
};
|
||||
|
||||
metaWLock(pMeta);
|
||||
code = metaHandleChildTableUpdateImpl(pMeta, ¶m);
|
||||
metaULock(pMeta);
|
||||
if (code) {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
metaFetchEntryFree(&pOldEntry);
|
||||
metaFetchEntryFree(&pSuperEntry);
|
||||
return code;
|
||||
}
|
||||
|
||||
metaFetchEntryFree(&pOldEntry);
|
||||
metaFetchEntryFree(&pSuperEntry);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t metaHandleNormalTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SMetaEntry *pOldEntry = NULL;
|
||||
|
@ -1413,7 +1575,7 @@ int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) {
|
|||
}
|
||||
case TSDB_CHILD_TABLE: {
|
||||
if (isExist) {
|
||||
// code = metaHandleChildTableUpdate(pMeta, pEntry);
|
||||
code = metaHandleChildTableUpdate(pMeta, pEntry);
|
||||
} else {
|
||||
code = metaHandleChildTableCreate(pMeta, pEntry);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
@ -598,7 +598,7 @@ static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1;
|
||||
STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2;
|
||||
tb_uid_t uid1 = 0, uid2 = 0;
|
||||
|
|
|
@ -21,6 +21,7 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST
|
|||
int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
|
||||
int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
|
||||
int32_t metaAlterTableColumnBytes(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
|
||||
int32_t metaUpdateTableTagValue(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
|
||||
|
||||
int32_t metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||
|
||||
|
@ -2994,7 +2995,7 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
|
|||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
|
||||
return metaAlterTableColumnName(pMeta, version, pReq, pMetaRsp);
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
|
||||
return metaUpdateTableTagVal(pMeta, version, pReq);
|
||||
return metaUpdateTableTagValue(pMeta, version, pReq);
|
||||
case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL:
|
||||
return metaUpdateTableMultiTagVal(pMeta, version, pReq);
|
||||
return terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||
|
|
|
@ -1000,3 +1000,171 @@ int32_t metaAlterTableColumnBytes(SMeta *pMeta, int64_t version, SVAlterTbReq *p
|
|||
metaFetchEntryFree(&pEntry);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t metaCheckUpdateTableTagValReq(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
|
||||
int32_t code = 0;
|
||||
|
||||
// check tag name
|
||||
if (NULL == pReq->tagName || strlen(pReq->tagName) == 0) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since invalid tag name:%s, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||
__func__, __FILE__, __LINE__, pReq->tagName, version);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
// check name
|
||||
void *value = NULL;
|
||||
int32_t valueSize = 0;
|
||||
code = tdbTbGet(pMeta->pNameIdx, pReq->tbName, strlen(pReq->tbName) + 1, &value, &valueSize);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
|
||||
__FILE__, __LINE__, pReq->tbName, version);
|
||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t metaUpdateTableTagValue(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// check request
|
||||
code = metaCheckUpdateTableTagValReq(pMeta, version, pReq);
|
||||
if (code) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
// fetch child entry
|
||||
SMetaEntry *pChild = NULL;
|
||||
code = metaFetchEntryByName(pMeta, pReq->tbName, &pChild);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
|
||||
__FILE__, __LINE__, pReq->tbName, version);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (pChild->type != TSDB_CHILD_TABLE) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since table %s is not a child table, version:%" PRId64,
|
||||
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, version);
|
||||
metaFetchEntryFree(&pChild);
|
||||
TAOS_RETURN(TSDB_CODE_VND_INVALID_TABLE_ACTION);
|
||||
}
|
||||
|
||||
// fetch super entry
|
||||
SMetaEntry *pSuper = NULL;
|
||||
code = metaFetchEntryByUid(pMeta, pChild->ctbEntry.suid, &pSuper);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since super table uid %" PRId64 " not found, version:%" PRId64,
|
||||
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pChild->ctbEntry.suid, version);
|
||||
metaFetchEntryFree(&pChild);
|
||||
TAOS_RETURN(TSDB_CODE_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
// do change tag value
|
||||
SSchemaWrapper *pTagSchema = &pSuper->stbEntry.schemaTag;
|
||||
SSchema *pColumn = NULL;
|
||||
int32_t iColumn = 0;
|
||||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
if (strncmp(pTagSchema->pSchema[i].name, pReq->tagName, TSDB_COL_NAME_LEN) == 0) {
|
||||
pColumn = &pTagSchema->pSchema[i];
|
||||
iColumn = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pColumn) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since tag %s not found in table %s, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||
__func__, __FILE__, __LINE__, pReq->tagName, pReq->tbName, version);
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(TSDB_CODE_VND_COL_NOT_EXISTS);
|
||||
}
|
||||
|
||||
// do change tag value
|
||||
pChild->version = version;
|
||||
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||
void *pNewTag = taosMemoryRealloc(pChild->ctbEntry.pTags, pReq->nTagVal);
|
||||
if (NULL == pNewTag) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(terrno), version);
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
pChild->ctbEntry.pTags = pNewTag;
|
||||
memcpy(pChild->ctbEntry.pTags, pReq->pTagVal, pReq->nTagVal);
|
||||
} else {
|
||||
STag *pOldTag = (STag *)pChild->ctbEntry.pTags;
|
||||
|
||||
SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
|
||||
if (NULL == pTagArray) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(terrno), version);
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
STagVal value = {
|
||||
.type = pTagSchema->pSchema[i].type,
|
||||
.cid = pTagSchema->pSchema[i].colId,
|
||||
};
|
||||
|
||||
if (iColumn == i) {
|
||||
if (pReq->isNull) {
|
||||
continue;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(value.type)) {
|
||||
value.pData = pReq->pTagVal;
|
||||
value.nData = pReq->nTagVal;
|
||||
} else {
|
||||
memcpy(&value.i64, pReq->pTagVal, pReq->nTagVal);
|
||||
}
|
||||
} else if (!tTagGet(pOldTag, &value)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(pTagArray, &value)) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(terrno), version);
|
||||
taosArrayDestroy(pTagArray);
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
}
|
||||
|
||||
STag *pNewTag = NULL;
|
||||
code = tTagNew(pTagArray, pTagSchema->version, false, &pNewTag);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(code), version);
|
||||
taosArrayDestroy(pTagArray);
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFree(pChild->ctbEntry.pTags);
|
||||
pChild->ctbEntry.pTags = (uint8_t *)pNewTag;
|
||||
}
|
||||
|
||||
// do handle entry
|
||||
code = metaHandleEntry2(pMeta, pChild);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||
__func__, __FILE__, __LINE__, tstrerror(code), pChild->uid, pReq->tbName, version);
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(code);
|
||||
} else {
|
||||
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->tbName,
|
||||
pChild->uid, version);
|
||||
}
|
||||
|
||||
// free resource and return
|
||||
metaFetchEntryFree(&pChild);
|
||||
metaFetchEntryFree(&pSuper);
|
||||
TAOS_RETURN(code);
|
||||
}
|
Loading…
Reference in New Issue