enh: refactor return code

This commit is contained in:
kailixu 2024-07-24 10:09:24 +08:00
parent 8da3fefcbd
commit 7b49327463
3 changed files with 191 additions and 231 deletions

View File

@ -38,24 +38,24 @@ SSmaMgmt smaMgmt = {
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static void tdUidStoreDestory(STbUidStore *pStore); static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx); int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
SRSmaInfo *pInfo, ERsmaExecType type, int8_t level); SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
int32_t execType, int8_t *streamFlushed); int32_t execType, int8_t *streamFlushed);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
struct SRSmaQTaskInfoItem { struct SRSmaQTaskInfoItem {
int32_t len; int32_t len;
@ -147,11 +147,10 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid); code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo);
if (!pRSmaInfo) { if (code != 0) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
code = TSDB_CODE_RSMA_INVALID_STAT;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -215,8 +214,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL; SHashObj *infoHash = NULL;
if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) { if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
terrno = TSDB_CODE_RSMA_INVALID_STAT; TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
return TSDB_CODE_FAILED;
} }
// info cached when create rsma stable and return directly for non-rsma ctables // info cached when create rsma stable and return directly for non-rsma ctables
@ -225,14 +223,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
} }
if (!(*ppStore)) { if (!(*ppStore)) {
if (tdUidStoreInit(ppStore) < 0) { TAOS_CHECK_RETURN(tdUidStoreInit(ppStore));
return TSDB_CODE_FAILED;
}
} }
if (tdUidStorePut(*ppStore, suid, &uid) < 0) { if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) {
*ppStore = tdUidStoreFree(*ppStore); *ppStore = tdUidStoreFree(*ppStore);
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -262,6 +258,7 @@ static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskI
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx) { int8_t idx) {
int32_t code = 0;
if ((param->qmsgLen > 0) && param->qmsg[idx]) { if ((param->qmsgLen > 0) && param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
SRetention *pRetention = SMA_RETENTION(pSma); SRetention *pRetention = SMA_RETENTION(pSma);
@ -275,18 +272,20 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
if (!taosCheckExistFile(taskInfDir)) { if (!taosCheckExistFile(taskInfDir)) {
char *s = taosStrdup(taskInfDir); char *s = taosStrdup(taskInfDir);
if (!s) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
if (taosMulMkDir(s) != 0) { if (taosMulMkDir(s) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s); taosMemoryFree(s);
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
taosMemoryFree(s); taosMemoryFree(s);
} }
SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask)); SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
if (!pStreamTask) { if (!pStreamTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_FAILED;
} }
pItem->pStreamTask = pStreamTask; pItem->pStreamTask = pStreamTask;
pStreamTask->id.taskId = 0; pStreamTask->id.taskId = 0;
@ -294,24 +293,20 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pStreamTask->chkInfo.startTs = taosGetTimestampMs(); pStreamTask->chkInfo.startTs = taosGetTimestampMs();
pStreamTask->pMeta = pVnode->pTq->pStreamMeta; pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1); pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
if (!pStreamTask->exec.qmsg) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG); sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
int32_t code = streamCreateStateMachine(pStreamTask); TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo); TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId); pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
if (!pStreamState) { if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN);
return TSDB_CODE_FAILED;
} }
pItem->pStreamState = pStreamState; pItem->pStreamState = pStreamState;
@ -321,12 +316,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
return TSDB_CODE_FAILED;
} }
if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) { if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
return TSDB_CODE_FAILED; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
} }
if (pItem->fetchResultVer < pItem->submitReqVer) { if (pItem->fetchResultVer < pItem->submitReqVer) {
@ -349,7 +343,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
@ -359,7 +355,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId, TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
} }
return TSDB_CODE_SUCCESS; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
/** /**
@ -372,20 +368,14 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
* @return int32_t * @return int32_t
*/ */
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
int32_t code; int32_t code = 0;
int32_t lino = 0;
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
return TSDB_CODE_FAILED;
}
#endif
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
@ -399,41 +389,34 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
// from write queue: single thead // from write queue: single thead
pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
if (!pRSmaInfo) { if (!pRSmaInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_FAILED;
} }
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1, 1); STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1, 1);
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; TAOS_CHECK_EXIT(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION);
goto _err;
} }
pRSmaInfo->pSma = pSma; pRSmaInfo->pSma = pSma;
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
T_REF_INIT_VAL(pRSmaInfo, 1); T_REF_INIT_VAL(pRSmaInfo, 1);
code = taosOpenQueue(&pRSmaInfo->queue); TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue));
if (code) goto _err;
code = taosAllocateQall(&pRSmaInfo->qall); TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall));
if (code) goto _err;
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 || TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0));
tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) { TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1));
goto _err;
TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)));
_exit:
if (code != 0) {
tdFreeRSmaInfo(pSma, pRSmaInfo);
} else {
smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
} }
TAOS_RETURN(code);
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != 0) {
goto _err;
}
smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
_err:
tdFreeRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED;
} }
/** /**
@ -480,11 +463,13 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = 0;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo);
if (!pRSmaInfo) { if (code != 0) {
smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name, smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
pReq->suid); pReq->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -519,12 +504,11 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid)
if (uid) { if (uid) {
if (!pStore->tbUids) { if (!pStore->tbUids) {
if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) { if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_FAILED;
} }
} }
if (!taosArrayPush(pStore->tbUids, uid)) { if (!taosArrayPush(pStore->tbUids, uid)) {
return TSDB_CODE_FAILED; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
} else { } else {
@ -532,32 +516,29 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid)
if (!pStore->uidHash) { if (!pStore->uidHash) {
pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (!pStore->uidHash) { if (!pStore->uidHash) {
return TSDB_CODE_FAILED; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
if (uid) { if (uid) {
SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
if (uidArray && ((uidArray = *(SArray **)uidArray))) { if (uidArray && ((uidArray = *(SArray **)uidArray))) {
taosArrayPush(uidArray, uid); if (!taosArrayPush(uidArray, uid)) {
taosArrayDestroy(uidArray);
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
} else { } else {
SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t)); SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
if (!pUidArray) { if (!pUidArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_FAILED;
} }
if (!taosArrayPush(pUidArray, uid)) { if (!taosArrayPush(pUidArray, uid)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(pUidArray); taosArrayDestroy(pUidArray);
return TSDB_CODE_FAILED; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) {
return TSDB_CODE_FAILED;
} }
TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)));
} }
} else { } else {
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) { TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0));
return TSDB_CODE_FAILED;
}
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -600,9 +581,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (pReq) { if (pReq) {
SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq; SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
// spin lock for race condition during insert data // spin lock for race condition during insert data
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL));
return TSDB_CODE_FAILED;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -612,13 +591,9 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL; SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
int32_t size = taosArrayGetSize(pSubmitTbData); int32_t size = taosArrayGetSize(pSubmitTbData);
terrno = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i); SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
if ((terrno = tdUidStorePut(pStore, pData->suid, NULL)) < 0) { TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL));
return -1;
}
} }
return 0; return 0;
@ -645,7 +620,7 @@ int32_t smaRetention(SSma *pSma, int64_t now) {
} }
_end: _end:
return code; TAOS_RETURN(code);
} }
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) { static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
@ -659,13 +634,17 @@ static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatc
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
if (!pBuf) { if (!pBuf) {
code = terrno; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
tEncodeSBatchDeleteReq(&encoder, pDelReq); if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
TSDB_CHECK_CODE(code, lino, _exit);
}
tEncoderClear(&encoder); tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode); ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
@ -682,7 +661,7 @@ _exit:
SMA_VID(pSma), lino, suid, level, tstrerror(code)); SMA_VID(pSma), lino, suid, level, tstrerror(code));
} }
return code; TAOS_RETURN(code);
} }
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
@ -717,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level}; SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
if (!deleteReq.deleteReqs) { if (!deleteReq.deleteReqs) {
code = terrno; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true); code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
@ -753,21 +732,15 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { TAOS_CHECK_EXIT(
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid));
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) {
if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) { if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
// TODO: reconfigure SSubmitReq2 // TODO: reconfigure SSubmitReq2
} else {
if (terrno == 0) terrno = TSDB_CODE_RSMA_RESULT;
code = terrno;
} }
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFreeClear(pReq);
pReq = NULL;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -812,14 +785,12 @@ _exit:
*/ */
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
SRSmaInfo *pInfo, tb_uid_t suid) { SRSmaInfo *pInfo, tb_uid_t suid) {
int32_t code; int32_t code = 0;
int32_t lino = 0;
int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload
void *qItem; void *qItem;
code = taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem); TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem));
if (code) {
return code;
}
void *pItem = qItem; void *pItem = qItem;
@ -830,7 +801,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
*(int64_t *)pItem = version; *(int64_t *)pItem = version;
memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len); memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
taosWriteQitem(pInfo->queue, qItem); TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem));
pInfo->lastRecv = taosGetTimestampMs(); pInfo->lastRecv = taosGetTimestampMs();
@ -863,10 +834,10 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSRow *row = NULL; STSRow *row = NULL;
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter));
while (true) { while (true) {
SSubmitBlk *pBlock = NULL; SSubmitBlk *pBlock = NULL;
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock));
if (pBlock == NULL) break; if (pBlock == NULL) break;
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
@ -892,6 +863,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
*/ */
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
int32_t code = 0;
int32_t idx = level - 1; int32_t idx = level - 1;
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
@ -902,25 +874,24 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pInfo->pTSchema) { if (!pInfo->pTSchema) {
terrno = TSDB_CODE_INVALID_PTR;
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
return TSDB_CODE_FAILED; TAOS_RETURN(TSDB_CODE_INVALID_PTR);
} }
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64 smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
", inputType:%d", ", inputType:%d",
SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType); SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code));
return TSDB_CODE_FAILED; TAOS_RETURN(TSDB_CODE_FAILED);
} }
atomic_store_64(&pItem->submitReqVer, version); atomic_store_64(&pItem->submitReqVer, version);
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL); TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL));
return terrno ? TSDB_CODE_FAILED : TDB_CODE_SUCCESS; TAOS_RETURN(code);
} }
/** /**
@ -928,26 +899,23 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
* *
* @param pSma * @param pSma
* @param suid * @param suid
* @return SRSmaInfo*
*/ */
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL; SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
terrno = 0; *ppRSmaInfo = NULL;
if (!pEnv) { if (!pEnv) {
terrno = TSDB_CODE_RSMA_INVALID_ENV; TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
return NULL;
} }
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
if (!pStat || !RSMA_INFO_HASH(pStat)) { if (!pStat || !RSMA_INFO_HASH(pStat)) {
terrno = TSDB_CODE_RSMA_INVALID_STAT; TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
return NULL;
} }
taosRLockLatch(SMA_ENV_LOCK(pEnv)); taosRLockLatch(SMA_ENV_LOCK(pEnv));
@ -955,20 +923,19 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
} }
tdRefRSmaInfo(pSma, pRSmaInfo); tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) { if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) {
terrno = TSDB_CODE_APP_ERROR; TAOS_RETURN(TSDB_CODE_APP_ERROR);
return NULL;
} }
return pRSmaInfo; TAOS_RETURN(code);
} }
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
} }
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
@ -989,16 +956,19 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
*/ */
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
tb_uid_t suid) { tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); int32_t code = 0;
if (!pRSmaInfo) { SRSmaInfo *pRSmaInfo = NULL;
code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo);
if (code != 0) {
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS; TAOS_RETURN(TSDB_CODE_SUCCESS); // return success
} }
if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) { if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
if (smaMgmt.tmrHandle) { if (smaMgmt.tmrHandle) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
@ -1011,11 +981,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
} }
} }
} else { } else {
terrno = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid, smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
tstrerror(terrno), inputType); tstrerror(code), inputType);
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
@ -1025,57 +995,56 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { int32_t code = 0;
smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr()); if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
goto _err; smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
goto _exit;
} }
STbUidStore uidStore = {0}; STbUidStore uidStore = {0};
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) { if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) {
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), tstrerror(code));
goto _err; goto _exit;
} }
if (uidStore.suid != 0) { if (uidStore.suid != 0) {
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid) < 0) { if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
goto _err; goto _exit;
} }
void *pIter = NULL; void *pIter = NULL;
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid) < 0) { if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code));
taosHashCancelIterate(uidStore.uidHash, pIter); taosHashCancelIterate(uidStore.uidHash, pIter);
goto _err; goto _exit;
} }
} }
} }
_exit:
tdUidStoreDestory(&uidStore); tdUidStoreDestory(&uidStore);
return TSDB_CODE_SUCCESS; TAOS_RETURN(code);
_err:
tdUidStoreDestory(&uidStore);
return terrno;
} }
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { int32_t code = 0;
smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), terrstr()); if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
goto _err; smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
goto _exit;
} }
SDeleteRes *pDelRes = pReq; SDeleteRes *pDelRes = pReq;
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid) < 0) { if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
goto _err; goto _exit;
} }
return TSDB_CODE_SUCCESS; _exit:
_err: TAOS_RETURN(code);
return terrno;
} }
/** /**
@ -1099,10 +1068,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) { TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList));
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t arrSize = taosArrayGetSize(suidList); int64_t arrSize = taosArrayGetSize(suidList);
@ -1146,10 +1112,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
" qmsgLen:%" PRIi32, " qmsgLen:%" PRIi32,
TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
} }
if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name));
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
#if 0 #if 0
// reload all ctbUids for suid // reload all ctbUids for suid
uidStore.suid = suid; uidStore.suid = suid;
@ -1180,7 +1143,7 @@ _exit:
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
tdUidStoreDestory(&uidStore); tdUidStoreDestory(&uidStore);
return code; TAOS_RETURN(code);
} }
/** /**
@ -1188,20 +1151,16 @@ _exit:
*/ */
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) { int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
int64_t nTables = 0; int64_t nTables = 0;
// step 1: init env // step 1: init env
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP));
code = TSDB_CODE_TDB_INIT_FAILED;
goto _err;
}
// step 2: iterate all stables to restore the rsma env // step 2: iterate all stables to restore the rsma env
if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) { TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables));
goto _err;
}
_err: _exit:
if (code) { if (code) {
smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type, smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
qtaskFileVer, tstrerror(code)); qtaskFileVer, tstrerror(code));
@ -1210,7 +1169,7 @@ _err:
type, qtaskFileVer, nTables); type, qtaskFileVer, nTables);
} }
return code; TAOS_RETURN(code);
} }
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
@ -1323,9 +1282,8 @@ _checkpoint:
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (streamMetaSaveTask(pMeta, pTask)) { if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
taosHashCancelIterate(pInfoHash, infoHash); taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -1338,9 +1296,9 @@ _checkpoint:
} }
if (pMeta) { if (pMeta) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta)) { if ((code = streamMetaCommit(pMeta)) != 0) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -1354,8 +1312,7 @@ _exit:
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
terrno = code; TAOS_RETURN(code);
return code;
} }
/** /**
@ -1370,6 +1327,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaStat *pStat = NULL; SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfoItem *pItem = NULL; SRSmaInfoItem *pItem = NULL;
int32_t code = 0;
if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) { if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param, smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param,
@ -1386,7 +1344,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
pSma = pStat->pSma; pSma = pStat->pSma;
if (!(pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid))) { if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId, smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
pRSmaRef->refId); // pRSmaRef freed in taosHashRemove pRSmaRef->refId); // pRSmaRef freed in taosHashRemove
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
@ -1484,6 +1442,8 @@ static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
@ -1512,12 +1472,10 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
pItem->nScanned = 0; pItem->nScanned = 0;
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK));
goto _err; if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) {
} atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code);
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL) < 0) { goto _exit;
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
goto _err;
} }
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished", smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
@ -1529,10 +1487,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
} }
} }
_end: _exit:
return TSDB_CODE_SUCCESS; TAOS_RETURN(code);
_err:
return TSDB_CODE_FAILED;
} }
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
@ -1541,6 +1497,8 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
int32_t nSubmit = 0; int32_t nSubmit = 0;
int32_t nDelete = 0; int32_t nDelete = 0;
int64_t version = 0; int64_t version = 0;
int32_t code = 0;
int32_t lino = 0;
SPackedData packData; SPackedData packData;
@ -1559,23 +1517,21 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
version = packData.ver; version = packData.ver;
if (!taosArrayPush(pSubmitArr, &packData)) { if (!taosArrayPush(pSubmitArr, &packData)) {
taosFreeQitem(msg); taosFreeQitem(msg);
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
} }
++nSubmit; ++nSubmit;
} else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
_resume_delete: _resume_delete:
version = RSMA_EXEC_MSG_VER(msg); version = RSMA_EXEC_MSG_VER(msg);
if ((terrno = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
&packData.pDataBlock, 1))) { &packData.pDataBlock, 1))) {
taosFreeQitem(msg); taosFreeQitem(msg);
goto _err; TAOS_CHECK_EXIT(code);
} }
if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) { if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
taosFreeQitem(msg); taosFreeQitem(msg);
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
} }
taosFreeQitem(msg); taosFreeQitem(msg);
if (packData.pDataBlock) { if (packData.pDataBlock) {
@ -1593,9 +1549,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
ASSERTS(size > 0, "size is %d", size); ASSERTS(size > 0, "size is %d", size);
int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i) < 0) { TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
goto _err;
}
} }
tdFreeRSmaSubmitItems(pSubmitArr, inputType); tdFreeRSmaSubmitItems(pSubmitArr, inputType);
nSubmit = 0; nSubmit = 0;
@ -1612,7 +1566,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
_rtn: _rtn:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _exit:
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno); atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
@ -1626,7 +1580,7 @@ _err:
break; break;
} }
} }
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
/** /**
@ -1677,7 +1631,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
int32_t qallItemSize = taosQallItemSize(pInfo->qall); int32_t qallItemSize = taosQallItemSize(pInfo->qall);
if (qallItemSize > 0) { if (qallItemSize > 0) {
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
taosHashCancelIterate(infoHash, pIter);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type); smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
} }
@ -1693,7 +1650,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tdRSmaFetchAllResult(pSma, pInfo); if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) {
taosHashCancelIterate(infoHash, pIter);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
@ -1742,5 +1702,5 @@ _exit:
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
return code; TAOS_RETURN(code);
} }

View File

@ -34,7 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
int32_t numOfRows = 0; int32_t numOfRows = 0;
if (ASSERTS(pTsdb->mem != NULL, "vgId:%d, mem is NULL", TD_VID(pTsdb->pVnode))) { if (ASSERTS(pTsdb->mem != NULL, "vgId:%d, mem is NULL", TD_VID(pTsdb->pVnode))) {
return -1; TAOS_RETURN(TSDB_CODE_INVALID_PTR);
} }
arrSize = taosArrayGetSize(pMsg->aSubmitTbData); arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
@ -42,7 +42,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
// scan and convert // scan and convert
if ((code = tsdbScanAndConvertSubmitMsg(pTsdb, pMsg)) < 0) { if ((code = tsdbScanAndConvertSubmitMsg(pTsdb, pMsg)) < 0) {
if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno)); tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
} }
return code; return code;
} }

View File

@ -601,7 +601,7 @@ int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid); SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
if (!pCur) { if (!pCur) {
return TSDB_CODE_FAILED; return TSDB_CODE_OUT_OF_MEMORY;
} }
while (1) { while (1) {