Merge pull request #28002 from taosdata/enh/TD-31895-3.0
enh: return value process
This commit is contained in:
commit
ba248dd94e
|
@ -257,7 +257,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
||||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||||
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
||||||
if (refVal == 0) {
|
if (refVal == 0) {
|
||||||
(void)taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
|
if(taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)) < 0) {
|
||||||
|
smaError("vgId:%d, rsma async post commit, failed to remove rsma info for table:%" PRIi64, SMA_VID(pSma), *pSuid);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
smaDebug(
|
smaDebug(
|
||||||
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
|
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
|
||||||
|
|
|
@ -46,7 +46,7 @@ int32_t smaInit() {
|
||||||
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
|
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
|
||||||
if (old != 2) break;
|
if (old != 2) break;
|
||||||
if (++nLoops > 1000) {
|
if (++nLoops > 1000) {
|
||||||
(void)sched_yield();
|
TAOS_UNUSED(sched_yield());
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ void smaCleanUp() {
|
||||||
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
|
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
|
||||||
if (old != 2) break;
|
if (old != 2) break;
|
||||||
if (++nLoops > 1000) {
|
if (++nLoops > 1000) {
|
||||||
(void)sched_yield();
|
TAOS_UNUSED(sched_yield());
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
|
||||||
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
|
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
|
||||||
|
|
||||||
if ((code = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
|
if ((code = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
|
||||||
(void)tdFreeSmaEnv(pEnv);
|
TAOS_UNUSED(tdFreeSmaEnv(pEnv));
|
||||||
*ppEnv = NULL;
|
*ppEnv = NULL;
|
||||||
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
|
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
|
||||||
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
|
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
|
||||||
|
@ -183,7 +183,7 @@ static void tRSmaInfoHashFreeNode(void *data) {
|
||||||
smaError("failed to hash remove %s:%d", __FUNCTION__, __LINE__);
|
smaError("failed to hash remove %s:%d", __FUNCTION__, __LINE__);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo);
|
TAOS_UNUSED(tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 3:
|
// step 3:
|
||||||
(void)tdRsmaStopExecutor(pSma);
|
TAOS_UNUSED(tdRsmaStopExecutor(pSma));
|
||||||
|
|
||||||
// step 4: destroy the rsma info and associated fetch tasks
|
// step 4: destroy the rsma info and associated fetch tasks
|
||||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||||
|
@ -302,7 +302,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||||
(void)tdDestroySmaState(pSmaStat, smaType);
|
TAOS_UNUSED(tdDestroySmaState(pSmaStat, smaType));
|
||||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||||
taosMemoryFreeClear(pSmaStat);
|
taosMemoryFreeClear(pSmaStat);
|
||||||
}
|
}
|
||||||
|
@ -382,16 +382,16 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// init sma env
|
// init sma env
|
||||||
(void)tdLockSma(pSma);
|
TAOS_UNUSED(tdLockSma(pSma));
|
||||||
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
|
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
|
||||||
: atomic_load_ptr(&SMA_RSMA_ENV(pSma));
|
: atomic_load_ptr(&SMA_RSMA_ENV(pSma));
|
||||||
if (!pEnv) {
|
if (!pEnv) {
|
||||||
if ((code = tdInitSmaEnv(pSma, smaType, &pEnv)) < 0) {
|
if ((code = tdInitSmaEnv(pSma, smaType, &pEnv)) < 0) {
|
||||||
(void)tdUnLockSma(pSma);
|
TAOS_UNUSED(tdUnLockSma(pSma));
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)tdUnLockSma(pSma);
|
TAOS_UNUSED(tdUnLockSma(pSma));
|
||||||
|
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
@ -399,7 +399,9 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
void *tdRSmaExecutorFunc(void *param) {
|
void *tdRSmaExecutorFunc(void *param) {
|
||||||
setThreadName("vnode-rsma");
|
setThreadName("vnode-rsma");
|
||||||
|
|
||||||
(void)tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW);
|
if(tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW) < 0){
|
||||||
|
smaError("vgId:%d, failed to process rsma exec", SMA_VID((SSma *)param));
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -173,7 +173,7 @@ _exit:
|
||||||
|
|
||||||
int32_t smaClose(SSma *pSma) {
|
int32_t smaClose(SSma *pSma) {
|
||||||
if (pSma) {
|
if (pSma) {
|
||||||
(void)smaPreClose(pSma);
|
TAOS_UNUSED(smaPreClose(pSma));
|
||||||
(void)taosThreadMutexDestroy(&pSma->mutex);
|
(void)taosThreadMutexDestroy(&pSma->mutex);
|
||||||
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
|
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
|
||||||
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
|
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
|
||||||
|
|
|
@ -89,7 +89,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
||||||
if (pItem->tmrId) {
|
if (pItem->tmrId) {
|
||||||
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
|
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
|
||||||
pInfo->suid, i + 1);
|
pInfo->suid, i + 1);
|
||||||
(void)taosTmrStopA(&pItem->tmrId);
|
if(!taosTmrStopA(&pItem->tmrId)){
|
||||||
|
smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
|
||||||
|
i + 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pItem->pStreamState) {
|
if (pItem->pStreamState) {
|
||||||
|
@ -246,7 +249,10 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
||||||
(void)streamMetaUnregisterTask(pMeta, streamId, taskId);
|
int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||||
|
if (code != 0) {
|
||||||
|
smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
|
||||||
|
}
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
@ -348,7 +354,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||||
|
if (!ret) {
|
||||||
|
smaError("vgId:%d, failed to reset fetch timer for table %" PRIi64 " level %d", TD_VID(pVnode), pRSmaInfo->suid,
|
||||||
|
idx + 1);
|
||||||
|
}
|
||||||
|
|
||||||
smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
|
smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
|
||||||
", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
|
", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
|
||||||
|
@ -412,7 +422,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
(void)tdFreeRSmaInfo(pSma, pRSmaInfo);
|
TAOS_UNUSED(tdFreeRSmaInfo(pSma, pRSmaInfo));
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
|
smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
|
||||||
}
|
}
|
||||||
|
@ -1264,7 +1274,7 @@ _checkpoint:
|
||||||
if (pItem && pItem->pStreamTask) {
|
if (pItem && pItem->pStreamTask) {
|
||||||
SStreamTask *pTask = pItem->pStreamTask;
|
SStreamTask *pTask = pItem->pStreamTask;
|
||||||
// atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
|
// atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
|
||||||
(void)streamTaskSetActiveCheckpointInfo(pTask, checkpointId);
|
TAOS_UNUSED(streamTaskSetActiveCheckpointInfo(pTask, checkpointId));
|
||||||
|
|
||||||
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
|
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
|
||||||
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
|
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
|
||||||
|
@ -1339,7 +1349,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
|
if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
|
||||||
smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
|
smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
|
||||||
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
||||||
(void)taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES);
|
TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1348,8 +1358,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
|
if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
|
||||||
smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
|
smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
|
||||||
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
||||||
(void)tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
|
||||||
(void)taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES);
|
TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1357,8 +1367,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
|
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
|
||||||
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
(void)tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
|
||||||
(void)taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES);
|
TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, ¶m, POINTER_BYTES));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1373,10 +1383,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
", rsetId:%d refId:%" PRIi64,
|
", rsetId:%d refId:%" PRIi64,
|
||||||
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
|
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
|
||||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
|
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
|
||||||
(void)taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||||
|
if (!ret) {
|
||||||
|
smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8 " since tmr reset failed, rsetId:%d refId:%" PRIi64,
|
||||||
|
SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
(void)tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -1414,7 +1428,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
_end:
|
_end:
|
||||||
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
(void)tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
|
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
|
||||||
|
@ -1507,7 +1521,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
||||||
|
|
||||||
// the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
|
// the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
|
||||||
while (1) {
|
while (1) {
|
||||||
(void)taosGetQitem(qall, (void **)&msg);
|
TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
|
||||||
if (msg) {
|
if (msg) {
|
||||||
int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
|
int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
@ -1573,7 +1587,7 @@ _exit:
|
||||||
tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
|
tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
|
||||||
while (1) {
|
while (1) {
|
||||||
void *msg = NULL;
|
void *msg = NULL;
|
||||||
(void)taosGetQitem(qall, (void **)&msg);
|
TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
|
||||||
if (msg) {
|
if (msg) {
|
||||||
taosFreeQitem(msg);
|
taosFreeQitem(msg);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1628,7 +1642,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
batchMax = TMAX(batchMax, 4);
|
batchMax = TMAX(batchMax, 4);
|
||||||
}
|
}
|
||||||
while (occupied || (++batchCnt < batchMax)) { // greedy mode
|
while (occupied || (++batchCnt < batchMax)) { // greedy mode
|
||||||
(void)taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
TAOS_UNUSED(taosReadAllQitems(pInfo->queue, pInfo->qall)); // queue has mutex lock
|
||||||
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
|
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
|
||||||
if (qallItemSize > 0) {
|
if (qallItemSize > 0) {
|
||||||
if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
|
if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
|
||||||
|
@ -1662,7 +1676,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qallItemSize > 0) {
|
if (qallItemSize > 0) {
|
||||||
(void)atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
TAOS_UNUSED(atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (RSMA_NEED_FETCH(pInfo)) {
|
if (RSMA_NEED_FETCH(pInfo)) {
|
||||||
|
@ -1672,7 +1686,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0);
|
TAOS_UNUSED(atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -93,7 +93,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pReader->pSma), __func__, lino, tstrerror(code));
|
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pReader->pSma), __func__, lino, tstrerror(code));
|
||||||
(void)rsmaSnapReaderClose(&pReader);
|
TAOS_UNUSED(rsmaSnapReaderClose(&pReader));
|
||||||
} else {
|
} else {
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
|
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,10 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pReader->pDataReader[i]) {
|
if (pReader->pDataReader[i]) {
|
||||||
(void)tsdbSnapReaderClose(&pReader->pDataReader[i]);
|
if ((code = tsdbSnapReaderClose(&pReader->pDataReader[i])) < 0) {
|
||||||
|
smaError("vgId:%d, vnode snapshot rsma , failed to close tsdbSnapReader since %s", SMA_VID(pReader->pSma),
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,7 +123,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg
|
||||||
TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
|
TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
|
||||||
|
|
||||||
// create stable to save tsma result in dstVgId
|
// create stable to save tsma result in dstVgId
|
||||||
(void)tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
TAOS_CHECK_EXIT(tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
|
||||||
pReq.name = (char *)tNameGetTableName(&stbFullName);
|
pReq.name = (char *)tNameGetTableName(&stbFullName);
|
||||||
pReq.suid = pCfg->dstTbUid;
|
pReq.suid = pCfg->dstTbUid;
|
||||||
pReq.schemaRow = pCfg->schemaRow;
|
pReq.schemaRow = pCfg->schemaRow;
|
||||||
|
@ -283,7 +283,10 @@ static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
|
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
|
||||||
(void)tEncodeSBatchDeleteReq(&encoder, pDelReq);
|
if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
|
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
|
||||||
|
|
|
@ -96,15 +96,15 @@ void tfsUpdateSize(STfs *pTfs) {
|
||||||
size.used += pTier->size.used;
|
size.used += pTier->size.used;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tfsLock(pTfs);
|
TAOS_UNUSED(tfsLock(pTfs));
|
||||||
pTfs->size = size;
|
pTfs->size = size;
|
||||||
(void)tfsUnLock(pTfs);
|
TAOS_UNUSED(tfsUnLock(pTfs));
|
||||||
}
|
}
|
||||||
|
|
||||||
SDiskSize tfsGetSize(STfs *pTfs) {
|
SDiskSize tfsGetSize(STfs *pTfs) {
|
||||||
(void)tfsLock(pTfs);
|
TAOS_UNUSED(tfsLock(pTfs));
|
||||||
SDiskSize size = pTfs->size;
|
SDiskSize size = pTfs->size;
|
||||||
(void)tfsUnLock(pTfs);
|
TAOS_UNUSED(tfsUnLock(pTfs));
|
||||||
|
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
@ -204,8 +204,8 @@ bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) {
|
||||||
(void)strncpy(nameBuf2, pFile2->rname, TMPNAME_LEN);
|
(void)strncpy(nameBuf2, pFile2->rname, TMPNAME_LEN);
|
||||||
nameBuf1[TMPNAME_LEN - 1] = 0;
|
nameBuf1[TMPNAME_LEN - 1] = 0;
|
||||||
nameBuf2[TMPNAME_LEN - 1] = 0;
|
nameBuf2[TMPNAME_LEN - 1] = 0;
|
||||||
(void)taosRealPath(nameBuf1, NULL, TMPNAME_LEN);
|
TAOS_UNUSED(taosRealPath(nameBuf1, NULL, TMPNAME_LEN));
|
||||||
(void)taosRealPath(nameBuf2, NULL, TMPNAME_LEN);
|
TAOS_UNUSED(taosRealPath(nameBuf2, NULL, TMPNAME_LEN));
|
||||||
if (strncmp(nameBuf1, nameBuf2, TMPNAME_LEN) != 0) return false;
|
if (strncmp(nameBuf1, nameBuf2, TMPNAME_LEN) != 0) return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -483,7 +483,7 @@ const STfsFile *tfsReaddir(STfsDir *pTfsDir) {
|
||||||
void tfsClosedir(STfsDir *pTfsDir) {
|
void tfsClosedir(STfsDir *pTfsDir) {
|
||||||
if (pTfsDir) {
|
if (pTfsDir) {
|
||||||
if (pTfsDir->pDir != NULL) {
|
if (pTfsDir->pDir != NULL) {
|
||||||
(void)taosCloseDir(&pTfsDir->pDir);
|
TAOS_UNUSED(taosCloseDir(&pTfsDir->pDir));
|
||||||
pTfsDir->pDir = NULL;
|
pTfsDir->pDir = NULL;
|
||||||
}
|
}
|
||||||
taosMemoryFree(pTfsDir);
|
taosMemoryFree(pTfsDir);
|
||||||
|
@ -698,7 +698,7 @@ int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
|
||||||
|
|
||||||
tfsUpdateSize(pTfs);
|
tfsUpdateSize(pTfs);
|
||||||
|
|
||||||
(void)tfsLock(pTfs);
|
TAOS_UNUSED(tfsLock(pTfs));
|
||||||
for (int32_t level = 0; level < pTfs->nlevel; level++) {
|
for (int32_t level = 0; level < pTfs->nlevel; level++) {
|
||||||
STfsTier *pTier = &pTfs->tiers[level];
|
STfsTier *pTier = &pTfs->tiers[level];
|
||||||
for (int32_t disk = 0; disk < pTier->ndisk; ++disk) {
|
for (int32_t disk = 0; disk < pTier->ndisk; ++disk) {
|
||||||
|
@ -708,14 +708,14 @@ int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
|
||||||
dinfo.level = pDisk->level;
|
dinfo.level = pDisk->level;
|
||||||
tstrncpy(dinfo.name, pDisk->path, sizeof(dinfo.name));
|
tstrncpy(dinfo.name, pDisk->path, sizeof(dinfo.name));
|
||||||
if (taosArrayPush(pInfo->datadirs, &dinfo) == NULL) {
|
if (taosArrayPush(pInfo->datadirs, &dinfo) == NULL) {
|
||||||
(void)tfsUnLock(pTfs);
|
TAOS_UNUSED(tfsUnLock(pTfs));
|
||||||
taosArrayDestroy(pInfo->datadirs);
|
taosArrayDestroy(pInfo->datadirs);
|
||||||
pInfo->datadirs = NULL;
|
pInfo->datadirs = NULL;
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)tfsUnLock(pTfs);
|
TAOS_UNUSED(tfsUnLock(pTfs));
|
||||||
|
|
||||||
TAOS_RETURN(0);
|
TAOS_RETURN(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ void tfsUpdateTierSize(STfsTier *pTier) {
|
||||||
SDiskSize size = {0};
|
SDiskSize size = {0};
|
||||||
int32_t nAvailDisks = 0;
|
int32_t nAvailDisks = 0;
|
||||||
|
|
||||||
(void)tfsLockTier(pTier);
|
TAOS_UNUSED(tfsLockTier(pTier));
|
||||||
|
|
||||||
for (int32_t id = 0; id < pTier->ndisk; id++) {
|
for (int32_t id = 0; id < pTier->ndisk; id++) {
|
||||||
STfsDisk *pDisk = pTier->disks[id];
|
STfsDisk *pDisk = pTier->disks[id];
|
||||||
|
@ -104,15 +104,15 @@ void tfsUpdateTierSize(STfsTier *pTier) {
|
||||||
pTier->size = size;
|
pTier->size = size;
|
||||||
pTier->nAvailDisks = nAvailDisks;
|
pTier->nAvailDisks = nAvailDisks;
|
||||||
|
|
||||||
(void)tfsUnLockTier(pTier);
|
TAOS_UNUSED(tfsUnLockTier(pTier));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Round-Robin to allocate disk on a tier
|
// Round-Robin to allocate disk on a tier
|
||||||
int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
|
int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
|
||||||
(void)tfsLockTier(pTier);
|
TAOS_UNUSED(tfsLockTier(pTier));
|
||||||
|
|
||||||
if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) {
|
if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) {
|
||||||
(void)tfsUnLockTier(pTier);
|
TAOS_UNUSED(tfsUnLockTier(pTier));
|
||||||
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +155,7 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tfsUnLockTier(pTier);
|
TAOS_UNUSED(tfsUnLockTier(pTier));
|
||||||
if (retId < 0) {
|
if (retId < 0) {
|
||||||
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue