diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2a29d9b7c7..fd9ff3eb65 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1776,12 +1776,10 @@ typedef struct { } SDDropTopicReq; typedef struct { - float xFilesFactor; - int32_t delay; - int32_t qmsg1Len; - int32_t qmsg2Len; - char* qmsg1; // pAst1:qmsg1:SRetention1 => trigger aggr task1 - char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 + int64_t maxdelay[2]; + int64_t watermark[2]; + int32_t qmsgLen[2]; + char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2 } SRSmaParam; int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c3c96972b7..d078d22cdf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4273,39 +4273,34 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { } int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) { - if (tEncodeFloat(pCoder, pRSmaParam->xFilesFactor) < 0) return -1; - if (tEncodeI32v(pCoder, pRSmaParam->delay) < 0) return -1; - if (tEncodeI32v(pCoder, pRSmaParam->qmsg1Len) < 0) return -1; - if (tEncodeI32v(pCoder, pRSmaParam->qmsg2Len) < 0) return -1; - if (pRSmaParam->qmsg1Len > 0) { - if (tEncodeBinary(pCoder, pRSmaParam->qmsg1, (uint64_t)pRSmaParam->qmsg1Len) < 0) // qmsg1Len contains len of '\0' - return -1; - } - if (pRSmaParam->qmsg2Len > 0) { - if (tEncodeBinary(pCoder, pRSmaParam->qmsg2, (uint64_t)pRSmaParam->qmsg2Len) < 0) // qmsg2Len contains len of '\0' - return -1; + for (int32_t i = 0; i < 2; ++i) { + if (tEncodeI64v(pCoder, pRSmaParam->maxdelay[i]) < 0) return -1; + if (tEncodeI64v(pCoder, pRSmaParam->watermark[i]) < 0) return -1; + if (tEncodeI32v(pCoder, pRSmaParam->qmsgLen[i]) < 0) return -1; + if (pRSmaParam->qmsgLen[i] > 0) { + if (tEncodeBinary(pCoder, pRSmaParam->qmsg[i], (uint64_t)pRSmaParam->qmsgLen[i]) < + 0) // qmsgLen contains len of '\0' + return -1; + } } return 0; } int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) { - if (tDecodeFloat(pCoder, &pRSmaParam->xFilesFactor) < 0) return -1; - if (tDecodeI32v(pCoder, &pRSmaParam->delay) < 0) return -1; - if (tDecodeI32v(pCoder, &pRSmaParam->qmsg1Len) < 0) return -1; - if (tDecodeI32v(pCoder, &pRSmaParam->qmsg2Len) < 0) return -1; - if (pRSmaParam->qmsg1Len > 0) { - uint64_t len; - if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg1, &len) < 0) return -1; // qmsg1Len contains len of '\0' - } else { - pRSmaParam->qmsg1 = NULL; - } - if (pRSmaParam->qmsg2Len > 0) { - uint64_t len; - if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg2, &len) < 0) return -1; // qmsg2Len contains len of '\0' - } else { - pRSmaParam->qmsg2 = NULL; + for (int32_t i = 0; i < 2; ++i) { + if (tDecodeI64v(pCoder, &pRSmaParam->maxdelay[i]) < 0) return -1; + if (tDecodeI64v(pCoder, &pRSmaParam->watermark[i]) < 0) return -1; + if (tDecodeI32v(pCoder, &pRSmaParam->qmsgLen[i]) < 0) return -1; + if (pRSmaParam->qmsgLen[i] > 0) { + uint64_t len; + if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg[i], &len) < 0) + return -1; // qmsgLen contains len of '\0' + } else { + pRSmaParam->qmsg[i] = NULL; + } } + return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8963f6be39..987b01b96a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -341,8 +341,8 @@ typedef struct { int32_t colVer; int32_t smaVer; int32_t nextColId; - float xFilesFactor; - int32_t delay; + int64_t watermark[2]; + int64_t maxdelay[2]; int32_t ttl; int32_t numOfColumns; int32_t numOfTags; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 8e816d2dd6..15d2c6cd5e 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, - int64_t watermark, double filesFactor); + int64_t watermark); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 37aa2d33d0..f417e2267b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -43,7 +43,7 @@ static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { } int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, - int64_t watermark, double filesFactor) { + int64_t watermark) { SNode* pAst = NULL; SQueryPlan* pPlan = NULL; terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6c8021e3b3..3e50ea8262 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -89,8 +89,10 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER) - SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER) - SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER) @@ -168,10 +170,10 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER) - int32_t xFilesFactor = 0; - SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER) - pStb->xFilesFactor = xFilesFactor / 10000.0f; - SDB_GET_INT32(pRaw, dataPos, &pStb->delay, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER) @@ -399,18 +401,18 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.schemaTag.pSchema = pStb->pTags; if (req.rollup) { - req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; - req.pRSmaParam.delay = pStb->delay; + req.pRSmaParam.maxdelay[0] = pStb->maxdelay[0]; + req.pRSmaParam.maxdelay[1] = pStb->maxdelay[1]; if (pStb->ast1Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, - STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { - return NULL; + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[0], &req.pRSmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, + STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[0]) < 0) { + goto _err; } } if (pStb->ast2Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, - STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { - return NULL; + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[1], &req.pRSmaParam.qmsgLen[1], pStb->pAst2, pStb->uid, + STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[1]) < 0) { + goto _err; } } } @@ -418,17 +420,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt int32_t ret = 0; tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret); if (ret < 0) { - return NULL; + goto _err; } contLen += sizeof(SMsgHead); SMsgHead *pHead = taosMemoryMalloc(contLen); if (pHead == NULL) { - taosMemoryFreeClear(req.pRSmaParam.qmsg1); - taosMemoryFreeClear(req.pRSmaParam.qmsg2); terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + goto _err; } pHead->contLen = htonl(contLen); @@ -438,17 +438,19 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead)); if (tEncodeSVCreateStbReq(&encoder, &req) < 0) { taosMemoryFreeClear(pHead); - taosMemoryFreeClear(req.pRSmaParam.qmsg1); - taosMemoryFreeClear(req.pRSmaParam.qmsg2); tEncoderClear(&encoder); - return NULL; + goto _err; } tEncoderClear(&encoder); *pContLen = contLen; - taosMemoryFreeClear(req.pRSmaParam.qmsg1); - taosMemoryFreeClear(req.pRSmaParam.qmsg2); + taosMemoryFreeClear(req.pRSmaParam.qmsg[0]); + taosMemoryFreeClear(req.pRSmaParam.qmsg[1]); return pHead; +_err: + taosMemoryFreeClear(req.pRSmaParam.qmsg[0]); + taosMemoryFreeClear(req.pRSmaParam.qmsg[1]); + return NULL; } static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { @@ -670,8 +672,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->tagVer = 1; pDst->colVer = 1; pDst->nextColId = 1; - // pDst->xFilesFactor = pCreate->xFilesFactor; - // pDst->delay = pCreate->delay; + pDst->maxdelay[0] = pCreate->delay1; + pDst->maxdelay[1] = pCreate->delay2; + pDst->watermark[0] = pCreate->watermark1; + pDst->watermark[1] = pCreate->watermark2; pDst->ttl = pCreate->ttl; pDst->numOfColumns = pCreate->numOfColumns; pDst->numOfTags = pCreate->numOfTags; @@ -897,7 +901,7 @@ static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, cha return -1; } memcpy(pNew->comment, pComment, commentLen + 1); - } else if(commentLen == 0){ + } else if (commentLen == 0) { pNew->commentLen = 0; } @@ -1849,7 +1853,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, pStb->comment); colDataAppend(pColInfo, numOfRows, comment, false); - } else if(pStb->commentLen == 0) { + } else if (pStb->commentLen == 0) { char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, ""); colDataAppend(pColInfo, numOfRows, comment, false); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ecd47c2303..1f18f6cb87 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -21,6 +21,31 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, tb_uid_t suid, int8_t level); +#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \ + if (param->qmsg[__idx]) { \ + pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \ + pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \ + if (!pRSmaInfo->items[__idx].taskInfo) { \ + goto _err; \ + } \ + pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \ + if (param->maxdelay[__idx] < 1) { \ + int64_t msInterval = \ + convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \ + pRSmaInfo->items[__idx].maxDelay = msInterval; \ + } else { \ + pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \ + } \ + if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \ + pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \ + } \ + pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \ + pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \ + if (!pRSmaInfo->items[__idx].tmrHandle) { \ + goto _err; \ + } \ + } + struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; void *taskInfo; // qTaskInfo_t @@ -207,7 +232,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { SMsgCb *pMsgCb = &pVnode->msgCb; SRSmaParam *param = &pReq->pRSmaParam; - if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { + if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } @@ -257,36 +282,11 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { pRSmaInfo->pSma = pSma; pRSmaInfo->suid = pReq->suid; - if (param->qmsg1) { - pRSmaInfo->items[0].pRsmaInfo = pRSmaInfo; - pRSmaInfo->items[0].taskInfo = qCreateStreamExecTaskInfo(param->qmsg1, &handle); - if (!pRSmaInfo->items[0].taskInfo) { - goto _err; - } - pRSmaInfo->items[0].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; - pRSmaInfo->items[0].maxDelay = 5000; - pRSmaInfo->items[0].level = TSDB_RETENTION_L1; - pRSmaInfo->items[0].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L1"); + SRetention *pRetention = SMA_RETENTION(pSma); + STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); - if (!pRSmaInfo->items[0].tmrHandle) { - goto _err; - } - } - - if (param->qmsg2) { - pRSmaInfo->items[1].pRsmaInfo = pRSmaInfo; - pRSmaInfo->items[1].taskInfo = qCreateStreamExecTaskInfo(param->qmsg2, &handle); - if (!pRSmaInfo->items[1].taskInfo) { - goto _err; - } - pRSmaInfo->items[1].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; - pRSmaInfo->items[1].maxDelay = 5000; - pRSmaInfo->items[1].level = TSDB_RETENTION_L2; - pRSmaInfo->items[1].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L2"); - if (!pRSmaInfo->items[1].tmrHandle) { - goto _err; - } - } + SET_RSMA_INFO_ITEM_PARAMS(0, 1); + SET_RSMA_INFO_ITEM_PARAMS(1, 2); if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != TSDB_CODE_SUCCESS) { @@ -451,7 +451,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } if (taosArrayGetSize(pResult) > 0) { -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowData(pResult, flag); @@ -494,7 +494,7 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { - printf("%s:%d THREAD:%" PRIi64 " status = active\n", __func__, __LINE__, taosGetSelfPthreadId()); + smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); @@ -502,7 +502,7 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) { tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); } else { - printf("%s:%d THREAD:%" PRIi64 " status = in active\n", __func__, __LINE__, taosGetSelfPthreadId()); + smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); } // taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 98fcee97c5..009c739693 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -106,7 +106,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t len; int32_t ret; - vError("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); pVnode->state.applied = version; diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 612b870b7e..a62c30660d 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -114,8 +114,10 @@ void dumpStb(SSdb *pSdb, SJson *json) { tjsonAddIntegerToObject(item, "tagVer", pObj->tagVer); tjsonAddIntegerToObject(item, "colVer", pObj->colVer); tjsonAddIntegerToObject(item, "nextColId", pObj->nextColId); - tjsonAddIntegerToObject(item, "xFilesFactor", pObj->xFilesFactor * 10000); - tjsonAddIntegerToObject(item, "delay", pObj->delay); + tjsonAddIntegerToObject(item, "watermark1", pObj->watermark[0]); + tjsonAddIntegerToObject(item, "watermark2", pObj->watermark[1]); + tjsonAddIntegerToObject(item, "maxdelay1", pObj->maxdelay[0]); + tjsonAddIntegerToObject(item, "maxdelay2", pObj->maxdelay[1]); tjsonAddIntegerToObject(item, "ttl", pObj->ttl); tjsonAddIntegerToObject(item, "numOfColumns", pObj->numOfColumns); tjsonAddIntegerToObject(item, "numOfTags", pObj->numOfTags);