diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 12e89277f4..5f1c4d93ce 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -383,9 +383,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.suid = pStb->uid; req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.schema.nCols = pStb->numOfColumns; - req.schema.sver = 0; + req.schema.sver = pStb->version; req.schema.pSchema = pStb->pColumns; req.schemaTag.nCols = pStb->numOfTags; + req.schemaTag.nCols = 0; req.schemaTag.pSchema = pStb->pTags; if (req.rollup) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9a36fc6eae..cb99a96923 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -77,6 +77,7 @@ int metaClose(SMeta* pMeta); int metaBegin(SMeta* pMeta); int metaCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); +int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index d666bd22c1..e5377d543a 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -131,6 +131,75 @@ _err: return -1; } +int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { + SMetaEntry oStbEntry = {0}; + SMetaEntry nStbEntry = {0}; + TDBC *pUidIdxc = NULL; + TDBC *pTbDbc = NULL; + const void *pData; + int nData; + int64_t oversion; + SDecoder dc = {0}; + int32_t ret; + int32_t c; + + tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c); + if (ret < 0 || c) { + ASSERT(0); + return -1; + } + + ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + if (ret < 0) { + ASSERT(0); + return -1; + } + + oversion = *(int64_t *)pData; + + tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + ret = tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c); + ASSERT(ret == 0 && c == 0); + + ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData); + ASSERT(ret == 0); + + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &oStbEntry); + + nStbEntry.version = version; + nStbEntry.type = TSDB_SUPER_TABLE; + nStbEntry.uid = pReq->suid; + nStbEntry.name = pReq->name; + nStbEntry.stbEntry.schema = pReq->schema; + nStbEntry.stbEntry.schemaTag = pReq->schemaTag; + + metaWLock(pMeta); + // compare two entry + if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) { + if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) { + metaSaveToSkmDb(pMeta, &nStbEntry); + } + } + + // if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) { + // // change tag schema + // } + + // update table.db + metaSaveToTbDb(pMeta, &nStbEntry); + + // update uid index + tdbDbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0); + + metaULock(pMeta); + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return 0; +} + int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { SMetaEntry me = {0}; SMetaReader mr = {0}; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 996d789e24..8fbd1e24e1 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -91,7 +91,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p // TODO set to real sversion *pUid = 0; - int32_t sversion = 0; + int32_t sversion = 1; if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index d180799e58..76d5c3cb3a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -465,7 +465,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { pTbData = (STbData *)pNode->pData; pCommitIter = pCommith->iters + i; - pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); // TODO: schema version + pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 1); // TODO: schema version if (pTSchema) { pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); @@ -912,7 +912,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { while (bidx < nBlocks) { if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) { // Set commit table - pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version + pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version if (!pTSchema) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b293f1399d..4e66477629 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -490,7 +490,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); - pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0); + pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 1); int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn); int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData; @@ -1618,7 +1618,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa if (pSchema1 == NULL) { // pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1)); // TODO: use the real schemaVersion - pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0); + pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1); } #ifdef TD_DEBUG_PRINT_ROW @@ -1637,7 +1637,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa if (pSchema2 == NULL) { // pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2)); // TODO: use the real schemaVersion - pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0); + pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1); } if (isRow2DataRow) { numOfColsOfRow2 = schemaNCols(pSchema2); @@ -2755,7 +2755,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int win->ekey = key; if (rv != TD_ROW_SVER(row)) { - pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); + pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 1); rv = TD_ROW_SVER(row); } numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, @@ -3889,7 +3889,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch // NOTE: not add ref count for super table SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true); + SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 1, true); // no tags and tbname condition, all child tables of this stable are involved if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index e878668654..1589513110 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -2084,7 +2084,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { // TODO: use the proper schema instead of 0, and cache STSchema in cache - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1); if (!pTSchema) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4f76bc5386..fe496d137f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,7 +16,7 @@ #include "vnd.h" static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -72,7 +72,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_STB: - if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_STB: if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -398,20 +398,32 @@ _exit: return rcode; } -static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { - // ASSERT(0); -#if 0 - SVCreateTbReq vAlterTbReq = {0}; - vTrace("vgId:%d, process alter stb req", TD_VID(pVnode)); - tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); - // TODO: to encapsule a free API - taosMemoryFree(vAlterTbReq.stbCfg.pSchema); - taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); - if (vAlterTbReq.stbCfg.pRSmaParam) { - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); +static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVCreateStbReq req = {0}; + SDecoder dc = {0}; + + pRsp->msgType = TDMT_VND_ALTER_STB_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; + + tDecoderInit(&dc, pReq, len); + + // decode req + if (tDecodeSVCreateStbReq(&dc, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; } - taosMemoryFree(vAlterTbReq.name); -#endif + + if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) { + pRsp->code = terrno; + tDecoderClear(&dc); + return -1; + } + + tDecoderClear(&dc); + return 0; } @@ -514,7 +526,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub if (pSchema) { taosMemoryFreeClear(pSchema); } - pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema + pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1); // TODO: use the real schema if (pSchema) { suid = msgIter->suid; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 95ab1500cc..d848fb05e8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3688,7 +3688,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* req.type = TD_NORMAL_TABLE; req.name = strdup(pStmt->tableName); req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols); - req.ntb.schema.sver = 0; + req.ntb.schema.sver = 1; req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema)); if (NULL == req.name || NULL == req.ntb.schema.pSchema) { destroyCreateTbReq(&req);