more code
This commit is contained in:
parent
1779c17375
commit
0706f2dea2
|
@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
|
|||
}
|
||||
}
|
||||
|
||||
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
|
||||
if (CODE) { \
|
||||
LINO = __LINE__; \
|
||||
goto LABEL; \
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -32,12 +32,6 @@ extern "C" {
|
|||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
// clang-format on
|
||||
|
||||
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
|
||||
if (CODE) { \
|
||||
LINO = __LINE__; \
|
||||
goto LABEL; \
|
||||
}
|
||||
|
||||
typedef struct TSDBROW TSDBROW;
|
||||
typedef struct TABLEID TABLEID;
|
||||
typedef struct TSDBKEY TSDBKEY;
|
||||
|
@ -252,8 +246,9 @@ int32_t tsdbFSClose(STsdb *pTsdb);
|
|||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSDestroy(STsdbFS *pFS);
|
||||
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
|
||||
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSCommit(STsdb *pTsdb);
|
||||
int32_t tsdbFSRollback(STsdb *pTsdb);
|
||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
|
||||
|
||||
|
|
|
@ -153,6 +153,8 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC
|
|||
int tsdbClose(STsdb** pTsdb);
|
||||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb);
|
||||
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
|
|
|
@ -1041,38 +1041,20 @@ _exit:
|
|||
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
SMemTable *pMemTable = pTsdb->imem;
|
||||
|
||||
ASSERT(eno == 0 &&
|
||||
"tsdbCommit failure"
|
||||
"Restart taosd");
|
||||
|
||||
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
// commit or rollback
|
||||
code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
if (eno) {
|
||||
code = eno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pTsdb->imem = NULL;
|
||||
|
||||
// unlock
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
|
||||
tsdbUnrefMemTable(pMemTable);
|
||||
_exit:
|
||||
tsdbFSDestroy(&pCommitter->fs);
|
||||
taosArrayDestroy(pCommitter->aTbDataP);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (code || eno) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
|
@ -1646,3 +1628,48 @@ _exit:
|
|||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFinishCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SMemTable *pMemTable = pTsdb->imem;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit(pTsdb);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pTsdb->imem = NULL;
|
||||
|
||||
// unlock
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
tsdbUnrefMemTable(pMemTable);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb finish commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbRollbackCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbFSRollback(pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb rollback commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
|
@ -697,7 +697,8 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||
// EXPOSED APIS ====================================================================================
|
||||
int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
@ -731,17 +732,13 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSRollback(STsdb *pTsdb) {
|
||||
int32_t tsdbFSRollback(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
char current_t[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetCurrentFName(pTsdb, NULL, current_t);
|
||||
|
||||
if (taosRemoveFile(current_t) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
(void)taosRemoveFile(current_t);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -750,7 +747,6 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
// EXPOSED APIS ====================================================================================
|
||||
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
@ -1010,336 +1006,21 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
char tfname[TSDB_FILENAME_LEN];
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
||||
pTsdb->path, TD_DIRSEP);
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
||||
pTsdb->path, TD_DIRSEP);
|
||||
tsdbGetCurrentFName(pTsdb, NULL, tfname);
|
||||
|
||||
// gnrt CURRENT.t
|
||||
code = tsdbSaveFSToFile(pFSNew, tfname);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// rename
|
||||
code = taosRenameFile(tfname, fname);
|
||||
_exit:
|
||||
if (code) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||
int32_t code = 0;
|
||||
int32_t nRef;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
// del
|
||||
if (pFSNew->pDelFile) {
|
||||
SDelFile *pDelFile = pTsdb->fs.pDelFile;
|
||||
|
||||
if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) {
|
||||
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||
if (pTsdb->fs.pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
*pTsdb->fs.pDelFile = *pFSNew->pDelFile;
|
||||
pTsdb->fs.pDelFile->nRef = 1;
|
||||
|
||||
if (pDelFile) {
|
||||
nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDelFileName(pTsdb, pDelFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pDelFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(pTsdb->fs.pDelFile == NULL);
|
||||
}
|
||||
|
||||
// data
|
||||
int32_t iOld = 0;
|
||||
int32_t iNew = 0;
|
||||
while (true) {
|
||||
int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
|
||||
int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet);
|
||||
SDFileSet fSet;
|
||||
int8_t sameDisk;
|
||||
|
||||
if (iOld >= nOld && iNew >= nNew) break;
|
||||
|
||||
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
|
||||
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL;
|
||||
|
||||
if (pSetOld && pSetNew) {
|
||||
if (pSetOld->fid == pSetNew->fid) {
|
||||
goto _merge_old_and_new;
|
||||
} else if (pSetOld->fid < pSetNew->fid) {
|
||||
goto _remove_old;
|
||||
} else {
|
||||
goto _add_new;
|
||||
}
|
||||
} else if (pSetOld) {
|
||||
goto _remove_old;
|
||||
} else {
|
||||
goto _add_new;
|
||||
}
|
||||
|
||||
_merge_old_and_new:
|
||||
sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));
|
||||
|
||||
// head
|
||||
fSet.pHeadF = pSetOld->pHeadF;
|
||||
if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) {
|
||||
pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (pSetOld->pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->pHeadF = *pSetNew->pHeadF;
|
||||
pSetOld->pHeadF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(fSet.pHeadF);
|
||||
}
|
||||
} else {
|
||||
ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size);
|
||||
ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset);
|
||||
}
|
||||
|
||||
// data
|
||||
fSet.pDataF = pSetOld->pDataF;
|
||||
if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) {
|
||||
pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (pSetOld->pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->pDataF = *pSetNew->pDataF;
|
||||
pSetOld->pDataF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(fSet.pDataF);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size);
|
||||
pSetOld->pDataF->size = pSetNew->pDataF->size;
|
||||
}
|
||||
|
||||
// sma
|
||||
fSet.pSmaF = pSetOld->pSmaF;
|
||||
if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) {
|
||||
pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (pSetOld->pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->pSmaF = *pSetNew->pSmaF;
|
||||
pSetOld->pSmaF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(fSet.pSmaF);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size);
|
||||
pSetOld->pSmaF->size = pSetNew->pSmaF->size;
|
||||
}
|
||||
|
||||
// stt
|
||||
if (sameDisk) {
|
||||
if (pSetNew->nSttF > pSetOld->nSttF) {
|
||||
ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1);
|
||||
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
|
||||
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
|
||||
pSetOld->nSttF++;
|
||||
} else if (pSetNew->nSttF < pSetOld->nSttF) {
|
||||
ASSERT(pSetNew->nSttF == 1);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
pSetOld->aSttF[iStt] = NULL;
|
||||
}
|
||||
|
||||
pSetOld->nSttF = 1;
|
||||
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
|
||||
pSetOld->aSttF[0]->nRef = 1;
|
||||
} else {
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
|
||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
} else {
|
||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
|
||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
|
||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!sameDisk) {
|
||||
pSetOld->diskId = pSetNew->diskId;
|
||||
}
|
||||
|
||||
iOld++;
|
||||
iNew++;
|
||||
continue;
|
||||
|
||||
_remove_old:
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pHeadF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pDataF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pSmaF);
|
||||
}
|
||||
|
||||
for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->aSttF[iStt]);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
|
||||
continue;
|
||||
|
||||
_add_new:
|
||||
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1};
|
||||
|
||||
// head
|
||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (fSet.pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.pHeadF = *pSetNew->pHeadF;
|
||||
fSet.pHeadF->nRef = 1;
|
||||
|
||||
// data
|
||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (fSet.pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.pDataF = *pSetNew->pDataF;
|
||||
fSet.pDataF->nRef = 1;
|
||||
|
||||
// sma
|
||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (fSet.pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.pSmaF = *pSetNew->pSmaF;
|
||||
fSet.pSmaF->nRef = 1;
|
||||
|
||||
// stt
|
||||
ASSERT(pSetNew->nSttF == 1);
|
||||
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (fSet.aSttF[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.aSttF[0] = *pSetNew->aSttF[0];
|
||||
fSet.aSttF[0]->nRef = 1;
|
||||
|
||||
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
iOld++;
|
||||
iNew++;
|
||||
continue;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
|||
}
|
||||
|
||||
// do change fs
|
||||
code = tsdbFSCommit1(pTsdb, &fs);
|
||||
code = tsdbFSPrepareCommit(pTsdb, &fs);
|
||||
if (code) goto _err;
|
||||
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit2(pTsdb, &fs);
|
||||
code = tsdbFSCommit(pTsdb);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _err;
|
||||
|
|
|
@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
|||
code = tsdbSnapWriteDelEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
|
||||
code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _err;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
|
||||
code = tsdbFSCommit(pWriter->pTsdb);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _err;
|
||||
|
|
|
@ -213,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
int vnodeCommit(SVnode *pVnode) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnodeInfo info = {0};
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
|
||||
|
@ -232,25 +234,23 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
||||
}
|
||||
if (vnodeSaveInfo(dir, &info) < 0) {
|
||||
vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||
|
||||
// preCommit
|
||||
// smaSyncPreCommit(pVnode->pSma);
|
||||
if (smaAsyncPreCommit(pVnode->pSma) < 0) {
|
||||
vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
code = smaAsyncPreCommit(pVnode->pSma);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
vnodeBufPoolUnRef(pVnode->inUse);
|
||||
pVnode->inUse = NULL;
|
||||
|
||||
// commit each sub-system
|
||||
if (metaCommit(pVnode->pMeta) < 0) {
|
||||
vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
|
@ -272,24 +272,23 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbCommit(pVnode->pTsdb) < 0) {
|
||||
vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
code = tsdbCommit(pVnode->pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (tqCommit(pVnode->pTq) < 0) {
|
||||
vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
// walCommit (TODO)
|
||||
|
||||
// commit info
|
||||
if (vnodeCommitInfo(dir, &info) < 0) {
|
||||
vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tsdbFinishCommit(pVnode->pTsdb);
|
||||
|
||||
pVnode->state.committed = info.state.committed;
|
||||
|
||||
// postCommit
|
||||
|
@ -302,8 +301,12 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
// apply the commit (TODO)
|
||||
walEndSnapshot(pVnode->pWal);
|
||||
|
||||
vInfo("vgId:%d, commit end", TD_VID(pVnode));
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
vInfo("vgId:%d, commit end", TD_VID(pVnode));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue