feat: rsma recovery
This commit is contained in:
parent
45b47b7a2b
commit
1aa45f73f4
|
@ -617,6 +617,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
|
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
|
||||||
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
|
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
|
||||||
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
|
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
|
||||||
|
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
|
||||||
|
|
||||||
//index
|
//index
|
||||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||||
|
|
|
@ -31,6 +31,8 @@ target_sources(
|
||||||
"src/sma/smaEnv.c"
|
"src/sma/smaEnv.c"
|
||||||
"src/sma/smaUtil.c"
|
"src/sma/smaUtil.c"
|
||||||
"src/sma/smaOpen.c"
|
"src/sma/smaOpen.c"
|
||||||
|
"src/sma/smaCommit.c"
|
||||||
|
"src/sma/smaSnapshot.c"
|
||||||
"src/sma/smaRollup.c"
|
"src/sma/smaRollup.c"
|
||||||
"src/sma/smaTimeRange.c"
|
"src/sma/smaTimeRange.c"
|
||||||
|
|
||||||
|
|
|
@ -92,8 +92,9 @@ enum {
|
||||||
TASK_TRIGGER_STAT_INIT = 0,
|
TASK_TRIGGER_STAT_INIT = 0,
|
||||||
TASK_TRIGGER_STAT_ACTIVE = 1,
|
TASK_TRIGGER_STAT_ACTIVE = 1,
|
||||||
TASK_TRIGGER_STAT_INACTIVE = 2,
|
TASK_TRIGGER_STAT_INACTIVE = 2,
|
||||||
TASK_TRIGGER_STAT_CANCELLED = 3,
|
TASK_TRIGGER_STAT_PAUSED = 3,
|
||||||
TASK_TRIGGER_STAT_FINISHED = 4,
|
TASK_TRIGGER_STAT_CANCELLED = 4,
|
||||||
|
TASK_TRIGGER_STAT_FINISHED = 5,
|
||||||
};
|
};
|
||||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
|
@ -214,25 +215,22 @@ struct STFInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STFile {
|
struct STFile {
|
||||||
STFInfo info;
|
|
||||||
STfsFile f;
|
|
||||||
TdFilePtr pFile;
|
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
|
STFInfo info;
|
||||||
|
char *fname;
|
||||||
|
TdFilePtr pFile;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TD_TFILE_F(tf) (&((tf)->f))
|
|
||||||
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
|
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
|
||||||
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
||||||
#define TD_TFILE_FULL_NAME(tf) (TD_TFILE_F(tf)->aname)
|
#define TD_TFILE_FULL_NAME(tf) ((tf)->fname)
|
||||||
#define TD_TFILE_REL_NAME(tf) (TD_TFILE_F(tf)->rname)
|
|
||||||
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
||||||
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
|
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
|
||||||
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
|
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
|
||||||
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
|
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||||
#define TD_TFILE_DID(tf) (TD_TFILE_F(tf)->did)
|
|
||||||
|
|
||||||
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
|
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
|
||||||
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);
|
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
|
||||||
int32_t tdOpenTFile(STFile *pTFile, int flags);
|
int32_t tdOpenTFile(STFile *pTFile, int flags);
|
||||||
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
|
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
|
||||||
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
|
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
|
||||||
|
@ -244,8 +242,10 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
|
||||||
int32_t tdUpdateTFileHeader(STFile *pTFile);
|
int32_t tdUpdateTFileHeader(STFile *pTFile);
|
||||||
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
void tdCloseTFile(STFile *pTFile);
|
void tdCloseTFile(STFile *pTFile);
|
||||||
|
void tdDestroyTFile(STFile *pTFile);
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName);
|
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName);
|
||||||
|
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,6 +165,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
|
||||||
int32_t smaOpen(SVnode* pVnode);
|
int32_t smaOpen(SVnode* pVnode);
|
||||||
int32_t smaCloseEnv(SSma* pSma);
|
int32_t smaCloseEnv(SSma* pSma);
|
||||||
int32_t smaCloseEx(SSma* pSma);
|
int32_t smaCloseEx(SSma* pSma);
|
||||||
|
int32_t smaPreCommit(SSma* pSma);
|
||||||
|
int32_t smaCommit(SSma* pSma);
|
||||||
|
int32_t smaPostCommit(SSma* pSma);
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
|
@ -0,0 +1,150 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sma.h"
|
||||||
|
|
||||||
|
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma);
|
||||||
|
static int32_t tdProcessRSmaCommitImpl(SSma *pSma);
|
||||||
|
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Only applicable to Rollup SMA
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Only applicable to Rollup SMA
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Only applicable to Rollup SMA
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief pre-commit for rollup sma.
|
||||||
|
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||||
|
* 2) perform persist task for qTaskInfo
|
||||||
|
* 3) wait all triggered fetch tasks finished
|
||||||
|
* 4) set trigger stat of rsma timer TASK_TRIGGER_STAT_ACTIVE.
|
||||||
|
* 5) finish
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||||
|
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||||
|
|
||||||
|
// step 1
|
||||||
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||||
|
|
||||||
|
// step 2
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief commit for rollup sma
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdProcessRSmaCommitImpl(SSma *pSma) {
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief post-commit for rollup sma
|
||||||
|
* 1) clean up the outdated qtaskinfo files
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t committed = pVnode->state.committed;
|
||||||
|
TdDirPtr pDir = NULL;
|
||||||
|
TdDirEntryPtr pDirEntry = NULL;
|
||||||
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
char bname[TSDB_FILENAME_LEN];
|
||||||
|
const char *pattern = "^v[0-9]+qtaskinfo\\.ver([0-9]+)?$";
|
||||||
|
regex_t regex;
|
||||||
|
|
||||||
|
tdGetVndDirName(TD_VID(pVnode), VNODE_RSMA_DIR, dir);
|
||||||
|
|
||||||
|
// Resource allocation and init
|
||||||
|
regcomp(®ex, pattern, REG_EXTENDED);
|
||||||
|
|
||||||
|
if ((pDir = taosOpenDir(dir)) == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
smaWarn("rsma post-commit open dir %s failed since %s", dir, terrstr());
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
regmatch_t regMatch[2];
|
||||||
|
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
||||||
|
char *entryName = taosGetDirEntryName(pDirEntry);
|
||||||
|
if (!entryName) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
char *fileName = taosDirEntryBaseName(entryName);
|
||||||
|
int code = regexec(®ex, bname, 2, regMatch, 0);
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
|
// match
|
||||||
|
printf("match 0 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so));
|
||||||
|
printf("match 1 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[1].rm_so));
|
||||||
|
} else if (code == REG_NOMATCH) {
|
||||||
|
// not match
|
||||||
|
smaInfo("rsma post-commit not match %s", fileName);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// has other error
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
smaWarn("rsma post-commit regexec failed since %s", terrstr());
|
||||||
|
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
regfree(®ex);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -23,8 +23,7 @@ SSmaMgmt smaMgmt = {
|
||||||
.smaRef = -1,
|
.smaRef = -1,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum { TD_QTASK_TMP_F = 0, TD_QTASK_CUR_F } TD_QTASK_FILE_T;
|
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
|
||||||
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
|
|
||||||
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||||
|
|
||||||
|
@ -37,7 +36,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
|
||||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||||
static void tdRSmaPersistTrigger(void *param, void *tmrId);
|
static void tdRSmaPersistTrigger(void *param, void *tmrId);
|
||||||
static void *tdRSmaPersistExec(void *param);
|
static void *tdRSmaPersistExec(void *param);
|
||||||
static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName);
|
static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName);
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
||||||
|
@ -88,8 +87,8 @@ struct SRSmaQTaskInfoIter {
|
||||||
int32_t nBufPos;
|
int32_t nBufPos;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int8_t ftype, char *outputName) {
|
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) {
|
||||||
tdGetVndFileName(vgId, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName);
|
tdGetVndFileName(vgId, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
||||||
|
@ -493,7 +492,6 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SSubmitBlk *pBlock = NULL;
|
SSubmitBlk *pBlock = NULL;
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
@ -501,19 +499,26 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (!pBlock) break;
|
if (!pBlock) break;
|
||||||
tdUidStorePut(pStore, msgIter.suid, NULL);
|
tdUidStorePut(pStore, msgIter.suid, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdDestroySDataBlockArray(SArray *pArray) {
|
static void tdDestroySDataBlockArray(SArray *pArray) {
|
||||||
|
// TODO
|
||||||
#if 0
|
#if 0
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
|
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
|
||||||
|
@ -598,33 +603,54 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
|
|
||||||
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
|
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
|
||||||
|
|
||||||
// if rsma trigger stat in cancelled or finished, not start fetch task anymore
|
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
|
||||||
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
||||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
|
switch (rsmaTriggerStat) {
|
||||||
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
case TASK_TRIGGER_STAT_PAUSED:
|
||||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled",
|
case TASK_TRIGGER_STAT_CANCELLED:
|
||||||
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
case TASK_TRIGGER_STAT_FINISHED: {
|
||||||
return;
|
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
||||||
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t fetchTriggerStat =
|
int8_t fetchTriggerStat =
|
||||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||||
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
|
switch (fetchTriggerStat) {
|
||||||
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
case TASK_TRIGGER_STAT_ACTIVE: {
|
||||||
pItem->level, pItem->pRsmaInfo->suid);
|
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
||||||
|
pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
|
||||||
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
||||||
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK);
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
} break;
|
||||||
} else {
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
||||||
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
|
case TASK_TRIGGER_STAT_INACTIVE: {
|
||||||
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
|
case TASK_TRIGGER_STAT_INIT: {
|
||||||
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma),
|
||||||
|
pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
|
default: {
|
||||||
|
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
||||||
}
|
}
|
||||||
|
@ -780,10 +806,10 @@ _err:
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_TMP_F, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -799,17 +825,20 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
||||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
|
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr());
|
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr());
|
||||||
|
@ -931,19 +960,21 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) {
|
if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t infoLen = 0;
|
int32_t infoLen = 0;
|
||||||
taosDecodeFixedI32(pIter->qBuf, &infoLen);
|
taosDecodeFixedI32(pIter->qBuf, &infoLen);
|
||||||
if (infoLen > nBytes) {
|
if (infoLen > nBytes) {
|
||||||
ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE);
|
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
||||||
|
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
||||||
|
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
pIter->nAlloc = infoLen;
|
pIter->nAlloc = infoLen;
|
||||||
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
||||||
if (!pBuf) {
|
if (!pBuf) {
|
||||||
|
@ -955,12 +986,10 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
nBytes = infoLen;
|
nBytes = infoLen;
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
|
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -977,7 +1006,6 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
||||||
// block iter
|
// block iter
|
||||||
bool isFinish = false;
|
bool isFinish = false;
|
||||||
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
if (isFinish) {
|
if (isFinish) {
|
||||||
|
@ -989,6 +1017,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
||||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
||||||
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
|
smaError("restore rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1025,22 +1054,16 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tdRSmaPersistExec(void *param) {
|
static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
|
||||||
setThreadName("rsma-task-persist");
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
SRSmaStat *pRSmaStat = param;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SSma *pSma = pRSmaStat->pSma;
|
int32_t vid = SMA_VID(pSma);
|
||||||
STfs *pTfs = pSma->pVnode->pTfs;
|
int64_t toffset = 0;
|
||||||
int32_t vid = SMA_VID(pSma);
|
bool isFileCreated = false;
|
||||||
int64_t toffset = 0;
|
|
||||||
bool isFileCreated = false;
|
|
||||||
|
|
||||||
if (TASK_TRIGGER_STAT_CANCELLED == atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||||
if (!infoHash) {
|
if (!infoHash) {
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
|
@ -1074,9 +1097,13 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
|
|
||||||
if (!isFileCreated) {
|
if (!isFileCreated) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_F, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
|
||||||
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
tdCreateTFile(&tFile, pTfs, true, -1);
|
goto _err;
|
||||||
|
}
|
||||||
|
if (tdCreateTFile(&tFile, true, -1) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
isFileCreated = true;
|
isFileCreated = true;
|
||||||
}
|
}
|
||||||
|
@ -1101,49 +1128,55 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
|
|
||||||
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
|
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
|
||||||
}
|
}
|
||||||
_normal:
|
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
if (tdUpdateTFileHeader(&tFile) < 0) {
|
||||||
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdRemoveTFile(&tFile);
|
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||||
}
|
}
|
||||||
|
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
char newFName[TSDB_FILENAME_LEN];
|
|
||||||
strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
|
|
||||||
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]);
|
|
||||||
strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
|
|
||||||
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
|
|
||||||
smaError("vgId:%d, rsma, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
|
||||||
goto _err;
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, rsma, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
tdRemoveTFile(&tFile);
|
tdRemoveTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
}
|
}
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tdRSmaPersistExec(void *param) {
|
||||||
|
setThreadName("rsma-task-persist");
|
||||||
|
SRSmaStat *pRSmaStat = param;
|
||||||
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
|
|
||||||
|
int8_t triggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat));
|
||||||
|
|
||||||
|
if (TASK_TRIGGER_STAT_CANCELLED == triggerStat || TASK_TRIGGER_STAT_PAUSED == triggerStat) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
// execution
|
||||||
|
tdRSmaPersistExecImpl(pRSmaStat);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_INACTIVE,
|
TASK_TRIGGER_STAT_INACTIVE,
|
||||||
TASK_TRIGGER_STAT_ACTIVE)) {
|
TASK_TRIGGER_STAT_ACTIVE)) {
|
||||||
smaDebug("vgId:%d, rsma persist task is active again", vid);
|
smaDebug("vgId:%d, rsma persist task is active again", SMA_VID(pSma));
|
||||||
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_CANCELLED,
|
TASK_TRIGGER_STAT_CANCELLED,
|
||||||
TASK_TRIGGER_STAT_FINISHED)) {
|
TASK_TRIGGER_STAT_FINISHED)) {
|
||||||
smaDebug("vgId:%d, rsma persist task is cancelled", vid);
|
smaDebug("vgId:%d, rsma persist task is cancelled", SMA_VID(pSma));
|
||||||
} else {
|
} else {
|
||||||
smaWarn("vgId:%d, rsma persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
smaWarn("vgId:%d, rsma persist task in stat %" PRIi8, SMA_VID(pSma), atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||||
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId);
|
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId);
|
||||||
taosThreadExit(NULL);
|
taosThreadExit(NULL);
|
||||||
|
@ -1166,8 +1199,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
|
||||||
TASK_TRIGGER_STAT_FINISHED)) {
|
TASK_TRIGGER_STAT_FINISHED)) {
|
||||||
smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma));
|
smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma));
|
||||||
} else {
|
} else {
|
||||||
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)),
|
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, SMA_VID(pRSmaStat->pSma),
|
||||||
SMA_VID(pRSmaStat->pSma));
|
atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||||
|
@ -1216,6 +1249,9 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
||||||
smaDebug("rsma persistence not start since cancelled and finished");
|
smaDebug("rsma persistence not start since cancelled and finished");
|
||||||
} break;
|
} break;
|
||||||
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
|
smaDebug("rsma persistence not start since paused");
|
||||||
|
} break;
|
||||||
case TASK_TRIGGER_STAT_INACTIVE: {
|
case TASK_TRIGGER_STAT_INACTIVE: {
|
||||||
smaDebug("rsma persistence not start since inactive");
|
smaDebug("rsma persistence not start since inactive");
|
||||||
} break;
|
} break;
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sma.h"
|
|
@ -179,72 +179,83 @@ void tdCloseTFile(STFile *pTFile) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName) {
|
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vgId, dname, fname);
|
|
||||||
|
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName) {
|
||||||
|
if (version < 0) {
|
||||||
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s", vgId, dname, vgId, fname);
|
||||||
|
} else {
|
||||||
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s%" PRIi64, vgId, dname, vgId, fname, version);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
|
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName) {
|
||||||
char fullname[TSDB_FILENAME_LEN];
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", vgId, dname);
|
||||||
SDiskID did = {0};
|
}
|
||||||
|
|
||||||
|
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||||
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||||
TD_TFILE_SET_CLOSED(pTFile);
|
TD_TFILE_SET_CLOSED(pTFile);
|
||||||
|
|
||||||
memset(&(pTFile->info), 0, sizeof(pTFile->info));
|
memset(&(pTFile->info), 0, sizeof(pTFile->info));
|
||||||
pTFile->info.magic = TD_FILE_INIT_MAGIC;
|
pTFile->info.magic = TD_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
if (tfsAllocDisk(pTfs, 0, &did) < 0) {
|
char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0};
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname);
|
||||||
|
int32_t tmpNameLen = strlen(tmpName) + 1;
|
||||||
|
pTFile->fname = taosMemoryMalloc(tmpNameLen);
|
||||||
|
if (!pTFile->fname) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
tstrncpy(pTFile->fname, tmpName, tmpNameLen);
|
||||||
tfsInitFile(pTfs, &(pTFile->f), did, fname);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) {
|
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
|
||||||
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
|
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
|
||||||
|
|
||||||
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pTFile->pFile == NULL) {
|
if (pTFile->pFile == NULL) {
|
||||||
if (errno == ENOENT) {
|
if (errno == ENOENT) {
|
||||||
// Try to create directory recursively
|
// Try to create directory recursively
|
||||||
char *s = strdup(TD_TFILE_REL_NAME(pTFile));
|
if (taosMulMkDir(taosDirName(TD_TFILE_FULL_NAME(pTFile))) != 0) {
|
||||||
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_TFILE_DID(pTFile)) < 0) {
|
|
||||||
taosMemoryFreeClear(s);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(s);
|
|
||||||
|
|
||||||
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
|
||||||
if (pTFile->pFile == NULL) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
} else {
|
||||||
|
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (pTFile->pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
|
if (!updateHeader) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
|
||||||
|
pTFile->info.fver = 0;
|
||||||
|
|
||||||
|
if (tdUpdateTFileHeader(pTFile) < 0) {
|
||||||
|
tdCloseTFile(pTFile);
|
||||||
|
tdRemoveTFile(pTFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!updateHeader) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
|
|
||||||
pTFile->info.fver = 0;
|
|
||||||
|
|
||||||
if (tdUpdateTFileHeader(pTFile) < 0) {
|
|
||||||
tdCloseTFile(pTFile);
|
|
||||||
tdRemoveTFile(pTFile);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_TFILE_F(pTFile)); }
|
int32_t tdRemoveTFile(STFile *pTFile) {
|
||||||
|
if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
};
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// smaXXXUtil ================
|
// smaXXXUtil ================
|
||||||
// ...
|
// ...
|
|
@ -28,12 +28,12 @@ const SVnodeCfg vnodeCfgDefault = {
|
||||||
.update = 1,
|
.update = 1,
|
||||||
.compression = 2,
|
.compression = 2,
|
||||||
.slLevel = 5,
|
.slLevel = 5,
|
||||||
.days = 10,
|
.days = 14400,
|
||||||
.minRows = 100,
|
.minRows = 100,
|
||||||
.maxRows = 4096,
|
.maxRows = 4096,
|
||||||
.keep2 = 3650,
|
.keep2 = 5256000,
|
||||||
.keep0 = 3650,
|
.keep0 = 5256000,
|
||||||
.keep1 = 3650},
|
.keep1 = 5256000},
|
||||||
.walCfg =
|
.walCfg =
|
||||||
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
||||||
.hashBegin = 0,
|
.hashBegin = 0,
|
||||||
|
|
|
@ -229,6 +229,9 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preCommit
|
||||||
|
// TODO
|
||||||
|
|
||||||
// commit each sub-system
|
// commit each sub-system
|
||||||
if (metaCommit(pVnode->pMeta) < 0) {
|
if (metaCommit(pVnode->pMeta) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -269,6 +272,9 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
|
|
||||||
pVnode->state.committed = info.state.committed;
|
pVnode->state.committed = info.state.committed;
|
||||||
|
|
||||||
|
// postCommit
|
||||||
|
smaPostCommit(pVnode->pSma);
|
||||||
|
|
||||||
// apply the commit (TODO)
|
// apply the commit (TODO)
|
||||||
vnodeBufPoolReset(pVnode->onCommit);
|
vnodeBufPoolReset(pVnode->onCommit);
|
||||||
pVnode->onCommit->next = pVnode->pPool;
|
pVnode->onCommit->next = pVnode->pPool;
|
||||||
|
|
|
@ -779,6 +779,7 @@ _exit:
|
||||||
taosArrayDestroy(submitRsp.pArray);
|
taosArrayDestroy(submitRsp.pArray);
|
||||||
|
|
||||||
// TODO: the partial success scenario and the error case
|
// TODO: the partial success scenario and the error case
|
||||||
|
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level 1/level 2.
|
||||||
// TODO: refactor
|
// TODO: refactor
|
||||||
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
||||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
||||||
|
|
|
@ -586,6 +586,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
|
||||||
|
|
||||||
|
|
||||||
//tq
|
//tq
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
|
||||||
|
|
Loading…
Reference in New Issue