rollup sma grammar integrate
This commit is contained in:
parent
34f5f3fbfe
commit
37245334d5
|
@ -1395,11 +1395,10 @@ typedef struct {
|
|||
} SDDropTopicReq;
|
||||
|
||||
typedef struct {
|
||||
float xFilesFactor;
|
||||
int8_t delayUnit;
|
||||
int8_t nFuncIds;
|
||||
int32_t* pFuncIds;
|
||||
int64_t delay;
|
||||
float xFilesFactor;
|
||||
int32_t delay;
|
||||
int8_t nFuncIds;
|
||||
func_id_t* pFuncIds;
|
||||
} SRSmaParam;
|
||||
|
||||
typedef struct SVCreateTbReq {
|
||||
|
|
|
@ -31,6 +31,7 @@ typedef int16_t col_id_t;
|
|||
typedef int8_t col_type_t;
|
||||
typedef int32_t col_bytes_t;
|
||||
typedef uint16_t schema_ver_t;
|
||||
typedef int32_t func_id_t;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
typedef struct {
|
||||
|
|
|
@ -314,13 +314,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
}
|
||||
if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
|
||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||
tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor));
|
||||
tlen += taosEncodeFixedI32(buf, param->delay);
|
||||
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
|
||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
||||
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
||||
}
|
||||
tlen += taosEncodeFixedI64(buf, param->delay);
|
||||
}
|
||||
break;
|
||||
case TD_CHILD_TABLE:
|
||||
|
@ -339,13 +338,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
}
|
||||
if (pReq->rollup && pReq->ntbCfg.pRSmaParam) {
|
||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
||||
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
|
||||
tlen += taosEncodeFixedI8(buf, param->delayUnit);
|
||||
tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor));
|
||||
tlen += taosEncodeFixedI32(buf, param->delay);
|
||||
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
|
||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
||||
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
||||
}
|
||||
tlen += taosEncodeFixedI64(buf, param->delay);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
@ -387,17 +385,17 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
if (pReq->rollup) {
|
||||
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
|
||||
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
|
||||
buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor);
|
||||
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
||||
buf = taosDecodeBinaryTo(buf, (void*)¶m->xFilesFactor, sizeof(param->xFilesFactor));
|
||||
buf = taosDecodeFixedI32(buf, ¶m->delay);
|
||||
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
||||
if (param->nFuncIds > 0) {
|
||||
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
|
||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
||||
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
||||
}
|
||||
} else {
|
||||
param->pFuncIds = NULL;
|
||||
}
|
||||
buf = taosDecodeFixedI64(buf, ¶m->delay);
|
||||
} else {
|
||||
pReq->stbCfg.pRSmaParam = NULL;
|
||||
}
|
||||
|
@ -420,17 +418,17 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
|||
if (pReq->rollup) {
|
||||
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
|
||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
||||
buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor);
|
||||
buf = taosDecodeFixedI8(buf, ¶m->delayUnit);
|
||||
buf = taosDecodeBinaryTo(buf, (void*)¶m->xFilesFactor, sizeof(param->xFilesFactor));
|
||||
buf = taosDecodeFixedI32(buf, ¶m->delay);
|
||||
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
||||
if (param->nFuncIds > 0) {
|
||||
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
|
||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
||||
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
||||
}
|
||||
} else {
|
||||
param->pFuncIds = NULL;
|
||||
}
|
||||
buf = taosDecodeFixedI64(buf, ¶m->delay);
|
||||
} else {
|
||||
pReq->ntbCfg.pRSmaParam = NULL;
|
||||
}
|
||||
|
|
|
@ -354,6 +354,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.name = (char *)tNameGetTableName(&name);
|
||||
req.ttl = 0;
|
||||
req.keep = 0;
|
||||
req.rollup = pStb->aggregationMethod > -1 ? 1 : 0;
|
||||
req.type = TD_SUPER_TABLE;
|
||||
req.stbCfg.suid = pStb->uid;
|
||||
req.stbCfg.nCols = pStb->numOfColumns;
|
||||
|
@ -365,7 +366,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int bSmaStat = 0; // no column has bsma
|
||||
if (pStb->numOfSmas == pStb->numOfColumns) { // assume pColumns > 0
|
||||
bSmaStat = 1; // all columns have bsma
|
||||
|
@ -405,9 +406,38 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
}
|
||||
}
|
||||
|
||||
SRSmaParam *pRSmaParam = NULL;
|
||||
if (req.rollup) {
|
||||
pRSmaParam = (SRSmaParam *)taosMemoryCalloc(1, sizeof(SRSmaParam));
|
||||
if (pRSmaParam == NULL) {
|
||||
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRSmaParam->xFilesFactor = pStb->xFilesFactor;
|
||||
pRSmaParam->delay = pStb->delay;
|
||||
pRSmaParam->nFuncIds = 1; // only 1 aggregation method supported currently
|
||||
pRSmaParam->pFuncIds = (func_id_t *)taosMemoryCalloc(pRSmaParam->nFuncIds, sizeof(func_id_t));
|
||||
if (pRSmaParam->pFuncIds == NULL) {
|
||||
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
|
||||
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t f = 0; f < pRSmaParam->nFuncIds; ++f) {
|
||||
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod;
|
||||
}
|
||||
req.stbCfg.pRSmaParam = pRSmaParam;
|
||||
}
|
||||
|
||||
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
|
||||
SMsgHead *pHead = taosMemoryMalloc(contLen);
|
||||
if (pHead == NULL) {
|
||||
if (pRSmaParam) {
|
||||
taosMemoryFreeClear(pRSmaParam->pFuncIds);
|
||||
taosMemoryFreeClear(pRSmaParam);
|
||||
}
|
||||
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -420,6 +450,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
tSerializeSVCreateTbReq(&pBuf, &req);
|
||||
|
||||
*pContLen = contLen;
|
||||
if (pRSmaParam) {
|
||||
taosMemoryFreeClear(pRSmaParam->pFuncIds);
|
||||
taosMemoryFreeClear(pRSmaParam);
|
||||
}
|
||||
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||
return pHead;
|
||||
}
|
||||
|
@ -632,6 +666,9 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
|
|||
stbObj.dbUid = pDb->uid;
|
||||
stbObj.version = 1;
|
||||
stbObj.nextColId = 1;
|
||||
stbObj.xFilesFactor = pCreate->xFilesFactor;
|
||||
stbObj.aggregationMethod = pCreate->aggregationMethod;
|
||||
stbObj.delay = pCreate->delay;
|
||||
stbObj.ttl = pCreate->ttl;
|
||||
stbObj.numOfColumns = pCreate->numOfColumns;
|
||||
stbObj.numOfTags = pCreate->numOfTags;
|
||||
|
|
|
@ -78,10 +78,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
// TODO: handle error
|
||||
}
|
||||
|
||||
// TODO: maybe need to clear the request struct
|
||||
// TODO: to encapsule a free API
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
||||
if(vCreateTbReq.stbCfg.pRSmaParam) {
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
||||
}
|
||||
taosMemoryFree(vCreateTbReq.dbFName);
|
||||
taosMemoryFree(vCreateTbReq.name);
|
||||
break;
|
||||
|
@ -110,17 +113,24 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
// TODO: handle error
|
||||
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
||||
}
|
||||
// TODO: to encapsule a free API
|
||||
taosMemoryFree(pCreateTbReq->name);
|
||||
taosMemoryFree(pCreateTbReq->dbFName);
|
||||
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
|
||||
if (pCreateTbReq->stbCfg.pRSmaParam) {
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
|
||||
}
|
||||
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
|
||||
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
|
||||
} else {
|
||||
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
|
||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
|
||||
if (pCreateTbReq->ntbCfg.pRSmaParam) {
|
||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds);
|
||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,9 +155,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
SVCreateTbReq vAlterTbReq = {0};
|
||||
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
|
||||
// TODO: to encapsule a free API
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
||||
if (vAlterTbReq.stbCfg.pRSmaParam) {
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
||||
}
|
||||
taosMemoryFree(vAlterTbReq.dbFName);
|
||||
taosMemoryFree(vAlterTbReq.name);
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue