refactor: rsma support max_delay/watermark params

This commit is contained in:
Cary Xu 2022-06-20 19:27:26 +08:00
parent 4ef0a66453
commit ba94a44bec
9 changed files with 98 additions and 99 deletions

View File

@ -1776,12 +1776,10 @@ typedef struct {
} SDDropTopicReq;
typedef struct {
float xFilesFactor;
int32_t delay;
int32_t qmsg1Len;
int32_t qmsg2Len;
char* qmsg1; // pAst1:qmsg1:SRetention1 => trigger aggr task1
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
int64_t maxdelay[2];
int64_t watermark[2];
int32_t qmsgLen[2];
char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2
} SRSmaParam;
int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam);

View File

@ -4273,39 +4273,34 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {
if (tEncodeFloat(pCoder, pRSmaParam->xFilesFactor) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->delay) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsg1Len) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsg2Len) < 0) return -1;
if (pRSmaParam->qmsg1Len > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg1, (uint64_t)pRSmaParam->qmsg1Len) < 0) // qmsg1Len contains len of '\0'
return -1;
}
if (pRSmaParam->qmsg2Len > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg2, (uint64_t)pRSmaParam->qmsg2Len) < 0) // qmsg2Len contains len of '\0'
return -1;
for (int32_t i = 0; i < 2; ++i) {
if (tEncodeI64v(pCoder, pRSmaParam->maxdelay[i]) < 0) return -1;
if (tEncodeI64v(pCoder, pRSmaParam->watermark[i]) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsgLen[i]) < 0) return -1;
if (pRSmaParam->qmsgLen[i] > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg[i], (uint64_t)pRSmaParam->qmsgLen[i]) <
0) // qmsgLen contains len of '\0'
return -1;
}
}
return 0;
}
int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) {
if (tDecodeFloat(pCoder, &pRSmaParam->xFilesFactor) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->delay) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsg1Len) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsg2Len) < 0) return -1;
if (pRSmaParam->qmsg1Len > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg1, &len) < 0) return -1; // qmsg1Len contains len of '\0'
} else {
pRSmaParam->qmsg1 = NULL;
}
if (pRSmaParam->qmsg2Len > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg2, &len) < 0) return -1; // qmsg2Len contains len of '\0'
} else {
pRSmaParam->qmsg2 = NULL;
for (int32_t i = 0; i < 2; ++i) {
if (tDecodeI64v(pCoder, &pRSmaParam->maxdelay[i]) < 0) return -1;
if (tDecodeI64v(pCoder, &pRSmaParam->watermark[i]) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsgLen[i]) < 0) return -1;
if (pRSmaParam->qmsgLen[i] > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg[i], &len) < 0)
return -1; // qmsgLen contains len of '\0'
} else {
pRSmaParam->qmsg[i] = NULL;
}
}
return 0;
}

View File

@ -341,8 +341,8 @@ typedef struct {
int32_t colVer;
int32_t smaVer;
int32_t nextColId;
float xFilesFactor;
int32_t delay;
int64_t watermark[2];
int64_t maxdelay[2];
int32_t ttl;
int32_t numOfColumns;
int32_t numOfTags;

View File

@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, double filesFactor);
int64_t watermark);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);

View File

@ -43,7 +43,7 @@ static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
}
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, double filesFactor) {
int64_t watermark) {
SNode* pAst = NULL;
SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS;

View File

@ -89,8 +89,10 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER)
@ -168,10 +170,10 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
int32_t xFilesFactor = 0;
SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER)
pStb->xFilesFactor = xFilesFactor / 10000.0f;
SDB_GET_INT32(pRaw, dataPos, &pStb->delay, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER)
@ -399,18 +401,18 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.schemaTag.pSchema = pStb->pTags;
if (req.rollup) {
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
req.pRSmaParam.delay = pStb->delay;
req.pRSmaParam.maxdelay[0] = pStb->maxdelay[0];
req.pRSmaParam.maxdelay[1] = pStb->maxdelay[1];
if (pStb->ast1Len > 0) {
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL;
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[0], &req.pRSmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[0]) < 0) {
goto _err;
}
}
if (pStb->ast2Len > 0) {
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL;
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[1], &req.pRSmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[1]) < 0) {
goto _err;
}
}
}
@ -418,17 +420,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret);
if (ret < 0) {
return NULL;
goto _err;
}
contLen += sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
taosMemoryFreeClear(req.pRSmaParam.qmsg1);
taosMemoryFreeClear(req.pRSmaParam.qmsg2);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
goto _err;
}
pHead->contLen = htonl(contLen);
@ -438,17 +438,19 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
taosMemoryFreeClear(pHead);
taosMemoryFreeClear(req.pRSmaParam.qmsg1);
taosMemoryFreeClear(req.pRSmaParam.qmsg2);
tEncoderClear(&encoder);
return NULL;
goto _err;
}
tEncoderClear(&encoder);
*pContLen = contLen;
taosMemoryFreeClear(req.pRSmaParam.qmsg1);
taosMemoryFreeClear(req.pRSmaParam.qmsg2);
taosMemoryFreeClear(req.pRSmaParam.qmsg[0]);
taosMemoryFreeClear(req.pRSmaParam.qmsg[1]);
return pHead;
_err:
taosMemoryFreeClear(req.pRSmaParam.qmsg[0]);
taosMemoryFreeClear(req.pRSmaParam.qmsg[1]);
return NULL;
}
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
@ -670,8 +672,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->tagVer = 1;
pDst->colVer = 1;
pDst->nextColId = 1;
// pDst->xFilesFactor = pCreate->xFilesFactor;
// pDst->delay = pCreate->delay;
pDst->maxdelay[0] = pCreate->delay1;
pDst->maxdelay[1] = pCreate->delay2;
pDst->watermark[0] = pCreate->watermark1;
pDst->watermark[1] = pCreate->watermark2;
pDst->ttl = pCreate->ttl;
pDst->numOfColumns = pCreate->numOfColumns;
pDst->numOfTags = pCreate->numOfTags;
@ -897,7 +901,7 @@ static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, cha
return -1;
}
memcpy(pNew->comment, pComment, commentLen + 1);
} else if(commentLen == 0){
} else if (commentLen == 0) {
pNew->commentLen = 0;
}
@ -1849,7 +1853,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pStb->comment);
colDataAppend(pColInfo, numOfRows, comment, false);
} else if(pStb->commentLen == 0) {
} else if (pStb->commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfo, numOfRows, comment, false);

View File

@ -21,6 +21,31 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
tb_uid_t suid, int8_t level);
#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \
if (param->qmsg[__idx]) { \
pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \
pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \
if (!pRSmaInfo->items[__idx].taskInfo) { \
goto _err; \
} \
pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \
if (param->maxdelay[__idx] < 1) { \
int64_t msInterval = \
convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \
pRSmaInfo->items[__idx].maxDelay = msInterval; \
} else { \
pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \
} \
if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \
pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \
} \
pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \
pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \
if (!pRSmaInfo->items[__idx].tmrHandle) { \
goto _err; \
} \
}
struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo;
void *taskInfo; // qTaskInfo_t
@ -207,7 +232,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SMsgCb *pMsgCb = &pVnode->msgCb;
SRSmaParam *param = &pReq->pRSmaParam;
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
@ -257,36 +282,11 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
pRSmaInfo->pSma = pSma;
pRSmaInfo->suid = pReq->suid;
if (param->qmsg1) {
pRSmaInfo->items[0].pRsmaInfo = pRSmaInfo;
pRSmaInfo->items[0].taskInfo = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
if (!pRSmaInfo->items[0].taskInfo) {
goto _err;
}
pRSmaInfo->items[0].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
pRSmaInfo->items[0].maxDelay = 5000;
pRSmaInfo->items[0].level = TSDB_RETENTION_L1;
pRSmaInfo->items[0].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L1");
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
if (!pRSmaInfo->items[0].tmrHandle) {
goto _err;
}
}
if (param->qmsg2) {
pRSmaInfo->items[1].pRsmaInfo = pRSmaInfo;
pRSmaInfo->items[1].taskInfo = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
if (!pRSmaInfo->items[1].taskInfo) {
goto _err;
}
pRSmaInfo->items[1].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
pRSmaInfo->items[1].maxDelay = 5000;
pRSmaInfo->items[1].level = TSDB_RETENTION_L2;
pRSmaInfo->items[1].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L2");
if (!pRSmaInfo->items[1].tmrHandle) {
goto _err;
}
}
SET_RSMA_INFO_ITEM_PARAMS(0, 1);
SET_RSMA_INFO_ITEM_PARAMS(1, 2);
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
TSDB_CODE_SUCCESS) {
@ -451,7 +451,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
}
if (taosArrayGetSize(pResult) > 0) {
#if 1
#if 0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowData(pResult, flag);
@ -494,7 +494,7 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param;
if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
printf("%s:%d THREAD:%" PRIi64 " status = active\n", __func__, __LINE__, taosGetSelfPthreadId());
smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
@ -502,7 +502,7 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) {
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK);
} else {
printf("%s:%d THREAD:%" PRIi64 " status = in active\n", __func__, __LINE__, taosGetSelfPthreadId());
smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid);
}
// taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);

View File

@ -106,7 +106,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t len;
int32_t ret;
vError("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version);
pVnode->state.applied = version;

View File

@ -114,8 +114,10 @@ void dumpStb(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "tagVer", pObj->tagVer);
tjsonAddIntegerToObject(item, "colVer", pObj->colVer);
tjsonAddIntegerToObject(item, "nextColId", pObj->nextColId);
tjsonAddIntegerToObject(item, "xFilesFactor", pObj->xFilesFactor * 10000);
tjsonAddIntegerToObject(item, "delay", pObj->delay);
tjsonAddIntegerToObject(item, "watermark1", pObj->watermark[0]);
tjsonAddIntegerToObject(item, "watermark2", pObj->watermark[1]);
tjsonAddIntegerToObject(item, "maxdelay1", pObj->maxdelay[0]);
tjsonAddIntegerToObject(item, "maxdelay2", pObj->maxdelay[1]);
tjsonAddIntegerToObject(item, "ttl", pObj->ttl);
tjsonAddIntegerToObject(item, "numOfColumns", pObj->numOfColumns);
tjsonAddIntegerToObject(item, "numOfTags", pObj->numOfTags);