enh: rsma exception process

This commit is contained in:
kailixu 2023-11-09 10:49:40 +08:00
parent edef4de7b2
commit 3495efaac7
5 changed files with 24 additions and 13 deletions

View File

@ -108,6 +108,7 @@ struct SRSmaStat {
int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t execStat; // 0 succeed, other failed
volatile int32_t nFetchAll; // active number of fetch all
volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing

View File

@ -179,13 +179,14 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit;
code = atomic_load_32(&pRSmaStat->execStat);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
// all rsma results are written completely
STsdb *pTsdb = NULL;
if ((pTsdb = VND_RSMA1(pSma->pVnode))) {

View File

@ -682,13 +682,14 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
// TODO: reconfigure SSubmitReq2
} else {
if (terrno == 0) terrno = TSDB_CODE_RSMA_RESULT;
code = terrno;
}
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
", ver:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
output ? output->info.version : -1);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -844,10 +845,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
atomic_store_64(&pItem->submitReqVer, packData->ver);
}
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, &pResList, NULL);
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, &pResList, NULL);
taosArrayDestroy(pResList);
return TSDB_CODE_SUCCESS;
return terrno ? TSDB_CODE_FAILED : TDB_CODE_SUCCESS;
}
/**
@ -953,7 +954,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
return TDB_CODE_SUCCESS;
}
if (0 != (terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr());
goto _err;
}
STbUidStore uidStore = {0};
@ -985,7 +991,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
return TSDB_CODE_SUCCESS;
_err:
tdUidStoreDestory(&uidStore);
return TSDB_CODE_FAILED;
return terrno;
}
/**
@ -1417,6 +1423,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
goto _err;
}
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, &pResList, NULL) < 0) {
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
goto _err;
}
@ -1448,6 +1455,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))};
if (!taosArrayPush(pSubmitArr, &packData)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
@ -1467,6 +1475,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
}
return TSDB_CODE_SUCCESS;
_err:
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
tdFreeRSmaSubmitItems(pSubmitArr);

View File

@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
// scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
if ((terrno = tsdbScanAndConvertSubmitMsg(pTsdb, pMsg)) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
}

View File

@ -1669,7 +1669,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if (code == 0) {
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
}
// clear