support rollup sma
This commit is contained in:
parent
a64549f052
commit
0bee6614fe
|
@ -1473,10 +1473,12 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
float xFilesFactor;
|
float xFilesFactor;
|
||||||
int32_t delay;
|
int32_t delay;
|
||||||
int8_t nFuncIds;
|
int32_t qmsg1Len;
|
||||||
|
int32_t qmsg2Len;
|
||||||
func_id_t* pFuncIds;
|
func_id_t* pFuncIds;
|
||||||
char* qmsg1; // not null: pAst1:qmsg1:SRetention1 => trigger aggr task1
|
char* qmsg1; // not null: pAst1:qmsg1:SRetention1 => trigger aggr task1
|
||||||
char* qmsg2; // not null: pAst2:qmsg2:SRetention2 => trigger aggr task2
|
char* qmsg2; // not null: pAst2:qmsg2:SRetention2 => trigger aggr task2
|
||||||
|
int8_t nFuncIds;
|
||||||
} SRSmaParam;
|
} SRSmaParam;
|
||||||
|
|
||||||
typedef struct SVCreateTbReq {
|
typedef struct SVCreateTbReq {
|
||||||
|
|
|
@ -434,9 +434,16 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
||||||
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
||||||
}
|
}
|
||||||
|
tlen += taosEncodeFixedI32(buf, param->qmsg1Len);
|
||||||
|
if (param->qmsg1Len > 0) {
|
||||||
tlen += taosEncodeString(buf, param->qmsg1);
|
tlen += taosEncodeString(buf, param->qmsg1);
|
||||||
|
}
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedI32(buf, param->qmsg2Len);
|
||||||
|
if (param->qmsg2Len > 0) {
|
||||||
tlen += taosEncodeString(buf, param->qmsg2);
|
tlen += taosEncodeString(buf, param->qmsg2);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TD_CHILD_TABLE:
|
case TD_CHILD_TABLE:
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
|
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
|
||||||
|
@ -509,8 +516,15 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
buf = taosDecodeFixedI32(buf, ¶m->qmsg1Len);
|
||||||
|
if (param->qmsg1Len > 0) {
|
||||||
buf = taosDecodeString(buf, ¶m->qmsg1);
|
buf = taosDecodeString(buf, ¶m->qmsg1);
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = taosDecodeFixedI32(buf, ¶m->qmsg2Len);
|
||||||
|
if (param->qmsg2Len > 0) {
|
||||||
buf = taosDecodeString(buf, ¶m->qmsg2);
|
buf = taosDecodeString(buf, ¶m->qmsg2);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pReq->stbCfg.pRSmaParam = NULL;
|
pReq->stbCfg.pRSmaParam = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "mndInfoSchema.h"
|
#include "mndInfoSchema.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndPerfSchema.h"
|
#include "mndPerfSchema.h"
|
||||||
|
#include "mndScheduler.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
@ -444,13 +445,23 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod;
|
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod;
|
||||||
}
|
}
|
||||||
if (pStb->ast1Len > 0) {
|
if (pStb->ast1Len > 0) {
|
||||||
pRSmaParam->qmsg1 = strdup(pStb->pAst1);
|
if (mndConvertRSmaTask(pStb->pAst1, 0, 0, &pRSmaParam->qmsg1, &pRSmaParam->qmsg1Len) != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFreeClear(pRSmaParam->pFuncIds);
|
||||||
|
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
|
||||||
|
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pStb->ast2Len > 0) {
|
if (pStb->ast2Len > 0) {
|
||||||
pRSmaParam->qmsg2 = strdup(pStb->pAst2);
|
int32_t qmsgLen2 = 0;
|
||||||
|
if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &pRSmaParam->qmsg2, &pRSmaParam->qmsg2Len) != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFreeClear(pRSmaParam->pFuncIds);
|
||||||
|
taosMemoryFreeClear(pRSmaParam->qmsg1);
|
||||||
|
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
|
||||||
|
taosMemoryFreeClear(req.stbCfg.pSchema);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TASSERT(pRSmaParam->qmsg1 && pRSmaParam->qmsg2);
|
|
||||||
|
|
||||||
req.stbCfg.pRSmaParam = pRSmaParam;
|
req.stbCfg.pRSmaParam = pRSmaParam;
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,12 +215,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove the log
|
// TODO: remove the debug log
|
||||||
if (vCreateTbReq.stbCfg.pRSmaParam) {
|
SRSmaParam *param = vCreateTbReq.stbCfg.pRSmaParam;
|
||||||
printf("qmsg1 len = %d, body = %s\n", (int32_t)strlen(vCreateTbReq.stbCfg.pRSmaParam->qmsg1),
|
if (param) {
|
||||||
vCreateTbReq.stbCfg.pRSmaParam->qmsg1);
|
printf("qmsg1 len = %d, body = %s\n", param->qmsg1 ? (int32_t)strlen(param->qmsg1) : 0,
|
||||||
printf("qmsg2 len = %d, body = %s\n", (int32_t)strlen(vCreateTbReq.stbCfg.pRSmaParam->qmsg2),
|
param->qmsg1 ? param->qmsg1 : "");
|
||||||
vCreateTbReq.stbCfg.pRSmaParam->qmsg2);
|
printf("qmsg1 len = %d, body = %s\n", param->qmsg2 ? (int32_t)strlen(param->qmsg2) : 0,
|
||||||
|
param->qmsg2 ? param->qmsg2 : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
|
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
|
||||||
|
|
Loading…
Reference in New Issue