enh: rsma drop and code optimization
This commit is contained in:
parent
2204a770ab
commit
156e72b071
|
@ -43,9 +43,9 @@ typedef struct SRSmaInfo SRSmaInfo;
|
||||||
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
||||||
|
|
||||||
struct SSmaEnv {
|
struct SSmaEnv {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
SSmaStat *pStat;
|
SSmaStat *pStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -103,10 +103,15 @@ struct SRSmaInfoItem {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
|
int8_t delFlag;
|
||||||
|
T_REF_DECLARE()
|
||||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||||
};
|
};
|
||||||
|
#define RSMA_INFO_HEAD_LEN 24
|
||||||
|
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||||
|
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_TRIGGER_STAT_INIT = 0,
|
TASK_TRIGGER_STAT_INIT = 0,
|
||||||
|
@ -120,8 +125,8 @@ enum {
|
||||||
enum {
|
enum {
|
||||||
RSMA_ROLE_CREATE = 0,
|
RSMA_ROLE_CREATE = 0,
|
||||||
RSMA_ROLE_DROP = 1,
|
RSMA_ROLE_DROP = 1,
|
||||||
RSMA_ROLE_FETCH = 2,
|
RSMA_ROLE_SUBMIT = 2,
|
||||||
RSMA_ROLE_SUBMIT = 3,
|
RSMA_ROLE_FETCH = 3,
|
||||||
RSMA_ROLE_ITERATE = 4,
|
RSMA_ROLE_ITERATE = 4,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -134,6 +139,8 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
|
||||||
|
|
||||||
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
||||||
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
|
||||||
|
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
|
||||||
|
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
|
||||||
|
|
||||||
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
||||||
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
|
||||||
|
@ -193,6 +200,7 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t le
|
||||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
||||||
|
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
||||||
|
|
||||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||||
|
@ -258,8 +266,9 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
void tdCloseTFile(STFile *pTFile);
|
void tdCloseTFile(STFile *pTFile);
|
||||||
void tdDestroyTFile(STFile *pTFile);
|
void tdDestroyTFile(STFile *pTFile);
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName);
|
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||||
void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, bool endWithSep, char *outputName);
|
char *outputName);
|
||||||
|
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -277,7 +277,7 @@ _drop_super_table:
|
||||||
_exit:
|
_exit:
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
metaDebug("vgId:%d, super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
metaDebug("vgId:%d, super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,7 @@ static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
|
||||||
if ((pDir = taosOpenDir(dir)) == NULL) {
|
if ((pDir = taosOpenDir(dir)) == NULL) {
|
||||||
regfree(®ex);
|
regfree(®ex);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
smaDebug("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,5 +392,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
||||||
// step 2: cleanup outdated qtaskinfo files
|
// step 2: cleanup outdated qtaskinfo files
|
||||||
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
|
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
|
||||||
|
|
||||||
|
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,6 +169,26 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||||
|
if (!pRSmaInfo) return 0;
|
||||||
|
|
||||||
|
int ref = T_REF_INC(pRSmaInfo);
|
||||||
|
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||||
|
if (!pRSmaInfo) return 0;
|
||||||
|
|
||||||
|
int ref = T_REF_DEC(pRSmaInfo);
|
||||||
|
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
||||||
|
|
||||||
|
if (ref == 0) {
|
||||||
|
tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
|
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
|
||||||
ASSERT(pSmaStat != NULL);
|
ASSERT(pSmaStat != NULL);
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ SSmaMgmt smaMgmt = {
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
|
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
|
||||||
|
#define TD_RSMAINFO_DEL_FILE "rsmainfo.del"
|
||||||
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||||
|
|
||||||
|
@ -48,14 +49,11 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
|
||||||
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
|
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
||||||
// adapt accordingly if definition of SRSmaInfo update
|
// adapt accordingly if definition of SRSmaInfo update
|
||||||
SRSmaInfo *pResult = NULL;
|
SRSmaInfo *pResult = NULL;
|
||||||
int32_t rsmaInfoHeadLen = sizeof(int64_t) + sizeof(STSchema *);
|
|
||||||
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
||||||
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + rsmaInfoHeadLen));
|
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN));
|
||||||
ASSERT(pResult->pTSchema->numOfCols > 1);
|
ASSERT(pResult->pTSchema->numOfCols > 1);
|
||||||
return pResult;
|
return pResult;
|
||||||
}
|
}
|
||||||
|
@ -116,8 +114,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
||||||
SRSmaInfoItem *pItem = &pInfo->items[i];
|
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||||
if (pItem->taskInfo) {
|
if (pItem->taskInfo) {
|
||||||
if (isDeepFree && pItem->tmrId) {
|
if (isDeepFree && pItem->tmrId) {
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
|
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
|
||||||
i + 1);
|
pItem->tmrId, i + 1);
|
||||||
taosTmrStopA(&pItem->tmrId);
|
taosTmrStopA(&pItem->tmrId);
|
||||||
}
|
}
|
||||||
tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
||||||
|
@ -337,6 +335,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
}
|
}
|
||||||
pRSmaInfo->pTSchema = pTSchema;
|
pRSmaInfo->pTSchema = pTSchema;
|
||||||
pRSmaInfo->suid = suid;
|
pRSmaInfo->suid = suid;
|
||||||
|
T_REF_INIT_VAL(pRSmaInfo, 1);
|
||||||
|
|
||||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -392,11 +391,33 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
|
||||||
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
if (!VND_IS_RSMA(pVnode)) {
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
||||||
pReq->suid);
|
pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||||
|
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||||
|
|
||||||
|
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pReq->suid);
|
||||||
|
|
||||||
|
if (!pRSmaInfo) {
|
||||||
|
smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
|
||||||
|
pReq->suid);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set del flag for data in mem
|
||||||
|
RSMA_INFO_SET_DEL(pRSmaInfo);
|
||||||
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
|
// save to file
|
||||||
|
|
||||||
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -650,10 +671,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
|
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
|
||||||
*
|
*
|
||||||
* @param pSma
|
* @param pSma
|
||||||
* @param suid
|
* @param suid
|
||||||
* @return SRSmaInfo*
|
* @return SRSmaInfo*
|
||||||
*/
|
*/
|
||||||
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
@ -661,7 +682,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
if (!pEnv) {
|
if (!pEnv) {
|
||||||
// only applicable when rsma env exists
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -683,18 +703,21 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
SRSmaInfo *pCowRSmaInfo = NULL;
|
SRSmaInfo *pCowRSmaInfo = NULL;
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||||
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
if (!taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t))) { // 2-phase lock
|
||||||
if (iRSmaInfo) {
|
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
if (iRSmaInfo) {
|
||||||
if (pIRSmaInfo) {
|
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
||||||
if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) {
|
if (pIRSmaInfo) {
|
||||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) {
|
||||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||||
return NULL;
|
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||||
}
|
return NULL;
|
||||||
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
|
}
|
||||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return NULL;
|
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
|
||||||
|
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -703,21 +726,56 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
return pCowRSmaInfo;
|
return pCowRSmaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief During the drop procedure, only need to delete the object in rsmaInfoHash.
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @param suid
|
||||||
|
* @return SRSmaInfo*
|
||||||
|
*/
|
||||||
|
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
||||||
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
SRSmaStat *pStat = NULL;
|
||||||
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
|
||||||
|
if (!pEnv) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
|
if (!pStat || !RSMA_INFO_HASH(pStat)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
|
if (pRSmaInfo) {
|
||||||
|
if (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo) {
|
||||||
|
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||||
|
}
|
||||||
|
taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||||
|
smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid);
|
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid);
|
||||||
if (!pRSmaInfo) {
|
if (!pRSmaInfo) {
|
||||||
smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pRSmaInfo->items[0].taskInfo) {
|
if (!pRSmaInfo->items[0].taskInfo) {
|
||||||
smaDebug("vgId:%d, return as no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
|
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
|
||||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
|
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
|
||||||
|
|
||||||
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -931,10 +989,10 @@ _err:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Restore from SRSmaQTaskInfoItem
|
* @brief Restore from SRSmaQTaskInfoItem
|
||||||
*
|
*
|
||||||
* @param pSma
|
* @param pSma
|
||||||
* @param pItem
|
* @param pItem
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
|
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
@ -1271,6 +1329,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
||||||
|
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
int8_t fetchTriggerStat =
|
int8_t fetchTriggerStat =
|
||||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||||
|
@ -1279,13 +1340,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
||||||
pItem->level, pRSmaInfo->suid);
|
pItem->level, pRSmaInfo->suid);
|
||||||
|
|
||||||
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
// sync procedure => async process
|
||||||
|
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
||||||
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
} break;
|
} break;
|
||||||
case TASK_TRIGGER_STAT_PAUSED: {
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
||||||
|
|
Loading…
Reference in New Issue