Merge pull request #16478 from taosdata/feature/3.0_interval_hash_optimize
fix: race condition between fetch all and optr serialize
This commit is contained in:
commit
beb08d4cea
|
@ -312,15 +312,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
|
||||
int32_t nLoops = 0;
|
||||
|
||||
// step 1: set rsma stat
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1);
|
||||
while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||
ASSERT(pRSmaStat->commitAppliedVer > 0);
|
||||
|
||||
// step 2: wait for all triggered fetch tasks to finish
|
||||
int32_t nLoops = 0;
|
||||
|
||||
while (1) {
|
||||
if (T_REF_VAL_GET(pStat) == 0) {
|
||||
smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||
#define RSMA_FETCH_DELAY_MAX (900000) // ms
|
||||
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
|
||||
#define RSMA_FETCH_INTERVAL (5000) // ms
|
||||
|
||||
SSmaMgmt smaMgmt = {
|
||||
.inited = 0,
|
||||
|
@ -1501,13 +1502,13 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
switch (rsmaTriggerStat) {
|
||||
case TASK_TRIGGER_STAT_PAUSED:
|
||||
case TASK_TRIGGER_STAT_CANCELLED: {
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
||||
smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
|
||||
", rsetId rsetId:%" PRIi64 " refId:%d",
|
||||
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
|
||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
|
||||
taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
}
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
||||
return;
|
||||
}
|
||||
default:
|
||||
|
@ -1518,7 +1519,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||
switch (fetchTriggerStat) {
|
||||
case TASK_TRIGGER_STAT_ACTIVE: {
|
||||
smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
|
||||
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
|
||||
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
||||
// async process
|
||||
pItem->fetchLevel = pItem->level;
|
||||
|
@ -1531,8 +1532,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
|
||||
tsem_post(&(pStat->notEmpty));
|
||||
}
|
||||
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
||||
pRSmaInfo->suid);
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_PAUSED: {
|
||||
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused",
|
||||
|
@ -1715,16 +1714,26 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
|||
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
|
||||
}
|
||||
|
||||
if (type == RSMA_EXEC_OVERFLOW) {
|
||||
if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) {
|
||||
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
|
||||
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
|
||||
}
|
||||
|
||||
if (qallItemSize > 0) {
|
||||
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
||||
continue;
|
||||
} else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
|
||||
if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) {
|
||||
continue;
|
||||
}
|
||||
for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) {
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j);
|
||||
if (pItem->fetchLevel) {
|
||||
pItem->fetchLevel = 0;
|
||||
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -295,11 +295,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
|
|||
}
|
||||
|
||||
if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) {
|
||||
if (!pPrev) {
|
||||
*pIter = NULL;
|
||||
} else {
|
||||
*pIter = GET_SHASH_NODE_DATA(pPrev);
|
||||
}
|
||||
*pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL;
|
||||
}
|
||||
|
||||
FREE_HASH_NODE(pNode);
|
||||
|
|
|
@ -97,7 +97,7 @@ int32_t tqDebugFlag = 135;
|
|||
int32_t fsDebugFlag = 135;
|
||||
int32_t metaDebugFlag = 135;
|
||||
int32_t udfDebugFlag = 135;
|
||||
int32_t smaDebugFlag = 135;
|
||||
int32_t smaDebugFlag = 131;
|
||||
int32_t idxDebugFlag = 135;
|
||||
|
||||
int64_t dbgEmptyW = 0;
|
||||
|
|
Loading…
Reference in New Issue