more code

This commit is contained in:
Hongze Cheng 2024-12-09 18:28:29 +08:00
parent f549bbdeb2
commit 57f16a6f24
4 changed files with 484 additions and 126 deletions

View File

@ -65,6 +65,23 @@ static FORCE_INLINE int32_t metatInitDefaultSColCmprWrapper(SDecoder *pDecoder,
return 0;
}
static int32_t metaCloneColCmpr(const SColCmprWrapper *pSrc, SColCmprWrapper *pDst) {
pDst->nCols = pSrc->nCols;
pDst->version = pSrc->version;
pDst->pColCmpr = (SColCmpr *)taosMemoryCalloc(pSrc->nCols, sizeof(SColCmpr));
if (NULL == pDst->pColCmpr) {
return terrno;
}
memcpy(pDst->pColCmpr, pSrc->pColCmpr, pSrc->nCols * sizeof(SColCmpr));
return 0;
}
static void metaCloneColCmprFree(SColCmprWrapper *pCmpr) {
if (pCmpr == NULL) {
taosMemoryFreeClear(pCmpr->pColCmpr);
}
}
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
TAOS_CHECK_RETURN(tStartEncode(pCoder));
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->version));
@ -189,3 +206,159 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
tEndDecode(pCoder);
return 0;
}
static int32_t metaCloneSchema(const SSchemaWrapper *pSrc, SSchemaWrapper *pDst) {
if (pSrc == NULL || pDst == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pDst->nCols = pSrc->nCols;
pDst->version = pSrc->version;
pDst->pSchema = (SSchema *)taosMemoryMalloc(pSrc->nCols * sizeof(SSchema));
if (pDst->pSchema == NULL) {
return terrno;
}
memcpy(pDst->pSchema, pSrc->pSchema, pSrc->nCols * sizeof(SSchema));
return TSDB_CODE_SUCCESS;
}
static void metaCloneSchemaFree(SSchemaWrapper *pSchema) {
if (pSchema) {
taosMemoryFreeClear(pSchema->pSchema);
}
}
int32_t metaCloneEntryFree(SMetaEntry **ppEntry) {
if (ppEntry == NULL || *ppEntry == NULL) {
return TSDB_CODE_SUCCESS;
}
taosMemoryFreeClear((*ppEntry)->name);
if ((*ppEntry)->type < 0) {
taosMemoryFreeClear(*ppEntry);
return TSDB_CODE_SUCCESS;
}
if (TSDB_SUPER_TABLE == (*ppEntry)->type) {
metaCloneSchemaFree(&(*ppEntry)->stbEntry.schemaRow);
metaCloneSchemaFree(&(*ppEntry)->stbEntry.schemaTag);
} else if (TSDB_CHILD_TABLE == (*ppEntry)->type) {
taosMemoryFreeClear((*ppEntry)->ctbEntry.comment);
taosMemoryFreeClear((*ppEntry)->ctbEntry.pTags);
} else if (TSDB_CHILD_TABLE == (*ppEntry)->type) {
metaCloneSchemaFree(&(*ppEntry)->ntbEntry.schemaRow);
taosMemoryFreeClear((*ppEntry)->ntbEntry.comment);
} else {
return TSDB_CODE_INVALID_PARA;
}
metaCloneColCmprFree(&(*ppEntry)->colCmpr);
taosMemoryFreeClear(*ppEntry);
return TSDB_CODE_SUCCESS;
}
int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pEntry || NULL == ppEntry) {
return TSDB_CODE_INVALID_PARA;
}
*ppEntry = (SMetaEntry *)taosMemoryCalloc(1, sizeof(SMetaEntry));
if (NULL == *ppEntry) {
return terrno;
}
(*ppEntry)->version = pEntry->version;
(*ppEntry)->type = pEntry->type;
(*ppEntry)->uid = pEntry->uid;
if (pEntry->type < 0) {
return TSDB_CODE_SUCCESS;
}
if (pEntry->name) {
(*ppEntry)->name = tstrdup(pEntry->name);
if (NULL == (*ppEntry)->name) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
}
if (pEntry->type == TSDB_SUPER_TABLE) {
(*ppEntry)->flags = pEntry->flags;
code = metaCloneSchema(&pEntry->stbEntry.schemaRow, &(*ppEntry)->stbEntry.schemaRow);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
code = metaCloneSchema(&pEntry->stbEntry.schemaTag, &(*ppEntry)->stbEntry.schemaTag);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
} else if (pEntry->type == TSDB_CHILD_TABLE) {
(*ppEntry)->ctbEntry.btime = pEntry->ctbEntry.btime;
(*ppEntry)->ctbEntry.ttlDays = pEntry->ctbEntry.ttlDays;
(*ppEntry)->ctbEntry.suid = pEntry->ctbEntry.suid;
// comment
(*ppEntry)->ctbEntry.commentLen = pEntry->ctbEntry.commentLen;
if (pEntry->ctbEntry.commentLen > 0) {
(*ppEntry)->ctbEntry.comment = taosMemoryMalloc(pEntry->ctbEntry.commentLen + 1);
if (NULL == (*ppEntry)->ctbEntry.comment) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
memcpy((*ppEntry)->ctbEntry.comment, pEntry->ctbEntry.comment, pEntry->ctbEntry.commentLen);
}
// tags
STag *pTags = (STag *)pEntry->ctbEntry.pTags;
(*ppEntry)->ctbEntry.pTags = taosMemoryCalloc(1, pTags->len);
if (NULL == (*ppEntry)->ctbEntry.pTags) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
memcpy((*ppEntry)->ctbEntry.pTags, pEntry->ctbEntry.pTags, pTags->len);
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
(*ppEntry)->ntbEntry.btime = pEntry->ntbEntry.btime;
(*ppEntry)->ntbEntry.ttlDays = pEntry->ntbEntry.ttlDays;
(*ppEntry)->ntbEntry.ncid = pEntry->ntbEntry.ncid;
// schema
code = metaCloneSchema(&pEntry->ntbEntry.schemaRow, &(*ppEntry)->ntbEntry.schemaRow);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
// comment
(*ppEntry)->ntbEntry.commentLen = pEntry->ntbEntry.commentLen;
if (pEntry->ntbEntry.commentLen > 0) {
(*ppEntry)->ntbEntry.comment = taosMemoryMalloc(pEntry->ntbEntry.commentLen + 1);
if (NULL == (*ppEntry)->ntbEntry.comment) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
memcpy((*ppEntry)->ntbEntry.comment, pEntry->ntbEntry.comment, pEntry->ntbEntry.commentLen);
}
} else {
return TSDB_CODE_INVALID_PARA;
}
code = metaCloneColCmpr(&pEntry->colCmpr, &(*ppEntry)->colCmpr);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
return code;
}

View File

@ -10,6 +10,11 @@
#include "meta.h"
int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry);
int32_t metaCloneEntryFree(SMetaEntry **ppEntry);
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
#define metaErr(VGID, ERRNO) \
do { \
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", VGID, \
@ -37,13 +42,98 @@ typedef enum {
META_TABLE_OP_MAX,
} EMetaTableOp;
typedef struct {
const SMetaEntry *pEntry;
const SMetaEntry *pSuperEntry;
const SMetaEntry *pOldEntry;
} SMetaHandleParam;
typedef struct {
EMetaTable table;
EMetaTableOp op;
} SMetaTableOp;
static int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry) {
int32_t code = TSDB_CODE_SUCCESS;
void *value = NULL;
int32_t valueSize = 0;
// search uid index
code = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &value, &valueSize);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, failed to get entry by uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), uid, tstrerror(code));
return code;
}
// search entry table
STbDbKey key = {
.version = ((SUidIdxVal *)value)->version,
.uid = uid,
};
tdbFreeClear(value);
code = tdbTbGet(pMeta->pTbDb, &key, sizeof(key), &value, &valueSize);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, failed to get entry by uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), uid, tstrerror(code));
code = TSDB_CODE_INTERNAL_ERROR;
return code;
}
// decode entry
SDecoder decoder = {0};
SMetaEntry entry = {0};
tDecoderInit(&decoder, value, valueSize);
code = metaDecodeEntry(&decoder, &entry);
if (code) {
metaError("vgId:%d, failed to decode entry by uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), uid,
tstrerror(code));
tDecoderClear(&decoder);
tdbFreeClear(value);
return code;
}
code = metaCloneEntry(&entry, ppEntry);
if (code) {
metaError("vgId:%d, failed to clone entry by uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), uid,
tstrerror(code));
tDecoderClear(&decoder);
tdbFreeClear(value);
return code;
}
tdbFreeClear(value);
tDecoderClear(&decoder);
return code;
}
static int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry) {
int32_t code = TSDB_CODE_SUCCESS;
void *value = NULL;
int32_t valueSize = 0;
code = tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &value, &valueSize);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, failed to get entry by name:%s since %s", TD_VID(pMeta->pVnode), name, tstrerror(code));
return code;
}
int64_t uid = *(int64_t *)value;
tdbFreeClear(value);
code = metaFetchEntryByUid(pMeta, uid, ppEntry);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, failed to get entry by uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), uid, tstrerror(code));
code = TSDB_CODE_INTERNAL_ERROR;
}
return code;
}
static int32_t metaFetchEntryFree(SMetaEntry **ppEntry) { return metaCloneEntryFree(ppEntry); }
// Entry Table
static int32_t metaEntryTableUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaEntryTableUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
const SMetaEntry *pEntry = pParam->pEntry;
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
void *value = NULL;
@ -92,15 +182,15 @@ static int32_t metaEntryTableUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMet
return code;
}
static int32_t metaEntryTableInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaEntryTableUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
static int32_t metaEntryTableInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaEntryTableUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaEntryTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaEntryTableUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
static int32_t metaEntryTableUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaEntryTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaEntryTableDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaEntryTableDelete(SMeta *pMeta, const SMetaHandleParam *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// SMetaEntry entry { .version = ; };
@ -111,13 +201,14 @@ static int32_t metaEntryTableDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
}
// Schema Table
static int32_t metaSchemaTableUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaSchemaTableUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
SEncoder encoder = {0};
void *value = NULL;
int32_t valueSize = 0;
const SMetaEntry *pEntry = pParam->pEntry;
const SSchemaWrapper *pSchema = NULL;
if (pEntry->type == TSDB_SUPER_TABLE) {
pSchema = &pEntry->stbEntry.schemaRow;
@ -169,15 +260,15 @@ static int32_t metaSchemaTableUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMe
return code;
}
static int32_t metaSchemaTableInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaSchemaTableUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
static int32_t metaSchemaTableInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaSchemaTableUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaSchemaTableDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaSchemaTableDelete(SMeta *pMeta, const SMetaHandleParam *pEntry) {
// TODO
return TSDB_CODE_SUCCESS;
}
@ -198,10 +289,12 @@ static void metaBuildEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
}
}
static int32_t metaUidIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaUidIdxUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
const SMetaEntry *pEntry = pParam->pEntry;
// update cache
SMetaInfo info = {0};
metaBuildEntryInfo(pEntry, &info);
@ -224,17 +317,19 @@ static int32_t metaUidIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTab
return code;
}
static int32_t metaUidIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaUidIdxUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
static int32_t metaUidIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaUidIdxUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaUidIdxUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaUidIdxUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
static int32_t metaUidIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaUidIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaUidIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaUidIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
int32_t code = 0;
const SMetaEntry *pEntry = pParam->pEntry;
// delete tdb
code = tdbTbDelete(pMeta->pUidIdx, &pEntry->uid, sizeof(pEntry->uid), pMeta->txn);
if (code) {
@ -247,8 +342,11 @@ static int32_t metaUidIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
}
// Name Index
static int32_t metaNameIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaNameIdxUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
int32_t code = TSDB_CODE_SUCCESS;
const SMetaEntry *pEntry = pParam->pEntry;
if (META_TABLE_OP_INSERT == op) {
code = tdbTbInsert(pMeta->pNameIdx, pEntry->name, strlen(pEntry->name) + 1, &pEntry->uid, sizeof(pEntry->uid),
pMeta->txn);
@ -261,26 +359,18 @@ static int32_t metaNameIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTa
return code;
}
static int32_t metaNameIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaNameIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
int32_t code = TSDB_CODE_SUCCESS;
code = metaNameIdxUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
return code;
return metaNameIdxUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaNameIdxUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
code = metaNameIdxUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
return code;
static int32_t metaNameIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaNameIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaNameIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaNameIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
int32_t code = TSDB_CODE_SUCCESS;
const SMetaEntry *pEntry = pParam->pEntry;
code = tdbTbDelete(pMeta->pNameIdx, pEntry->name, strlen(pEntry->name) + 1, pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
@ -289,7 +379,9 @@ static int32_t metaNameIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
}
// Suid Index
static int32_t metaSUidIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaSUidIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
const SMetaEntry *pEntry = pParam->pEntry;
int32_t code = tdbTbInsert(pMeta->pSuidIdx, &pEntry->uid, sizeof(pEntry->uid), NULL, 0, pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
@ -297,7 +389,8 @@ static int32_t metaSUidIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return code;
}
static int32_t metaSUidIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaSUidIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
const SMetaEntry *pEntry = pParam->pEntry;
int32_t code = tdbTbDelete(pMeta->pSuidIdx, &pEntry->uid, sizeof(pEntry->uid), pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
@ -306,9 +399,11 @@ static int32_t metaSUidIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
}
// Child Index
static int32_t metaChildIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaChildIdxUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
int32_t code = TSDB_CODE_SUCCESS;
const SMetaEntry *pEntry = pParam->pEntry;
SCtbIdxKey key = {
.suid = pEntry->ctbEntry.suid,
.uid = pEntry->uid,
@ -326,15 +421,17 @@ static int32_t metaChildIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaT
return code;
}
static int32_t metaChildIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaChildIdxUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
static int32_t metaChildIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaChildIdxUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaChildIdxUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaChildIdxUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
static int32_t metaChildIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaChildIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaChildIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaChildIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
const SMetaEntry *pEntry = pParam->pEntry;
SCtbIdxKey key = {
.suid = pEntry->ctbEntry.suid,
.uid = pEntry->uid,
@ -343,23 +440,89 @@ static int32_t metaChildIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
}
// Tag Index
static int32_t metaTagIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaTagIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
int32_t code = TSDB_CODE_SUCCESS;
const SMetaEntry *pEntry = pParam->pEntry;
const SMetaEntry *pSuperEntry = pParam->pSuperEntry;
const SSchemaWrapper *pTagSchema = &pSuperEntry->stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
const SSchema *pTagColumn = &pTagSchema->pSchema[0];
STagVal tagVal = {
.cid = pTagColumn->colId,
};
const void *pTagData = pEntry->ctbEntry.pTags;
int32_t nTagData = ((const STag *)pEntry->ctbEntry.pTags)->len;
code = metaSaveJsonVarToIdx(pMeta, pEntry, pTagColumn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
} else {
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
const void *pTagData = NULL;
int32_t nTagData = 0;
const SSchema *pTagColumn = &pTagSchema->pSchema[i];
if (!IS_IDX_ON(pTagColumn)) {
continue;
}
STagVal tagVal = {
.cid = pTagColumn->colId,
};
if (tTagGet((const STag *)pEntry->ctbEntry.pTags, &tagVal)) {
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pTagColumn->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
nTagData = tDataTypes[pTagColumn->type].bytes;
}
}
code = metaCreateTagIdxKey(pSuperEntry->uid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pEntry->uid,
&pTagIdxKey, &nTagIdxKey);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
return code;
}
code = tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
}
}
return code;
}
static int32_t metaTagIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
// TODO
return 0;
}
static int32_t metaTagIdxUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
// TODO
return 0;
}
static int32_t metaTagIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaTagIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
// TODO
return 0;
}
// Btime Index
static int32_t metaBtimeIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaBtimeIdxUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
const SMetaEntry *pEntry = pParam->pEntry;
SBtimeIdxKey key = {
.uid = pEntry->uid,
};
@ -381,32 +544,35 @@ static int32_t metaBtimeIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaT
}
}
static int32_t metaBtimeIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaBtimeIdxUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
static int32_t metaBtimeIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaBtimeIdxUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaBtimeIdxUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaBtimeIdxUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
static int32_t metaBtimeIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaBtimeIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaBtimeIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
SBtimeIdxKey key = {
.uid = pEntry->uid,
};
static int32_t metaBtimeIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
// int32_t code = TSDB_CODE_SUCCESS;
// SBtimeIdxKey key = {
// .uid = pEntry->uid,
// };
if (TSDB_CHILD_TABLE == pEntry->type) {
key.btime = pEntry->ctbEntry.btime;
} else if (TSDB_NORMAL_TABLE == pEntry->type) {
key.btime = pEntry->ntbEntry.btime;
} else {
return TSDB_CODE_INVALID_PARA;
}
return tdbTbDelete(pMeta->pBtimeIdx, &key, sizeof(key), pMeta->txn);
// if (TSDB_CHILD_TABLE == pEntry->type) {
// key.btime = pEntry->ctbEntry.btime;
// } else if (TSDB_NORMAL_TABLE == pEntry->type) {
// key.btime = pEntry->ntbEntry.btime;
// } else {
// return TSDB_CODE_INVALID_PARA;
// }
// return tdbTbDelete(pMeta->pBtimeIdx, &key, sizeof(key), pMeta->txn);
return TSDB_CODE_SUCCESS;
}
// TTL Index
static int32_t metaTtlIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTableOp op) {
static int32_t metaTtlIdxUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
const SMetaEntry *pEntry = pParam->pEntry;
STtlUpdTtlCtx ctx = {
.uid = pEntry->uid,
.pTxn = pMeta->txn,
@ -426,20 +592,20 @@ static int32_t metaTtlIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry, EMetaTab
return TSDB_CODE_SUCCESS;
}
static int32_t metaTtlIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaTtlIdxUpsert(pMeta, pEntry, META_TABLE_OP_INSERT);
static int32_t metaTtlIdxInsert(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaTtlIdxUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
}
static int32_t metaTtlIdxUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
return metaTtlIdxUpsert(pMeta, pEntry, META_TABLE_OP_UPDATA);
static int32_t metaTtlIdxUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
return metaTtlIdxUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
}
static int32_t metaTtlIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaTtlIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
// TODO
return 0;
}
static int32_t (*metaTableOpFn[META_TABLE_MAX][META_TABLE_OP_MAX])(SMeta *pMeta, const SMetaEntry *pEntry) =
static int32_t (*metaTableOpFn[META_TABLE_MAX][META_TABLE_OP_MAX])(SMeta *pMeta, const SMetaHandleParam *pParam) =
{
[META_ENTRY_TABLE] =
{
@ -510,8 +676,11 @@ static int32_t metaHandleSuperTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE
for (int i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
SMetaTableOp *op = &ops[i];
const SMetaHandleParam param = {
.pEntry = pEntry,
};
code = metaTableOpFn[op->table][op->op](pMeta, pEntry);
code = metaTableOpFn[op->table][op->op](pMeta, &param);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
return code;
@ -554,7 +723,11 @@ static int32_t metaHandleNormalTableCreateImpl(SMeta *pMeta, const SMetaEntry *p
for (int i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
SMetaTableOp *op = &ops[i];
code = metaTableOpFn[op->table][op->op](pMeta, pEntry);
SMetaHandleParam param = {
.pEntry = pEntry,
};
code = metaTableOpFn[op->table][op->op](pMeta, &param);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
return code;
@ -589,7 +762,7 @@ static int32_t metaHandleNormalTableCreate(SMeta *pMeta, const SMetaEntry *pEntr
return code;
}
static int32_t metaHandleChildTableCreateImpl(SMeta *pMeta, const SMetaEntry *pEntry) {
static int32_t metaHandleChildTableCreateImpl(SMeta *pMeta, const SMetaEntry *pEntry, const SMetaEntry *pSuperEntry) {
int32_t code = TSDB_CODE_SUCCESS;
SMetaTableOp ops[] = {
@ -605,53 +778,61 @@ static int32_t metaHandleChildTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE
for (int i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
SMetaTableOp *op = &ops[i];
code = metaTableOpFn[op->table][op->op](pMeta, pEntry);
SMetaHandleParam param = {
.pEntry = pEntry,
.pSuperEntry = pSuperEntry,
};
code = metaTableOpFn[op->table][op->op](pMeta, &param);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
return code;
}
}
if (TSDB_CODE_SUCCESS == code) {
metaUpdateStbStats(pMeta, pSuperEntry->uid, 1, 0);
int32_t ret = metaUidCacheClear(pMeta, pSuperEntry->uid);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
ret = metaTbGroupCacheClear(pMeta, pSuperEntry->uid);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
}
return code;
}
static int32_t metaHandleChildTableCreate(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
SMetaEntry *pSuperEntry = NULL;
// get the super table entry
code = metaFetchEntryByUid(pMeta, pEntry->ctbEntry.suid, &pSuperEntry);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
return code;
}
// update TDB
metaWLock(pMeta);
code = metaHandleChildTableCreateImpl(pMeta, pEntry);
if (TSDB_CODE_SUCCESS == code) {
// metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1, 0);
// ret = metaUidCacheClear(pMeta, me.ctbEntry.suid);
// if (ret < 0) {
// metaError("vgId:%d, failed to clear uid cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
// pReq->ctb.suid, tstrerror(ret));
// }
// ret = metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
// if (ret < 0) {
// metaError("vgId:%d, failed to clear group cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode),
// pReq->name,
// pReq->ctb.suid, tstrerror(ret));
// }
}
code = metaHandleChildTableCreateImpl(pMeta, pEntry, pSuperEntry);
metaULock(pMeta);
// update other stuff
if (TSDB_CODE_SUCCESS != code) {
pMeta->pVnode->config.vndStats.numOfCTables++;
// if (!metaTbInFilterCache(pMeta, pReq->ctb.stbName, 1)) {
// int32_t nCols = 0;
// ret = metaGetStbStats(pMeta->pVnode, me.ctbEntry.suid, 0, &nCols);
// if (ret < 0) {
// metaError("vgId:%d, failed to get stb stats:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode),
// pReq->name,
// pReq->ctb.suid, tstrerror(ret));
// }
// pStats->numOfTimeSeries += nCols - 1;
// }
if (!metaTbInFilterCache(pMeta, pSuperEntry->name, 1)) {
int32_t nCols = 0;
int32_t ret = metaGetStbStats(pMeta->pVnode, pSuperEntry->uid, 0, &nCols);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
pMeta->pVnode->config.vndStats.numOfNTimeSeries += (nCols - 1);
}
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int32_t rc = tsdbCacheNewTable(pMeta->pVnode->pTsdb, pEntry->uid, pEntry->ctbEntry.suid, NULL);
@ -660,10 +841,13 @@ static int32_t metaHandleChildTableCreate(SMeta *pMeta, const SMetaEntry *pEntry
tstrerror(rc));
}
}
pMeta->changed = true;
} else {
metaErr(TD_VID(pMeta->pVnode), code);
}
metaFetchEntryFree(&pSuperEntry);
return code;
}
@ -680,13 +864,13 @@ static int32_t metaHandleNormalTableDropImpl(SMeta *pMeta, const SMetaEntry *pEn
// {META_TTL_IDX, META_TABLE_OP_INSERT}, //
};
for (int32_t i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
SMetaTableOp *op = &ops[i];
code = metaTableOpFn[op->table][op->op](pMeta, pEntry);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
}
// for (int32_t i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
// SMetaTableOp *op = &ops[i];
// code = metaTableOpFn[op->table][op->op](pMeta, pEntry);
// if (code) {
// metaErr(TD_VID(pMeta->pVnode), code);
// }
// }
return code;
}
@ -698,13 +882,13 @@ static int32_t metaHandleNormalTableDrop(SMeta *pMeta, const SMetaEntry *pEntry)
// TODO: get entry
metaWLock(pMeta);
code = metaHandleNormalTableDropImpl(pMeta, &entry);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
} else {
code = metaTableOpFn[META_ENTRY_TABLE][META_TABLE_OP_INSERT](pMeta, pEntry);
}
metaULock(pMeta);
// code = metaHandleNormalTableDropImpl(pMeta, &entry);
// if (code) {
// metaErr(TD_VID(pMeta->pVnode), code);
// } else {
// code = metaTableOpFn[META_ENTRY_TABLE][META_TABLE_OP_INSERT](pMeta, pEntry);
// }
// metaULock(pMeta);
if (TSDB_CODE_SUCCESS == code) {
pMeta->pVnode->config.vndStats.numOfNTables--;

View File

@ -17,7 +17,8 @@
extern SDmNotifyHandle dmNotifyHdl;
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -29,7 +30,7 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl);
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
// opt ins_tables query
static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -110,7 +111,7 @@ int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STabl
return 0;
}
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
int32_t code = 0;
#ifdef USE_INVERTED_INDEX
@ -3160,7 +3161,7 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_
return 0;
}
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
if (pTagIdxKey) taosMemoryFree(pTagIdxKey);
}

View File

@ -465,7 +465,7 @@ int32_t metaDropTable2(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray
int32_t metaCreateTable2(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **ppRsp) {
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CHILD_TABLE == pReq->type) {
code = metaCreateTable(pMeta, version, pReq, ppRsp);
code = metaCreateChildTable(pMeta, version, pReq, ppRsp);
} else if (TSDB_NORMAL_TABLE == pReq->type) {
code = metaCreateNormalTable(pMeta, version, pReq, ppRsp);
} else {