enh: refactor return code

This commit is contained in:
kailixu 2024-07-23 19:30:55 +08:00
parent 4557cf05cd
commit 994a08ad41
3 changed files with 113 additions and 127 deletions

View File

@ -16,38 +16,34 @@
#include "sma.h" #include "sma.h"
#include "tsdb.h" #include "tsdb.h"
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration); static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration,
int32_t *days);
static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
static int32_t rsmaRestore(SSma *pSma); static int32_t rsmaRestore(SSma *pSma);
#define SMA_SET_KEEP_CFG(v, l) \ #define SMA_SET_KEEP_CFG(v, l) \
do { \ do { \
SRetention *r = &pCfg->retentions[l]; \ SRetention *r = &pCfg->retentions[l]; \
int64_t keep = -1; \ int64_t keep = -1; \
convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE, &keep); \ TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE, &keep)); \
pKeepCfg->keep2 = (int32_t)keep; \ pKeepCfg->keep2 = (int32_t)keep; \
pKeepCfg->keep0 = pKeepCfg->keep2; \ pKeepCfg->keep0 = pKeepCfg->keep2; \
pKeepCfg->keep1 = pKeepCfg->keep2; \ pKeepCfg->keep1 = pKeepCfg->keep2; \
pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ TAOS_CHECK_EXIT(smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days, &pKeepCfg->days)); \
pKeepCfg->keepTimeOffset = 0; \ pKeepCfg->keepTimeOffset = 0; \
} while (0) } while (0)
#define SMA_OPEN_RSMA_IMPL(v, l, force) \ #define SMA_OPEN_RSMA_IMPL(v, l, force) \
do { \ do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(l, r)) { \ if (!RETENTION_VALID(l, r)) { \
if (l == 0) { \ if (l == 0) { \
code = TSDB_CODE_INVALID_PARA; \ TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA); \
TSDB_CHECK_CODE(code, lino, _exit); \ } \
} \ break; \
break; \ } \
} \ TAOS_CHECK_EXIT(smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l)); \
code = smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ TAOS_CHECK_EXIT(tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback, force)); \
TSDB_CHECK_CODE(code, lino, _exit); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback, force) < 0) { \
code = terrno; \
TSDB_CHECK_CODE(code, lino, _exit); \
} \
} while (0) } while (0)
/** /**
@ -59,51 +55,61 @@ static int32_t rsmaRestore(SSma *pSma);
* @param level * @param level
* @param precision * @param precision
* @param duration * @param duration
* @param days
* @return int32_t * @return int32_t
*/ */
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) { static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration,
int32_t code = TSDB_CODE_SUCCESS; int32_t *days) {
int32_t code = 0;
int32_t lino = 0;
int64_t freqDuration = -1; int64_t freqDuration = -1;
int64_t keepDuration = -1; int64_t keepDuration = -1;
code = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE, &freqDuration); TAOS_CHECK_EXIT(
code = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE, &keepDuration); convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE, &freqDuration));
int32_t days = duration; // min TAOS_CHECK_EXIT(
convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE, &keepDuration));
*days = duration; // min
if (days < freqDuration) { if (*days < freqDuration) {
days = freqDuration; *days = freqDuration;
} }
if (days > keepDuration) { if (*days > keepDuration) {
days = keepDuration; *days = keepDuration;
} }
if (level < TSDB_RETENTION_L1 || level > TSDB_RETENTION_L2) { if (level < TSDB_RETENTION_L1 || level > TSDB_RETENTION_L2) {
goto _exit; goto _exit;
} }
code = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE, &freqDuration); TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE, &freqDuration));
code = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE, &keepDuration); TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE, &keepDuration));
int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level
days *= (nFreqTimes > 1 ? nFreqTimes : 1); *days *= (nFreqTimes > 1 ? nFreqTimes : 1);
if (days < freqDuration) { if (*days < freqDuration) {
days = freqDuration; *days = freqDuration;
} }
int32_t maxKeepDuration = TMIN(keepDuration, TSDB_MAX_DURATION_PER_FILE); int32_t maxKeepDuration = TMIN(keepDuration, TSDB_MAX_DURATION_PER_FILE);
if (days > maxKeepDuration) { if (*days > maxKeepDuration) {
days = maxKeepDuration; *days = maxKeepDuration;
} }
_exit: _exit:
smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration); if (code) {
return days; smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, *days, duration);
}
TAOS_RETURN(code);
} }
int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) { int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
terrno = 0; int32_t code = 0;
int32_t lino = 0;
pKeepCfg->precision = pCfg->precision; pKeepCfg->precision = pCfg->precision;
switch (type) { switch (type) {
case TSDB_TYPE_RSMA_L0: case TSDB_TYPE_RSMA_L0:
@ -116,10 +122,14 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
SMA_SET_KEEP_CFG(pVnode, 2); SMA_SET_KEEP_CFG(pVnode, 2);
break; break;
default: default:
terrno = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
break; break;
} }
return terrno; _exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
TAOS_RETURN(code);
} }
int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) { int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) {
@ -129,8 +139,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) {
SSma *pSma = taosMemoryCalloc(1, sizeof(SSma)); SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
if (!pSma) { if (!pSma) {
code = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code, lino, _exit);
} }
pVnode->pSma = pSma; pVnode->pSma = pSma;
@ -152,18 +161,14 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback, bool force) {
} }
// restore the rsma // restore the rsma
if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) { TAOS_CHECK_EXIT(tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback));
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
_exit: _exit:
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
terrno = code;
} }
return code; TAOS_RETURN(code);
} }
int32_t smaClose(SSma *pSma) { int32_t smaClose(SSma *pSma) {
@ -190,7 +195,7 @@ int32_t smaClose(SSma *pSma) {
*/ */
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) { int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) {
if (!VND_IS_RSMA(pSma->pVnode)) { if (!VND_IS_RSMA(pSma->pVnode)) {
return TSDB_CODE_RSMA_INVALID_ENV; TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
} }
return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback); return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback);

View File

@ -123,8 +123,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) { if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -132,12 +131,13 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) { static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
int32_t code = 0;
if (!suid || !tbUids) { if (!suid || !tbUids) {
terrno = TSDB_CODE_INVALID_PTR; code = TSDB_CODE_INVALID_PTR;
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1, smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1,
terrstr()); tstrerror(code));
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
int32_t nTables = taosArrayGetSize(tbUids); int32_t nTables = taosArrayGetSize(tbUids);
@ -151,17 +151,17 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
if (!pRSmaInfo) { if (!pRSmaInfo) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
terrno = TSDB_CODE_RSMA_INVALID_STAT; code = TSDB_CODE_RSMA_INVALID_STAT;
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) { if (pRSmaInfo->taskInfo[i]) {
if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) { if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr()); tstrerror(code));
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64 smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
"nTables:%d level %d", "nTables:%d level %d",
@ -170,26 +170,25 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
} }
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS; TAOS_RETURN(code);
} }
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
int32_t code = 0;
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd) != TSDB_CODE_SUCCESS) { TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd));
return TSDB_CODE_FAILED;
}
void *pIter = NULL; void *pIter = NULL;
while ((pIter = taosHashIterate(pStore->uidHash, pIter))) { while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SArray *pTbUids = *(SArray **)pIter; SArray *pTbUids = *(SArray **)pIter;
if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) { if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) {
taosHashCancelIterate(pStore->uidHash, pIter); taosHashCancelIterate(pStore->uidHash, pIter);
return TSDB_CODE_FAILED; TAOS_RETURN(code);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -206,6 +205,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
*/ */
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) { int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
int32_t code = 0;
// only applicable to rollup SMA ctables // only applicable to rollup SMA ctables
if (!pEnv) { if (!pEnv) {

View File

@ -32,19 +32,19 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code)); smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code));
} }
return code; TAOS_RETURN(code);
} }
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) { int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg); int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
return code; TAOS_RETURN(code);
} }
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days); int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
return code; TAOS_RETURN(code);
} }
/** /**
@ -70,8 +70,8 @@ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t c
STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
int64_t sInterval = -1; int64_t sInterval = -1;
code = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval); TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval));
if (TSDB_CODE_SUCCESS != code || 0 == sInterval) { if (0 == sInterval) {
*days = pTsdbCfg->days; *days = pTsdbCfg->days;
goto _exit; goto _exit;
} }
@ -80,10 +80,7 @@ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t c
*days = pTsdbCfg->days; *days = pTsdbCfg->days;
} else { } else {
int64_t mInterval = -1; int64_t mInterval = -1;
code = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval); TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval));
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
}
int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
@ -103,7 +100,7 @@ _exit:
smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days); smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
} }
tDecoderClear(&coder); tDecoderClear(&coder);
return code; TAOS_RETURN(code);
} }
/** /**
@ -123,10 +120,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
// create tsma meta in dstVgId // create tsma meta in dstVgId
if (metaCreateTSma(SMA_META(pSma), ver, pCfg) < 0) { TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// create stable to save tsma result in dstVgId // create stable to save tsma result in dstVgId
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -135,28 +129,24 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg
pReq.schemaRow = pCfg->schemaRow; pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag; pReq.schemaTag = pCfg->schemaTag;
if (metaCreateSTable(SMA_META(pSma), ver, &pReq) < 0) { TAOS_CHECK_EXIT(metaCreateSTable(SMA_META(pSma), ver, &pReq));
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
} else { } else {
code = terrno = TSDB_CODE_TSMA_INVALID_STAT; TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
if (code) { if (code) {
smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
" dstTb:%s dstVg:%d", " dstTb:%s dstVg:%d since %s",
SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
pCfg->dstVgId); pCfg->dstVgId, tstrerror(code));
} else { } else {
smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
" dstTb:%s dstVg:%d", " dstTb:%s dstVg:%d",
SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
} }
return code; TAOS_RETURN(code);
} }
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid, int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid,
@ -167,6 +157,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t len = 0; int32_t len = 0;
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
SArray *tagArray = NULL; SArray *tagArray = NULL;
SHashObj *pTableIndexMap = NULL;
int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t numOfBlocks = taosArrayGetSize(pBlocks);
@ -174,18 +165,18 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)); pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
if (!tagArray || !pReq) { if (!tagArray || !pReq) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code, lino, _exit);
} }
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)); pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
if (pReq->aSubmitTbData == NULL) { if (pReq->aSubmitTbData == NULL) {
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno; TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
TSDB_CHECK_CODE(code, lino, _exit);
} }
SHashObj *pTableIndexMap = pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pTableIndexMap == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
// SSubmitTbData req // SSubmitTbData req
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
@ -193,8 +184,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid; pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true); TAOS_CHECK_EXIT(tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true));
TSDB_CHECK_CODE(code, lino, _exit);
continue; continue;
} }
@ -202,11 +192,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
code = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq); TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq));
if (code) {
smaError("failed to build create-table req, code:%d", code);
continue;
}
{ {
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
@ -221,7 +207,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
taosArrayPush(pReq->aSubmitTbData, &tbData); taosArrayPush(pReq->aSubmitTbData, &tbData);
int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1; int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); TAOS_CHECK_EXIT(taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)));
} else { } else {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, ""); code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -237,15 +223,13 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
} }
} }
taosHashCleanup(pTableIndexMap);
// encode // encode
tEncodeSize(tEncodeSubmitReq, pReq, len, code); tEncodeSize(tEncodeSubmitReq, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
if (!(pBuf = rpcMallocCont(len))) { if (!(pBuf = rpcMallocCont(len))) {
code = terrno; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -253,9 +237,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len); ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg *)pBuf)->version = htobe64(1); ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSubmitReq(&encoder, pReq) < 0) { if ((code = tEncodeSubmitReq(&encoder, pReq)) < 0) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
@ -263,6 +246,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
_exit: _exit:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosHashCleanup(pTableIndexMap);
if (pReq != NULL) { if (pReq != NULL) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
@ -276,7 +260,7 @@ _exit:
if (ppData) *ppData = pBuf; if (ppData) *ppData = pBuf;
if (pLen) *pLen = len; if (pLen) *pLen = len;
} }
return code; TAOS_RETURN(code);
} }
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) { static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
@ -290,7 +274,7 @@ static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
if (!pBuf) { if (!pBuf) {
code = terrno; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -313,7 +297,7 @@ _exit:
indexUid, tstrerror(code)); indexUid, tstrerror(code));
} }
return code; TAOS_RETURN(code);
} }
/** /**
@ -350,16 +334,15 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
pTsmaStat = SMA_STAT_TSMA(pStat); pTsmaStat = SMA_STAT_TSMA(pStat);
if (!pTsmaStat->pTSma) { if (!pTsmaStat->pTSma) {
terrno = 0;
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) { if (!pTSma) {
code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR; code = TSDB_CODE_TSMA_INVALID_PTR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
pTsmaStat->pTSma = pTSma; pTsmaStat->pTSma = pTSma;
pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1); pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1);
if (!pTsmaStat->pTSchema) { if (!pTsmaStat->pTSchema) {
code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR; code = TSDB_CODE_TSMA_INVALID_PTR;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
@ -378,15 +361,13 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen); pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) { TAOS_CHECK_EXIT(tsmaProcessDelReq(pSma, indexUid, &deleteReq));
goto _exit;
}
#if 0 #if 0
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
terrno = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid, smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName); pTsmaStat->pTSma->indexUid, tstrerror(code), pTsmaStat->pTSma->dstTbName);
goto _err; goto _err;
} }
#endif #endif
@ -405,5 +386,5 @@ _exit:
smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino, smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino,
tstrerror(code), indexUid); tstrerror(code), indexUid);
} }
return code; TAOS_RETURN(code);
} }