Merge pull request #15087 from taosdata/feature/TD-11274-3.0
enh: rsma drop and code optimization
This commit is contained in:
commit
8910acc4ba
|
@ -105,8 +105,13 @@ struct SRSmaInfoItem {
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
|
int8_t delFlag;
|
||||||
|
T_REF_DECLARE()
|
||||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||||
};
|
};
|
||||||
|
#define RSMA_INFO_HEAD_LEN 24
|
||||||
|
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||||
|
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_TRIGGER_STAT_INIT = 0,
|
TASK_TRIGGER_STAT_INIT = 0,
|
||||||
|
@ -120,8 +125,8 @@ enum {
|
||||||
enum {
|
enum {
|
||||||
RSMA_ROLE_CREATE = 0,
|
RSMA_ROLE_CREATE = 0,
|
||||||
RSMA_ROLE_DROP = 1,
|
RSMA_ROLE_DROP = 1,
|
||||||
RSMA_ROLE_FETCH = 2,
|
RSMA_ROLE_SUBMIT = 2,
|
||||||
RSMA_ROLE_SUBMIT = 3,
|
RSMA_ROLE_FETCH = 3,
|
||||||
RSMA_ROLE_ITERATE = 4,
|
RSMA_ROLE_ITERATE = 4,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -134,6 +139,8 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
|
||||||
|
|
||||||
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
||||||
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
||||||
|
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
|
||||||
|
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
|
||||||
|
|
||||||
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
||||||
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
||||||
|
@ -193,6 +200,7 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t le
|
||||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
||||||
|
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
||||||
|
|
||||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||||
|
@ -258,8 +266,9 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
void tdCloseTFile(STFile *pTFile);
|
void tdCloseTFile(STFile *pTFile);
|
||||||
void tdDestroyTFile(STFile *pTFile);
|
void tdDestroyTFile(STFile *pTFile);
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName);
|
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||||
void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, bool endWithSep, char *outputName);
|
char *outputName);
|
||||||
|
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,7 +191,7 @@ static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
|
||||||
if ((pDir = taosOpenDir(dir)) == NULL) {
|
if ((pDir = taosOpenDir(dir)) == NULL) {
|
||||||
regfree(®ex);
|
regfree(®ex);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
smaDebug("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,5 +392,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
||||||
// step 2: cleanup outdated qtaskinfo files
|
// step 2: cleanup outdated qtaskinfo files
|
||||||
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
|
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
|
||||||
|
|
||||||
|
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,6 +169,26 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||||
|
if (!pRSmaInfo) return 0;
|
||||||
|
|
||||||
|
int ref = T_REF_INC(pRSmaInfo);
|
||||||
|
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||||
|
if (!pRSmaInfo) return 0;
|
||||||
|
|
||||||
|
int ref = T_REF_DEC(pRSmaInfo);
|
||||||
|
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
||||||
|
|
||||||
|
if (ref == 0) {
|
||||||
|
tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
|
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
|
||||||
ASSERT(pSmaStat != NULL);
|
ASSERT(pSmaStat != NULL);
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ SSmaMgmt smaMgmt = {
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
|
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
|
||||||
|
#define TD_RSMAINFO_DEL_FILE "rsmainfo.del"
|
||||||
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||||
|
|
||||||
|
@ -48,14 +49,11 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
|
||||||
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
|
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
||||||
// adapt accordingly if definition of SRSmaInfo update
|
// adapt accordingly if definition of SRSmaInfo update
|
||||||
SRSmaInfo *pResult = NULL;
|
SRSmaInfo *pResult = NULL;
|
||||||
int32_t rsmaInfoHeadLen = sizeof(int64_t) + sizeof(STSchema *);
|
|
||||||
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
||||||
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + rsmaInfoHeadLen));
|
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN));
|
||||||
ASSERT(pResult->pTSchema->numOfCols > 1);
|
ASSERT(pResult->pTSchema->numOfCols > 1);
|
||||||
return pResult;
|
return pResult;
|
||||||
}
|
}
|
||||||
|
@ -116,8 +114,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
||||||
SRSmaInfoItem *pItem = &pInfo->items[i];
|
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||||
if (pItem->taskInfo) {
|
if (pItem->taskInfo) {
|
||||||
if (isDeepFree && pItem->tmrId) {
|
if (isDeepFree && pItem->tmrId) {
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
|
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
|
||||||
i + 1);
|
pItem->tmrId, i + 1);
|
||||||
taosTmrStopA(&pItem->tmrId);
|
taosTmrStopA(&pItem->tmrId);
|
||||||
}
|
}
|
||||||
tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
||||||
|
@ -337,6 +335,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
}
|
}
|
||||||
pRSmaInfo->pTSchema = pTSchema;
|
pRSmaInfo->pTSchema = pTSchema;
|
||||||
pRSmaInfo->suid = suid;
|
pRSmaInfo->suid = suid;
|
||||||
|
T_REF_INIT_VAL(pRSmaInfo, 1);
|
||||||
|
|
||||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -392,11 +391,33 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
|
||||||
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
if (!VND_IS_RSMA(pVnode)) {
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
||||||
pReq->suid);
|
pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||||
|
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||||
|
|
||||||
|
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pReq->suid);
|
||||||
|
|
||||||
|
if (!pRSmaInfo) {
|
||||||
|
smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
|
||||||
|
pReq->suid);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set del flag for data in mem
|
||||||
|
RSMA_INFO_SET_DEL(pRSmaInfo);
|
||||||
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
|
// save to file
|
||||||
|
|
||||||
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -661,7 +682,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
if (!pEnv) {
|
if (!pEnv) {
|
||||||
// only applicable when rsma env exists
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -683,6 +703,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
SRSmaInfo *pCowRSmaInfo = NULL;
|
SRSmaInfo *pCowRSmaInfo = NULL;
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||||
|
if (!taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t))) { // 2-phase lock
|
||||||
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
if (iRSmaInfo) {
|
if (iRSmaInfo) {
|
||||||
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
||||||
|
@ -692,32 +713,69 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
|
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
|
||||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// unlock
|
// unlock
|
||||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||||
return pCowRSmaInfo;
|
return pCowRSmaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief During the drop procedure, only need to delete the object in rsmaInfoHash.
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @param suid
|
||||||
|
* @return SRSmaInfo*
|
||||||
|
*/
|
||||||
|
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
SRSmaStat *pStat = NULL;
|
||||||
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
|
if (!pEnv) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
|
if (!pStat || !RSMA_INFO_HASH(pStat)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
|
if (pRSmaInfo) {
|
||||||
|
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||||
|
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||||
|
}
|
||||||
|
taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
|
smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid);
|
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid);
|
||||||
if (!pRSmaInfo) {
|
if (!pRSmaInfo) {
|
||||||
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pRSmaInfo->items[0].taskInfo) {
|
if (!pRSmaInfo->items[0].taskInfo) {
|
||||||
smaDebug("vgId:%d, return as no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
|
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
|
||||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
|
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
|
||||||
|
|
||||||
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1271,6 +1329,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
||||||
|
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
int8_t fetchTriggerStat =
|
int8_t fetchTriggerStat =
|
||||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||||
|
@ -1279,13 +1340,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
||||||
pItem->level, pRSmaInfo->suid);
|
pItem->level, pRSmaInfo->suid);
|
||||||
|
|
||||||
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
// sync procedure => async process
|
||||||
|
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
||||||
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
} break;
|
} break;
|
||||||
case TASK_TRIGGER_STAT_PAUSED: {
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
||||||
|
|
Loading…
Reference in New Issue