diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2490074464..4b46162c7d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -708,6 +708,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158) #define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159) #define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160) +#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161) +#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162) +#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index f2466e1843..0072075623 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -44,7 +44,6 @@ typedef struct SRSmaInfoItem SRSmaInfoItem; typedef struct SRSmaFS SRSmaFS; typedef struct SQTaskFile SQTaskFile; typedef struct SQTaskFReader SQTaskFReader; -typedef struct SQTaskFWriter SQTaskFWriter; struct SSmaEnv { SRWLatch lock; @@ -94,15 +93,11 @@ struct SQTaskFile { struct SQTaskFReader { SSma *pSma; + int8_t level; + int64_t suid; int64_t version; TdFilePtr pReadH; }; -struct SQTaskFWriter { - SSma *pSma; - int64_t version; - TdFilePtr pWriteH; - char *fname; -}; struct SRSmaFS { SArray *aQTaskInf; // array of SQTaskFile @@ -221,11 +216,11 @@ void tdRSmaFSClose(SRSmaFS *fs); int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); int32_t tdRSmaFSCommit(SSma *pSma); int32_t tdRSmaFSFinishCommit(SSma *pSma); -int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut); -int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut); -int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); -void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); -int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size); +int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS); +int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS); +int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS); +void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS); +int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize); int32_t tdRSmaFSRollback(SSma *pSma); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7adb18b2dc..62886ea0b9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -271,6 +271,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); typedef struct { @@ -414,6 +415,7 @@ enum { struct SSnapDataHdr { int8_t type; + int8_t flag; int64_t index; int64_t size; uint8_t data[]; diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 525ca5b528..3c60a24d3d 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -588,10 +588,10 @@ _exit: return code; } -int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size) { +int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) { int32_t code = 0; - for (int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < nSize; ++i) { SQTaskFile *qTaskF = qTaskFile + i; int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE); @@ -602,7 +602,14 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); if (c == 0) { - ASSERT(0); + if (pTaskF->size != qTaskF->size) { + code = TSDB_CODE_RSMA_FS_UPDATE; + smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64 + ", size:%" PRIi64 " != %" PRIi64, + SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version, + pTaskF->size, qTaskF->size); + goto _exit; + } continue; } } @@ -616,7 +623,7 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz _exit: return code; } - +#if 0 int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; @@ -696,38 +703,101 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int taosWUnLockLatch(RSMA_FS_LOCK(pStat)); } +#endif -int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) { +int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + int32_t nRef = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *qFS = RSMA_FS(pStat); + int32_t size = taosArrayGetSize(qFS->aQTaskInf); + + pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile)); + if (pFS->aQTaskInf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i); + nRef = atomic_fetch_add_32(&qTaskF->nRef, 1); + if(nRef <= 0) { + code = TSDB_CODE_RSMA_FS_REF; + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + taosArraySetSize(pFS->aQTaskInf, size); + memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code), + nRef); + } + return code; +} + +void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) { + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN]; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + int32_t size = taosArrayGetSize(pFS->aQTaskInf); + + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i); + + nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); + if (nRef == 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), fname); + if (taosRemoveFile(fname) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname); + } + } else if (nRef < 0) { + smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF)); + } + } + + taosArrayDestroy(pFS->aQTaskInf); +} + +int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) { int32_t code = 0; int32_t lino = 0; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); taosRLockLatch(RSMA_FS_LOCK(pStat)); - code = tdRSmaFSCopy(pSma, pFSOut); + code = tdRSmaFSRef(pSma, pFS); TSDB_CHECK_CODE(code, lino, _exit); - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); _exit: + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); if (code) { - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); } return code; } -int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) { +int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) { int32_t code = 0; int32_t lino = 0; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaFS *pFS = RSMA_FS(pStat); - int32_t size = taosArrayGetSize(pFS->aQTaskInf); + SRSmaFS *qFS = RSMA_FS(pStat); + int32_t size = taosArrayGetSize(qFS->aQTaskInf); - code = tdRSmaFSCreate(pFSOut, size); + code = tdRSmaFSCreate(pFS, size); TSDB_CHECK_CODE(code, lino, _exit); - taosArraySetSize(pFSOut->aQTaskInf, size); - memcpy(pFSOut->aQTaskInf->pData, pFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); + taosArraySetSize(pFS->aQTaskInf, size); + memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); _exit: if (code) { diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 2b442b9de1..3923a5dd5b 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -121,8 +121,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty int32_t smaOpen(SVnode *pVnode, int8_t rollback) { STsdbCfg *pCfg = &pVnode->config.tsdbCfg; - ASSERT(!pVnode->pSma); - SSma *pSma = taosMemoryCalloc(1, sizeof(SSma)); if (!pSma) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -137,7 +135,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) { if (VND_IS_RSMA(pVnode)) { STsdbKeepCfg keepCfg = {0}; - for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { + for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { if (i == TSDB_RETENTION_L0) { SMA_OPEN_RSMA_IMPL(pVnode, 0); } else if (i == TSDB_RETENTION_L1) { @@ -145,7 +143,9 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) { } else if (i == TSDB_RETENTION_L2) { SMA_OPEN_RSMA_IMPL(pVnode, 2); } else { - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), terrstr(), i); + goto _err; } } @@ -182,7 +182,10 @@ int32_t smaClose(SSma *pSma) { * @return int32_t */ int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) { - ASSERT(VND_IS_RSMA(pSma->pVnode)); + if (!VND_IS_RSMA(pSma->pVnode)) { + terrno = TSDB_CODE_RSMA_INVALID_ENV; + return TSDB_CODE_FAILED; + } return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback); } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3410dcfd5c..044e1d5bab 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1272,10 +1272,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } // prepare - code = tdRSmaFSTakeSnapshot(pSma, &fs); + code = tdRSmaFSCopy(pSma, &fs); TSDB_CHECK_CODE(code, lino, _exit); - code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); + code = tdRSmaFSUpsertQTaskFile(pSma, &fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); TSDB_CHECK_CODE(code, lino, _exit); code = tdRSmaFSPrepareCommit(pSma, &fs); diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 9bc2619c9e..356a52e996 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -17,14 +17,13 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData); static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version); -static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader); // SRSmaSnapReader ======================================== struct SRSmaSnapReader { SSma* pSma; int64_t sver; int64_t ever; + SRSmaFS fs; // for data file int8_t rsmaDataDone[TSDB_RETENTION_L2]; @@ -32,19 +31,23 @@ struct SRSmaSnapReader { // for qtaskinfo file int8_t qTaskDone; + int32_t fsIter; SQTaskFReader* pQTaskFReader; }; int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) { int32_t code = 0; + int32_t lino = 0; SVnode* pVnode = pSma->pVnode; SRSmaSnapReader* pReader = NULL; + SSmaEnv* pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat* pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); // alloc pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pReader->pSma = pSma; pReader->sver = sver; @@ -55,174 +58,144 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead if (pSma->pRSmaTsdb[i]) { code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2, &pReader->pDataReader[i]); - if (code < 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _exit); } } // open qtaskinfo - if ((code = rsmaQTaskInfSnapReaderOpen(pReader, ever)) < 0) { - goto _err; + taosRLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSRef(pSma, &pReader->fs); + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) { + pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); + if (!pReader->pQTaskFReader) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + pReader->pQTaskFReader->pSma = pSma; + pReader->pQTaskFReader->version = pReader->ever; } *ppReader = pReader; - - return TSDB_CODE_SUCCESS; -_err: - if (pReader) rsmaSnapReaderClose(&pReader); - *ppReader = NULL; - smaError("vgId:%d, vnode snapshot rsma reader open failed since %s", TD_VID(pVnode), tstrerror(code)); - return TSDB_CODE_FAILED; -} - -static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) { - #if 0 - int32_t code = 0; - SSma* pSma = pReader->pSma; - SVnode* pVnode = pSma->pVnode; - SSmaEnv* pEnv = NULL; - SRSmaStat* pStat = NULL; - - if (!(pEnv = SMA_RSMA_ENV(pSma))) { - smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as env is NULL", - TD_VID(pVnode), version); - return TSDB_CODE_SUCCESS; +_exit: + if (code) { + if (pReader) rsmaSnapReaderClose(&pReader); + *ppReader = NULL; + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } - - pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); - - int32_t ref = tdRSmaFSRef(pReader->pSma, pStat, version); - if (ref < 1) { - smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as ref is %d", - TD_VID(pVnode), version, ref); - return TSDB_CODE_SUCCESS; - } - - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - - if (!taosCheckExistFile(qTaskInfoFullName)) { - tdRSmaFSUnRef(pSma, pStat, version); - smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as %s not exist", - TD_VID(pVnode), version, qTaskInfoFullName); - return TSDB_CODE_SUCCESS; - } - - pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); - if (!pReader->pQTaskFReader) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - TdFilePtr fp = taosOpenFile(qTaskInfoFullName, TD_FILE_READ); - if (!fp) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFreeClear(pReader->pQTaskFReader); - goto _end; - } - - pReader->pQTaskFReader->pReadH = fp; - pReader->pQTaskFReader->pSma = pSma; - pReader->pQTaskFReader->version = pReader->ever; - -_end: - if (code < 0) { - tdRSmaFSUnRef(pSma, pStat, version); - smaError("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName); - return TSDB_CODE_FAILED; - } - - smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName); - #endif - return TSDB_CODE_SUCCESS; -} - -static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) { -#if 0 - if (!(*ppReader)) { - return TSDB_CODE_SUCCESS; - } - - SSma* pSma = (*ppReader)->pSma; - SRSmaStat* pStat = SMA_RSMA_STAT(pSma); - int64_t version = (*ppReader)->version; - - taosCloseFile(&(*ppReader)->pReadH); - tdRSmaFSUnRef(pSma, pStat, version); - taosMemoryFreeClear(*ppReader); - smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version); -#endif - return TSDB_CODE_SUCCESS; + return code; } static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) { int32_t code = 0; - SSma* pSma = pReader->pSma; + int32_t lino = 0; + SVnode* pVnode = pReader->pSma->pVnode; + SQTaskFReader* qReader = pReader->pQTaskFReader; + SRSmaFS* pFS = &pReader->fs; int64_t n = 0; uint8_t* pBuf = NULL; - SQTaskFReader* qReader = pReader->pQTaskFReader; + int64_t version = pReader->ever; + char fname[TSDB_FILENAME_LEN]; if (!qReader) { *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", SMA_VID(pSma)); + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", TD_VID(pVnode)); return 0; } + if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) { + *ppBuf = NULL; + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode)); + return 0; + } + + while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) { + SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter); + if (qTaskF->version != version) { + continue; + } + + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs), + fname); + if (!taosCheckExistFile(fname)) { + smaWarn("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 ", version %" PRIi64 + " is not needed as %s not exist", + TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname); + continue; + } + + TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ); + if (!fp) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + qReader->pReadH = fp; + qReader->level = qTaskF->level; + qReader->suid = qTaskF->suid; + } + if (!qReader->pReadH) { *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is NULL", SMA_VID(pSma)); + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode)); return 0; } int64_t size = 0; if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // seek if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - ASSERT(!(*ppBuf)); - // alloc - *ppBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + size); + if (*ppBuf) { + *ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size); + } else { + *ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + } if (!(*ppBuf)) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // read n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else if (n != size) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size); + smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version, + size); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf); pHdr->type = SNAP_DATA_QTASK; + pHdr->flag = qReader->level; + pHdr->index = qReader->suid; pHdr->size = size; _exit: - smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma)); - return code; - -_err: - *ppBuf = NULL; - smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code)); + if (code) { + *ppBuf = NULL; + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode)); + } return code; } int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; + int32_t lino = 0; *ppData = NULL; @@ -236,14 +209,11 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { if (!pReader->rsmaDataDone[i]) { smaInfo("vgId:%d, vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i); code = tsdbSnapRead(pTsdbSnapReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->rsmaDataDone[i] = 1; - } + pReader->rsmaDataDone[i] = 1; } } else { smaInfo("vgId:%d, vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i); @@ -254,22 +224,20 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { if (!pReader->qTaskDone) { smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma)); code = rsmaSnapReadQTaskInfo(pReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { pReader->qTaskDone = 1; - if (*ppData) { - goto _exit; - } } } _exit: - smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma)); - return code; - -_err: - smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); + if (code) { + smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); + } else { + smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma)); + } return code; } @@ -277,14 +245,18 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) { int32_t code = 0; SRSmaSnapReader* pReader = *ppReader; + tdRSmaFSUnRef(pReader->pSma, &pReader->fs); + if (pReader->pQTaskFReader) { + taosCloseFile(&pReader->pQTaskFReader->pReadH); + taosMemoryFree(pReader->pQTaskFReader); + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pReader->pDataReader[i]) { tsdbSnapReaderClose(&pReader->pDataReader[i]); } } - rsmaQTaskInfSnapReaderClose(&pReader->pQTaskFReader); - smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma)); taosMemoryFreeClear(*ppReader); @@ -296,29 +268,23 @@ struct SRSmaSnapWriter { SSma* pSma; int64_t sver; int64_t ever; - - // config - int64_t commitID; + SRSmaFS fs; // for data file STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2]; - - // for qtaskinfo file - SQTaskFReader* pQTaskFReader; - SQTaskFWriter* pQTaskFWriter; }; int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) { int32_t code = 0; - #if 0 - SRSmaSnapWriter* pWriter = NULL; + int32_t lino = 0; SVnode* pVnode = pSma->pVnode; + SRSmaSnapWriter* pWriter = NULL; // alloc pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { + if (!pWriter) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->pSma = pSma; pWriter->sver = sver; @@ -328,104 +294,103 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]); - if (code < 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _exit); } } // qtaskinfo - SQTaskFWriter* qWriter = (SQTaskFWriter*)taosMemoryCalloc(1, sizeof(SQTaskFWriter)); - qWriter->pSma = pSma; - - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (!qTaskF) { - taosMemoryFree(qWriter); - code = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, - tstrerror(code)); - goto _err; - } - qWriter->pWriteH = qTaskF; - int32_t fnameLen = strlen(qTaskInfoFullName) + 1; - qWriter->fname = taosMemoryCalloc(1, fnameLen); - strncpy(qWriter->fname, qTaskInfoFullName, fnameLen); - pWriter->pQTaskFWriter = qWriter; - smaDebug("vgId:%d, rsma snapshot writer open succeed for %s", TD_VID(pSma->pVnode), qTaskInfoFullName); + code = tdRSmaFSCopy(pSma, &pWriter->fs); + TSDB_CHECK_CODE(code, lino, _exit); // snapWriter *ppWriter = pWriter; - - smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode)); +_exit: + if (code) { + if (pWriter) rsmaSnapWriterClose(&pWriter, 0); + *ppWriter = NULL; + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode)); + } return code; +} -_err: - smaError("vgId:%d, rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code)); - if (pWriter) rsmaSnapWriterClose(&pWriter, 0); - *ppWriter = NULL; - #endif +int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { + int32_t code = 0; + int32_t lino = 0; + + // rsmaSnapWriterClose + + code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code)); + } return code; } int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; - #if 0 + int32_t lino = 0; + SSma* pSma = NULL; + SSmaEnv* pEnv = NULL; + SRSmaStat* pStat = NULL; SRSmaSnapWriter* pWriter = *ppWriter; - SVnode* pVnode = pWriter->pSma->pVnode; - if (rollback) { - // TODO: rsma1/rsma2 - // qtaskinfo - if (pWriter->pQTaskFWriter) { - if (taosRemoveFile(pWriter->pQTaskFWriter->fname) != 0) { - smaWarn("vgId:%d, vnode snapshot rsma writer failed to remove %s since %s", SMA_VID(pWriter->pSma), - pWriter->pQTaskFWriter->fname ? pWriter->pQTaskFWriter->fname : "NULL", - tstrerror(TAOS_SYSTEM_ERROR(errno))); - } - } - } else { - // rsma1/rsma2 - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pWriter->pDataWriter[i]) { - code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback); - if (code) goto _err; - } - } - // qtaskinfo - if (pWriter->pQTaskFWriter) { - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma), - pWriter->pQTaskFWriter->fname, qTaskInfoFullName); + if (!pWriter) { + goto _exit; + } - // rsma restore - int8_t rollback = 0; - if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback)) < 0) { - goto _err; - } - smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName); + pSma = pWriter->pSma; + pEnv = SMA_RSMA_ENV(pSma); + pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); + + // rsma1/rsma2 + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pWriter->pDataWriter[i]) { + code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback); + TSDB_CHECK_CODE(code, lino, _exit); } } - smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma)); - taosMemoryFree(pWriter); - *ppWriter = NULL; - return code; + // qtaskinfo + if (rollback) { + tdRSmaFSRollback(pSma); + // remove qTaskFiles + } else { + // lock + taosWLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSCommit(pSma); + if (code) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + goto _exit; + } + // unlock + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + } + + // rsma restore + code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback); + TSDB_CHECK_CODE(code, lino, _exit); + smaInfo("vgId:%d, vnode snapshot rsma writer restore from sync succeed", SMA_VID(pSma)); + +_exit: + if (pWriter) taosMemoryFree(pWriter); + *ppWriter = NULL; + if (code) { + smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code)); + } else { + smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma)); + } -_err: - smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); - #endif return code; } int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; + int32_t lino = 0; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; // rsma1/rsma2 @@ -438,42 +403,78 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) } else if (pHdr->type == SNAP_DATA_QTASK) { code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); } else { - ASSERT(0); + code = TSDB_CODE_RSMA_FS_SYNC; } - if (code < 0) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); _exit: - smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); - return code; - -_err: - smaError("vgId:%d, rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, - tstrerror(code)); + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, data type %" PRIi8, SMA_VID(pWriter->pSma), __func__, lino, + tstrerror(code), pHdr->type); + } else { + smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); + } return code; } static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - SQTaskFWriter* qWriter = pWriter->pQTaskFWriter; + int32_t code = 0; + int32_t lino = 0; + SSma* pSma = pWriter->pSma; + SVnode* pVnode = pSma->pVnode; + char fname[TSDB_FILENAME_LEN]; + TdFilePtr fp = NULL; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - if (qWriter && qWriter->pWriteH) { - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - int64_t size = pHdr->size; - ASSERT(size == (nData - sizeof(SSnapDataHdr))); - int64_t contLen = taosWriteFile(qWriter->pWriteH, pHdr->data, size); - if (contLen != size) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", SMA_VID(pWriter->pSma), qWriter->fname); - } else { - smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo is not needed", SMA_VID(pWriter->pSma)); + fname[0] = '\0'; + + if (pHdr->size != (nData - sizeof(SSnapDataHdr))) { + code = TSDB_CODE_RSMA_FS_SYNC; + TSDB_CHECK_CODE(code, lino, _exit); } -_exit: - return code; + SQTaskFile qTaskFile = { + .nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size}; + + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version, + tfsGetPrimaryPath(pVnode->pTfs), fname); + + fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (!fp) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size); + if (contLen != pHdr->size) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + uint32_t mtime = 0; + if (taosFStatFile(fp, NULL, &mtime) == 0) { + qTaskFile.mtime = mtime; + } + + if (taosFsyncFile(fp) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&fp); + + code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + if (fp) { + (void)taosRemoveFile(fname); + } + smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname); + } else { + smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname); + } -_err: - smaError("vgId:%d, vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index a2c9484693..fc8bb46b9c 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -111,35 +111,48 @@ _err: * @return int32_t */ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { - SSmaCfg *pCfg = (SSmaCfg *)pMsg; + int32_t code = 0; + int32_t lino = 0; + SSmaCfg *pCfg = (SSmaCfg *)pMsg; + SVCreateStbReq pReq = {0}; if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { // create tsma meta in dstVgId if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // create stable to save tsma result in dstVgId SName stbFullName = {0}; tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - SVCreateStbReq pReq = {0}; pReq.name = (char *)tNameGetTableName(&stbFullName); pReq.suid = pCfg->dstTbUid; pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } + } else { + code = terrno = TSDB_CODE_TSMA_INVALID_STAT; + TSDB_CHECK_CODE(code, lino, _exit); + } +_exit: + if (code) { + smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 + " dstTb:%s dstVg:%d", + SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, + pCfg->dstVgId); + } else { smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); - } else { - ASSERT(0); } - return 0; + return code; } /** @@ -174,7 +187,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char STSmaStat *pTsmaStat = NULL; if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { - terrno = TSDB_CODE_TSMA_INVALID_STAT; + terrno = TSDB_CODE_TSMA_INVALID_ENV; return TSDB_CODE_FAILED; } @@ -216,9 +229,14 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char indexUid, tstrerror(terrno)); goto _err; } - + #if 0 - ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)); + if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { + terrno = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid, + pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName); + goto _err; + } #endif SRpcMsg submitReqMsg = { diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 56ba2b8e98..91afd49cd3 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -98,74 +98,6 @@ void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool } } -#if 0 -int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { - TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK); - TD_TFILE_SET_CLOSED(pTFile); - - memset(&(pTFile->info), 0, sizeof(pTFile->info)); - pTFile->info.magic = TD_FILE_INIT_MAGIC; - - char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0}; - snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname); - int32_t tmpNameLen = strlen(tmpName) + 1; - pTFile->fname = taosMemoryMalloc(tmpNameLen); - if (!pTFile->fname) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - tstrncpy(pTFile->fname, tmpName, tmpNameLen); - - return 0; -} - -int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) { - ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pTFile->pFile == NULL) { - if (errno == ENOENT) { - // Try to create directory recursively - char *s = strdup(TD_TFILE_FULL_NAME(pTFile)); - if (taosMulMkDir(taosDirName(s)) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(s); - return -1; - } - taosMemoryFree(s); - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pTFile->pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - } - } - - if (!updateHeader) { - return 0; - } - - pTFile->info.fsize += TD_FILE_HEAD_SIZE; - pTFile->info.fver = 0; - - if (tdUpdateTFileHeader(pTFile) < 0) { - tdCloseTFile(pTFile); - tdRemoveTFile(pTFile); - return -1; - } - - return 0; -} - -int32_t tdRemoveTFile(STFile *pTFile) { - if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - }; - return 0; -} - -#endif - // smaXXXUtil ================ void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) { void *pResult = taosAcquireRef(rsetId, refId); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 792fd0008a..e729a3bf7f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -585,7 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") -TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Invalid rsma fs commit") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Rsma fs commit error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") @@ -593,6 +593,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")