enh: rsma code optimization
This commit is contained in:
parent
bf9b2ef8f0
commit
963bc989e3
|
@ -2659,14 +2659,16 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t refId;
|
||||
uint64_t handle;
|
||||
int64_t suid;
|
||||
int8_t level;
|
||||
} SRSmaFetchMsg;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pCoder, pReq->refId) < 0) return -1;
|
||||
if (tEncodeU64(pCoder, pReq->handle) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -2676,7 +2678,8 @@ static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg
|
|||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1;
|
||||
if (tDecodeU64(pCoder, &pReq->handle) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
|
|
|
@ -25,9 +25,9 @@ extern "C" {
|
|||
// reference counting
|
||||
typedef void (*_ref_fn_t)(const void *pObj);
|
||||
|
||||
#define T_REF_DECLARE() \
|
||||
struct { \
|
||||
int32_t val; \
|
||||
#define T_REF_DECLARE() \
|
||||
struct { \
|
||||
volatile int32_t val; \
|
||||
} _ref;
|
||||
|
||||
#define T_REF_REGISTER_FUNC(s, e) \
|
||||
|
|
|
@ -115,8 +115,6 @@ struct SSmaStat {
|
|||
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
||||
|
||||
struct SRSmaInfoItem {
|
||||
void *taskInfo; // qTaskInfo_t
|
||||
int64_t refId;
|
||||
tmr_h tmrId;
|
||||
int32_t maxDelay;
|
||||
int8_t level;
|
||||
|
@ -126,13 +124,20 @@ struct SRSmaInfoItem {
|
|||
struct SRSmaInfo {
|
||||
STSchema *pTSchema;
|
||||
int64_t suid;
|
||||
int64_t refId; // refId of SRSmaStat
|
||||
int8_t delFlag;
|
||||
T_REF_DECLARE()
|
||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
||||
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable
|
||||
};
|
||||
#define RSMA_INFO_HEAD_LEN 24
|
||||
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||
|
||||
#define RSMA_INFO_HEAD_LEN 32
|
||||
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
|
||||
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
|
||||
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
|
||||
|
||||
enum {
|
||||
TASK_TRIGGER_STAT_INIT = 0,
|
||||
|
@ -223,7 +228,7 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
|
|||
|
||||
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
|
||||
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc);
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
|
||||
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
|
|
|
@ -32,13 +32,11 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *ui
|
|||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||
int8_t idx);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
||||
STSchema *pTSchema, tb_uid_t suid, int8_t level);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, int8_t level);
|
||||
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||
int8_t blkType);
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
static void tdRSmaFetchTrigger2(void *param, void *tmrId);
|
||||
|
||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
||||
|
@ -116,17 +114,26 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
|||
if (pInfo) {
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||
if (pItem->taskInfo) {
|
||||
if (isDeepFree && pItem->tmrId) {
|
||||
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
|
||||
pItem->tmrId, i + 1);
|
||||
taosTmrStopA(&pItem->tmrId);
|
||||
}
|
||||
tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
||||
|
||||
if (isDeepFree && pItem->tmrId) {
|
||||
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
|
||||
pItem->tmrId, i + 1);
|
||||
taosTmrStopA(&pItem->tmrId);
|
||||
}
|
||||
|
||||
if (isDeepFree && pInfo->taskInfo[i]) {
|
||||
tdFreeQTaskInfo(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
|
||||
} else {
|
||||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
|
||||
pInfo->suid, i + 1);
|
||||
}
|
||||
|
||||
if (pInfo->iTaskInfo[i]) {
|
||||
tdFreeQTaskInfo(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
|
||||
} else {
|
||||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
|
||||
SMA_VID(pSma), pInfo->suid, i + 1);
|
||||
}
|
||||
}
|
||||
if (isDeepFree) {
|
||||
taosMemoryFreeClear(pInfo->pTSchema);
|
||||
|
@ -156,6 +163,11 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (!taosArrayGetSize(tbUids)) {
|
||||
smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, *suid);
|
||||
|
||||
if (!pRSmaInfo) {
|
||||
|
@ -164,23 +176,16 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (pRSmaInfo->items[0].taskInfo) {
|
||||
if ((qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) {
|
||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
} else {
|
||||
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
|
||||
pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
|
||||
}
|
||||
}
|
||||
|
||||
if (pRSmaInfo->items[1].taskInfo) {
|
||||
if ((qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) {
|
||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
} else {
|
||||
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
|
||||
pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pRSmaInfo->taskInfo[i]) {
|
||||
if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) {
|
||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
|
||||
terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
} else {
|
||||
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d",
|
||||
SMA_VID(pSma), pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,13 +273,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
|||
.initTqReader = 1,
|
||||
};
|
||||
|
||||
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
||||
pItem->refId = RSMA_REF_ID(pStat);
|
||||
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
|
||||
if (!pItem->taskInfo) {
|
||||
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
|
||||
if (!pRSmaInfo->taskInfo[idx]) {
|
||||
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
||||
pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE;
|
||||
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||
int64_t msInterval =
|
||||
|
@ -343,6 +347,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
}
|
||||
pRSmaInfo->pTSchema = pTSchema;
|
||||
pRSmaInfo->suid = suid;
|
||||
pRSmaInfo->refId = RSMA_REF_ID(pStat);
|
||||
T_REF_INIT_VAL(pRSmaInfo, 1);
|
||||
|
||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||
|
@ -592,7 +597,7 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||
int8_t blkType) {
|
||||
SArray *pResult = NULL;
|
||||
SSma *pSma = pStat->pSma;
|
||||
|
@ -601,7 +606,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
|
|||
SSDataBlock *output = NULL;
|
||||
uint64_t ts;
|
||||
|
||||
int32_t code = qExecTask(pItem->taskInfo, &output, &ts);
|
||||
int32_t code = qExecTask(taskInfo, &output, &ts);
|
||||
if (code < 0) {
|
||||
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
|
||||
pItem->level, terrstr(code));
|
||||
|
@ -662,29 +667,32 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
|
||||
STSchema *pTSchema, tb_uid_t suid, int8_t level) {
|
||||
if (!pItem || !pItem->taskInfo) {
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
|
||||
int8_t level) {
|
||||
int32_t idx = level - 1;
|
||||
if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (!pTSchema) {
|
||||
if (!pInfo->pTSchema) {
|
||||
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
pItem->taskInfo, suid);
|
||||
RSMA_INFO_QTASK(pInfo, idx), suid);
|
||||
|
||||
if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
|
||||
if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
|
||||
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
|
||||
|
||||
tdRSmaFetchAndSubmitResult(pItem, pTSchema, suid, pStat, STREAM_INPUT__DATA_SUBMIT);
|
||||
tdRSmaFetchAndSubmitResult(RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, pStat,
|
||||
STREAM_INPUT__DATA_SUBMIT);
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
||||
if (smaMgmt.tmrHandle) {
|
||||
|
@ -735,7 +743,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
if (iRSmaInfo) {
|
||||
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
||||
if (pIRSmaInfo) {
|
||||
if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) {
|
||||
if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||
|
@ -780,6 +788,9 @@ void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
return;
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||
if (pRSmaInfo) {
|
||||
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||
|
@ -788,6 +799,8 @@ void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||
smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid);
|
||||
}
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
}
|
||||
|
||||
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
|
@ -797,7 +810,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!pRSmaInfo->items[0].taskInfo) {
|
||||
if (!RSMA_INFO_QTASK(pRSmaInfo,0)) {
|
||||
smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -805,8 +818,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
|
|||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1);
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2);
|
||||
|
||||
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||
}
|
||||
|
@ -1028,9 +1041,9 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
|
|||
}
|
||||
|
||||
if (pItem->type == TSDB_RETENTION_L1) {
|
||||
qTaskInfo = pRSmaInfo->items[0].taskInfo;
|
||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0);
|
||||
} else if (pItem->type == TSDB_RETENTION_L2) {
|
||||
qTaskInfo = pRSmaInfo->items[1].taskInfo;
|
||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -1227,7 +1240,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
while (infoHash) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo;
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
|
||||
if (!taskInfo) {
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
||||
continue;
|
||||
|
@ -1312,6 +1325,7 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief trigger to get rsma result
|
||||
*
|
||||
|
@ -1395,6 +1409,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
_end:
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief trigger to get rsma result
|
||||
|
@ -1402,14 +1417,20 @@ _end:
|
|||
* @param param
|
||||
* @param tmrId
|
||||
*/
|
||||
static void tdRSmaFetchTrigger2(void *param, void *tmrId) {
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||
SRSmaInfoItem *pItem = param;
|
||||
SSma *pSma = NULL;
|
||||
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
||||
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__);
|
||||
|
||||
if (!pStat) {
|
||||
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
|
||||
pItem->refId);
|
||||
pRSmaInfo->refId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1420,10 +1441,10 @@ static void tdRSmaFetchTrigger2(void *param, void *tmrId) {
|
|||
switch (rsmaTriggerStat) {
|
||||
case TASK_TRIGGER_STAT_PAUSED:
|
||||
case TASK_TRIGGER_STAT_CANCELLED: {
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__);
|
||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
|
||||
" refId:%d",
|
||||
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
|
||||
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
|
||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
|
||||
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle,
|
||||
&pItem->tmrId);
|
||||
|
@ -1434,11 +1455,6 @@ static void tdRSmaFetchTrigger2(void *param, void *tmrId) {
|
|||
break;
|
||||
}
|
||||
|
||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
int8_t fetchTriggerStat =
|
||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||
switch (fetchTriggerStat) {
|
||||
|
@ -1450,9 +1466,11 @@ static void tdRSmaFetchTrigger2(void *param, void *tmrId) {
|
|||
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||
|
||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
||||
tdCleanupStreamInputDataBlock(pItem->taskInfo);
|
||||
qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1];
|
||||
qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||
tdRSmaFetchAndSubmitResult(taskInfo, pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat,
|
||||
STREAM_INPUT__DATA_BLOCK);
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
|
||||
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
@ -1477,5 +1495,5 @@ static void tdRSmaFetchTrigger2(void *param, void *tmrId) {
|
|||
}
|
||||
|
||||
_end:
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__);
|
||||
}
|
||||
|
|
|
@ -349,29 +349,20 @@ _err:
|
|||
|
||||
/**
|
||||
* @brief pTSchema is shared
|
||||
*
|
||||
* @param pSma
|
||||
* @param pDest
|
||||
* @param pSrc
|
||||
* @return int32_t
|
||||
*
|
||||
* @param pSma
|
||||
* @param pDest
|
||||
* @param pSrc
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SRSmaParam *param = NULL;
|
||||
if (!pSrc) {
|
||||
*pDest = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!pDest) {
|
||||
pDest = taosMemoryCalloc(1, sizeof(SRSmaInfo));
|
||||
if (!pDest) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
memcpy(pDest, pSrc, sizeof(SRSmaInfo));
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
|
||||
|
@ -384,21 +375,20 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
|
|||
ASSERT(mr.me.uid == pSrc->suid);
|
||||
if (TABLE_IS_ROLLUP(mr.me.flags)) {
|
||||
param = &mr.me.stbEntry.rsmaParam;
|
||||
for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
SRSmaInfoItem *pItem = &pSrc->items[i];
|
||||
if (pItem->taskInfo) {
|
||||
tdCloneQTaskInfo(pSma, pDest->items[i].taskInfo, pItem->taskInfo, param, pSrc->suid, i);
|
||||
}
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i);
|
||||
}
|
||||
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
||||
*pDest = pSrc; // pointer copy
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
*pDest = NULL;
|
||||
metaReaderClear(&mr);
|
||||
tdFreeRSmaInfo(pSma, pDest, false);
|
||||
// tdFreeRSmaInfo(pSma, pDest, false);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// ...
|
||||
}
|
Loading…
Reference in New Issue