refactor code
This commit is contained in:
parent
7aaef32304
commit
0bb9e1195d
|
@ -439,7 +439,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
col_id_t colId;
|
col_id_t colId;
|
||||||
int32_t cmprAlg;
|
int32_t cmprAlg;
|
||||||
} SCmprObj;
|
} SCmprObj;
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
|
@ -469,7 +469,7 @@ typedef struct {
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t source;
|
int8_t source;
|
||||||
SCmprObj* pCmpr;
|
SColCmpr* pCmpr;
|
||||||
} SStbObj;
|
} SStbObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -105,7 +105,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
|
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + pStb->commentLen +
|
||||||
pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SCmprObj) + STB_RESERVE_SIZE +
|
pStb->ast1Len + pStb->ast2Len + pStb->numOfColumns * sizeof(SColCmpr) + STB_RESERVE_SIZE +
|
||||||
taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
|
taosArrayGetSize(pStb->pFuncs) * TSDB_FUNC_NAME_LEN;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto _OVER;
|
if (pRaw == NULL) goto _OVER;
|
||||||
|
@ -170,9 +170,9 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
|
|
||||||
if (pStb->pCmpr != NULL) {
|
if (pStb->pCmpr != NULL) {
|
||||||
for (int i = 0; i < pStb->numOfColumns; i++) {
|
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||||
SCmprObj *p = &pStb->pCmpr[i];
|
SColCmpr *p = &pStb->pCmpr[i];
|
||||||
SDB_SET_INT16(pRaw, dataPos, p->colId, _OVER)
|
SDB_SET_INT16(pRaw, dataPos, p->id, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, p->cmprAlg, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, p->alg, _OVER)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
|
||||||
|
@ -282,20 +282,20 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
|
||||||
}
|
}
|
||||||
|
|
||||||
pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SCmprObj));
|
pStb->pCmpr = taosMemoryCalloc(pStb->numOfColumns, sizeof(SColCmpr));
|
||||||
if (sver < STB_VER_NUMBER) {
|
if (sver < STB_VER_NUMBER) {
|
||||||
// compatible with old data, setup default compress value
|
// compatible with old data, setup default compress value
|
||||||
// impl later
|
// impl later
|
||||||
for (int i = 0; i < pStb->numOfColumns; i++) {
|
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||||
SCmprObj *pCmpr = &pStb->pCmpr[i];
|
SColCmpr *pCmpr = &pStb->pCmpr[i];
|
||||||
pCmpr->colId = 0;
|
pCmpr->id = 0;
|
||||||
pCmpr->cmprAlg = 0;
|
pCmpr->alg = 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < pStb->numOfColumns; i++) {
|
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||||
SCmprObj *pCmpr = &pStb->pCmpr[i];
|
SColCmpr *pCmpr = &pStb->pCmpr[i];
|
||||||
SDB_GET_INT16(pRaw, dataPos, &pCmpr->colId, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &pCmpr->id, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pCmpr->cmprAlg, _OVER) // compatiable
|
SDB_GET_INT32(pRaw, dataPos, (int32_t *)&pCmpr->alg, _OVER) // compatiable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,7 +433,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
|
memcpy(pOld->pAst2, pNew->pAst2, pNew->ast2Len);
|
||||||
pOld->ast2Len = pNew->ast2Len;
|
pOld->ast2Len = pNew->ast2Len;
|
||||||
}
|
}
|
||||||
memcpy(pOld->pCmpr, pNew->pCmpr, pOld->numOfColumns * sizeof(SCmprObj));
|
memcpy(pOld->pCmpr, pNew->pCmpr, pOld->numOfColumns * sizeof(SColCmpr));
|
||||||
|
|
||||||
taosWUnLockLatch(&pOld->lock);
|
taosWUnLockLatch(&pOld->lock);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -505,8 +505,8 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
|
||||||
req.colCmpr.pColCmpr = taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
|
req.colCmpr.pColCmpr = taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
|
||||||
for (int32_t i = 0; i < pStb->numOfColumns; i++) {
|
for (int32_t i = 0; i < pStb->numOfColumns; i++) {
|
||||||
SColCmpr *p = &pCmpr->pColCmpr[i];
|
SColCmpr *p = &pCmpr->pColCmpr[i];
|
||||||
p->alg = pStb->pCmpr[i].cmprAlg;
|
p->alg = pStb->pCmpr[i].alg;
|
||||||
p->id = pStb->pCmpr[i].colId;
|
p->id = pStb->pCmpr[i].id;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.rollup) {
|
if (req.rollup) {
|
||||||
|
@ -897,9 +897,9 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
|
SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i);
|
||||||
SSchema *pSchema = &pDst->pColumns[i];
|
SSchema *pSchema = &pDst->pColumns[i];
|
||||||
|
|
||||||
SCmprObj *pColCmpr = &pDst->pCmpr[i];
|
SColCmpr *pColCmpr = &pDst->pCmpr[i];
|
||||||
pColCmpr->colId = pSchema->colId;
|
pColCmpr->id = pSchema->colId;
|
||||||
pColCmpr->cmprAlg = pField->compress;
|
pColCmpr->alg = pField->compress;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1282,7 +1282,7 @@ int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
|
||||||
pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
|
pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
|
||||||
pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
|
pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
|
||||||
pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
|
pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
|
||||||
if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr) {
|
if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1658,11 +1658,15 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, char *colName,
|
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pField,
|
||||||
SColCmpr *pColCmpr) {
|
int32_t nCols) {
|
||||||
if (pColCmpr == NULL || colName == NULL) return -1;
|
// if (pColCmpr == NULL || colName == NULL) return -1;
|
||||||
|
|
||||||
|
ASSERT(taosArrayGetSize(pField) == nCols);
|
||||||
|
TAOS_FIELD *p = taosArrayGet(pField, 0);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t idx = mndFindSuperTableColumnIndex(pOld, colName);
|
int32_t idx = mndFindSuperTableColumnIndex(pOld, p->name);
|
||||||
if (idx == -1) {
|
if (idx == -1) {
|
||||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1675,15 +1679,27 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
|
||||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SSchema *pCols = pNew->pColumns + idx;
|
DEFINE_VAR((uint32_t)(p->bytes));
|
||||||
|
|
||||||
int8_t updated = 0;
|
int8_t updated = 0;
|
||||||
for (int i = 0; i < pNew->numOfColumns; i++) {
|
for (int i = 0; i < pNew->numOfColumns; i++) {
|
||||||
SCmprObj *p = &pNew->pCmpr[i];
|
SColCmpr *pCmpr = &pNew->pCmpr[i];
|
||||||
if (p->colId == colId) {
|
if (pCmpr->id == colId) {
|
||||||
if (p->cmprAlg != pColCmpr->alg) {
|
uint32_t cmprAlg = pCmpr->alg;
|
||||||
p->cmprAlg = pColCmpr->alg;
|
uint8_t tl1 = COMPRESS_L1_TYPE_U32(cmprAlg);
|
||||||
|
uint8_t tl2 = COMPRESS_L2_TYPE_U32(cmprAlg);
|
||||||
|
uint8_t tlvl = COMPRESS_L2_TYPE_LEVEL_U32(cmprAlg);
|
||||||
|
if (l1 != 0) {
|
||||||
|
updated = 1;
|
||||||
|
setColCompressByOption((uint32_t *)&cmprAlg, l1, tl2, tlvl);
|
||||||
|
} else if (l2 != 0) {
|
||||||
|
updated = 1;
|
||||||
|
setColCompressByOption((uint32_t *)&cmprAlg, tl1, l2, tlvl);
|
||||||
|
} else if (lvl != 0) {
|
||||||
|
updated = 1;
|
||||||
|
setColCompressByOption((uint32_t *)&cmprAlg, tl1, tl2, lvl);
|
||||||
}
|
}
|
||||||
|
if (updated == 1) pCmpr->alg = cmprAlg;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1976,10 +1992,10 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
|
||||||
pSchema->bytes = pSrcSchema->bytes;
|
pSchema->bytes = pSrcSchema->bytes;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < pStb->numOfColumns; i++) {
|
for (int32_t i = 0; i < pStb->numOfColumns; i++) {
|
||||||
SCmprObj *pCmpr = &pStb->pCmpr[i];
|
SColCmpr *pCmpr = &pStb->pCmpr[i];
|
||||||
SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
|
SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
|
||||||
pSchEx->colId = pCmpr->colId;
|
pSchEx->colId = pCmpr->id;
|
||||||
pSchEx->compress = pCmpr->cmprAlg;
|
pSchEx->compress = pCmpr->alg;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pStb->lock);
|
taosRUnLockLatch(&pStb->lock);
|
||||||
|
@ -2039,11 +2055,11 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
|
||||||
|
|
||||||
pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
|
pRsp->pSchemaExt = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchemaExt));
|
||||||
for (int32_t i = 0; i < pStb->numOfColumns; i++) {
|
for (int32_t i = 0; i < pStb->numOfColumns; i++) {
|
||||||
SCmprObj *pCmpr = &pStb->pCmpr[i];
|
SColCmpr *pCmpr = &pStb->pCmpr[i];
|
||||||
|
|
||||||
SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
|
SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
|
||||||
pSchExt->colId = pCmpr->colId;
|
pSchExt->colId = pCmpr->id;
|
||||||
pSchExt->compress = pCmpr->cmprAlg;
|
pSchExt->compress = pCmpr->alg;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pStb->lock);
|
taosRUnLockLatch(&pStb->lock);
|
||||||
|
@ -2384,7 +2400,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
||||||
code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
|
code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
|
||||||
code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, 0, NULL);
|
code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
needRsp = false;
|
needRsp = false;
|
||||||
|
@ -2402,6 +2418,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
||||||
_OVER:
|
_OVER:
|
||||||
taosMemoryFreeClear(stbObj.pTags);
|
taosMemoryFreeClear(stbObj.pTags);
|
||||||
taosMemoryFreeClear(stbObj.pColumns);
|
taosMemoryFreeClear(stbObj.pColumns);
|
||||||
|
taosMemoryFreeClear(stbObj.pCmpr);
|
||||||
if (pAlter->commentLen > 0) {
|
if (pAlter->commentLen > 0) {
|
||||||
taosMemoryFreeClear(stbObj.comment);
|
taosMemoryFreeClear(stbObj.comment);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue