diff --git a/source/dnode/vnode/src/meta/metaEntry2.c b/source/dnode/vnode/src/meta/metaEntry2.c index 384dd5abbd..943cb05fb6 100644 --- a/source/dnode/vnode/src/meta/metaEntry2.c +++ b/source/dnode/vnode/src/meta/metaEntry2.c @@ -109,7 +109,7 @@ int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry) { return code; } -static int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry) { +int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry) { int32_t code = TSDB_CODE_SUCCESS; void *value = NULL; int32_t valueSize = 0; @@ -1309,6 +1309,7 @@ static int32_t metaHandleNormalTableUpdate(SMeta *pMeta, const SMetaEntry *pEntr tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, pEntry->uid, pEntry->ntbEntry.schemaRow.version); } metaTimeSeriesNotifyCheck(pMeta); + metaFetchEntryFree(&pOldEntry); return code; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 81936ed2a4..7ecfa0fc12 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -43,23 +43,21 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); -static int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) { +int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) { int32_t nCols = pWp->nCols; int32_t ver = pWp->version; if (add) { - SColCmpr *p = taosMemoryCalloc(1, sizeof(SColCmpr) * (nCols + 1)); + SColCmpr *p = taosMemoryRealloc(pWp->pColCmpr, sizeof(SColCmpr) * (nCols + 1)); if (p == NULL) { return terrno; } - - memcpy(p, pWp->pColCmpr, sizeof(SColCmpr) * nCols); + pWp->pColCmpr = p; SColCmpr *pCol = p + nCols; pCol->id = pSchema->colId; pCol->alg = compress; pWp->nCols = nCols + 1; pWp->version = ver; - pWp->pColCmpr = p; } else { for (int32_t i = 0; i < nCols; i++) { SColCmpr *pOCmpr = &pWp->pColCmpr[i]; @@ -2989,8 +2987,9 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: return metaAddTableColumn(pMeta, version, pReq, pMetaRsp); - case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: case TSDB_ALTER_TABLE_DROP_COLUMN: + // return metaDropTableColumn(pMeta, version, pReq, pMetaRsp); + case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: return metaAlterTableColumn(pMeta, version, pReq, pMetaRsp); diff --git a/source/dnode/vnode/src/meta/metaTable2.c b/source/dnode/vnode/src/meta/metaTable2.c index 83f9a4b3d2..d31d7b688a 100644 --- a/source/dnode/vnode/src/meta/metaTable2.c +++ b/source/dnode/vnode/src/meta/metaTable2.c @@ -18,7 +18,9 @@ extern int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry); extern int32_t metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp); extern int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry); +extern int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry); extern void metaFetchEntryFree(SMetaEntry **ppEntry); +extern int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress); static int32_t metaCheckCreateSuperTableReq(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int32_t vgId = TD_VID(pMeta->pVnode); @@ -549,10 +551,9 @@ int32_t metaDropTable2(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { TAOS_RETURN(code); } -int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { - int32_t code = TSDB_CODE_SUCCESS; +static int32_t metaCheckAlterTableColumnReq(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { + int32_t code = 0; - // check request if (NULL == pReq->colName || strlen(pReq->colName) == 0) { metaError("vgId:%d, %s failed at %s:%d since invalid column name:%s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->colName, version); @@ -595,19 +596,29 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST __FILE__, __LINE__, tstrerror(code), version, pReq->tbName); TAOS_RETURN(code); } + TAOS_RETURN(code); +} + +int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + + // check request + code = metaCheckAlterTableColumnReq(pMeta, version, pReq); + if (code) { + TAOS_RETURN(code); + } // fetch old entry SMetaEntry *pEntry = NULL; - code = metaFetchEntryByUid(pMeta, uid, &pEntry); + code = metaFetchEntryByName(pMeta, pReq->tbName, &pEntry); if (code) { - metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " not found, version:%" PRId64, - TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, version); + metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, + __FILE__, __LINE__, pReq->tbName, version); TAOS_RETURN(code); } if (pEntry->version >= version) { - metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " version %" PRId64 - " is not less than %" PRId64, - TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, pEntry->version, version); + metaError("vgId:%d, %s failed at %s:%d since table %s version %" PRId64 " is not less than %" PRId64, + TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, pEntry->version, version); metaFetchEntryFree(&pEntry); TAOS_RETURN(TSDB_CODE_INVALID_PARA); } @@ -651,55 +662,301 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST pColumn->flags = pReq->flags; pColumn->colId = pEntry->ntbEntry.ncid++; tstrncpy(pColumn->name, pReq->colName, TSDB_COL_NAME_LEN); + uint32_t compress = createDefaultColCmprByType(pColumn->type); + code = updataTableColCmpr(&pEntry->colCmpr, pColumn, 1, compress); + if (code) { + metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__, + __LINE__, tstrerror(code), version); + metaFetchEntryFree(&pEntry); + TAOS_RETURN(code); + } // do handle entry code = metaHandleEntry2(pMeta, pEntry); if (code) { metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode), - __func__, __FILE__, __LINE__, tstrerror(code), uid, pReq->tbName, version); + __func__, __FILE__, __LINE__, tstrerror(code), pEntry->uid, pReq->tbName, version); } else { - metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->tbName, uid, - version); + metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->tbName, + pEntry->uid, version); } - metaFetchEntryFree(&pEntry); - if (metaUpdateMetaRsp(uid, pReq->tbName, pSchema, pRsp) < 0) { + if (metaUpdateMetaRsp(pEntry->uid, pReq->tbName, pSchema, pRsp) < 0) { metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode), - __func__, __FILE__, __LINE__, tstrerror(code), uid, pReq->tbName, version); + __func__, __FILE__, __LINE__, tstrerror(code), pEntry->uid, pReq->tbName, version); + } else { + for (int32_t i = 0; i < pEntry->colCmpr.nCols; i++) { + SColCmpr *p = &pEntry->colCmpr.pColCmpr[i]; + pRsp->pSchemaExt[i].colId = p->id; + pRsp->pSchemaExt[i].compress = p->alg; + } } + + metaFetchEntryFree(&pEntry); + TAOS_RETURN(code); +} + +int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + + code = metaCheckAlterTableColumnReq(pMeta, version, pReq); + if (code) { + TAOS_RETURN(code); + } + + // TODO + TAOS_RETURN(code); #if 0 + void *pVal = NULL; + int nVal = 0; + const void *pData = NULL; + int nData = 0; + int ret = 0; + tb_uid_t uid; + int64_t oversion; + SSchema *pColumn = NULL; + SMetaEntry entry = {0}; + SSchemaWrapper *pSchema; + int c; + bool freeColCmpr = false; + + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schemaRow; + + // save old entry + SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; + oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; + + int32_t rowLen = -1; + if (pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN || + pAlterTbReq->action == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES) { + rowLen = 0; + } + + int32_t iCol = 0, jCol = 0; + SSchema *qColumn = NULL; + for (;;) { + qColumn = NULL; + + if (jCol >= pSchema->nCols) break; + qColumn = &pSchema->pSchema[jCol]; + + if (!pColumn && (strcmp(qColumn->name, pAlterTbReq->colName) == 0)) { + pColumn = qColumn; + iCol = jCol; + if (rowLen < 0) break; + } + rowLen += qColumn->bytes; + ++jCol; + } + + entry.version = version; + int tlen; + SSchema *pNewSchema = NULL; + SSchema tScheam; + switch (pAlterTbReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: + if (pColumn) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { + goto _err; + } + if (rowLen + pAlterTbReq->bytes > TSDB_MAX_BYTES_PER_ROW) { + terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH; + goto _err; + } + pSchema->version++; + pSchema->nCols++; + pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols); + if (pNewSchema == NULL) { + goto _err; + } + memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1)); + pSchema->pSchema = pNewSchema; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++; + tstrncpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName, + strlen(pAlterTbReq->colName) + 1); + + ++pMeta->pVnode->config.vndStats.numOfNTimeSeries; + metaTimeSeriesNotifyCheck(pMeta); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId; + int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type; + int32_t ret = tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + if (ret < 0) { + terrno = ret; + goto _err; + } + } + SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1]; + uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type) + : pAlterTbReq->compress; + if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) { + metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, + entry.uid); + } + freeColCmpr = true; + if (entry.colCmpr.nCols != pSchema->nCols) { + if (pNewSchema) taosMemoryFree(pNewSchema); + if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr); + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: +if (pColumn == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + if (pColumn->colId == 0) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) { + terrno = TSDB_CODE_VND_COL_SUBSCRIBED; + goto _err; + } + bool hasPrimayKey = false; + if (pSchema->nCols >= 2) { + hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false; + } + + memcpy(&tScheam, pColumn, sizeof(SSchema)); + pSchema->version++; + tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); + if (tlen) { + memmove(pColumn, pColumn + 1, tlen); + } + pSchema->nCols--; + + --pMeta->pVnode->config.vndStats.numOfNTimeSeries; + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pColumn->colId; + + if (tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey) != 0) { + metaError("vgId:%d, failed to drop ntable column:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, + entry.uid); + } + } + + if (updataTableColCmpr(&entry.colCmpr, &tScheam, 0, 0) != 0) { + metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, + entry.uid); + } + if (entry.colCmpr.nCols != pSchema->nCols) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes >= pAlterTbReq->colModBytes) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + if (rowLen + pAlterTbReq->colModBytes - pColumn->bytes > TSDB_MAX_BYTES_PER_ROW) { + terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH; + goto _err; + } + if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) { + terrno = TSDB_CODE_VND_COL_SUBSCRIBED; + goto _err; + } + pSchema->version++; + pColumn->bytes = pAlterTbReq->colModBytes; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (pAlterTbReq->colNewName == NULL) { + terrno = TSDB_CODE_INVALID_MSG; + goto _err; + } + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) { + terrno = TSDB_CODE_VND_COL_SUBSCRIBED; + goto _err; + } + pSchema->version++; + tstrncpy(pColumn->name, pAlterTbReq->colNewName, strlen(pAlterTbReq->colNewName) + 1); + break; + } + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version); + } + + entry.version = version; + + // do actual write + metaWLock(pMeta); + + if (metaDeleteNcolIdx(pMeta, &oldEntry) < 0) { + metaError("vgId:%d, failed to delete ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaUpdateNcolIdx(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to update ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + // save to table db + if (metaSaveToTbDb(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to save to tb db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaUpdateUidIdx(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaSaveToSkmDb(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to save to skm db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) { + metaError("vgId:%d, failed to update change time:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + metaULock(pMeta); + + if (metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp) < 0) { + metaError("vgId:%d, failed to update meta rsp:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } for (int32_t i = 0; i < entry.colCmpr.nCols; i++) { SColCmpr *p = &entry.colCmpr.pColCmpr[i]; pMetaRsp->pSchemaExt[i].colId = p->id; pMetaRsp->pSchemaExt[i].compress = p->alg; } + + if (entry.pBuf) taosMemoryFree(entry.pBuf); + if (pNewSchema) taosMemoryFree(pNewSchema); + if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr); + + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + tDecoderClear(&dc); + + return 0; + +_err: + if (entry.pBuf) taosMemoryFree(entry.pBuf); + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + tDecoderClear(&dc); + + return terrno != 0 ? terrno : TSDB_CODE_FAILED; #endif - - TAOS_RETURN(code); - -#if 0 - SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1]; - uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type) - : pAlterTbReq->compress; - if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) { - metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); - } - freeColCmpr = true; - if (entry.colCmpr.nCols != pSchema->nCols) { - if (pNewSchema) taosMemoryFree(pNewSchema); - if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr); - terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; - goto _err; - } - break; - -#endif -} - -int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { - int32_t code = TSDB_CODE_SUCCESS; - // TODO - return code; } int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) {