other: rsma snapshot and misc update
This commit is contained in:
parent
660fd8729b
commit
602d2aa377
|
@ -54,6 +54,7 @@ typedef struct TdFile *TdFilePtr;
|
||||||
#define TD_FILE_EXCL 0x0080
|
#define TD_FILE_EXCL 0x0080
|
||||||
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
|
#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile
|
||||||
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions);
|
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions);
|
||||||
|
TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions);
|
||||||
|
|
||||||
#define TD_FILE_ACCESS_EXIST_OK 0x1
|
#define TD_FILE_ACCESS_EXIST_OK 0x1
|
||||||
#define TD_FILE_ACCESS_READ_OK 0x2
|
#define TD_FILE_ACCESS_READ_OK 0x2
|
||||||
|
|
|
@ -1739,6 +1739,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
||||||
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
||||||
printf(" %25s |", pBuf);
|
printf(" %25s |", pBuf);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
printf(" %15d |", *(int32_t*)var);
|
||||||
|
break;
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
printf(" %15d |", *(int32_t*)var);
|
printf(" %15d |", *(int32_t*)var);
|
||||||
break;
|
break;
|
||||||
|
@ -1757,6 +1760,22 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
printf(" %15lf |", *(double*)var);
|
printf(" %15lf |", *(double*)var);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_DATA_TYPE_VARCHAR: {
|
||||||
|
char* pData = colDataGetVarData(pColInfoData, j);
|
||||||
|
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData) + 1);
|
||||||
|
memset(pBuf, 0, dataSize);
|
||||||
|
strncpy(pBuf, varDataVal(pData), dataSize);
|
||||||
|
printf(" %15s |", pBuf);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
|
char* pData = colDataGetVarData(pColInfoData, j);
|
||||||
|
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData) + 1);
|
||||||
|
memset(pBuf, 0, dataSize);
|
||||||
|
taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
|
||||||
|
printf(" %15s |", pBuf);
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printf("\n");
|
printf("\n");
|
||||||
|
|
|
@ -41,6 +41,9 @@ typedef struct SRSmaStat SRSmaStat;
|
||||||
typedef struct SSmaKey SSmaKey;
|
typedef struct SSmaKey SSmaKey;
|
||||||
typedef struct SRSmaInfo SRSmaInfo;
|
typedef struct SRSmaInfo SRSmaInfo;
|
||||||
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
typedef struct SRSmaInfoItem SRSmaInfoItem;
|
||||||
|
typedef struct SQTaskFile SQTaskFile;
|
||||||
|
typedef struct SQTaskFReader SQTaskFReader;
|
||||||
|
typedef struct SQTaskFWriter SQTaskFWriter;
|
||||||
|
|
||||||
struct SSmaEnv {
|
struct SSmaEnv {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
|
@ -64,12 +67,32 @@ struct STSmaStat {
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SQTaskFile {
|
||||||
|
volatile int32_t nRef;
|
||||||
|
int64_t commitID;
|
||||||
|
int64_t size;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct SQTaskFReader {
|
||||||
|
SSma *pSma;
|
||||||
|
SQTaskFile fTask;
|
||||||
|
TdFilePtr pReadH;
|
||||||
|
};
|
||||||
|
struct SQTaskFWriter {
|
||||||
|
SSma *pSma;
|
||||||
|
SQTaskFile fTask;
|
||||||
|
TdFilePtr pWriteH;
|
||||||
|
char *fname;
|
||||||
|
};
|
||||||
|
|
||||||
struct SRSmaStat {
|
struct SRSmaStat {
|
||||||
SSma *pSma;
|
SSma *pSma;
|
||||||
int64_t commitAppliedVer; // vnode applied version for async commit
|
int64_t commitAppliedVer; // vnode applied version for async commit
|
||||||
int64_t refId; // shared by fetch tasks
|
int64_t refId; // shared by fetch tasks
|
||||||
|
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
|
||||||
int8_t triggerStat; // shared by fetch tasks
|
int8_t triggerStat; // shared by fetch tasks
|
||||||
int8_t commitStat; // 0 not in committing, 1 in committing
|
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||||
|
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
||||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||||
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
|
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
|
||||||
};
|
};
|
||||||
|
@ -89,6 +112,7 @@ struct SSmaStat {
|
||||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||||
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
||||||
#define RSMA_REF_ID(r) ((r)->refId)
|
#define RSMA_REF_ID(r) ((r)->refId)
|
||||||
|
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
||||||
|
|
||||||
struct SRSmaInfoItem {
|
struct SRSmaInfoItem {
|
||||||
void *taskInfo; // qTaskInfo_t
|
void *taskInfo; // qTaskInfo_t
|
||||||
|
@ -192,6 +216,8 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
|
||||||
|
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
|
||||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc);
|
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc);
|
||||||
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||||
|
@ -209,9 +235,6 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen,
|
||||||
|
|
||||||
// smaFileUtil ================
|
// smaFileUtil ================
|
||||||
|
|
||||||
typedef struct SQTaskFReader SQTaskFReader;
|
|
||||||
typedef struct SQTaskFWriter SQTaskFWriter;
|
|
||||||
|
|
||||||
#define TD_FILE_HEAD_SIZE 512
|
#define TD_FILE_HEAD_SIZE 512
|
||||||
|
|
||||||
typedef struct STFInfo STFInfo;
|
typedef struct STFInfo STFInfo;
|
||||||
|
|
|
@ -186,6 +186,7 @@ int32_t smaSyncPostCommit(SSma* pSma);
|
||||||
int32_t smaAsyncPreCommit(SSma* pSma);
|
int32_t smaAsyncPreCommit(SSma* pSma);
|
||||||
int32_t smaAsyncCommit(SSma* pSma);
|
int32_t smaAsyncCommit(SSma* pSma);
|
||||||
int32_t smaAsyncPostCommit(SSma* pSma);
|
int32_t smaAsyncPostCommit(SSma* pSma);
|
||||||
|
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
@ -354,16 +355,16 @@ struct SSma {
|
||||||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
SNAP_DATA_META = 0,
|
SNAP_DATA_META = 1,
|
||||||
SNAP_DATA_TSDB = 1,
|
SNAP_DATA_TSDB = 2,
|
||||||
SNAP_DATA_DEL = 2,
|
SNAP_DATA_DEL = 3,
|
||||||
SNAP_DATA_RSMA1 = 3,
|
SNAP_DATA_RSMA1 = 4,
|
||||||
SNAP_DATA_RSMA2 = 4,
|
SNAP_DATA_RSMA2 = 5,
|
||||||
SNAP_DATA_QTASK = 5,
|
SNAP_DATA_QTASK = 6,
|
||||||
SNAP_DATA_TQ_HANDLE = 6,
|
SNAP_DATA_TQ_HANDLE = 7,
|
||||||
SNAP_DATA_TQ_OFFSET = 7,
|
SNAP_DATA_TQ_OFFSET = 8,
|
||||||
SNAP_DATA_STREAM_TASK = 8,
|
SNAP_DATA_STREAM_TASK = 9,
|
||||||
SNAP_DATA_STREAM_STATE = 9,
|
SNAP_DATA_STREAM_STATE = 10,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSnapDataHdr {
|
struct SSnapDataHdr {
|
||||||
|
|
|
@ -241,6 +241,41 @@ static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SQTaskFile ======================================================
|
||||||
|
// int32_t tCmprQTaskFile(void const *lhs, void const *rhs) {
|
||||||
|
// int64_t *lCommitted = *(int64_t *)lhs;
|
||||||
|
// SQTaskFile *rQTaskF = (SQTaskFile *)rhs;
|
||||||
|
|
||||||
|
// if (lCommitted < rQTaskF->commitID) {
|
||||||
|
// return -1;
|
||||||
|
// } else if (lCommitted > rQTaskF->commitID) {
|
||||||
|
// return 1;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
/**
|
||||||
|
* @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
|
||||||
|
* multiple qtaskinfo files supporting snapshot replication.
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @param pRSmaStat
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) {
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
int64_t committed = pRSmaStat->commitAppliedVer;
|
||||||
|
SArray *aTaskFile = pRSmaStat->aTaskFile;
|
||||||
|
|
||||||
|
void *qTaskFile = taosArraySearch(aTaskFile, committed, tCmprQTaskFile, TD_LE);
|
||||||
|
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief post-commit for rollup sma
|
* @brief post-commit for rollup sma
|
||||||
* 1) clean up the outdated qtaskinfo files
|
* 1) clean up the outdated qtaskinfo files
|
||||||
|
|
|
@ -38,7 +38,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||||
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||||
int8_t blkType);
|
int8_t blkType);
|
||||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||||
static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName);
|
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
||||||
|
@ -77,10 +76,14 @@ struct SRSmaQTaskInfoIter {
|
||||||
int32_t nBufPos;
|
int32_t nBufPos;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) {
|
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
|
||||||
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char* path, char *outputName) {
|
||||||
|
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
||||||
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
|
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
|
||||||
}
|
}
|
||||||
|
@ -599,8 +602,8 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
|
||||||
SSubmitReq *pReq = NULL;
|
SSubmitReq *pReq = NULL;
|
||||||
// TODO: the schema update should be handled
|
// TODO: the schema update should be handled
|
||||||
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
|
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
|
||||||
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
|
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||||
suid, pItem->level, terrstr());
|
SMA_VID(pSma), suid, pItem->level, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -874,7 +877,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -1172,7 +1175,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
#if 0
|
#if 0
|
||||||
if (pRSmaStat->commitAppliedVer > 0) {
|
if (pRSmaStat->commitAppliedVer > 0) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -1217,7 +1220,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
|
|
||||||
if (!isFileCreated) {
|
if (!isFileCreated) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -1357,3 +1360,20 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
_end:
|
_end:
|
||||||
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t smaDoRetention(SSma *pSma, int64_t now) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (VND_IS_RSMA(pSma->pVnode)) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
|
if (pSma->pRSmaTsdb[i]) {
|
||||||
|
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
|
||||||
|
if (code) goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -35,6 +35,7 @@ struct SRsmaSnapReader {
|
||||||
|
|
||||||
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) {
|
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SVnode* pVnode = pSma->pVnode;
|
||||||
SRsmaSnapReader* pReader = NULL;
|
SRsmaSnapReader* pReader = NULL;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
|
@ -47,6 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
|
||||||
|
// rsma1/rsma2
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pSma->pRSmaTsdb[i]) {
|
if (pSma->pRSmaTsdb[i]) {
|
||||||
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
|
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
|
||||||
|
@ -56,23 +58,98 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// qtaskinfo
|
||||||
|
// 1. add ref to qtaskinfo.v${ever} if exists and then start to replicate
|
||||||
|
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||||
|
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||||
|
|
||||||
|
if (!taosCheckExistFile(qTaskInfoFullName)) {
|
||||||
|
smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo not need as %s not exists", TD_VID(pVnode),
|
||||||
|
qTaskInfoFullName);
|
||||||
|
} else {
|
||||||
|
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
|
||||||
|
if (!pReader->pQTaskFReader) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
TdFilePtr qTaskF = taosOpenFile(qTaskInfoFullName, TD_FILE_READ);
|
||||||
|
if (!qTaskF) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pReader->pQTaskFReader->pReadH = qTaskF;
|
||||||
|
#if 0
|
||||||
|
SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask;
|
||||||
|
pQTaskF->nRef = 1;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma reader opened succeed", SMA_VID(pSma));
|
smaInfo("vgId:%d, vnode snapshot rsma reader opened %s succeed", TD_VID(pVnode), qTaskInfoFullName);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
smaError("vgId:%d, vnode snapshot rsma reader opened failed since %s", SMA_VID(pSma), tstrerror(code));
|
smaError("vgId:%d, vnode snapshot rsma reader opened failed since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData) {
|
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSma* pSma = pReader->pSma;
|
SSma* pSma = pReader->pSma;
|
||||||
|
int64_t n = 0;
|
||||||
|
uint8_t* pBuf = NULL;
|
||||||
|
SQTaskFReader* qReader = pReader->pQTaskFReader;
|
||||||
|
|
||||||
|
if (!qReader->pReadH) {
|
||||||
|
*ppBuf = NULL;
|
||||||
|
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is empty", SMA_VID(pSma));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t size = 0;
|
||||||
|
if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// seek
|
||||||
|
if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(!(*ppBuf));
|
||||||
|
// alloc
|
||||||
|
*ppBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + size);
|
||||||
|
if (!(*ppBuf)) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// read
|
||||||
|
n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size);
|
||||||
|
if (n < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
} else if (n != size) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size);
|
||||||
|
|
||||||
|
|
||||||
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
|
||||||
|
pHdr->type = SNAP_DATA_QTASK;
|
||||||
|
pHdr->size = size;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
|
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
*ppBuf = NULL;
|
||||||
smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
|
smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -108,14 +185,14 @@ int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
// read qtaskinfo file
|
// read qtaskinfo file
|
||||||
if (!pReader->qTaskDone) {
|
if (!pReader->qTaskDone) {
|
||||||
|
smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma));
|
||||||
code = rsmaSnapReadQTaskInfo(pReader, ppData);
|
code = rsmaSnapReadQTaskInfo(pReader, ppData);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
|
pReader->qTaskDone = 1;
|
||||||
if (*ppData) {
|
if (*ppData) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
|
||||||
pReader->qTaskDone = 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,11 +217,11 @@ int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pQTaskFReader) {
|
if (pReader->pQTaskFReader) {
|
||||||
// TODO: close for qtaskinfo
|
taosCloseFile(&pReader->pQTaskFReader->pReadH);
|
||||||
|
taosMemoryFreeClear(pReader->pQTaskFReader);
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
|
smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
|
smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
|
||||||
|
|
||||||
taosMemoryFreeClear(*ppReader);
|
taosMemoryFreeClear(*ppReader);
|
||||||
|
@ -171,6 +248,7 @@ struct SRsmaSnapWriter {
|
||||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) {
|
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRsmaSnapWriter* pWriter = NULL;
|
SRsmaSnapWriter* pWriter = NULL;
|
||||||
|
SVnode* pVnode = pSma->pVnode;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
|
@ -182,6 +260,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
|
// rsma1/rsma2
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pSma->pRSmaTsdb[i]) {
|
if (pSma->pRSmaTsdb[i]) {
|
||||||
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
|
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
|
||||||
|
@ -192,8 +271,25 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
|
||||||
}
|
}
|
||||||
|
|
||||||
// qtaskinfo
|
// qtaskinfo
|
||||||
// TODO
|
SQTaskFWriter* qWriter = (SQTaskFWriter*)taosMemoryCalloc(1, sizeof(SQTaskFWriter));
|
||||||
|
qWriter->pSma = pSma;
|
||||||
|
|
||||||
|
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||||
|
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 1, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||||
|
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (!qTaskF) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, tstrerror(code));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
qWriter->pWriteH = qTaskF;
|
||||||
|
int32_t fnameLen = strlen(qTaskInfoFullName) + 1;
|
||||||
|
qWriter->fname = taosMemoryCalloc(1, fnameLen);
|
||||||
|
strncpy(qWriter->fname, qTaskInfoFullName, fnameLen);
|
||||||
|
pWriter->pQTaskFWriter = qWriter;
|
||||||
|
smaDebug("vgId:%d, rsma snapshot writer open succeed for %s", TD_VID(pSma->pVnode), qTaskInfoFullName);
|
||||||
|
|
||||||
|
// snapWriter
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
|
|
||||||
smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
|
smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
|
||||||
|
@ -208,18 +304,30 @@ _err:
|
||||||
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
|
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRsmaSnapWriter* pWriter = *ppWriter;
|
SRsmaSnapWriter* pWriter = *ppWriter;
|
||||||
|
SVnode* pVnode = pWriter->pSma->pVnode;
|
||||||
|
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
ASSERT(0);
|
// TODO: rsma1/rsma2
|
||||||
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
|
// qtaskinfo
|
||||||
// if (code) goto _err;
|
if(pWriter->pQTaskFWriter) {
|
||||||
|
taosRemoveFile(pWriter->pQTaskFWriter->fname);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// rsma1/rsma2
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
if (pWriter->pDataWriter[i]) {
|
if (pWriter->pDataWriter[i]) {
|
||||||
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
|
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// qtaskinfo
|
||||||
|
if (pWriter->pQTaskFWriter) {
|
||||||
|
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||||
|
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
|
||||||
|
taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
|
||||||
|
smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma),
|
||||||
|
pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
|
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
|
||||||
|
@ -261,26 +369,23 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SQTaskFWriter* qWriter = pWriter->pQTaskFWriter;
|
||||||
|
|
||||||
if (pWriter->pQTaskFWriter == NULL) {
|
if (qWriter && qWriter->pWriteH) {
|
||||||
// SDelFile* pDelFile = pWriter->fs.pDelFile;
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||||
|
int64_t size = pHdr->size;
|
||||||
// // reader
|
ASSERT(size == (nData - sizeof(SSnapDataHdr)));
|
||||||
// if (pDelFile) {
|
int64_t contLen = taosWriteFile(qWriter->pWriteH, pHdr->data, size);
|
||||||
// code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
|
if (contLen != size) {
|
||||||
// if (code) goto _err;
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
// code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
|
}
|
||||||
// if (code) goto _err;
|
} else {
|
||||||
// }
|
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo is not needed", SMA_VID(pWriter->pSma));
|
||||||
|
|
||||||
// // writer
|
|
||||||
// SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
|
||||||
// code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
|
|
||||||
// if (code) goto _err;
|
|
||||||
}
|
}
|
||||||
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo succeed", SMA_VID(pWriter->pSma));
|
|
||||||
|
smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", SMA_VID(pWriter->pSma), qWriter->fname);
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
|
||||||
tqInfo("vgId:%d vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode));
|
tqInfo("vgId:%d, vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -109,7 +109,7 @@ int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,13 +52,13 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** p
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -113,14 +113,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
pHdr->size = vLen;
|
pHdr->size = vLen;
|
||||||
memcpy(pHdr->data, pVal, vLen);
|
memcpy(pHdr->data, pVal, vLen);
|
||||||
|
|
||||||
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||||
handle.snapshotVer, handle.subKey, vLen);
|
handle.snapshotVer, handle.subKey, vLen);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,6 +204,6 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tDecoderClear(pDecoder);
|
tDecoderClear(pDecoder);
|
||||||
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1215,11 +1215,11 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tsdbDebug("vgId:%d, tsdb snapshow write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d, tsdb snapshow write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
|
tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,7 +194,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
if (*ppData) {
|
if (*ppData) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
pReader->tsdbDone = 1;
|
pReader->rsmaDone = 1;
|
||||||
code = rsmaSnapReaderClose(&pReader->pRsmaReader);
|
code = rsmaSnapReaderClose(&pReader->pRsmaReader);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
@ -373,18 +373,9 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
case SNAP_DATA_STREAM_STATE: {
|
case SNAP_DATA_STREAM_STATE: {
|
||||||
} break;
|
} break;
|
||||||
case SNAP_DATA_RSMA1:
|
case SNAP_DATA_RSMA1:
|
||||||
case SNAP_DATA_RSMA2: {
|
case SNAP_DATA_RSMA2:
|
||||||
// rsma1/rsma2
|
|
||||||
if (pWriter->pRsmaSnapWriter == NULL) {
|
|
||||||
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
|
|
||||||
if (code) goto _err;
|
|
||||||
} break;
|
|
||||||
case SNAP_DATA_QTASK: {
|
case SNAP_DATA_QTASK: {
|
||||||
// qtask for rsma
|
// rsma1/rsma2/qtask for rsma
|
||||||
if (pWriter->pRsmaSnapWriter == NULL) {
|
if (pWriter->pRsmaSnapWriter == NULL) {
|
||||||
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
|
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
|
@ -378,6 +378,9 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
|
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
code = smaDoRetention(pVnode->pSma, trimReq.timestamp);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -908,7 +911,7 @@ _exit:
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SVCreateTSmaReq req = {0};
|
SVCreateTSmaReq req = {0};
|
||||||
SDecoder coder;
|
SDecoder coder = {0};
|
||||||
|
|
||||||
if (pRsp) {
|
if (pRsp) {
|
||||||
pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
|
pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
|
||||||
|
|
|
@ -162,6 +162,26 @@ _err:
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions) {
|
||||||
|
TdFilePtr fp = taosOpenFile(path, tdFileOptions);
|
||||||
|
if (!fp) {
|
||||||
|
if (errno == ENOENT) {
|
||||||
|
// Try to create directory recursively
|
||||||
|
char *s = strdup(path);
|
||||||
|
if (taosMulMkDir(taosDirName(s)) != 0) {
|
||||||
|
taosMemoryFree(s);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
taosMemoryFree(s);
|
||||||
|
fp = taosOpenFile(path, tdFileOptions);
|
||||||
|
if (!fp) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fp;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosRemoveFile(const char *path) { return remove(path); }
|
int32_t taosRemoveFile(const char *path) { return remove(path); }
|
||||||
|
|
||||||
int32_t taosRenameFile(const char *oldName, const char *newName) {
|
int32_t taosRenameFile(const char *oldName, const char *newName) {
|
||||||
|
|
Loading…
Reference in New Issue