feat: replace add index
This commit is contained in:
parent
bdaa3228a2
commit
582a40874f
|
@ -180,8 +180,8 @@ int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
||||||
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
|
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
|
||||||
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
|
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
|
||||||
|
|
||||||
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
int32_t metaAddIndexToSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||||
|
|
||||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
|
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
|
||||||
void metaUpdTimeSeriesNum(SMeta* pMeta);
|
void metaUpdTimeSeriesNum(SMeta* pMeta);
|
||||||
|
|
|
@ -18,6 +18,11 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const S
|
||||||
void metaTimeSeriesNotifyCheck(SMeta *pMeta);
|
void metaTimeSeriesNotifyCheck(SMeta *pMeta);
|
||||||
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
|
|
||||||
|
static int32_t metaGetChildUidsOfSuperTable(SMeta *pMeta, tb_uid_t suid, SArray **childList);
|
||||||
|
static int32_t metaFetchTagIdxKey(SMeta *pMeta, const SMetaEntry *pEntry, const SSchema *pTagColumn,
|
||||||
|
STagIdxKey **ppTagIdxKey, int32_t *pTagIdxKeySize);
|
||||||
|
static void metaFetchTagIdxKeyFree(STagIdxKey **ppTagIdxKey);
|
||||||
|
|
||||||
#define metaErr(VGID, ERRNO) \
|
#define metaErr(VGID, ERRNO) \
|
||||||
do { \
|
do { \
|
||||||
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", VGID, \
|
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", VGID, \
|
||||||
|
@ -263,6 +268,168 @@ static int32_t metaSchemaTableInsert(SMeta *pMeta, const SMetaHandleParam *pPara
|
||||||
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
|
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_INSERT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t metaAddOrDropTagIndexOfSuperTable(SMeta *pMeta, const SMetaHandleParam *pParam,
|
||||||
|
const SSchema *pOldColumn, const SSchema *pNewColumn) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
const SMetaEntry *pEntry = pParam->pEntry;
|
||||||
|
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||||
|
enum { ADD_INDEX, DROP_INDEX } action;
|
||||||
|
|
||||||
|
if (pOldColumn && pNewColumn) {
|
||||||
|
if (IS_IDX_ON(pOldColumn) && IS_IDX_ON(pNewColumn)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (IS_IDX_ON(pOldColumn) && !IS_IDX_ON(pNewColumn)) {
|
||||||
|
action = DROP_INDEX;
|
||||||
|
} else if (!IS_IDX_ON(pOldColumn) && IS_IDX_ON(pNewColumn)) {
|
||||||
|
action = ADD_INDEX;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else if (pOldColumn) {
|
||||||
|
if (IS_IDX_ON(pOldColumn)) {
|
||||||
|
action = DROP_INDEX;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (IS_IDX_ON(pNewColumn)) {
|
||||||
|
action = ADD_INDEX;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch all child tables
|
||||||
|
SArray *childTables = 0;
|
||||||
|
code = metaGetChildUidsOfSuperTable(pMeta, pEntry->uid, &childTables);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do drop or add index
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(childTables); i++) {
|
||||||
|
int64_t uid = *(int64_t *)taosArrayGet(childTables, i);
|
||||||
|
|
||||||
|
// fetch child entry
|
||||||
|
SMetaEntry *pChildEntry = NULL;
|
||||||
|
code = metaFetchEntryByUid(pMeta, uid, &pChildEntry);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
taosArrayDestroy(childTables);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
STagIdxKey *pTagIdxKey = NULL;
|
||||||
|
int32_t tagIdxKeySize = 0;
|
||||||
|
|
||||||
|
if (action == ADD_INDEX) {
|
||||||
|
code = metaFetchTagIdxKey(pMeta, pChildEntry, pNewColumn, &pTagIdxKey, &tagIdxKeySize);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
taosArrayDestroy(childTables);
|
||||||
|
metaFetchEntryFree(&pChildEntry);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, tagIdxKeySize, &pChildEntry->uid, sizeof(pChildEntry->uid),
|
||||||
|
pMeta->txn);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
taosArrayDestroy(childTables);
|
||||||
|
metaFetchEntryFree(&pChildEntry);
|
||||||
|
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = metaFetchTagIdxKey(pMeta, pChildEntry, pOldColumn, &pTagIdxKey, &tagIdxKeySize);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
taosArrayDestroy(childTables);
|
||||||
|
metaFetchEntryFree(&pChildEntry);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, tagIdxKeySize, pMeta->txn);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
taosArrayDestroy(childTables);
|
||||||
|
metaFetchEntryFree(&pChildEntry);
|
||||||
|
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metaFetchTagIdxKeyFree(&pTagIdxKey);
|
||||||
|
metaFetchEntryFree(&pChildEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(childTables);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t metaUpdateSuperTableTagSchema(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
const SMetaEntry *pEntry = pParam->pEntry;
|
||||||
|
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||||
|
const SSchemaWrapper *pNewTagSchema = &pEntry->stbEntry.schemaTag;
|
||||||
|
const SSchemaWrapper *pOldTagSchema = &pOldEntry->stbEntry.schemaTag;
|
||||||
|
|
||||||
|
int32_t iOld = 0, iNew = 0;
|
||||||
|
for (; iOld < pOldTagSchema->nCols && iNew < pNewTagSchema->nCols;) {
|
||||||
|
SSchema *pOldColumn = pOldTagSchema->pSchema + iOld;
|
||||||
|
SSchema *pNewColumn = pNewTagSchema->pSchema + iNew;
|
||||||
|
|
||||||
|
if (pOldColumn->colId == pNewColumn->colId) {
|
||||||
|
code = metaAddOrDropTagIndexOfSuperTable(pMeta, pParam, pOldColumn, pNewColumn);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
iOld++;
|
||||||
|
iNew++;
|
||||||
|
} else if (pOldColumn->colId < pNewColumn->colId) {
|
||||||
|
code = metaAddOrDropTagIndexOfSuperTable(pMeta, pParam, pOldColumn, NULL);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
iOld++;
|
||||||
|
} else {
|
||||||
|
code = metaAddOrDropTagIndexOfSuperTable(pMeta, pParam, NULL, pNewColumn);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
iNew++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iOld < pOldTagSchema->nCols; iOld++) {
|
||||||
|
SSchema *pOldColumn = pOldTagSchema->pSchema + iOld;
|
||||||
|
code = metaAddOrDropTagIndexOfSuperTable(pMeta, pParam, pOldColumn, NULL);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iNew < pNewTagSchema->nCols; iNew++) {
|
||||||
|
SSchema *pNewColumn = pNewTagSchema->pSchema + iNew;
|
||||||
|
code = metaAddOrDropTagIndexOfSuperTable(pMeta, pParam, NULL, pNewColumn);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -273,29 +440,30 @@ static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaHandleParam *pPara
|
||||||
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry->type == TSDB_NORMAL_TABLE &&
|
if (pEntry->type == TSDB_NORMAL_TABLE) {
|
||||||
pOldEntry->ntbEntry.schemaRow.version != pEntry->ntbEntry.schemaRow.version) {
|
// check row schema
|
||||||
code = metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
if (pOldEntry->ntbEntry.schemaRow.version != pEntry->ntbEntry.schemaRow.version) {
|
||||||
if (code) {
|
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||||
metaErr(TD_VID(pMeta->pVnode), code);
|
}
|
||||||
return code;
|
} else if (pEntry->type == TSDB_SUPER_TABLE) {
|
||||||
|
// check row schema
|
||||||
|
if (pOldEntry->stbEntry.schemaRow.version != pEntry->stbEntry.schemaRow.version) {
|
||||||
|
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOldEntry->ntbEntry.schemaRow.nCols != pEntry->ntbEntry.schemaRow.nCols) {
|
// check tag schema
|
||||||
pMeta->pVnode->config.vndStats.numOfNTimeSeries +=
|
if (pOldEntry->stbEntry.schemaTag.version != pEntry->stbEntry.schemaTag.version) {
|
||||||
(pEntry->ntbEntry.schemaRow.nCols - pOldEntry->ntbEntry.schemaRow.nCols);
|
code = metaUpdateSuperTableTagSchema(pMeta, pParam);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry->type == TSDB_SUPER_TABLE && pOldEntry->stbEntry.schemaRow.version != pEntry->stbEntry.schemaRow.version) {
|
return TSDB_CODE_SUCCESS;
|
||||||
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pParam->pEntry->type == TSDB_CHILD_TABLE) {
|
|
||||||
return TSDB_CODE_INVALID_MSG;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t metaSchemaTableDelete(SMeta *pMeta, const SMetaHandleParam *pEntry) {
|
static int32_t metaSchemaTableDelete(SMeta *pMeta, const SMetaHandleParam *pEntry) {
|
||||||
|
@ -1413,6 +1581,58 @@ static int32_t metaHandleChildTableUpdateImpl(SMeta *pMeta, const SMetaHandlePar
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t metaHandleSuperTableUpdateImpl(SMeta *pMeta, SMetaHandleParam *pParam) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
const SMetaEntry *pEntry = pParam->pEntry;
|
||||||
|
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||||
|
|
||||||
|
SMetaTableOp ops[] = {
|
||||||
|
{META_ENTRY_TABLE, META_TABLE_OP_UPDATA}, //
|
||||||
|
{META_UID_IDX, META_TABLE_OP_UPDATA}, //
|
||||||
|
{META_SCHEMA_TABLE, META_TABLE_OP_UPDATA}, //
|
||||||
|
};
|
||||||
|
|
||||||
|
for (int i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
|
||||||
|
SMetaTableOp *op = &ops[i];
|
||||||
|
code = metaTableOpFn[op->table][op->op](pMeta, pParam);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SMetaEntry *pOldEntry = NULL;
|
||||||
|
|
||||||
|
code = metaFetchEntryByUid(pMeta, pEntry->uid, &pOldEntry);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaHandleParam param = {
|
||||||
|
.pEntry = pEntry,
|
||||||
|
.pOldEntry = pOldEntry,
|
||||||
|
};
|
||||||
|
metaWLock(pMeta);
|
||||||
|
code = metaHandleSuperTableUpdateImpl(pMeta, ¶m);
|
||||||
|
metaULock(pMeta);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
metaFetchEntryFree(&pOldEntry);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaFetchEntryFree(&pOldEntry);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t metaHandleChildTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
|
static int32_t metaHandleChildTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -1589,7 +1809,7 @@ int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case TSDB_SUPER_TABLE: {
|
case TSDB_SUPER_TABLE: {
|
||||||
if (isExist) {
|
if (isExist) {
|
||||||
// code = metaHandleSuperTableUpdate(pMeta, pEntry);
|
code = metaHandleSuperTableUpdate(pMeta, pEntry);
|
||||||
} else {
|
} else {
|
||||||
code = metaHandleSuperTableCreate(pMeta, pEntry);
|
code = metaHandleSuperTableCreate(pMeta, pEntry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1541,3 +1541,315 @@ int32_t metaUpdateTableColCompress2(SMeta *pMeta, int64_t version, SVAlterTbReq
|
||||||
metaFetchEntryFree(&pEntry);
|
metaFetchEntryFree(&pEntry);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t metaAddIndexToSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (NULL == pReq->name || strlen(pReq->name) == 0) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since invalid table name, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
|
||||||
|
__FILE__, __LINE__, version);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaEntry *pEntry = NULL;
|
||||||
|
code = metaFetchEntryByName(pMeta, pReq->name, &pEntry);
|
||||||
|
if (code) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
|
||||||
|
__FILE__, __LINE__, pReq->name, version);
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->type != TSDB_SUPER_TABLE) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s type %d is invalid, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
|
__func__, __FILE__, __LINE__, pReq->name, pEntry->type, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_VND_INVALID_TABLE_ACTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->uid != pReq->suid) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " is not equal to %" PRId64
|
||||||
|
", version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, pEntry->uid, pReq->suid, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->stbEntry.schemaTag.version >= pReq->schemaTag.version) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s tag schema version %" PRId64 " is not less than %" PRId64
|
||||||
|
", version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, pEntry->stbEntry.schemaTag.version,
|
||||||
|
pReq->schemaTag.version, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
// do change the entry
|
||||||
|
SSchemaWrapper *pOldTagSchema = &pEntry->stbEntry.schemaTag;
|
||||||
|
SSchemaWrapper *pNewTagSchema = &pReq->schemaTag;
|
||||||
|
if (pOldTagSchema->nCols == 1 && pOldTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s has no tag, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
|
__func__, __FILE__, __LINE__, pReq->name, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_VND_COL_NOT_EXISTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOldTagSchema->nCols != pNewTagSchema->nCols) {
|
||||||
|
metaError(
|
||||||
|
"vgId:%d, %s failed at %s:%d since table %s tag schema column count %d is not equal to %d, version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, pOldTagSchema->nCols, pNewTagSchema->nCols,
|
||||||
|
version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOldTagSchema->version >= pNewTagSchema->version) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s tag schema version %" PRId64 " is not less than %" PRId64
|
||||||
|
", version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, pOldTagSchema->version,
|
||||||
|
pNewTagSchema->version, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfChangedTags = 0;
|
||||||
|
for (int32_t i = 0; i < pOldTagSchema->nCols; i++) {
|
||||||
|
SSchema *pOldColumn = pOldTagSchema->pSchema + i;
|
||||||
|
SSchema *pNewColumn = pNewTagSchema->pSchema + i;
|
||||||
|
|
||||||
|
if (pOldColumn->type != pNewColumn->type || pOldColumn->colId != pNewColumn->colId ||
|
||||||
|
strncmp(pOldColumn->name, pNewColumn->name, sizeof(pNewColumn->name))) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s tag schema column %d is not equal, version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, i, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_IDX_ON(pNewColumn) && !IS_IDX_ON(pOldColumn)) {
|
||||||
|
numOfChangedTags++;
|
||||||
|
SSCHMEA_SET_IDX_ON(pOldColumn);
|
||||||
|
} else if (!IS_IDX_ON(pNewColumn) && IS_IDX_ON(pOldColumn)) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since table %s tag schema column %d is not equal, version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, i, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfChangedTags != 1) {
|
||||||
|
metaError(
|
||||||
|
"vgId:%d, %s failed at %s:%d since table %s tag schema column count %d is not equal to 1, version:%" PRId64,
|
||||||
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->name, numOfChangedTags, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
pEntry->version = version;
|
||||||
|
pEntry->stbEntry.schemaTag.version = pNewTagSchema->version;
|
||||||
|
|
||||||
|
// do handle the entry
|
||||||
|
code = metaHandleEntry2(pMeta, pEntry);
|
||||||
|
if (code) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
|
__func__, __FILE__, __LINE__, tstrerror(code), pEntry->uid, pReq->name, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
|
} else {
|
||||||
|
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->name,
|
||||||
|
pEntry->uid, version);
|
||||||
|
}
|
||||||
|
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaDropIndexFromSuperTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
// TODO
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
#if 0
|
||||||
|
int32_t code = 0;
|
||||||
|
SMetaEntry oStbEntry = {0};
|
||||||
|
SMetaEntry nStbEntry = {0};
|
||||||
|
|
||||||
|
STbDbKey tbDbKey = {0};
|
||||||
|
TBC *pUidIdxc = NULL;
|
||||||
|
TBC *pTbDbc = NULL;
|
||||||
|
int ret = 0;
|
||||||
|
int c = -2;
|
||||||
|
void *pData = NULL;
|
||||||
|
int nData = 0;
|
||||||
|
int64_t oversion;
|
||||||
|
SDecoder dc = {0};
|
||||||
|
|
||||||
|
tb_uid_t suid = pReq->stbUid;
|
||||||
|
|
||||||
|
if ((code = tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pData, &nData)) != 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tbDbKey.uid = suid;
|
||||||
|
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
if ((code = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData)) != 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderInit(&dc, pData, nData);
|
||||||
|
code = metaDecodeEntry(&dc, &oStbEntry);
|
||||||
|
if (code != 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema *pCol = NULL;
|
||||||
|
int32_t colId = -1;
|
||||||
|
for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
|
||||||
|
SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
|
||||||
|
if (0 == strncmp(schema->name, pReq->colName, sizeof(pReq->colName))) {
|
||||||
|
if (IS_IDX_ON(schema)) {
|
||||||
|
pCol = schema;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCol == NULL) {
|
||||||
|
metaError("vgId:%d, failed to drop index on %s.%s,since %s", TD_VID(pMeta->pVnode), pReq->stb, pReq->colName,
|
||||||
|
tstrerror(TSDB_CODE_VND_COL_NOT_EXISTS));
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* iterator all pTdDbc by uid and version
|
||||||
|
*/
|
||||||
|
TBC *pCtbIdxc = NULL;
|
||||||
|
code = tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
|
||||||
|
if (code != 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
|
||||||
|
if (code < 0) {
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
for (;;) {
|
||||||
|
void *pKey = NULL, *pVal = NULL;
|
||||||
|
int nKey = 0, nVal = 0;
|
||||||
|
|
||||||
|
code = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
|
||||||
|
if (code < 0) {
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
pCtbIdxc = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (((SCtbIdxKey *)pKey)->suid != suid) {
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
STagIdxKey *pTagIdxKey = NULL;
|
||||||
|
int32_t nTagIdxKey;
|
||||||
|
|
||||||
|
const void *pTagData = NULL;
|
||||||
|
int32_t nTagData = 0;
|
||||||
|
|
||||||
|
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
|
||||||
|
STagVal tagVal = {.cid = pCol->colId};
|
||||||
|
if (tTagGet((const STag *)pVal, &tagVal)) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
pTagData = tagVal.pData;
|
||||||
|
nTagData = (int32_t)tagVal.nData;
|
||||||
|
} else {
|
||||||
|
pTagData = &(tagVal.i64);
|
||||||
|
nTagData = tDataTypes[pCol->type].bytes;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
nTagData = tDataTypes[pCol->type].bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey);
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
if (code < 0) {
|
||||||
|
metaDestroyTagIdxKey(pTagIdxKey);
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaWLock(pMeta);
|
||||||
|
ret = tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("vgId:%d, failed to delete tag idx key:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->stb,
|
||||||
|
pReq->stbUid, tstrerror(ret));
|
||||||
|
}
|
||||||
|
metaULock(pMeta);
|
||||||
|
metaDestroyTagIdxKey(pTagIdxKey);
|
||||||
|
pTagIdxKey = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear idx flag
|
||||||
|
SSCHMEA_SET_IDX_OFF(pCol);
|
||||||
|
|
||||||
|
nStbEntry.version = version;
|
||||||
|
nStbEntry.type = TSDB_SUPER_TABLE;
|
||||||
|
nStbEntry.uid = oStbEntry.uid;
|
||||||
|
nStbEntry.name = oStbEntry.name;
|
||||||
|
|
||||||
|
SSchemaWrapper *row = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaRow);
|
||||||
|
SSchemaWrapper *tag = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaTag);
|
||||||
|
SColCmprWrapper *cmpr = tCloneSColCmprWrapper(&oStbEntry.colCmpr);
|
||||||
|
if (row == NULL || tag == NULL || cmpr == NULL) {
|
||||||
|
tDeleteSchemaWrapper(row);
|
||||||
|
tDeleteSchemaWrapper(tag);
|
||||||
|
tDeleteSColCmprWrapper(cmpr);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
nStbEntry.stbEntry.schemaRow = *row;
|
||||||
|
nStbEntry.stbEntry.schemaTag = *tag;
|
||||||
|
nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam;
|
||||||
|
nStbEntry.colCmpr = *cmpr;
|
||||||
|
|
||||||
|
nStbEntry.colCmpr = oStbEntry.colCmpr;
|
||||||
|
|
||||||
|
metaWLock(pMeta);
|
||||||
|
// update table.db
|
||||||
|
ret = metaSaveToTbDb(pMeta, &nStbEntry);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("vgId:%d, failed to save tb db:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->stb,
|
||||||
|
pReq->stbUid, tstrerror(ret));
|
||||||
|
}
|
||||||
|
// update uid index
|
||||||
|
ret = metaUpdateUidIdx(pMeta, &nStbEntry);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->stb,
|
||||||
|
pReq->stbUid, tstrerror(ret));
|
||||||
|
}
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
tDeleteSchemaWrapper(tag);
|
||||||
|
tDeleteSchemaWrapper(row);
|
||||||
|
tDeleteSColCmprWrapper(cmpr);
|
||||||
|
|
||||||
|
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
tdbFree(pData);
|
||||||
|
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
tdbFree(pData);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
#endif
|
||||||
|
}
|
|
@ -2433,7 +2433,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
return terrno = TSDB_CODE_INVALID_MSG;
|
return terrno = TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = metaAddIndexToSTable(pVnode->pMeta, ver, &req);
|
code = metaAddIndexToSuperTable(pVnode->pMeta, ver, &req);
|
||||||
if (code) {
|
if (code) {
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
Loading…
Reference in New Issue