Merge pull request #14010 from taosdata/feature/TD-14481-3.0
feat: fetch rsma result by timer supported
This commit is contained in:
commit
565a11330d
|
@ -403,13 +403,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.pRSmaParam.delay = pStb->delay;
|
req.pRSmaParam.delay = pStb->delay;
|
||||||
if (pStb->ast1Len > 0) {
|
if (pStb->ast1Len > 0) {
|
||||||
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid,
|
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid,
|
||||||
STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pStb->ast2Len > 0) {
|
if (pStb->ast2Len > 0) {
|
||||||
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid,
|
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid,
|
||||||
STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
STREAM_TRIGGER_WINDOW_CLOSE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,11 +32,12 @@ extern "C" {
|
||||||
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
typedef struct SSmaEnv SSmaEnv;
|
typedef struct SSmaEnv SSmaEnv;
|
||||||
typedef struct SSmaStat SSmaStat;
|
typedef struct SSmaStat SSmaStat;
|
||||||
typedef struct SSmaStatItem SSmaStatItem;
|
typedef struct SSmaStatItem SSmaStatItem;
|
||||||
typedef struct SSmaKey SSmaKey;
|
typedef struct SSmaKey SSmaKey;
|
||||||
typedef struct SRSmaInfo SRSmaInfo;
|
typedef struct SRSmaInfo SRSmaInfo;
|
||||||
|
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
||||||
|
|
||||||
struct SSmaEnv {
|
struct SSmaEnv {
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
|
||||||
|
// functions for external invocation
|
||||||
|
|
||||||
// TODO: Who is responsible for resource allocate and release?
|
// TODO: Who is responsible for resource allocate and release?
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -45,6 +47,9 @@ int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// functions for internal invocation
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -208,7 +208,6 @@ int32_t tdUnLockSma(SSma *pSma) {
|
||||||
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
SSmaEnv *pEnv = NULL;
|
SSmaEnv *pEnv = NULL;
|
||||||
|
|
||||||
// return if already init
|
|
||||||
switch (smaType) {
|
switch (smaType) {
|
||||||
case TSDB_SMA_TYPE_TIME_RANGE:
|
case TSDB_SMA_TYPE_TIME_RANGE:
|
||||||
if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
|
if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
|
||||||
|
@ -244,3 +243,34 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
int32_t smaTimerInit(void **timer, int8_t *initFlag, const char *label) {
|
||||||
|
int8_t old;
|
||||||
|
while (1) {
|
||||||
|
old = atomic_val_compare_exchange_8(initFlag, 0, 2);
|
||||||
|
if (old != 2) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (old == 0) {
|
||||||
|
*timer = taosTmrInit(10000, 100, 10000, label);
|
||||||
|
if (!(*timer)) {
|
||||||
|
atomic_store_8(initFlag, 0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
atomic_store_8(initFlag, 1);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void smaTimerCleanUp(void *timer, int8_t *initFlag) {
|
||||||
|
int8_t old;
|
||||||
|
while (1) {
|
||||||
|
old = atomic_val_compare_exchange_8(initFlag, 1, 2);
|
||||||
|
if (old != 2) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (old == 1) {
|
||||||
|
taosTmrCleanUp(timer);
|
||||||
|
atomic_store_8(initFlag, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,14 +14,36 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||||
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||||
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo,
|
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
||||||
STSchema *pTSchema, tb_uid_t suid, int8_t level);
|
tb_uid_t suid, int8_t level);
|
||||||
|
|
||||||
|
struct SRSmaInfoItem {
|
||||||
|
SRSmaInfo *pRsmaInfo;
|
||||||
|
void *taskInfo; // qTaskInfo_t
|
||||||
|
void *tmrHandle;
|
||||||
|
tmr_h tmrId;
|
||||||
|
int8_t level;
|
||||||
|
int8_t tmrInitFlag;
|
||||||
|
int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
|
||||||
|
int32_t maxDelay;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t suid;
|
||||||
|
SRSmaInfoItem *pItem;
|
||||||
|
SSma *pSma;
|
||||||
|
STSchema *pTSchema;
|
||||||
|
} SRSmaTriggerParam;
|
||||||
|
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
STSchema *pTSchema;
|
||||||
|
SSma *pSma;
|
||||||
|
int64_t suid;
|
||||||
|
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||||
};
|
};
|
||||||
|
|
||||||
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
||||||
|
@ -33,11 +55,20 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
|
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
if (pInfo) {
|
||||||
if (pInfo->taskInfo[i]) {
|
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||||
tdFreeTaskHandle(pInfo->taskInfo[i]);
|
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||||
|
if (pItem->taskInfo) {
|
||||||
|
tdFreeTaskHandle(pItem->taskInfo);
|
||||||
|
}
|
||||||
|
if (pItem->tmrHandle) {
|
||||||
|
taosTmrCleanUp(pItem->tmrHandle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pInfo->pTSchema);
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,20 +100,20 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) {
|
if (pRSmaInfo->items[0].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) {
|
||||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
|
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
|
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
|
||||||
pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
|
pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) {
|
if (pRSmaInfo->items[1].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) {
|
||||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
|
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
|
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
|
||||||
pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
|
pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -144,12 +175,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
||||||
ASSERT(ppStore != NULL);
|
ASSERT(ppStore != NULL);
|
||||||
|
|
||||||
if (!(*ppStore)) {
|
if (!(*ppStore)) {
|
||||||
if (tdUidStoreInit(ppStore) != 0) {
|
if (tdUidStoreInit(ppStore) < 0) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdUidStorePut(*ppStore, suid, &uid) != 0) {
|
if (tdUidStorePut(*ppStore, suid, &uid) < 0) {
|
||||||
*ppStore = tdUidStoreFree(*ppStore);
|
*ppStore = tdUidStoreFree(*ppStore);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -172,8 +203,8 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMeta *pMeta = pVnode->pMeta;
|
SMeta *pMeta = pVnode->pMeta;
|
||||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||||
SRSmaParam *param = &pReq->pRSmaParam;
|
SRSmaParam *param = &pReq->pRSmaParam;
|
||||||
|
|
||||||
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
|
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
|
||||||
|
@ -192,10 +223,12 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
|
|
||||||
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
|
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
|
||||||
if (pRSmaInfo) {
|
if (pRSmaInfo) {
|
||||||
|
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
|
||||||
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// from write queue: single thead
|
||||||
pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
|
pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
|
||||||
if (!pRSmaInfo) {
|
if (!pRSmaInfo) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -204,9 +237,8 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
|
|
||||||
STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
|
STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
|
||||||
if (!pReadHandle) {
|
if (!pReadHandle) {
|
||||||
taosMemoryFree(pRSmaInfo);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
|
@ -216,32 +248,58 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
.vnode = pVnode,
|
.vnode = pVnode,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), pReq->suid, -1);
|
||||||
|
if (!pTSchema) {
|
||||||
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pRSmaInfo->pTSchema = pTSchema;
|
||||||
|
pRSmaInfo->pSma = pSma;
|
||||||
|
pRSmaInfo->suid = pReq->suid;
|
||||||
|
|
||||||
if (param->qmsg1) {
|
if (param->qmsg1) {
|
||||||
pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
|
pRSmaInfo->items[0].pRsmaInfo = pRSmaInfo;
|
||||||
if (!pRSmaInfo->taskInfo[0]) {
|
pRSmaInfo->items[0].taskInfo = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
|
||||||
taosMemoryFree(pRSmaInfo);
|
if (!pRSmaInfo->items[0].taskInfo) {
|
||||||
taosMemoryFree(pReadHandle);
|
goto _err;
|
||||||
return TSDB_CODE_FAILED;
|
}
|
||||||
|
pRSmaInfo->items[0].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
|
||||||
|
pRSmaInfo->items[0].maxDelay = 5000;
|
||||||
|
pRSmaInfo->items[0].level = TSDB_RETENTION_L1;
|
||||||
|
pRSmaInfo->items[0].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L1");
|
||||||
|
|
||||||
|
if (!pRSmaInfo->items[0].tmrHandle) {
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (param->qmsg2) {
|
if (param->qmsg2) {
|
||||||
pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
|
pRSmaInfo->items[1].pRsmaInfo = pRSmaInfo;
|
||||||
if (!pRSmaInfo->taskInfo[1]) {
|
pRSmaInfo->items[1].taskInfo = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
|
||||||
taosMemoryFree(pRSmaInfo);
|
if (!pRSmaInfo->items[1].taskInfo) {
|
||||||
taosMemoryFree(pReadHandle);
|
goto _err;
|
||||||
return TSDB_CODE_FAILED;
|
}
|
||||||
|
pRSmaInfo->items[1].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
|
||||||
|
pRSmaInfo->items[1].maxDelay = 5000;
|
||||||
|
pRSmaInfo->items[1].level = TSDB_RETENTION_L2;
|
||||||
|
pRSmaInfo->items[1].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA_L2");
|
||||||
|
if (!pRSmaInfo->items[1].tmrHandle) {
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
|
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
|
||||||
TSDB_CODE_SUCCESS) {
|
TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
|
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
tdFreeRSmaInfo(pRSmaInfo);
|
||||||
|
taosMemoryFree(pReadHandle);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -291,12 +349,12 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid)
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) {
|
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) < 0) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) {
|
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) < 0) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,22 +425,15 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo,
|
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
|
||||||
STSchema *pTSchema, tb_uid_t suid, int8_t level) {
|
SArray *pResult = NULL;
|
||||||
SArray *pResult = NULL;
|
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
|
||||||
|
SSma *pSma = pRSmaInfo->pSma;
|
||||||
|
|
||||||
if (!taskInfo) {
|
|
||||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid);
|
|
||||||
|
|
||||||
qSetStreamInput(taskInfo, pMsg, inputType, true);
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock *output = NULL;
|
SSDataBlock *output = NULL;
|
||||||
uint64_t ts;
|
uint64_t ts;
|
||||||
if (qExecTask(taskInfo, &output, &ts) < 0) {
|
if (qExecTask(pItem->taskInfo, &output, &ts) < 0) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
if (!output) {
|
if (!output) {
|
||||||
|
@ -400,18 +451,18 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pResult) > 0) {
|
if (taosArrayGetSize(pResult) > 0) {
|
||||||
#if 0
|
#if 1
|
||||||
char flag[10] = {0};
|
char flag[10] = {0};
|
||||||
snprintf(flag, 10, "level %" PRIi8, level);
|
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
||||||
blockDebugShowData(pResult, flag);
|
blockDebugShowData(pResult, flag);
|
||||||
#endif
|
#endif
|
||||||
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
|
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
|
||||||
SSubmitReq *pReq = NULL;
|
SSubmitReq *pReq = NULL;
|
||||||
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
|
if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) {
|
||||||
taosArrayDestroy(pResult);
|
taosArrayDestroy(pResult);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
|
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
|
||||||
taosArrayDestroy(pResult);
|
taosArrayDestroy(pResult);
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
|
@ -420,10 +471,63 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blkType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
|
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pResult);
|
taosArrayDestroy(pResult);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief trigger to get rsma result
|
||||||
|
*
|
||||||
|
* @param param
|
||||||
|
* @param tmrId
|
||||||
|
*/
|
||||||
|
static void rsmaTriggerByTimer(void *param, void *tmrId) {
|
||||||
|
// SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param;
|
||||||
|
// SRSmaInfoItem *pItem = pParam->pItem;
|
||||||
|
SRSmaInfoItem *pItem = param;
|
||||||
|
|
||||||
|
if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
|
printf("%s:%d THREAD:%" PRIi64 " status = active\n", __func__, __LINE__, taosGetSelfPthreadId());
|
||||||
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
|
|
||||||
|
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
|
||||||
|
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
|
||||||
|
|
||||||
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||||
|
} else {
|
||||||
|
printf("%s:%d THREAD:%" PRIi64 " status = in active\n", __func__, __LINE__, taosGetSelfPthreadId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
|
||||||
|
tb_uid_t suid, int8_t level) {
|
||||||
|
if (!pItem || !pItem->taskInfo) {
|
||||||
|
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
||||||
|
pItem->taskInfo, suid);
|
||||||
|
|
||||||
|
// inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1)
|
||||||
|
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) {
|
||||||
|
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema};
|
||||||
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||||
|
atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -441,24 +545,18 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
|
||||||
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
|
|
||||||
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||||
smaDebug("vgId:%d, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
if (!pRSmaInfo->taskInfo[0]) {
|
|
||||||
smaDebug("vgId:%d, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
if (!pRSmaInfo->items[0].taskInfo) {
|
||||||
|
smaDebug("vgId:%d, return as no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
// TODO: cache STSchema
|
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1);
|
||||||
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
|
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2);
|
||||||
if (!pTSchema) {
|
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, TSDB_RETENTION_L1);
|
|
||||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, TSDB_RETENTION_L2);
|
|
||||||
taosMemoryFree(pTSchema);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -346,7 +346,10 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdProcessRSmaCreate(pVnode, &req);
|
if (tdProcessRSmaCreate(pVnode, &req) < 0) {
|
||||||
|
pRsp->code = terrno;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&coder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -41,12 +41,18 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
pInfo->assignBlockUid = assignUid;
|
pInfo->assignBlockUid = assignUid;
|
||||||
|
|
||||||
// the block type can not be changed in the streamscan operators
|
// the block type can not be changed in the streamscan operators
|
||||||
|
#if 0
|
||||||
if (pInfo->blockType == 0) {
|
if (pInfo->blockType == 0) {
|
||||||
pInfo->blockType = type;
|
pInfo->blockType = type;
|
||||||
} else if (pInfo->blockType != type) {
|
} else if (pInfo->blockType != type) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
// rollup sma, the same qTaskInfo is used to insert data by SubmitReq and fetch result by SSDataBlock
|
||||||
|
if (pInfo->blockType != type) {
|
||||||
|
pInfo->blockType = type;
|
||||||
|
}
|
||||||
|
|
||||||
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
|
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
|
||||||
|
|
|
@ -5,7 +5,7 @@ sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print =============== create database with retentions
|
print =============== create database with retentions
|
||||||
sql create database d0 retentions 15s:7d,1m:21d,15m:365d;
|
sql create database d0 retentions 5s:7d,10s:21d,15s:365d;
|
||||||
sql use d0
|
sql use d0
|
||||||
|
|
||||||
print =============== create super table and register rsma
|
print =============== create super table and register rsma
|
||||||
|
@ -29,6 +29,8 @@ sql insert into ct1 values(now, 10);
|
||||||
sql insert into ct1 values(now+1s, 1);
|
sql insert into ct1 values(now+1s, 1);
|
||||||
sql insert into ct1 values(now+2s, 100);
|
sql insert into ct1 values(now+2s, 100);
|
||||||
|
|
||||||
|
print =============== wait maxdelay 15+1 seconds for results
|
||||||
|
sleep 16000
|
||||||
|
|
||||||
print =============== select * from retention level 2 from memory
|
print =============== select * from retention level 2 from memory
|
||||||
sql select * from ct1;
|
sql select * from ct1;
|
||||||
|
|
Loading…
Reference in New Issue