From 982c0665f53acf60a0ace08924749efbda0e218e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 11 Dec 2024 18:47:47 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/meta/metaTable.c | 10 +- source/dnode/vnode/src/meta/metaTable2.c | 352 +++++++++++++++++++++++ 2 files changed, 360 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 9e8e02ff39..0b4c19a687 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -17,9 +17,15 @@ extern SDmNotifyHandle dmNotifyHdl; -int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); +int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp); +int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp); +int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp); +int32_t metaAlterTableColumnBytes(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp); +int32_t metaAlterTableColumnCompressOption(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp); -int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); +int32_t metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); + +int32_t metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); diff --git a/source/dnode/vnode/src/meta/metaTable2.c b/source/dnode/vnode/src/meta/metaTable2.c index 1798a26a1d..8c936d1b44 100644 --- a/source/dnode/vnode/src/meta/metaTable2.c +++ b/source/dnode/vnode/src/meta/metaTable2.c @@ -546,3 +546,355 @@ int32_t metaDropTable2(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { } TAOS_RETURN(code); } + +int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + + // check request + if (NULL == pReq->colName || strlen(pReq->colName) == 0) { + metaError("vgId:%d, %s failed at %s:%d since invalid column name:%s, version:%" PRId64, TD_VID(pMeta->pVnode), + __func__, __FILE__, __LINE__, pReq->colName, version); + TAOS_RETURN(TSDB_CODE_INVALID_MSG); + } + + void *value = NULL; + int32_t valueSize = 0; + code = tdbTbGet(pMeta->pNameIdx, pReq->tbName, strlen(pReq->tbName) + 1, &value, &valueSize); + if (code) { + metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, + __FILE__, __LINE__, pReq->tbName, version); + code = TSDB_CODE_TDB_TABLE_NOT_EXIST; + TAOS_RETURN(code); + } + + int64_t uid = *(int64_t *)value; + tdbFreeClear(value); + + // TODO + return code; + +#if 0 + void *pVal = NULL; + int nVal = 0; + const void *pData = NULL; + int nData = 0; + int ret = 0; + tb_uid_t uid; + int64_t oversion; + SSchema *pColumn = NULL; + SMetaEntry entry = {0}; + SSchemaWrapper *pSchema; + int c; + bool freeColCmpr = false; + + // search uid index + TBC *pUidIdxc = NULL; + + TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL)); + ret = tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); + if (c != 0) { + tdbTbcClose(pUidIdxc); + metaError("meta/table: invalide c: %" PRId32 " alt tb column failed.", c); + return TSDB_CODE_FAILED; + } + + ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + oversion = ((SUidIdxVal *)pData)[0].version; + + // search table.db + TBC *pTbDbc = NULL; + + TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL)); + ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); + if (c != 0) { + tdbTbcClose(pUidIdxc); + tdbTbcClose(pTbDbc); + metaError("meta/table: invalide c: %" PRId32 " alt tb column failed.", c); + return TSDB_CODE_FAILED; + } + + ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); + + // get table entry + SDecoder dc = {0}; + if ((entry.pBuf = taosMemoryMalloc(nData)) == NULL) { + tdbTbcClose(pUidIdxc); + tdbTbcClose(pTbDbc); + return terrno; + } + memcpy(entry.pBuf, pData, nData); + tDecoderInit(&dc, entry.pBuf, nData); + ret = metaDecodeEntry(&dc, &entry); + if (ret != 0) { + tdbTbcClose(pUidIdxc); + tdbTbcClose(pTbDbc); + tDecoderClear(&dc); + metaError("meta/table: invalide ret: %" PRId32 " alt tb column failed.", ret); + return ret; + } + + if (entry.type != TSDB_NORMAL_TABLE) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schemaRow; + + // save old entry + SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; + oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; + + int32_t rowLen = -1; + if (pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN || + pAlterTbReq->action == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES) { + rowLen = 0; + } + + int32_t iCol = 0, jCol = 0; + SSchema *qColumn = NULL; + for (;;) { + qColumn = NULL; + + if (jCol >= pSchema->nCols) break; + qColumn = &pSchema->pSchema[jCol]; + + if (!pColumn && (strcmp(qColumn->name, pAlterTbReq->colName) == 0)) { + pColumn = qColumn; + iCol = jCol; + if (rowLen < 0) break; + } + rowLen += qColumn->bytes; + ++jCol; + } + + entry.version = version; + int tlen; + SSchema *pNewSchema = NULL; + SSchema tScheam; + switch (pAlterTbReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: + if (pColumn) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { + goto _err; + } + if (rowLen + pAlterTbReq->bytes > TSDB_MAX_BYTES_PER_ROW) { + terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH; + goto _err; + } + pSchema->version++; + pSchema->nCols++; + pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols); + if (pNewSchema == NULL) { + goto _err; + } + memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1)); + pSchema->pSchema = pNewSchema; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags; + pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++; + strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName); + + ++pMeta->pVnode->config.vndStats.numOfNTimeSeries; + metaTimeSeriesNotifyCheck(pMeta); + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId; + int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type; + int32_t ret = tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); + if (ret < 0) { + terrno = ret; + goto _err; + } + } + SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1]; + uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type) + : pAlterTbReq->compress; + if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) { + metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, + entry.uid); + } + freeColCmpr = true; + if (entry.colCmpr.nCols != pSchema->nCols) { + if (pNewSchema) taosMemoryFree(pNewSchema); + if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr); + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + if (pColumn->colId == 0) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) { + terrno = TSDB_CODE_VND_COL_SUBSCRIBED; + goto _err; + } + bool hasPrimayKey = false; + if (pSchema->nCols >= 2) { + hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false; + } + + memcpy(&tScheam, pColumn, sizeof(SSchema)); + pSchema->version++; + tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); + if (tlen) { + memmove(pColumn, pColumn + 1, tlen); + } + pSchema->nCols--; + + --pMeta->pVnode->config.vndStats.numOfNTimeSeries; + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + int16_t cid = pColumn->colId; + + if (tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey) != 0) { + metaError("vgId:%d, failed to drop ntable column:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, + entry.uid); + } + } + + if (updataTableColCmpr(&entry.colCmpr, &tScheam, 0, 0) != 0) { + metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, + entry.uid); + } + if (entry.colCmpr.nCols != pSchema->nCols) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes >= pAlterTbReq->colModBytes) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + if (rowLen + pAlterTbReq->colModBytes - pColumn->bytes > TSDB_MAX_BYTES_PER_ROW) { + terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH; + goto _err; + } + if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) { + terrno = TSDB_CODE_VND_COL_SUBSCRIBED; + goto _err; + } + pSchema->version++; + pColumn->bytes = pAlterTbReq->colModBytes; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (pAlterTbReq->colNewName == NULL) { + terrno = TSDB_CODE_INVALID_MSG; + goto _err; + } + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) { + terrno = TSDB_CODE_VND_COL_SUBSCRIBED; + goto _err; + } + pSchema->version++; + strcpy(pColumn->name, pAlterTbReq->colNewName); + break; + } + + if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { + tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version); + } + + entry.version = version; + + // do actual write + metaWLock(pMeta); + + if (metaDeleteNcolIdx(pMeta, &oldEntry) < 0) { + metaError("vgId:%d, failed to delete ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaUpdateNcolIdx(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to update ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + // save to table db + if (metaSaveToTbDb(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to save to tb db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaUpdateUidIdx(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaSaveToSkmDb(pMeta, &entry) < 0) { + metaError("vgId:%d, failed to save to skm db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) { + metaError("vgId:%d, failed to update change time:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + + metaULock(pMeta); + + if (metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp) < 0) { + metaError("vgId:%d, failed to update meta rsp:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid); + } + for (int32_t i = 0; i < entry.colCmpr.nCols; i++) { + SColCmpr *p = &entry.colCmpr.pColCmpr[i]; + pMetaRsp->pSchemaExt[i].colId = p->id; + pMetaRsp->pSchemaExt[i].compress = p->alg; + } + + if (entry.pBuf) taosMemoryFree(entry.pBuf); + if (pNewSchema) taosMemoryFree(pNewSchema); + if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr); + + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + tDecoderClear(&dc); + + return 0; + +_err: + if (entry.pBuf) taosMemoryFree(entry.pBuf); + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + tDecoderClear(&dc); + + return terrno != 0 ? terrno : TSDB_CODE_FAILED; +#endif +} + +int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + // TODO + return code; +} + +int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + // TODO + return code; +} + +int32_t metaAlterTableColumnBytes(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + // TODO + return code; +} + +int32_t metaAlterTableColumnCompressOption(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) { + int32_t code = TSDB_CODE_SUCCESS; + // TODO + return code; +} \ No newline at end of file