chore: rsma sync and assert
This commit is contained in:
parent
ed3a0c1dc1
commit
73710da55f
|
@ -708,6 +708,9 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
|
||||
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159)
|
||||
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160)
|
||||
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161)
|
||||
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162)
|
||||
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163)
|
||||
|
||||
//index
|
||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||
|
|
|
@ -44,7 +44,6 @@ typedef struct SRSmaInfoItem SRSmaInfoItem;
|
|||
typedef struct SRSmaFS SRSmaFS;
|
||||
typedef struct SQTaskFile SQTaskFile;
|
||||
typedef struct SQTaskFReader SQTaskFReader;
|
||||
typedef struct SQTaskFWriter SQTaskFWriter;
|
||||
|
||||
struct SSmaEnv {
|
||||
SRWLatch lock;
|
||||
|
@ -94,15 +93,11 @@ struct SQTaskFile {
|
|||
|
||||
struct SQTaskFReader {
|
||||
SSma *pSma;
|
||||
int8_t level;
|
||||
int64_t suid;
|
||||
int64_t version;
|
||||
TdFilePtr pReadH;
|
||||
};
|
||||
struct SQTaskFWriter {
|
||||
SSma *pSma;
|
||||
int64_t version;
|
||||
TdFilePtr pWriteH;
|
||||
char *fname;
|
||||
};
|
||||
|
||||
struct SRSmaFS {
|
||||
SArray *aQTaskInf; // array of SQTaskFile
|
||||
|
@ -221,11 +216,11 @@ void tdRSmaFSClose(SRSmaFS *fs);
|
|||
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
|
||||
int32_t tdRSmaFSCommit(SSma *pSma);
|
||||
int32_t tdRSmaFSFinishCommit(SSma *pSma);
|
||||
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut);
|
||||
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut);
|
||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
|
||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version);
|
||||
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size);
|
||||
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
|
||||
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
|
||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
|
||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
|
||||
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
|
||||
int32_t tdRSmaFSRollback(SSma *pSma);
|
||||
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
|
||||
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||
|
|
|
@ -271,6 +271,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
|
|||
// SRSmaSnapWriter ========================================
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
|
||||
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
|
||||
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
|
||||
|
||||
typedef struct {
|
||||
|
@ -414,6 +415,7 @@ enum {
|
|||
|
||||
struct SSnapDataHdr {
|
||||
int8_t type;
|
||||
int8_t flag;
|
||||
int64_t index;
|
||||
int64_t size;
|
||||
uint8_t data[];
|
||||
|
|
|
@ -588,10 +588,10 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size) {
|
||||
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) {
|
||||
int32_t code = 0;
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
for (int32_t i = 0; i < nSize; ++i) {
|
||||
SQTaskFile *qTaskF = qTaskFile + i;
|
||||
|
||||
int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE);
|
||||
|
@ -602,7 +602,14 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz
|
|||
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx);
|
||||
int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF);
|
||||
if (c == 0) {
|
||||
ASSERT(0);
|
||||
if (pTaskF->size != qTaskF->size) {
|
||||
code = TSDB_CODE_RSMA_FS_UPDATE;
|
||||
smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64
|
||||
", size:%" PRIi64 " != %" PRIi64,
|
||||
SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version,
|
||||
pTaskF->size, qTaskF->size);
|
||||
goto _exit;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -616,7 +623,7 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz
|
|||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) {
|
||||
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
||||
SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version};
|
||||
|
@ -696,38 +703,101 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int
|
|||
|
||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) {
|
||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t nRef = 0;
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SRSmaFS *qFS = RSMA_FS(pStat);
|
||||
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
|
||||
|
||||
pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile));
|
||||
if (pFS->aQTaskInf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i);
|
||||
nRef = atomic_fetch_add_32(&qTaskF->nRef, 1);
|
||||
if(nRef <= 0) {
|
||||
code = TSDB_CODE_RSMA_FS_REF;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
taosArraySetSize(pFS->aQTaskInf, size);
|
||||
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code),
|
||||
nRef);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) {
|
||||
int32_t nRef = 0;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i);
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version,
|
||||
tfsGetPrimaryPath(pVnode->pTfs), fname);
|
||||
if (taosRemoveFile(fname) < 0) {
|
||||
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
} else {
|
||||
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname);
|
||||
}
|
||||
} else if (nRef < 0) {
|
||||
smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF));
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pFS->aQTaskInf);
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
|
||||
taosRLockLatch(RSMA_FS_LOCK(pStat));
|
||||
code = tdRSmaFSCopy(pSma, pFSOut);
|
||||
code = tdRSmaFSRef(pSma, pFS);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
_exit:
|
||||
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
if (code) {
|
||||
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) {
|
||||
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SRSmaFS *pFS = RSMA_FS(pStat);
|
||||
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
|
||||
SRSmaFS *qFS = RSMA_FS(pStat);
|
||||
int32_t size = taosArrayGetSize(qFS->aQTaskInf);
|
||||
|
||||
code = tdRSmaFSCreate(pFSOut, size);
|
||||
code = tdRSmaFSCreate(pFS, size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosArraySetSize(pFSOut->aQTaskInf, size);
|
||||
memcpy(pFSOut->aQTaskInf->pData, pFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
|
||||
taosArraySetSize(pFS->aQTaskInf, size);
|
||||
memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile));
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
|
|
@ -121,8 +121,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
|
|||
int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
|
||||
STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
|
||||
|
||||
ASSERT(!pVnode->pSma);
|
||||
|
||||
SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
|
||||
if (!pSma) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -137,7 +135,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
|
|||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
STsdbKeepCfg keepCfg = {0};
|
||||
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
if (i == TSDB_RETENTION_L0) {
|
||||
SMA_OPEN_RSMA_IMPL(pVnode, 0);
|
||||
} else if (i == TSDB_RETENTION_L1) {
|
||||
|
@ -145,7 +143,9 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
|
|||
} else if (i == TSDB_RETENTION_L2) {
|
||||
SMA_OPEN_RSMA_IMPL(pVnode, 2);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), terrstr(), i);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,7 +182,10 @@ int32_t smaClose(SSma *pSma) {
|
|||
* @return int32_t
|
||||
*/
|
||||
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) {
|
||||
ASSERT(VND_IS_RSMA(pSma->pVnode));
|
||||
if (!VND_IS_RSMA(pSma->pVnode)) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_ENV;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback);
|
||||
}
|
||||
|
|
|
@ -1272,10 +1272,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
}
|
||||
|
||||
// prepare
|
||||
code = tdRSmaFSTakeSnapshot(pSma, &fs);
|
||||
code = tdRSmaFSCopy(pSma, &fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray));
|
||||
code = tdRSmaFSUpsertQTaskFile(pSma, &fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray));
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tdRSmaFSPrepareCommit(pSma, &fs);
|
||||
|
|
|
@ -17,14 +17,13 @@
|
|||
|
||||
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
|
||||
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version);
|
||||
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader);
|
||||
|
||||
// SRSmaSnapReader ========================================
|
||||
struct SRSmaSnapReader {
|
||||
SSma* pSma;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
SRSmaFS fs;
|
||||
|
||||
// for data file
|
||||
int8_t rsmaDataDone[TSDB_RETENTION_L2];
|
||||
|
@ -32,19 +31,23 @@ struct SRSmaSnapReader {
|
|||
|
||||
// for qtaskinfo file
|
||||
int8_t qTaskDone;
|
||||
int32_t fsIter;
|
||||
SQTaskFReader* pQTaskFReader;
|
||||
};
|
||||
|
||||
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnode* pVnode = pSma->pVnode;
|
||||
SRSmaSnapReader* pReader = NULL;
|
||||
SSmaEnv* pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat* pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
|
||||
|
||||
// alloc
|
||||
pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||
if (pReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pReader->pSma = pSma;
|
||||
pReader->sver = sver;
|
||||
|
@ -55,174 +58,144 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
|
|||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
|
||||
&pReader->pDataReader[i]);
|
||||
if (code < 0) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
// open qtaskinfo
|
||||
if ((code = rsmaQTaskInfSnapReaderOpen(pReader, ever)) < 0) {
|
||||
goto _err;
|
||||
taosRLockLatch(RSMA_FS_LOCK(pStat));
|
||||
code = tdRSmaFSRef(pSma, &pReader->fs);
|
||||
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) {
|
||||
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
|
||||
if (!pReader->pQTaskFReader) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pReader->pQTaskFReader->pSma = pSma;
|
||||
pReader->pQTaskFReader->version = pReader->ever;
|
||||
}
|
||||
|
||||
*ppReader = pReader;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
if (pReader) rsmaSnapReaderClose(&pReader);
|
||||
*ppReader = NULL;
|
||||
smaError("vgId:%d, vnode snapshot rsma reader open failed since %s", TD_VID(pVnode), tstrerror(code));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) {
|
||||
#if 0
|
||||
int32_t code = 0;
|
||||
SSma* pSma = pReader->pSma;
|
||||
SVnode* pVnode = pSma->pVnode;
|
||||
SSmaEnv* pEnv = NULL;
|
||||
SRSmaStat* pStat = NULL;
|
||||
|
||||
if (!(pEnv = SMA_RSMA_ENV(pSma))) {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as env is NULL",
|
||||
TD_VID(pVnode), version);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_exit:
|
||||
if (code) {
|
||||
if (pReader) rsmaSnapReaderClose(&pReader);
|
||||
*ppReader = NULL;
|
||||
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
|
||||
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
|
||||
|
||||
int32_t ref = tdRSmaFSRef(pReader->pSma, pStat, version);
|
||||
if (ref < 1) {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as ref is %d",
|
||||
TD_VID(pVnode), version, ref);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||
|
||||
if (!taosCheckExistFile(qTaskInfoFullName)) {
|
||||
tdRSmaFSUnRef(pSma, pStat, version);
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as %s not exist",
|
||||
TD_VID(pVnode), version, qTaskInfoFullName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
|
||||
if (!pReader->pQTaskFReader) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
TdFilePtr fp = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
|
||||
if (!fp) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFreeClear(pReader->pQTaskFReader);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pReader->pQTaskFReader->pReadH = fp;
|
||||
pReader->pQTaskFReader->pSma = pSma;
|
||||
pReader->pQTaskFReader->version = pReader->ever;
|
||||
|
||||
_end:
|
||||
if (code < 0) {
|
||||
tdRSmaFSUnRef(pSma, pStat, version);
|
||||
smaError("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName);
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) {
|
||||
#if 0
|
||||
if (!(*ppReader)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSma* pSma = (*ppReader)->pSma;
|
||||
SRSmaStat* pStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t version = (*ppReader)->version;
|
||||
|
||||
taosCloseFile(&(*ppReader)->pReadH);
|
||||
tdRSmaFSUnRef(pSma, pStat, version);
|
||||
taosMemoryFreeClear(*ppReader);
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version);
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) {
|
||||
int32_t code = 0;
|
||||
SSma* pSma = pReader->pSma;
|
||||
int32_t lino = 0;
|
||||
SVnode* pVnode = pReader->pSma->pVnode;
|
||||
SQTaskFReader* qReader = pReader->pQTaskFReader;
|
||||
SRSmaFS* pFS = &pReader->fs;
|
||||
int64_t n = 0;
|
||||
uint8_t* pBuf = NULL;
|
||||
SQTaskFReader* qReader = pReader->pQTaskFReader;
|
||||
int64_t version = pReader->ever;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
if (!qReader) {
|
||||
*ppBuf = NULL;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", SMA_VID(pSma));
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", TD_VID(pVnode));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) {
|
||||
*ppBuf = NULL;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode));
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) {
|
||||
SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter);
|
||||
if (qTaskF->version != version) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
|
||||
fname);
|
||||
if (!taosCheckExistFile(fname)) {
|
||||
smaWarn("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 ", version %" PRIi64
|
||||
" is not needed as %s not exist",
|
||||
TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
|
||||
continue;
|
||||
}
|
||||
|
||||
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (!fp) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
qReader->pReadH = fp;
|
||||
qReader->level = qTaskF->level;
|
||||
qReader->suid = qTaskF->suid;
|
||||
}
|
||||
|
||||
if (!qReader->pReadH) {
|
||||
*ppBuf = NULL;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is NULL", SMA_VID(pSma));
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// seek
|
||||
if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
ASSERT(!(*ppBuf));
|
||||
// alloc
|
||||
*ppBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + size);
|
||||
if (*ppBuf) {
|
||||
*ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size);
|
||||
} else {
|
||||
*ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
|
||||
}
|
||||
if (!(*ppBuf)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// read
|
||||
n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else if (n != size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size);
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version,
|
||||
size);
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
|
||||
pHdr->type = SNAP_DATA_QTASK;
|
||||
pHdr->flag = qReader->level;
|
||||
pHdr->index = qReader->suid;
|
||||
pHdr->size = size;
|
||||
|
||||
_exit:
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
*ppBuf = NULL;
|
||||
smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
if (code) {
|
||||
*ppBuf = NULL;
|
||||
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
*ppData = NULL;
|
||||
|
||||
|
@ -236,14 +209,11 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
|
|||
if (!pReader->rsmaDataDone[i]) {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i);
|
||||
code = tsdbSnapRead(pTsdbSnapReader, ppData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
pReader->rsmaDataDone[i] = 1;
|
||||
}
|
||||
pReader->rsmaDataDone[i] = 1;
|
||||
}
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i);
|
||||
|
@ -254,22 +224,20 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
|
|||
if (!pReader->qTaskDone) {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma));
|
||||
code = rsmaSnapReadQTaskInfo(pReader, ppData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
pReader->qTaskDone = 1;
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
|
||||
if (code) {
|
||||
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -277,14 +245,18 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
|
|||
int32_t code = 0;
|
||||
SRSmaSnapReader* pReader = *ppReader;
|
||||
|
||||
tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
|
||||
if (pReader->pQTaskFReader) {
|
||||
taosCloseFile(&pReader->pQTaskFReader->pReadH);
|
||||
taosMemoryFree(pReader->pQTaskFReader);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pReader->pDataReader[i]) {
|
||||
tsdbSnapReaderClose(&pReader->pDataReader[i]);
|
||||
}
|
||||
}
|
||||
|
||||
rsmaQTaskInfSnapReaderClose(&pReader->pQTaskFReader);
|
||||
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
|
||||
|
||||
taosMemoryFreeClear(*ppReader);
|
||||
|
@ -296,29 +268,23 @@ struct SRSmaSnapWriter {
|
|||
SSma* pSma;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
|
||||
// config
|
||||
int64_t commitID;
|
||||
SRSmaFS fs;
|
||||
|
||||
// for data file
|
||||
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
|
||||
|
||||
// for qtaskinfo file
|
||||
SQTaskFReader* pQTaskFReader;
|
||||
SQTaskFWriter* pQTaskFWriter;
|
||||
};
|
||||
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
SRSmaSnapWriter* pWriter = NULL;
|
||||
int32_t lino = 0;
|
||||
SVnode* pVnode = pSma->pVnode;
|
||||
SRSmaSnapWriter* pWriter = NULL;
|
||||
|
||||
// alloc
|
||||
pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||
if (pWriter == NULL) {
|
||||
if (!pWriter) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pWriter->pSma = pSma;
|
||||
pWriter->sver = sver;
|
||||
|
@ -328,104 +294,103 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit
|
|||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
|
||||
if (code < 0) {
|
||||
goto _err;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
// qtaskinfo
|
||||
SQTaskFWriter* qWriter = (SQTaskFWriter*)taosMemoryCalloc(1, sizeof(SQTaskFWriter));
|
||||
qWriter->pSma = pSma;
|
||||
|
||||
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (!qTaskF) {
|
||||
taosMemoryFree(qWriter);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName,
|
||||
tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
qWriter->pWriteH = qTaskF;
|
||||
int32_t fnameLen = strlen(qTaskInfoFullName) + 1;
|
||||
qWriter->fname = taosMemoryCalloc(1, fnameLen);
|
||||
strncpy(qWriter->fname, qTaskInfoFullName, fnameLen);
|
||||
pWriter->pQTaskFWriter = qWriter;
|
||||
smaDebug("vgId:%d, rsma snapshot writer open succeed for %s", TD_VID(pSma->pVnode), qTaskInfoFullName);
|
||||
code = tdRSmaFSCopy(pSma, &pWriter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// snapWriter
|
||||
*ppWriter = pWriter;
|
||||
|
||||
smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
|
||||
_exit:
|
||||
if (code) {
|
||||
if (pWriter) rsmaSnapWriterClose(&pWriter, 0);
|
||||
*ppWriter = NULL;
|
||||
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code));
|
||||
if (pWriter) rsmaSnapWriterClose(&pWriter, 0);
|
||||
*ppWriter = NULL;
|
||||
#endif
|
||||
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// rsmaSnapWriterClose
|
||||
|
||||
code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
int32_t lino = 0;
|
||||
SSma* pSma = NULL;
|
||||
SSmaEnv* pEnv = NULL;
|
||||
SRSmaStat* pStat = NULL;
|
||||
SRSmaSnapWriter* pWriter = *ppWriter;
|
||||
SVnode* pVnode = pWriter->pSma->pVnode;
|
||||
|
||||
if (rollback) {
|
||||
// TODO: rsma1/rsma2
|
||||
// qtaskinfo
|
||||
if (pWriter->pQTaskFWriter) {
|
||||
if (taosRemoveFile(pWriter->pQTaskFWriter->fname) != 0) {
|
||||
smaWarn("vgId:%d, vnode snapshot rsma writer failed to remove %s since %s", SMA_VID(pWriter->pSma),
|
||||
pWriter->pQTaskFWriter->fname ? pWriter->pQTaskFWriter->fname : "NULL",
|
||||
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// rsma1/rsma2
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pWriter->pDataWriter[i]) {
|
||||
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
// qtaskinfo
|
||||
if (pWriter->pQTaskFWriter) {
|
||||
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||
if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma),
|
||||
pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
|
||||
if (!pWriter) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// rsma restore
|
||||
int8_t rollback = 0;
|
||||
if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
|
||||
pSma = pWriter->pSma;
|
||||
pEnv = SMA_RSMA_ENV(pSma);
|
||||
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
|
||||
|
||||
// rsma1/rsma2
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pWriter->pDataWriter[i]) {
|
||||
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
// qtaskinfo
|
||||
if (rollback) {
|
||||
tdRSmaFSRollback(pSma);
|
||||
// remove qTaskFiles
|
||||
} else {
|
||||
// lock
|
||||
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
||||
code = tdRSmaFSCommit(pSma);
|
||||
if (code) {
|
||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
goto _exit;
|
||||
}
|
||||
// unlock
|
||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
}
|
||||
|
||||
// rsma restore
|
||||
code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
smaInfo("vgId:%d, vnode snapshot rsma writer restore from sync succeed", SMA_VID(pSma));
|
||||
|
||||
_exit:
|
||||
if (pWriter) taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
if (code) {
|
||||
smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma));
|
||||
}
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
|
||||
#endif
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
|
||||
// rsma1/rsma2
|
||||
|
@ -438,42 +403,78 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
|||
} else if (pHdr->type == SNAP_DATA_QTASK) {
|
||||
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
code = TSDB_CODE_RSMA_FS_SYNC;
|
||||
}
|
||||
if (code < 0) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
|
||||
tstrerror(code));
|
||||
if (code) {
|
||||
smaError("vgId:%d, %s failed at line %d since %s, data type %" PRIi8, SMA_VID(pWriter->pSma), __func__, lino,
|
||||
tstrerror(code), pHdr->type);
|
||||
} else {
|
||||
smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
SQTaskFWriter* qWriter = pWriter->pQTaskFWriter;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSma* pSma = pWriter->pSma;
|
||||
SVnode* pVnode = pSma->pVnode;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
TdFilePtr fp = NULL;
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
|
||||
if (qWriter && qWriter->pWriteH) {
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
int64_t size = pHdr->size;
|
||||
ASSERT(size == (nData - sizeof(SSnapDataHdr)));
|
||||
int64_t contLen = taosWriteFile(qWriter->pWriteH, pHdr->data, size);
|
||||
if (contLen != size) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", SMA_VID(pWriter->pSma), qWriter->fname);
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo is not needed", SMA_VID(pWriter->pSma));
|
||||
fname[0] = '\0';
|
||||
|
||||
if (pHdr->size != (nData - sizeof(SSnapDataHdr))) {
|
||||
code = TSDB_CODE_RSMA_FS_SYNC;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
SQTaskFile qTaskFile = {
|
||||
.nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size};
|
||||
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version,
|
||||
tfsGetPrimaryPath(pVnode->pTfs), fname);
|
||||
|
||||
fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (!fp) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size);
|
||||
if (contLen != pHdr->size) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
uint32_t mtime = 0;
|
||||
if (taosFStatFile(fp, NULL, &mtime) == 0) {
|
||||
qTaskFile.mtime = mtime;
|
||||
}
|
||||
|
||||
if (taosFsyncFile(fp) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
taosCloseFile(&fp);
|
||||
|
||||
code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (fp) {
|
||||
(void)taosRemoveFile(fname);
|
||||
}
|
||||
smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname);
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname);
|
||||
}
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -111,35 +111,48 @@ _err:
|
|||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
|
||||
SVCreateStbReq pReq = {0};
|
||||
|
||||
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
|
||||
// create tsma meta in dstVgId
|
||||
if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// create stable to save tsma result in dstVgId
|
||||
SName stbFullName = {0};
|
||||
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = (char *)tNameGetTableName(&stbFullName);
|
||||
pReq.suid = pCfg->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
|
||||
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
} else {
|
||||
code = terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
|
||||
" dstTb:%s dstVg:%d",
|
||||
SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
|
||||
pCfg->dstVgId);
|
||||
} else {
|
||||
smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
|
||||
" dstTb:%s dstVg:%d",
|
||||
SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -174,7 +187,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
|||
STSmaStat *pTsmaStat = NULL;
|
||||
|
||||
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
||||
terrno = TSDB_CODE_TSMA_INVALID_ENV;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -218,7 +231,12 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
|||
}
|
||||
|
||||
#if 0
|
||||
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
|
||||
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
|
||||
pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName);
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
SRpcMsg submitReqMsg = {
|
||||
|
|
|
@ -98,74 +98,6 @@ void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||
TD_TFILE_SET_CLOSED(pTFile);
|
||||
|
||||
memset(&(pTFile->info), 0, sizeof(pTFile->info));
|
||||
pTFile->info.magic = TD_FILE_INIT_MAGIC;
|
||||
|
||||
char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0};
|
||||
snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname);
|
||||
int32_t tmpNameLen = strlen(tmpName) + 1;
|
||||
pTFile->fname = taosMemoryMalloc(tmpNameLen);
|
||||
if (!pTFile->fname) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
tstrncpy(pTFile->fname, tmpName, tmpNameLen);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
|
||||
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
|
||||
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pTFile->pFile == NULL) {
|
||||
if (errno == ENOENT) {
|
||||
// Try to create directory recursively
|
||||
char *s = strdup(TD_TFILE_FULL_NAME(pTFile));
|
||||
if (taosMulMkDir(taosDirName(s)) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(s);
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFree(s);
|
||||
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pTFile->pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!updateHeader) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
|
||||
pTFile->info.fver = 0;
|
||||
|
||||
if (tdUpdateTFileHeader(pTFile) < 0) {
|
||||
tdCloseTFile(pTFile);
|
||||
tdRemoveTFile(pTFile);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdRemoveTFile(STFile *pTFile) {
|
||||
if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
};
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
// smaXXXUtil ================
|
||||
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
|
||||
void *pResult = taosAcquireRef(rsetId, refId);
|
||||
|
|
|
@ -585,7 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Invalid rsma fs commit")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Rsma fs commit error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
|
||||
|
@ -593,6 +593,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error")
|
||||
|
||||
//index
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||
|
|
Loading…
Reference in New Issue