refactor: rsma commit and restore
This commit is contained in:
parent
73b1101aa5
commit
a8a0aa9223
|
@ -22,13 +22,19 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define SMA_DEBUG_MODE // TODO: remove when release
|
||||
|
||||
// smaDebug ================
|
||||
// clang-format off
|
||||
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||
#define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||
#define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||
#ifdef SMA_DEBUG_MODE
|
||||
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||
#else
|
||||
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#endif
|
||||
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
// clang-format on
|
||||
|
||||
|
@ -62,12 +68,10 @@ struct STSmaStat {
|
|||
|
||||
struct SRSmaStat {
|
||||
SSma *pSma;
|
||||
int64_t refId; // shared by persistence/fetch tasks
|
||||
void *tmrHandle; // for persistence task
|
||||
tmr_h tmrId; // for persistence task
|
||||
int32_t tmrSeconds; // for persistence task
|
||||
int8_t triggerStat; // for persistence task
|
||||
int8_t runningStat; // for persistence task
|
||||
int64_t refId; // shared by fetch tasks
|
||||
void *tmrHandle; // shared by fetch tasks
|
||||
int8_t triggerStat; // shared by fetch tasks
|
||||
int8_t runningStat; // for persistence task
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
};
|
||||
|
||||
|
@ -82,7 +86,6 @@ struct SSmaStat {
|
|||
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
|
||||
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
|
||||
#define RSMA_TMR_ID(r) ((r)->tmrId)
|
||||
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
|
||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
|
||||
|
@ -185,9 +188,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
|
|||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat);
|
||||
|
||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||
int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
|
||||
|
||||
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
|
||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
||||
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||
|
@ -244,8 +249,8 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
|||
void tdCloseTFile(STFile *pTFile);
|
||||
void tdDestroyTFile(STFile *pTFile);
|
||||
|
||||
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName);
|
||||
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName);
|
||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName);
|
||||
void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, char *outputName);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -165,6 +165,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
|
|||
int32_t smaOpen(SVnode* pVnode);
|
||||
int32_t smaCloseEnv(SSma* pSma);
|
||||
int32_t smaCloseEx(SSma* pSma);
|
||||
int32_t smaBegin(SSma* pSma);
|
||||
int32_t smaPreCommit(SSma* pSma);
|
||||
int32_t smaCommit(SSma* pSma);
|
||||
int32_t smaPostCommit(SSma* pSma);
|
||||
|
|
|
@ -43,13 +43,48 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
|
|||
*/
|
||||
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
|
||||
|
||||
/**
|
||||
* @brief set rsma trigger stat active
|
||||
*
|
||||
* @param pSma
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t smaBegin(SSma *pSma) {
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pSmaEnv) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
|
||||
int8_t rsmaTriggerStat =
|
||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
|
||||
switch (rsmaTriggerStat) {
|
||||
case TASK_TRIGGER_STAT_PAUSED: {
|
||||
smaDebug("vgId:%d rsma trigger stat from paused to active", SMA_VID(pSma));
|
||||
break;
|
||||
}
|
||||
case TASK_TRIGGER_STAT_INIT: {
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
||||
smaDebug("vgId:%d rsma trigger stat from init to active", SMA_VID(pSma));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
||||
smaWarn("vgId:%d rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief pre-commit for rollup sma.
|
||||
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||
* 2) perform persist task for qTaskInfo
|
||||
* 3) wait all triggered fetch tasks finished
|
||||
* 4) set trigger stat of rsma timer TASK_TRIGGER_STAT_ACTIVE.
|
||||
* 5) finish
|
||||
*
|
||||
* @param pSma
|
||||
* @return int32_t
|
||||
|
@ -63,10 +98,29 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
|
|||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
|
||||
// step 1
|
||||
smaDebug("vgId:%d, rsma pre commit", SMA_VID(pSma));
|
||||
|
||||
// step 1: set persistence task paused
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
|
||||
// step 2
|
||||
// step 2: perform persist task for qTaskInfo
|
||||
tdRSmaPersistExecImpl(pRSmaStat);
|
||||
|
||||
// step 3: wait all triggered fetch tasks finished
|
||||
int32_t nLoops = 0;
|
||||
while (1) {
|
||||
if (T_REF_VAL_GET(pStat) == 0) {
|
||||
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
|
||||
break;
|
||||
} else {
|
||||
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
|
||||
}
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -103,18 +157,17 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
|
|||
TdDirPtr pDir = NULL;
|
||||
TdDirEntryPtr pDirEntry = NULL;
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
char bname[TSDB_FILENAME_LEN];
|
||||
const char *pattern = "^v[0-9]+qtaskinfo\\.ver([0-9]+)?$";
|
||||
regex_t regex;
|
||||
|
||||
tdGetVndDirName(TD_VID(pVnode), VNODE_RSMA_DIR, dir);
|
||||
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, dir);
|
||||
|
||||
// Resource allocation and init
|
||||
regcomp(®ex, pattern, REG_EXTENDED);
|
||||
|
||||
if ((pDir = taosOpenDir(dir)) == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
smaWarn("rsma post-commit open dir %s failed since %s", dir, terrstr());
|
||||
smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -125,20 +178,32 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
|
|||
continue;
|
||||
}
|
||||
char *fileName = taosDirEntryBaseName(entryName);
|
||||
int code = regexec(®ex, bname, 2, regMatch, 0);
|
||||
int code = regexec(®ex, fileName, 2, regMatch, 0);
|
||||
|
||||
if (code == 0) {
|
||||
// match
|
||||
printf("match 0 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so));
|
||||
printf("match 1 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[1].rm_so));
|
||||
smaDebug("vgId:%d, matched = %s, %s", TD_VID(pVnode), (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so),
|
||||
(const char *)POINTER_SHIFT(fileName, regMatch[1].rm_so));
|
||||
int64_t version = -1;
|
||||
sscanf((const char *)POINTER_SHIFT(fileName, regMatch[1].rm_so), "%" PRIi64, &version);
|
||||
if ((version < committed) && (version > -1)) {
|
||||
if (taosRemoveFile(entryName) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
|
||||
entryName, terrstr());
|
||||
} else {
|
||||
smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed,
|
||||
entryName);
|
||||
}
|
||||
}
|
||||
} else if (code == REG_NOMATCH) {
|
||||
// not match
|
||||
smaInfo("rsma post-commit not match %s", fileName);
|
||||
smaInfo("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), fileName);
|
||||
continue;
|
||||
} else {
|
||||
// has other error
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
smaWarn("rsma post-commit regexec failed since %s", terrstr());
|
||||
smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), terrstr());
|
||||
|
||||
taosCloseDir(&pDir);
|
||||
regfree(®ex);
|
||||
|
|
|
@ -132,6 +132,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
|
||||
pRSmaStat->pSma = (SSma *)pSma;
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
|
||||
|
||||
// init smaMgmt
|
||||
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
|
||||
|
@ -192,22 +193,20 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) {
|
|||
static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||
if (pRSmaStat) {
|
||||
SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
|
||||
smaDebug("vgId:%d %s:%d destroy rsma stat %p", SMA_VID(pStat->pSma), __func__, __LINE__, pRSmaStat);
|
||||
// step 1: set persistence task cancelled
|
||||
SSma *pSma = pStat->pSma;
|
||||
smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
|
||||
// step 1: set rsma trigger stat cancelled
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
|
||||
|
||||
// step 2: stop the persistence timer
|
||||
taosTmrStopA(&RSMA_TMR_ID(pStat));
|
||||
|
||||
// step 3: wait the persistence thread to finish
|
||||
// step 2: wait the persistence thread to finish
|
||||
int32_t nLoops = 0;
|
||||
if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) {
|
||||
while (1) {
|
||||
if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) {
|
||||
smaDebug("rsma, persist task finished already");
|
||||
smaDebug("vgId:%d, rsma persist task finished already", SMA_VID(pSma));
|
||||
break;
|
||||
} else {
|
||||
smaDebug("rsma, persist task not finished yet since rsma stat in %" PRIi8,
|
||||
smaDebug("vgId:%d, rsma persist task not finished yet since rsma stat in %" PRIi8, SMA_VID(pSma),
|
||||
atomic_load_8(RSMA_TRIGGER_STAT(pStat)));
|
||||
}
|
||||
++nLoops;
|
||||
|
@ -218,7 +217,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 4: destroy the rsma info and associated fetch tasks
|
||||
// step 3: destroy the rsma info and associated fetch tasks
|
||||
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
|
||||
while (infoHash) {
|
||||
|
@ -232,10 +231,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
nLoops = 0;
|
||||
while (1) {
|
||||
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
|
||||
smaDebug("rsma, all fetch task finished already");
|
||||
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
|
||||
break;
|
||||
} else {
|
||||
smaDebug("rsma, fetch tasks not all finished yet");
|
||||
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
|
||||
}
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
#include "sma.h"
|
||||
|
||||
#define RSMA_QTASKINFO_PERSIST_MS 7200000
|
||||
#define RSMA_QTASKINFO_BUFSIZE 32768
|
||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||
|
||||
|
@ -88,7 +87,7 @@ struct SRSmaQTaskInfoIter {
|
|||
};
|
||||
|
||||
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) {
|
||||
tdGetVndFileName(vgId, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
||||
|
@ -114,12 +113,14 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
|
|||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||
if (pItem->taskInfo) {
|
||||
smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
|
||||
i + 1);
|
||||
taosTmrStopA(&pItem->tmrId);
|
||||
if (pItem->tmrId) {
|
||||
smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
|
||||
i + 1);
|
||||
taosTmrStopA(&pItem->tmrId);
|
||||
}
|
||||
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
||||
} else {
|
||||
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
|
||||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
|
||||
pInfo->suid, i + 1);
|
||||
}
|
||||
}
|
||||
|
@ -358,13 +359,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
goto _err;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid);
|
||||
|
||||
// start the persist timer
|
||||
if (TASK_TRIGGER_STAT_INIT ==
|
||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) {
|
||||
taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat));
|
||||
}
|
||||
smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
|
@ -1054,7 +1049,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
|
||||
SSma *pSma = pRSmaStat->pSma;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
int32_t vid = SMA_VID(pSma);
|
||||
|
@ -1238,8 +1233,8 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
|||
// start persist task
|
||||
tdRSmaPersistTask(pRSmaStat);
|
||||
|
||||
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle,
|
||||
&pRSmaStat->tmrId);
|
||||
// taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle,
|
||||
// RSMA_TMR_ID(pRSmaStat));
|
||||
} else {
|
||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||
}
|
||||
|
|
|
@ -181,16 +181,34 @@ void tdCloseTFile(STFile *pTFile) {
|
|||
|
||||
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
||||
|
||||
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName) {
|
||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||
char *outputName) {
|
||||
if (version < 0) {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s", vgId, dname, vgId, fname);
|
||||
if (pdname) {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId,
|
||||
TD_DIRSEP, dname, TD_DIRSEP, vgId, fname);
|
||||
} else {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP,
|
||||
vgId, fname);
|
||||
}
|
||||
} else {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s%" PRIi64, vgId, dname, vgId, fname, version);
|
||||
if (pdname) {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP,
|
||||
vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version);
|
||||
} else {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname,
|
||||
TD_DIRSEP, vgId, fname, version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName) {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", vgId, dname);
|
||||
void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, char *outputName) {
|
||||
if (pdname) {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,
|
||||
dname);
|
||||
} else {
|
||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||
|
|
|
@ -69,6 +69,9 @@ int vnodeBegin(SVnode *pVnode) {
|
|||
}
|
||||
}
|
||||
|
||||
// begin sma
|
||||
smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -230,8 +233,8 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
// preCommit
|
||||
// TODO
|
||||
|
||||
smaPreCommit(pVnode->pSma);
|
||||
|
||||
// commit each sub-system
|
||||
if (metaCommit(pVnode->pMeta) < 0) {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
#include "vnd.h"
|
||||
|
||||
int32_t gForceCommit = 0;
|
||||
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
@ -198,7 +200,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
}
|
||||
|
||||
// commit if need
|
||||
if (vnodeShouldCommit(pVnode)) {
|
||||
if ((gForceCommit && (TDMT_VND_SUBMIT == pMsg->msgType)) || vnodeShouldCommit(pVnode)) {
|
||||
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
|
||||
// commit current change
|
||||
vnodeCommit(pVnode);
|
||||
|
|
Loading…
Reference in New Issue