Merge pull request #14789 from taosdata/feature/TD-11274-3.0
enh: rsma code optimization and support drop
This commit is contained in:
commit
4351700ec4
|
@ -188,7 +188,7 @@ void *taosHashGetKey(void *data, size_t* keyLen);
|
|||
void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen);
|
||||
|
||||
/**
|
||||
* release the prevous acquired obj
|
||||
* release the previous acquired obj
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param data
|
||||
|
|
|
@ -67,7 +67,6 @@ struct SRSmaStat {
|
|||
int64_t submitVer;
|
||||
int64_t refId; // shared by fetch tasks
|
||||
int8_t triggerStat; // shared by fetch tasks
|
||||
int8_t runningStat; // for persistence task
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
};
|
||||
|
||||
|
@ -83,7 +82,6 @@ struct SSmaStat {
|
|||
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
|
||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
|
||||
#define RSMA_REF_ID(r) ((r)->refId)
|
||||
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
|
||||
|
||||
|
@ -93,7 +91,7 @@ enum {
|
|||
TASK_TRIGGER_STAT_INACTIVE = 2,
|
||||
TASK_TRIGGER_STAT_PAUSED = 3,
|
||||
TASK_TRIGGER_STAT_CANCELLED = 4,
|
||||
TASK_TRIGGER_STAT_FINISHED = 5,
|
||||
TASK_TRIGGER_STAT_DROPPED = 5,
|
||||
};
|
||||
|
||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||
|
|
|
@ -172,8 +172,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
|||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
|
||||
|
||||
int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq);
|
||||
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
|
||||
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
|
||||
void tdUidStoreDestory(STbUidStore* pStore);
|
||||
|
|
|
@ -254,26 +254,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
// step 1: set rsma trigger stat cancelled
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
|
||||
|
||||
// step 2: wait the persistence thread to finish
|
||||
int32_t nLoops = 0;
|
||||
if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) {
|
||||
while (1) {
|
||||
if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) {
|
||||
smaDebug("vgId:%d, rsma persist task finished already", SMA_VID(pSma));
|
||||
break;
|
||||
} else {
|
||||
smaDebug("vgId:%d, rsma persist task not finished yet since rsma stat in %" PRIi8, SMA_VID(pSma),
|
||||
atomic_load_8(RSMA_TRIGGER_STAT(pStat)));
|
||||
}
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: destroy the rsma info and associated fetch tasks
|
||||
// step 2: destroy the rsma info and associated fetch tasks
|
||||
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
|
||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
|
||||
|
@ -285,8 +266,8 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
}
|
||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||
|
||||
// step 5: wait all triggered fetch tasks finished
|
||||
nLoops = 0;
|
||||
// step 3: wait all triggered fetch tasks finished
|
||||
int32_t nLoops = 0;
|
||||
while (1) {
|
||||
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
|
||||
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
|
||||
|
|
|
@ -37,8 +37,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
|||
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||
int8_t blkType);
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
static void tdRSmaPersistTrigger(void *param, void *tmrId);
|
||||
static void *tdRSmaPersistExec(void *param);
|
||||
static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName);
|
||||
|
||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||
|
@ -68,8 +66,8 @@ struct SRSmaInfo {
|
|||
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
||||
// adapt accordingly if definition of SRSmaInfo update
|
||||
int32_t rsmaInfoHeadLen = sizeof(int64_t) + sizeof(STSchema *);
|
||||
ASSERT(pItem->level == 1 || pItem->level == 2);
|
||||
return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * (pItem->level - 1) - rsmaInfoHeadLen);
|
||||
ASSERT(pItem->level == 0 || pItem->level == 1);
|
||||
return (SRSmaInfo *)POINTER_SHIFT(pItem, -sizeof(SRSmaInfoItem) * pItem->level - rsmaInfoHeadLen);
|
||||
}
|
||||
|
||||
struct SRSmaQTaskInfoItem {
|
||||
|
@ -278,7 +276,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
|||
if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
|
||||
pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
|
||||
}
|
||||
pItem->level = (idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2);
|
||||
pItem->level = idx;
|
||||
smaInfo("vgId:%d table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
|
||||
", finally maxdelay:%" PRIi32,
|
||||
SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
|
||||
|
@ -375,20 +373,48 @@ _err:
|
|||
/**
|
||||
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
|
||||
*
|
||||
* @param pVnode
|
||||
* @param pSma
|
||||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||
SSma *pSma = pVnode->pSma;
|
||||
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
if (!pReq->rollup) {
|
||||
smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
|
||||
pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!VND_IS_RSMA(pVnode)) {
|
||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
||||
pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief drop cache for stb
|
||||
*
|
||||
* @param pSma
|
||||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
if (!VND_IS_RSMA(pVnode)) {
|
||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
||||
pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief store suid/[uids], prefer to use array and then hash
|
||||
*
|
||||
|
@ -1174,123 +1200,6 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static void *tdRSmaPersistExec(void *param) {
|
||||
setThreadName("rsma-task-persist");
|
||||
SRSmaStat *pRSmaStat = param;
|
||||
SSma *pSma = pRSmaStat->pSma;
|
||||
|
||||
int8_t triggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat));
|
||||
|
||||
if (TASK_TRIGGER_STAT_CANCELLED == triggerStat || TASK_TRIGGER_STAT_PAUSED == triggerStat) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// execution
|
||||
tdRSmaPersistExecImpl(pRSmaStat);
|
||||
|
||||
_end:
|
||||
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||
TASK_TRIGGER_STAT_INACTIVE,
|
||||
TASK_TRIGGER_STAT_ACTIVE)) {
|
||||
smaDebug("vgId:%d, rsma persist task is active again", SMA_VID(pSma));
|
||||
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||
TASK_TRIGGER_STAT_CANCELLED,
|
||||
TASK_TRIGGER_STAT_FINISHED)) {
|
||||
smaDebug("vgId:%d, rsma persist task is cancelled", SMA_VID(pSma));
|
||||
} else {
|
||||
smaWarn("vgId:%d, rsma persist task in stat %" PRIi8, SMA_VID(pSma), atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||
}
|
||||
|
||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||
smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), smaMgmt.rsetId, pRSmaStat->refId);
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__);
|
||||
taosThreadExit(NULL);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED);
|
||||
TdThread tid;
|
||||
|
||||
if (taosThreadCreate(&tid, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) {
|
||||
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||
TASK_TRIGGER_STAT_INACTIVE,
|
||||
TASK_TRIGGER_STAT_ACTIVE)) {
|
||||
smaDebug("vgId:%d, persist task is active again", SMA_VID(pRSmaStat->pSma));
|
||||
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||
TASK_TRIGGER_STAT_CANCELLED,
|
||||
TASK_TRIGGER_STAT_FINISHED)) {
|
||||
smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma));
|
||||
} else {
|
||||
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, SMA_VID(pRSmaStat->pSma),
|
||||
atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||
}
|
||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||
smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d)", SMA_VID(pRSmaStat->pSma), smaMgmt.rsetId,
|
||||
pRSmaStat->refId);
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__);
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief trigger to persist rsma qTaskInfo
|
||||
*
|
||||
* @param param
|
||||
* @param tmrId
|
||||
*/
|
||||
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||
SRSmaStat *rsmaStat = param;
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.rsetId, rsmaStat->refId);
|
||||
ASSERT(0);
|
||||
if (!pRSmaStat) {
|
||||
smaDebug("rsma persistence task not start since already destroyed");
|
||||
return;
|
||||
}
|
||||
|
||||
int8_t tmrStat =
|
||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||
switch (tmrStat) {
|
||||
case TASK_TRIGGER_STAT_ACTIVE: {
|
||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 1);
|
||||
if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||
TASK_TRIGGER_STAT_CANCELLED,
|
||||
TASK_TRIGGER_STAT_FINISHED)) {
|
||||
smaDebug("vgId:%d, rsma persistence start since active", SMA_VID(pRSmaStat->pSma));
|
||||
|
||||
// start persist task
|
||||
tdRSmaPersistTask(pRSmaStat);
|
||||
|
||||
// taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle,
|
||||
// RSMA_TMR_ID(pRSmaStat));
|
||||
} else {
|
||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||
}
|
||||
return;
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_CANCELLED: {
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
||||
smaDebug("rsma persistence not start since cancelled and finished");
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_PAUSED: {
|
||||
smaDebug("rsma persistence not start since paused");
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_INACTIVE: {
|
||||
smaDebug("rsma persistence not start since inactive");
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_INIT: {
|
||||
smaDebug("rsma persistence not start since init");
|
||||
} break;
|
||||
default: {
|
||||
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
|
||||
} break;
|
||||
}
|
||||
taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief trigger to get rsma result
|
||||
*
|
||||
|
@ -1314,8 +1223,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
||||
switch (rsmaTriggerStat) {
|
||||
case TASK_TRIGGER_STAT_PAUSED:
|
||||
case TASK_TRIGGER_STAT_CANCELLED:
|
||||
case TASK_TRIGGER_STAT_FINISHED: {
|
||||
case TASK_TRIGGER_STAT_CANCELLED: {
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
|
||||
" refId:%d",
|
||||
|
@ -1328,7 +1236,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
|
||||
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
|
||||
|
||||
ASSERT(pRSmaInfo->suid > 0);
|
||||
ASSERT(pRSmaInfo->items[pItem->level].level == pItem->level);
|
||||
|
||||
int8_t fetchTriggerStat =
|
||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||
|
|
|
@ -417,7 +417,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (tdProcessRSmaCreate(pVnode, &req) < 0) {
|
||||
if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
@ -573,6 +573,11 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
if (tdProcessRSmaDrop(pVnode->pSma, &req) < 0) {
|
||||
rcode = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// return rsp
|
||||
_exit:
|
||||
pRsp->code = rcode;
|
||||
|
|
Loading…
Reference in New Issue