compress impl
This commit is contained in:
parent
9a6a533076
commit
e084ee9d22
|
@ -438,33 +438,38 @@ typedef struct {
|
||||||
} SIdxObj;
|
} SIdxObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
col_id_t colId;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
int32_t cmprAlg;
|
||||||
int64_t createdTime;
|
} SCmprObj;
|
||||||
int64_t updateTime;
|
typedef struct {
|
||||||
int64_t uid;
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t dbUid;
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int32_t tagVer;
|
int64_t createdTime;
|
||||||
int32_t colVer;
|
int64_t updateTime;
|
||||||
int32_t smaVer;
|
int64_t uid;
|
||||||
int32_t nextColId;
|
int64_t dbUid;
|
||||||
int64_t maxdelay[2];
|
int32_t tagVer;
|
||||||
int64_t watermark[2];
|
int32_t colVer;
|
||||||
int32_t ttl;
|
int32_t smaVer;
|
||||||
int32_t numOfColumns;
|
int32_t nextColId;
|
||||||
int32_t numOfTags;
|
int64_t maxdelay[2];
|
||||||
int32_t numOfFuncs;
|
int64_t watermark[2];
|
||||||
int32_t commentLen;
|
int32_t ttl;
|
||||||
int32_t ast1Len;
|
int32_t numOfColumns;
|
||||||
int32_t ast2Len;
|
int32_t numOfTags;
|
||||||
SArray* pFuncs;
|
int32_t numOfFuncs;
|
||||||
SSchema* pColumns;
|
int32_t commentLen;
|
||||||
SSchema* pTags;
|
int32_t ast1Len;
|
||||||
char* comment;
|
int32_t ast2Len;
|
||||||
char* pAst1;
|
SArray* pFuncs;
|
||||||
char* pAst2;
|
SSchema* pColumns;
|
||||||
SRWLatch lock;
|
SSchema* pTags;
|
||||||
int8_t source;
|
char* comment;
|
||||||
|
char* pAst1;
|
||||||
|
char* pAst2;
|
||||||
|
SRWLatch lock;
|
||||||
|
int8_t source;
|
||||||
|
SCmprObj* pCmpr;
|
||||||
} SStbObj;
|
} SStbObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -105,7 +105,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
|
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
|
||||||
pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE + taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
|
pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SCmprObj) + STB_RESERVE_SIZE +
|
||||||
|
taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto _OVER;
|
if (pRaw == NULL) goto _OVER;
|
||||||
|
|
||||||
|
@ -167,6 +168,13 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pStb->pCmpr != NULL) {
|
||||||
|
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||||
|
SCmprObj *p = &pStb->pCmpr[i];
|
||||||
|
SDB_SET_INT16(pRaw, dataPos, p->colId, _OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, p->cmprAlg, _OVER)
|
||||||
|
}
|
||||||
|
}
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||||
|
|
||||||
|
@ -273,6 +281,19 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
if (pStb->pAst2 == NULL) goto _OVER;
|
if (pStb->pAst2 == NULL) goto _OVER;
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SCmprObj));
|
||||||
|
if (sver < STB_VER_NUMBER) {
|
||||||
|
// compatible with old data, setup default compress value
|
||||||
|
// impl later
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||||
|
SCmprObj *pCmpr = &pStb->pCmpr[i];
|
||||||
|
SDB_GET_INT16(pRaw, dataPos, &pCmpr->colId, _OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pCmpr->cmprAlg, _OVER) // compatiable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -300,6 +321,7 @@ void mndFreeStb(SStbObj *pStb) {
|
||||||
taosMemoryFreeClear(pStb->comment);
|
taosMemoryFreeClear(pStb->comment);
|
||||||
taosMemoryFreeClear(pStb->pAst1);
|
taosMemoryFreeClear(pStb->pAst1);
|
||||||
taosMemoryFreeClear(pStb->pAst2);
|
taosMemoryFreeClear(pStb->pAst2);
|
||||||
|
taosMemoryFreeClear(pStb->pCmpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
||||||
|
@ -762,9 +784,9 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
pDst->createdTime = taosGetTimestampMs();
|
pDst->createdTime = taosGetTimestampMs();
|
||||||
pDst->updateTime = pDst->createdTime;
|
pDst->updateTime = pDst->createdTime;
|
||||||
pDst->uid =
|
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
|
||||||
(pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX)
|
? pCreate->suid
|
||||||
? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
: mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
pDst->dbUid = pDb->uid;
|
pDst->dbUid = pDb->uid;
|
||||||
pDst->tagVer = 1;
|
pDst->tagVer = 1;
|
||||||
pDst->colVer = 1;
|
pDst->colVer = 1;
|
||||||
|
@ -847,6 +869,12 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
pSchema->colId = pDst->nextColId;
|
pSchema->colId = pDst->nextColId;
|
||||||
pDst->nextColId++;
|
pDst->nextColId++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pDst->pCmpr = taosMemoryCalloc(1, sizeof(SCmprObj));
|
||||||
|
for (int32_t i = 0; i < pDst->numOfColumns; i++) {
|
||||||
|
SCmprObj *p = &pDst->pCmpr[i];
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) {
|
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *stbname, char *tagname) {
|
||||||
|
@ -1131,7 +1159,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
} else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) &&
|
||||||
|
(createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||||
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -1182,14 +1211,13 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
if(createReq.sql == NULL && createReq.sqlLen == 0){
|
if (createReq.sql == NULL && createReq.sqlLen == 0) {
|
||||||
char detail[1000] = {0};
|
char detail[1000] = {0};
|
||||||
|
|
||||||
sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname);
|
sprintf(detail, "dbname:%s, stable name:%s", name.dbname, name.tname);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
|
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail, strlen(detail));
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
|
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
|
||||||
}
|
}
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -1571,7 +1599,7 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
|
||||||
for (int32_t i = 0; i < pOld->numOfTags; ++i) {
|
for (int32_t i = 0; i < pOld->numOfTags; ++i) {
|
||||||
nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
|
nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nLen > TSDB_MAX_TAGS_LEN) {
|
if (nLen > TSDB_MAX_TAGS_LEN) {
|
||||||
terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
|
terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1934,7 +1962,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion* pStbVer, bool* schema, bool* sma) {
|
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
|
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
|
||||||
|
|
||||||
|
@ -1964,7 +1992,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion* pStbVer, bo
|
||||||
} else {
|
} else {
|
||||||
*schema = false;
|
*schema = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
|
if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
|
||||||
*sma = true;
|
*sma = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2748,8 +2776,8 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
|
||||||
pStbVersion->tversion = ntohl(pStbVersion->tversion);
|
pStbVersion->tversion = ntohl(pStbVersion->tversion);
|
||||||
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
|
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
|
||||||
|
|
||||||
bool schema = false;
|
bool schema = false;
|
||||||
bool sma = false;
|
bool sma = false;
|
||||||
int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
|
int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
|
|
Loading…
Reference in New Issue