From e084ee9d22b64f8d02e020bd0511445d700367cf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 1 Mar 2024 06:19:37 +0000 Subject: [PATCH] compress impl --- source/dnode/mnode/impl/inc/mndDef.h | 59 +++++++++++++++------------- source/dnode/mnode/impl/src/mndStb.c | 54 +++++++++++++++++++------ 2 files changed, 73 insertions(+), 40 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4a092057ce..15c0fd5bca 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -438,33 +438,38 @@ typedef struct { } SIdxObj; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createdTime; - int64_t updateTime; - int64_t uid; - int64_t dbUid; - int32_t tagVer; - int32_t colVer; - int32_t smaVer; - int32_t nextColId; - int64_t maxdelay[2]; - int64_t watermark[2]; - int32_t ttl; - int32_t numOfColumns; - int32_t numOfTags; - int32_t numOfFuncs; - int32_t commentLen; - int32_t ast1Len; - int32_t ast2Len; - SArray* pFuncs; - SSchema* pColumns; - SSchema* pTags; - char* comment; - char* pAst1; - char* pAst2; - SRWLatch lock; - int8_t source; + col_id_t colId; + int32_t cmprAlg; +} SCmprObj; +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createdTime; + int64_t updateTime; + int64_t uid; + int64_t dbUid; + int32_t tagVer; + int32_t colVer; + int32_t smaVer; + int32_t nextColId; + int64_t maxdelay[2]; + int64_t watermark[2]; + int32_t ttl; + int32_t numOfColumns; + int32_t numOfTags; + int32_t numOfFuncs; + int32_t commentLen; + int32_t ast1Len; + int32_t ast2Len; + SArray* pFuncs; + SSchema* pColumns; + SSchema* pTags; + char* comment; + char* pAst1; + char* pAst2; + SRWLatch lock; + int8_t source; + SCmprObj* pCmpr; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7ee1b36916..eeda3ef548 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -105,7 +105,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen + - pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE + taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN; + pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SCmprObj) + STB_RESERVE_SIZE + + taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN; SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size); if (pRaw == NULL) goto _OVER; @@ -167,6 +168,13 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER) } + if (pStb->pCmpr != NULL) { + for (int i = 0; i < pStb->numOfColumns; i++) { + SCmprObj *p = &pStb->pCmpr[i]; + SDB_SET_INT16(pRaw, dataPos, p->colId, _OVER) + SDB_SET_INT32(pRaw, dataPos, p->cmprAlg, _OVER) + } + } SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -273,6 +281,19 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { if (pStb->pAst2 == NULL) goto _OVER; SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER) } + + pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SCmprObj)); + if (sver < STB_VER_NUMBER) { + // compatible with old data, setup default compress value + // impl later + } else { + for (int i = 0; i < pStb->numOfColumns; i++) { + SCmprObj *pCmpr = &pStb->pCmpr[i]; + SDB_GET_INT16(pRaw, dataPos, &pCmpr->colId, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pCmpr->cmprAlg, _OVER) // compatiable + } + } + SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER) terrno = 0; @@ -300,6 +321,7 @@ void mndFreeStb(SStbObj *pStb) { taosMemoryFreeClear(pStb->comment); taosMemoryFreeClear(pStb->pAst1); taosMemoryFreeClear(pStb->pAst2); + taosMemoryFreeClear(pStb->pCmpr); } static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { @@ -762,9 +784,9 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN); pDst->createdTime = taosGetTimestampMs(); pDst->updateTime = pDst->createdTime; - pDst->uid = - (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX) - ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX) + ? pCreate->suid + : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->tagVer = 1; pDst->colVer = 1; @@ -847,6 +869,12 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pSchema->colId = pDst->nextColId; pDst->nextColId++; } + + + pDst->pCmpr = taosMemoryCalloc(1, sizeof(SCmprObj)); + for (int32_t i = 0; i < pDst->numOfColumns; i++) { + SCmprObj *p = &pDst->pCmpr[i]; + } return 0; } static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) { @@ -1131,7 +1159,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { goto _OVER; - } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) { + } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && + (createReq.tagVer != 1 || createReq.colVer != 1)) { mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); code = 0; goto _OVER; @@ -1182,14 +1211,13 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - if(createReq.sql == NULL && createReq.sqlLen == 0){ + if (createReq.sql == NULL && createReq.sqlLen == 0) { char detail[1000] = {0}; sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname); auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail)); - } - else{ + } else { auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen); } _OVER: @@ -1571,7 +1599,7 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj for (int32_t i = 0; i < pOld->numOfTags; ++i) { nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes; } - + if (nLen > TSDB_MAX_TAGS_LEN) { terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH; return -1; @@ -1934,7 +1962,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, return 0; } -static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion* pStbVer, bool* schema, bool* sma) { +static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) { char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName); @@ -1964,7 +1992,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion* pStbVer, bo } else { *schema = false; } - + if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) { *sma = true; } else { @@ -2748,8 +2776,8 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t pStbVersion->tversion = ntohl(pStbVersion->tversion); pStbVersion->smaVer = ntohl(pStbVersion->smaVer); - bool schema = false; - bool sma = false; + bool schema = false; + bool sma = false; int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma); if (TSDB_CODE_SUCCESS != code) { STableMetaRsp metaRsp = {0};