From fcfd5c250d3beccfa6237b0eb1f68a4cb5ff4214 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 20 Jun 2022 15:21:30 +0800 Subject: [PATCH 1/3] feat: fetch rsma result by timer supported --- source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/vnode/src/inc/sma.h | 11 +- source/dnode/vnode/src/sma/sma.c | 5 + source/dnode/vnode/src/sma/smaEnv.c | 32 ++- source/dnode/vnode/src/sma/smaRollup.c | 214 +++++++++++++++------ source/dnode/vnode/src/vnd/vnodeSvr.c | 5 +- source/libs/executor/src/executor.c | 6 + 7 files changed, 209 insertions(+), 66 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 39bb6798aa..c53ed6fd0b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -63,7 +63,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 .topicQuery = false, .streamQuery = true, .rSmaQuery = true, - .triggerType = STREAM_TRIGGER_AT_ONCE, + .triggerType = STREAM_TRIGGER_WINDOW_CLOSE, .watermark = watermark, /*.filesFactor = filesFactor,*/ }; diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 1e77022d04..902e5e1bcc 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -32,11 +32,12 @@ extern "C" { #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct SSmaEnv SSmaEnv; -typedef struct SSmaStat SSmaStat; -typedef struct SSmaStatItem SSmaStatItem; -typedef struct SSmaKey SSmaKey; -typedef struct SRSmaInfo SRSmaInfo; +typedef struct SSmaEnv SSmaEnv; +typedef struct SSmaStat SSmaStat; +typedef struct SSmaStatItem SSmaStatItem; +typedef struct SSmaKey SSmaKey; +typedef struct SRSmaInfo SRSmaInfo; +typedef struct SRSmaInfoItem SRSmaInfoItem; struct SSmaEnv { TdThreadRwlock lock; diff --git a/source/dnode/vnode/src/sma/sma.c b/source/dnode/vnode/src/sma/sma.c index b5c55a2f83..12f93f9400 100644 --- a/source/dnode/vnode/src/sma/sma.c +++ b/source/dnode/vnode/src/sma/sma.c @@ -15,6 +15,8 @@ #include "sma.h" +// functions for external invocation + // TODO: Who is responsible for resource allocate and release? int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) { int32_t code = TSDB_CODE_SUCCESS; @@ -45,6 +47,9 @@ int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* return code; } + +// functions for internal invocation + #if 0 /** diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index f71c222772..a80af2b202 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -208,7 +208,6 @@ int32_t tdUnLockSma(SSma *pSma) { int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { SSmaEnv *pEnv = NULL; - // return if already init switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) { @@ -244,3 +243,34 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { return TSDB_CODE_SUCCESS; }; + +int32_t smaTimerInit(void **timer, int8_t *initFlag, const char *label) { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(initFlag, 0, 2); + if (old != 2) break; + } + + if (old == 0) { + *timer = taosTmrInit(10000, 100, 10000, label); + if (!(*timer)) { + atomic_store_8(initFlag, 0); + return -1; + } + atomic_store_8(initFlag, 1); + } + return 0; +} + +void smaTimerCleanUp(void *timer, int8_t *initFlag) { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(initFlag, 1, 2); + if (old != 2) break; + } + + if (old == 1) { + taosTmrCleanUp(timer); + atomic_store_8(initFlag, 0); + } +} diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 0c372dfa70..458170b9aa 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -14,14 +14,36 @@ */ #include "sma.h" +#include "tstream.h" static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); -static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, - STSchema *pTSchema, tb_uid_t suid, int8_t level); +static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, + tb_uid_t suid, int8_t level); + +struct SRSmaInfoItem { + SRSmaInfo *pRsmaInfo; + void *taskInfo; // qTaskInfo_t + void *tmrHandle; + tmr_h tmrId; + int8_t level; + int8_t tmrInitFlag; + int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE + int32_t maxDelay; +}; + +typedef struct { + int64_t suid; + SRSmaInfoItem *pItem; + SSma *pSma; + STSchema *pTSchema; +} SRSmaTriggerParam; struct SRSmaInfo { - void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t + STSchema *pTSchema; + SSma *pSma; + int64_t suid; + SRSmaInfoItem items[TSDB_RETENTION_L2]; }; static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) { @@ -33,11 +55,20 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) { } void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { - for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { - if (pInfo->taskInfo[i]) { - tdFreeTaskHandle(pInfo->taskInfo[i]); + if (pInfo) { + for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { + SRSmaInfoItem *pItem = &pInfo->items[i]; + if (pItem->taskInfo) { + tdFreeTaskHandle(pItem->taskInfo); + } + if (pItem->tmrHandle) { + taosTmrCleanUp(pItem->tmrHandle); + } } + taosMemoryFree(pInfo->pTSchema); + taosMemoryFree(pInfo); } + return NULL; } @@ -69,20 +100,20 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA return TSDB_CODE_FAILED; } - if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) { + if (pRSmaInfo->items[0].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) { smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); return TSDB_CODE_FAILED; } else { smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), - pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); } - if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) { + if (pRSmaInfo->items[1].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) { smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); return TSDB_CODE_FAILED; } else { smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), - pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); } return TSDB_CODE_SUCCESS; @@ -144,12 +175,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui ASSERT(ppStore != NULL); if (!(*ppStore)) { - if (tdUidStoreInit(ppStore) != 0) { + if (tdUidStoreInit(ppStore) < 0) { return TSDB_CODE_FAILED; } } - if (tdUidStorePut(*ppStore, suid, &uid) != 0) { + if (tdUidStorePut(*ppStore, suid, &uid) < 0) { *ppStore = tdUidStoreFree(*ppStore); return TSDB_CODE_FAILED; } @@ -172,8 +203,8 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { return TSDB_CODE_SUCCESS; } - SMeta *pMeta = pVnode->pMeta; - SMsgCb *pMsgCb = &pVnode->msgCb; + SMeta *pMeta = pVnode->pMeta; + SMsgCb *pMsgCb = &pVnode->msgCb; SRSmaParam *param = &pReq->pRSmaParam; if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { @@ -192,10 +223,12 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); if (pRSmaInfo) { + ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } + // from write queue: single thead pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); if (!pRSmaInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -204,9 +237,8 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); if (!pReadHandle) { - taosMemoryFree(pRSmaInfo); terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + goto _err; } SReadHandle handle = { @@ -216,32 +248,58 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { .vnode = pVnode, }; + STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), pReq->suid, -1); + if (!pTSchema) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + goto _err; + } + pRSmaInfo->pTSchema = pTSchema; + pRSmaInfo->pSma = pSma; + pRSmaInfo->suid = pReq->suid; + if (param->qmsg1) { - pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle); - if (!pRSmaInfo->taskInfo[0]) { - taosMemoryFree(pRSmaInfo); - taosMemoryFree(pReadHandle); - return TSDB_CODE_FAILED; + pRSmaInfo->items[0].pRsmaInfo = pRSmaInfo; + pRSmaInfo->items[0].taskInfo = qCreateStreamExecTaskInfo(param->qmsg1, &handle); + if (!pRSmaInfo->items[0].taskInfo) { + goto _err; + } + pRSmaInfo->items[0].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; + pRSmaInfo->items[0].maxDelay = 5000; + pRSmaInfo->items[0].level = TSDB_RETENTION_L1; + pRSmaInfo->items[0].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L1"); + + if (!pRSmaInfo->items[0].tmrHandle) { + goto _err; } } if (param->qmsg2) { - pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle); - if (!pRSmaInfo->taskInfo[1]) { - taosMemoryFree(pRSmaInfo); - taosMemoryFree(pReadHandle); - return TSDB_CODE_FAILED; + pRSmaInfo->items[1].pRsmaInfo = pRSmaInfo; + pRSmaInfo->items[1].taskInfo = qCreateStreamExecTaskInfo(param->qmsg2, &handle); + if (!pRSmaInfo->items[1].taskInfo) { + goto _err; + } + pRSmaInfo->items[1].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; + pRSmaInfo->items[1].maxDelay = 5000; + pRSmaInfo->items[0].level = TSDB_RETENTION_L2; + pRSmaInfo->items[1].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L2"); + if (!pRSmaInfo->items[1].tmrHandle) { + goto _err; } } if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; + goto _err; } else { smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); } return TSDB_CODE_SUCCESS; +_err: + tdFreeRSmaInfo(pRSmaInfo); + taosMemoryFree(pReadHandle); + return TSDB_CODE_FAILED; } /** @@ -291,12 +349,12 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) { + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) < 0) { return TSDB_CODE_FAILED; } } } else { - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) { + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) < 0) { return TSDB_CODE_FAILED; } } @@ -367,22 +425,15 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { return 0; } -static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, - STSchema *pTSchema, tb_uid_t suid, int8_t level) { - SArray *pResult = NULL; +static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { + SArray *pResult = NULL; + SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; + SSma *pSma = pRSmaInfo->pSma; - if (!taskInfo) { - smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); - return TSDB_CODE_SUCCESS; - } - - smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid); - - qSetStreamInput(taskInfo, pMsg, inputType, true); while (1) { SSDataBlock *output = NULL; uint64_t ts; - if (qExecTask(taskInfo, &output, &ts) < 0) { + if (qExecTask(pItem->taskInfo, &output, &ts) < 0) { ASSERT(false); } if (!output) { @@ -402,16 +453,16 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 if (taosArrayGetSize(pResult) > 0) { #if 0 char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, level); + snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowData(pResult, flag); #endif - STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { + if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) { taosArrayDestroy(pResult); return TSDB_CODE_FAILED; } - + if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { taosArrayDestroy(pResult); taosMemoryFreeClear(pReq); @@ -420,10 +471,63 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 taosMemoryFreeClear(pReq); } else { - smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); + smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno)); + } + + if (blkType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); } taosArrayDestroy(pResult); + return 0; +} + +/** + * @brief trigger to get rsma result + * + * @param param + * @param tmrId + */ +static void rsmaTriggerByTimer(void *param, void *tmrId) { + // SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param; + // SRSmaInfoItem *pItem = pParam->pItem; + SRSmaInfoItem *pItem = param; + + if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { + printf("%s:%d THREAD:%" PRIi64 " status = active\n", __func__, __LINE__, taosGetSelfPthreadId()); + SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + + atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); + qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + + tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); + } else { + printf("%s:%d THREAD:%" PRIi64 " status = in active\n", __func__, __LINE__, taosGetSelfPthreadId()); + } + + // taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); +} + +static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, + tb_uid_t suid, int8_t level) { + if (!pItem || !pItem->taskInfo) { + smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); + return TSDB_CODE_SUCCESS; + } + + smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, + pItem->taskInfo, suid); + + // inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1) + if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { + smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + + // SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema}; + tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); + atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); + taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); return TSDB_CODE_SUCCESS; } @@ -441,24 +545,18 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - smaDebug("vgId:%d, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); + smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } - if (!pRSmaInfo->taskInfo[0]) { - smaDebug("vgId:%d, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); + + if (!pRSmaInfo->items[0].taskInfo) { + smaDebug("vgId:%d, return as no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - // TODO: cache STSchema - STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1); - if (!pTSchema) { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - return TSDB_CODE_FAILED; - } - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, TSDB_RETENTION_L1); - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, TSDB_RETENTION_L2); - taosMemoryFree(pTSchema); + tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1); + tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2); } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fb22b7c5bf..98fcee97c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -346,7 +346,10 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p goto _err; } - tdProcessRSmaCreate(pVnode, &req); + if (tdProcessRSmaCreate(pVnode, &req) < 0) { + pRsp->code = terrno; + goto _err; + } tDecoderClear(&coder); return 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b1d076e8f5..1526cf66f1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -41,12 +41,18 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->assignBlockUid = assignUid; // the block type can not be changed in the streamscan operators +#if 0 if (pInfo->blockType == 0) { pInfo->blockType = type; } else if (pInfo->blockType != type) { ASSERT(0); return TSDB_CODE_QRY_APP_ERROR; } +#endif + // rollup sma, the same qTaskInfo is used to insert data by SubmitReq and fetch result by SSDataBlock + if (pInfo->blockType != type) { + pInfo->blockType = type; + } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { From 24af96f833241bdca2ae0c95574f2fde135eb9a5 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 20 Jun 2022 15:39:20 +0800 Subject: [PATCH 2/3] other: solve conflict --- source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c53ed6fd0b..37aa2d33d0 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -63,7 +63,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 .topicQuery = false, .streamQuery = true, .rSmaQuery = true, - .triggerType = STREAM_TRIGGER_WINDOW_CLOSE, + .triggerType = triggerType, .watermark = watermark, /*.filesFactor = filesFactor,*/ }; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 96dc79adbc..6c8021e3b3 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -403,13 +403,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, - STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, - STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } From 4ef0a664533c41af3e70386438d5e7ceb4840ded Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 20 Jun 2022 16:41:36 +0800 Subject: [PATCH 3/3] test: adjust rsma test case --- source/dnode/vnode/src/sma/smaRollup.c | 4 ++-- tests/script/tsim/sma/rsmaCreateInsertQuery.sim | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 458170b9aa..ecd47c2303 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -281,7 +281,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { } pRSmaInfo->items[1].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; pRSmaInfo->items[1].maxDelay = 5000; - pRSmaInfo->items[0].level = TSDB_RETENTION_L2; + pRSmaInfo->items[1].level = TSDB_RETENTION_L2; pRSmaInfo->items[1].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L2"); if (!pRSmaInfo->items[1].tmrHandle) { goto _err; @@ -451,7 +451,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } if (taosArrayGetSize(pResult) > 0) { -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowData(pResult, flag); diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index 645b28f771..fb3503c841 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database with retentions -sql create database d0 retentions 15s:7d,1m:21d,15m:365d; +sql create database d0 retentions 5s:7d,10s:21d,15s:365d; sql use d0 print =============== create super table and register rsma @@ -29,6 +29,8 @@ sql insert into ct1 values(now, 10); sql insert into ct1 values(now+1s, 1); sql insert into ct1 values(now+2s, 100); +print =============== wait maxdelay 15+1 seconds for results +sleep 16000 print =============== select * from retention level 2 from memory sql select * from ct1;