more code
This commit is contained in:
parent
b1c480afd1
commit
01ceb2e90b
|
@ -15,6 +15,7 @@ void metaCloneEntryFree(SMetaEntry **ppEntry);
|
||||||
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
||||||
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||||
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||||
|
void metaTimeSeriesNotifyCheck(SMeta *pMeta);
|
||||||
|
|
||||||
#define metaErr(VGID, ERRNO) \
|
#define metaErr(VGID, ERRNO) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -54,7 +55,7 @@ typedef struct {
|
||||||
EMetaTableOp op;
|
EMetaTableOp op;
|
||||||
} SMetaTableOp;
|
} SMetaTableOp;
|
||||||
|
|
||||||
static int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry) {
|
int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void *value = NULL;
|
void *value = NULL;
|
||||||
int32_t valueSize = 0;
|
int32_t valueSize = 0;
|
||||||
|
@ -129,7 +130,7 @@ static int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry *
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void metaFetchEntryFree(SMetaEntry **ppEntry) { metaCloneEntryFree(ppEntry); }
|
void metaFetchEntryFree(SMetaEntry **ppEntry) { metaCloneEntryFree(ppEntry); }
|
||||||
|
|
||||||
// Entry Table
|
// Entry Table
|
||||||
static int32_t metaEntryTableUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
|
static int32_t metaEntryTableUpsert(SMeta *pMeta, const SMetaHandleParam *pParam, EMetaTableOp op) {
|
||||||
|
@ -262,7 +263,38 @@ static int32_t metaSchemaTableInsert(SMeta *pMeta, const SMetaHandleParam *pPara
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
static int32_t metaSchemaTableUpdate(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||||
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
const SMetaEntry *pEntry = pParam->pEntry;
|
||||||
|
const SMetaEntry *pOldEntry = pParam->pOldEntry;
|
||||||
|
|
||||||
|
if (NULL == pOldEntry) {
|
||||||
|
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->type == TSDB_NORMAL_TABLE &&
|
||||||
|
pOldEntry->ntbEntry.schemaRow.version != pEntry->ntbEntry.schemaRow.version) {
|
||||||
|
code = metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOldEntry->ntbEntry.schemaRow.nCols != pEntry->ntbEntry.schemaRow.nCols) {
|
||||||
|
pMeta->pVnode->config.vndStats.numOfNTimeSeries +=
|
||||||
|
(pEntry->ntbEntry.schemaRow.nCols - pOldEntry->ntbEntry.schemaRow.nCols);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->type == TSDB_SUPER_TABLE && pOldEntry->stbEntry.schemaRow.version != pEntry->stbEntry.schemaRow.version) {
|
||||||
|
return metaSchemaTableUpsert(pMeta, pParam, META_TABLE_OP_UPDATA);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pParam->pEntry->type == TSDB_CHILD_TABLE) {
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t metaSchemaTableDelete(SMeta *pMeta, const SMetaHandleParam *pEntry) {
|
static int32_t metaSchemaTableDelete(SMeta *pMeta, const SMetaHandleParam *pEntry) {
|
||||||
|
@ -1211,6 +1243,75 @@ static int32_t metaHandleSuperTableDropImpl(SMeta *pMeta, const SMetaHandleParam
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t metaHandleNormalTableUpdateImpl(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
const SMetaEntry *pEntry = pParam->pEntry;
|
||||||
|
|
||||||
|
SMetaTableOp ops[] = {
|
||||||
|
{META_ENTRY_TABLE, META_TABLE_OP_UPDATA}, //
|
||||||
|
{META_SCHEMA_TABLE, META_TABLE_OP_UPDATA}, //
|
||||||
|
{META_UID_IDX, META_TABLE_OP_UPDATA}, //
|
||||||
|
};
|
||||||
|
for (int32_t i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
|
||||||
|
SMetaTableOp *op = &ops[i];
|
||||||
|
code = metaTableOpFn[op->table][op->op](pMeta, pParam);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#if 0
|
||||||
|
if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) {
|
||||||
|
metaError("vgId:%d, failed to update change time:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t metaHandleNormalTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SMetaEntry *pOldEntry = NULL;
|
||||||
|
|
||||||
|
// fetch old entry
|
||||||
|
code = metaFetchEntryByUid(pMeta, pEntry->uid, &pOldEntry);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle update
|
||||||
|
SMetaHandleParam param = {
|
||||||
|
.pEntry = pEntry,
|
||||||
|
.pOldEntry = pOldEntry,
|
||||||
|
};
|
||||||
|
metaWLock(pMeta);
|
||||||
|
code = metaHandleNormalTableUpdateImpl(pMeta, ¶m);
|
||||||
|
metaULock(pMeta);
|
||||||
|
if (code) {
|
||||||
|
metaErr(TD_VID(pMeta->pVnode), code);
|
||||||
|
metaFetchEntryFree(&pOldEntry);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do other stuff
|
||||||
|
if (!TSDB_CACHE_NO(pMeta->pVnode->config) &&
|
||||||
|
pEntry->ntbEntry.schemaRow.version != pOldEntry->ntbEntry.schemaRow.version) {
|
||||||
|
#if 0
|
||||||
|
int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId;
|
||||||
|
int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type;
|
||||||
|
int32_t ret = tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = ret;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, pEntry->uid, pEntry->ntbEntry.schemaRow.version);
|
||||||
|
}
|
||||||
|
metaTimeSeriesNotifyCheck(pMeta);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t metaHandleSuperTableDrop(SMeta *pMeta, const SMetaEntry *pEntry) {
|
static int32_t metaHandleSuperTableDrop(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SArray *childList = NULL;
|
SArray *childList = NULL;
|
||||||
|
@ -1304,7 +1405,7 @@ int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) {
|
||||||
}
|
}
|
||||||
case TSDB_NORMAL_TABLE: {
|
case TSDB_NORMAL_TABLE: {
|
||||||
if (isExist) {
|
if (isExist) {
|
||||||
// code = metaHandleNormalTableUpdate(pMeta, pEntry);
|
code = metaHandleNormalTableUpdate(pMeta, pEntry);
|
||||||
} else {
|
} else {
|
||||||
code = metaHandleNormalTableCreate(pMeta, pEntry);
|
code = metaHandleNormalTableCreate(pMeta, pEntry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -286,7 +286,7 @@ _exception:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
|
void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
|
||||||
#if defined(TD_ENTERPRISE)
|
#if defined(TD_ENTERPRISE)
|
||||||
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
|
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
|
||||||
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
|
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
|
||||||
|
@ -2988,6 +2988,7 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
|
||||||
pMeta->changed = true;
|
pMeta->changed = true;
|
||||||
switch (pReq->action) {
|
switch (pReq->action) {
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||||
|
return metaAddTableColumn(pMeta, version, pReq, pMetaRsp);
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
|
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
|
||||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
extern int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry);
|
extern int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry);
|
||||||
extern int32_t metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp);
|
extern int32_t metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp);
|
||||||
|
extern int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry);
|
||||||
|
extern void metaFetchEntryFree(SMetaEntry **ppEntry);
|
||||||
|
|
||||||
static int32_t metaCheckCreateSuperTableReq(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
static int32_t metaCheckCreateSuperTableReq(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||||
|
@ -557,6 +559,7 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST
|
||||||
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check name
|
||||||
void *value = NULL;
|
void *value = NULL;
|
||||||
int32_t valueSize = 0;
|
int32_t valueSize = 0;
|
||||||
code = tdbTbGet(pMeta->pNameIdx, pReq->tbName, strlen(pReq->tbName) + 1, &value, &valueSize);
|
code = tdbTbGet(pMeta->pNameIdx, pReq->tbName, strlen(pReq->tbName) + 1, &value, &valueSize);
|
||||||
|
@ -566,312 +569,130 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST
|
||||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t uid = *(int64_t *)value;
|
int64_t uid = *(int64_t *)value;
|
||||||
tdbFreeClear(value);
|
tdbFreeClear(value);
|
||||||
|
|
||||||
// TODO
|
// check table type
|
||||||
return code;
|
SMetaInfo info;
|
||||||
|
if (metaGetInfo(pMeta, uid, &info, NULL) != 0) {
|
||||||
#if 0
|
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64
|
||||||
void *pVal = NULL;
|
" not found, this is an internal error in meta, version:%" PRId64,
|
||||||
int nVal = 0;
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, version);
|
||||||
const void *pData = NULL;
|
code = TSDB_CODE_INTERNAL_ERROR;
|
||||||
int nData = 0;
|
TAOS_RETURN(code);
|
||||||
int ret = 0;
|
}
|
||||||
tb_uid_t uid;
|
if (info.suid != 0) {
|
||||||
int64_t oversion;
|
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " is not a normal table, version:%" PRId64,
|
||||||
SSchema *pColumn = NULL;
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, version);
|
||||||
SMetaEntry entry = {0};
|
code = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||||
SSchemaWrapper *pSchema;
|
TAOS_RETURN(code);
|
||||||
int c;
|
|
||||||
bool freeColCmpr = false;
|
|
||||||
|
|
||||||
// search uid index
|
|
||||||
TBC *pUidIdxc = NULL;
|
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL));
|
|
||||||
ret = tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
|
|
||||||
if (c != 0) {
|
|
||||||
tdbTbcClose(pUidIdxc);
|
|
||||||
metaError("meta/table: invalide c: %" PRId32 " alt tb column failed.", c);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
// check grant
|
||||||
oversion = ((SUidIdxVal *)pData)[0].version;
|
code = grantCheck(TSDB_GRANT_TIMESERIES);
|
||||||
|
if (code) {
|
||||||
// search table.db
|
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " name:%s", TD_VID(pMeta->pVnode), __func__,
|
||||||
TBC *pTbDbc = NULL;
|
__FILE__, __LINE__, tstrerror(code), version, pReq->tbName);
|
||||||
|
TAOS_RETURN(code);
|
||||||
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL));
|
|
||||||
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
|
|
||||||
if (c != 0) {
|
|
||||||
tdbTbcClose(pUidIdxc);
|
|
||||||
tdbTbcClose(pTbDbc);
|
|
||||||
metaError("meta/table: invalide c: %" PRId32 " alt tb column failed.", c);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
|
// fetch old entry
|
||||||
|
SMetaEntry *pEntry = NULL;
|
||||||
// get table entry
|
code = metaFetchEntryByUid(pMeta, uid, &pEntry);
|
||||||
SDecoder dc = {0};
|
if (code) {
|
||||||
if ((entry.pBuf = taosMemoryMalloc(nData)) == NULL) {
|
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " not found, version:%" PRId64,
|
||||||
tdbTbcClose(pUidIdxc);
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, version);
|
||||||
tdbTbcClose(pTbDbc);
|
TAOS_RETURN(code);
|
||||||
return terrno;
|
|
||||||
}
|
}
|
||||||
memcpy(entry.pBuf, pData, nData);
|
if (pEntry->version >= version) {
|
||||||
tDecoderInit(&dc, entry.pBuf, nData);
|
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " version %" PRId64
|
||||||
ret = metaDecodeEntry(&dc, &entry);
|
" is not less than %" PRId64,
|
||||||
if (ret != 0) {
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, pEntry->version, version);
|
||||||
tdbTbcClose(pUidIdxc);
|
metaFetchEntryFree(&pEntry);
|
||||||
tdbTbcClose(pTbDbc);
|
TAOS_RETURN(TSDB_CODE_INVALID_PARA);
|
||||||
tDecoderClear(&dc);
|
|
||||||
metaError("meta/table: invalide ret: %" PRId32 " alt tb column failed.", ret);
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (entry.type != TSDB_NORMAL_TABLE) {
|
// do add column
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
int32_t rowSize = 0;
|
||||||
goto _err;
|
SSchemaWrapper *pSchema = &pEntry->ntbEntry.schemaRow;
|
||||||
}
|
SSchema *pColumn;
|
||||||
// search the column to add/drop/update
|
pEntry->version = version;
|
||||||
pSchema = &entry.ntbEntry.schemaRow;
|
for (int32_t i = 0; i < pSchema->nCols; i++) {
|
||||||
|
pColumn = &pSchema->pSchema[i];
|
||||||
// save old entry
|
if (strncmp(pColumn->name, pReq->colName, TSDB_COL_NAME_LEN) == 0) {
|
||||||
SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid};
|
metaError("vgId:%d, %s failed at %s:%d since column %s already exists in table %s, version:%" PRId64,
|
||||||
oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols;
|
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->colName, pReq->tbName, version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
int32_t rowLen = -1;
|
TAOS_RETURN(TSDB_CODE_VND_COL_ALREADY_EXISTS);
|
||||||
if (pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ||
|
|
||||||
pAlterTbReq->action == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES) {
|
|
||||||
rowLen = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t iCol = 0, jCol = 0;
|
|
||||||
SSchema *qColumn = NULL;
|
|
||||||
for (;;) {
|
|
||||||
qColumn = NULL;
|
|
||||||
|
|
||||||
if (jCol >= pSchema->nCols) break;
|
|
||||||
qColumn = &pSchema->pSchema[jCol];
|
|
||||||
|
|
||||||
if (!pColumn && (strcmp(qColumn->name, pAlterTbReq->colName) == 0)) {
|
|
||||||
pColumn = qColumn;
|
|
||||||
iCol = jCol;
|
|
||||||
if (rowLen < 0) break;
|
|
||||||
}
|
}
|
||||||
rowLen += qColumn->bytes;
|
rowSize += pColumn->bytes;
|
||||||
++jCol;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.version = version;
|
if (rowSize + pReq->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
||||||
int tlen;
|
metaError("vgId:%d, %s failed at %s:%d since row size %d + %d > %d, version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
SSchema *pNewSchema = NULL;
|
__func__, __FILE__, __LINE__, rowSize, pReq->bytes, TSDB_MAX_BYTES_PER_ROW, version);
|
||||||
SSchema tScheam;
|
metaFetchEntryFree(&pEntry);
|
||||||
switch (pAlterTbReq->action) {
|
TAOS_RETURN(TSDB_CODE_PAR_INVALID_ROW_LENGTH);
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
|
|
||||||
if (pColumn) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (rowLen + pAlterTbReq->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
|
||||||
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pSchema->version++;
|
|
||||||
pSchema->nCols++;
|
|
||||||
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
|
|
||||||
if (pNewSchema == NULL) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1));
|
|
||||||
pSchema->pSchema = pNewSchema;
|
|
||||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes;
|
|
||||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type;
|
|
||||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags;
|
|
||||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++;
|
|
||||||
strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName);
|
|
||||||
|
|
||||||
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
|
||||||
metaTimeSeriesNotifyCheck(pMeta);
|
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
|
||||||
int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId;
|
|
||||||
int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type;
|
|
||||||
int32_t ret = tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
|
||||||
if (ret < 0) {
|
|
||||||
terrno = ret;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1];
|
|
||||||
uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type)
|
|
||||||
: pAlterTbReq->compress;
|
|
||||||
if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) {
|
|
||||||
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
|
|
||||||
entry.uid);
|
|
||||||
}
|
|
||||||
freeColCmpr = true;
|
|
||||||
if (entry.colCmpr.nCols != pSchema->nCols) {
|
|
||||||
if (pNewSchema) taosMemoryFree(pNewSchema);
|
|
||||||
if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr);
|
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
|
||||||
if (pColumn == NULL) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (pColumn->colId == 0) {
|
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
bool hasPrimayKey = false;
|
|
||||||
if (pSchema->nCols >= 2) {
|
|
||||||
hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(&tScheam, pColumn, sizeof(SSchema));
|
|
||||||
pSchema->version++;
|
|
||||||
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
|
||||||
if (tlen) {
|
|
||||||
memmove(pColumn, pColumn + 1, tlen);
|
|
||||||
}
|
|
||||||
pSchema->nCols--;
|
|
||||||
|
|
||||||
--pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
|
||||||
int16_t cid = pColumn->colId;
|
|
||||||
|
|
||||||
if (tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey) != 0) {
|
|
||||||
metaError("vgId:%d, failed to drop ntable column:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
|
|
||||||
entry.uid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (updataTableColCmpr(&entry.colCmpr, &tScheam, 0, 0) != 0) {
|
|
||||||
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
|
|
||||||
entry.uid);
|
|
||||||
}
|
|
||||||
if (entry.colCmpr.nCols != pSchema->nCols) {
|
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
|
||||||
if (pColumn == NULL) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes >= pAlterTbReq->colModBytes) {
|
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (rowLen + pAlterTbReq->colModBytes - pColumn->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
|
||||||
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pSchema->version++;
|
|
||||||
pColumn->bytes = pAlterTbReq->colModBytes;
|
|
||||||
break;
|
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
|
|
||||||
if (pAlterTbReq->colNewName == NULL) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (pColumn == NULL) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
|
||||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pSchema->version++;
|
|
||||||
strcpy(pColumn->name, pAlterTbReq->colNewName);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
SSchema *pNewSchema = taosMemoryRealloc(pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols + 1));
|
||||||
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version);
|
if (NULL == pNewSchema) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__,
|
||||||
|
__LINE__, tstrerror(terrno), version);
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
|
pSchema->pSchema = pNewSchema;
|
||||||
|
pSchema->version++;
|
||||||
|
pSchema->nCols++;
|
||||||
|
pColumn = &pSchema->pSchema[pSchema->nCols - 1];
|
||||||
|
pColumn->bytes = pReq->bytes;
|
||||||
|
pColumn->type = pReq->type;
|
||||||
|
pColumn->flags = pReq->flags;
|
||||||
|
pColumn->colId = pEntry->ntbEntry.ncid++;
|
||||||
|
tstrncpy(pColumn->name, pReq->colName, TSDB_COL_NAME_LEN);
|
||||||
|
|
||||||
entry.version = version;
|
// do handle entry
|
||||||
|
code = metaHandleEntry2(pMeta, pEntry);
|
||||||
// do actual write
|
if (code) {
|
||||||
metaWLock(pMeta);
|
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
|
__func__, __FILE__, __LINE__, tstrerror(code), uid, pReq->tbName, version);
|
||||||
if (metaDeleteNcolIdx(pMeta, &oldEntry) < 0) {
|
} else {
|
||||||
metaError("vgId:%d, failed to delete ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->tbName, uid,
|
||||||
|
version);
|
||||||
}
|
}
|
||||||
|
metaFetchEntryFree(&pEntry);
|
||||||
|
|
||||||
if (metaUpdateNcolIdx(pMeta, &entry) < 0) {
|
if (metaUpdateMetaRsp(uid, pReq->tbName, pSchema, pRsp) < 0) {
|
||||||
metaError("vgId:%d, failed to update ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
}
|
__func__, __FILE__, __LINE__, tstrerror(code), uid, pReq->tbName, version);
|
||||||
|
|
||||||
// save to table db
|
|
||||||
if (metaSaveToTbDb(pMeta, &entry) < 0) {
|
|
||||||
metaError("vgId:%d, failed to save to tb db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaUpdateUidIdx(pMeta, &entry) < 0) {
|
|
||||||
metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaSaveToSkmDb(pMeta, &entry) < 0) {
|
|
||||||
metaError("vgId:%d, failed to save to skm db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) {
|
|
||||||
metaError("vgId:%d, failed to update change time:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
metaULock(pMeta);
|
|
||||||
|
|
||||||
if (metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp) < 0) {
|
|
||||||
metaError("vgId:%d, failed to update meta rsp:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
for (int32_t i = 0; i < entry.colCmpr.nCols; i++) {
|
for (int32_t i = 0; i < entry.colCmpr.nCols; i++) {
|
||||||
SColCmpr *p = &entry.colCmpr.pColCmpr[i];
|
SColCmpr *p = &entry.colCmpr.pColCmpr[i];
|
||||||
pMetaRsp->pSchemaExt[i].colId = p->id;
|
pMetaRsp->pSchemaExt[i].colId = p->id;
|
||||||
pMetaRsp->pSchemaExt[i].compress = p->alg;
|
pMetaRsp->pSchemaExt[i].compress = p->alg;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (entry.pBuf) taosMemoryFree(entry.pBuf);
|
TAOS_RETURN(code);
|
||||||
if (pNewSchema) taosMemoryFree(pNewSchema);
|
|
||||||
if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr);
|
|
||||||
|
|
||||||
tdbTbcClose(pTbDbc);
|
#if 0
|
||||||
tdbTbcClose(pUidIdxc);
|
SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1];
|
||||||
tDecoderClear(&dc);
|
uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type)
|
||||||
|
: pAlterTbReq->compress;
|
||||||
|
if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) {
|
||||||
|
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
|
||||||
|
}
|
||||||
|
freeColCmpr = true;
|
||||||
|
if (entry.colCmpr.nCols != pSchema->nCols) {
|
||||||
|
if (pNewSchema) taosMemoryFree(pNewSchema);
|
||||||
|
if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr);
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
if (entry.pBuf) taosMemoryFree(entry.pBuf);
|
|
||||||
tdbTbcClose(pTbDbc);
|
|
||||||
tdbTbcClose(pUidIdxc);
|
|
||||||
tDecoderClear(&dc);
|
|
||||||
|
|
||||||
return terrno != 0 ? terrno : TSDB_CODE_FAILED;
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue