chore: remove codes unused currently for sma
This commit is contained in:
parent
3977522220
commit
0f5f0d7cfb
|
@ -258,6 +258,7 @@ enum {
|
|||
TD_FTYPE_RSMA_QTASKINFO = 0,
|
||||
};
|
||||
|
||||
#if 0
|
||||
struct STFile {
|
||||
uint8_t state;
|
||||
STFInfo info;
|
||||
|
@ -287,6 +288,7 @@ int32_t tdUpdateTFileHeader(STFile *pTFile);
|
|||
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||
void tdCloseTFile(STFile *pTFile);
|
||||
void tdDestroyTFile(STFile *pTFile);
|
||||
#endif
|
||||
|
||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||
char *outputName);
|
||||
|
|
|
@ -17,14 +17,17 @@
|
|||
|
||||
extern SSmaMgmt smaMgmt;
|
||||
|
||||
#if 0
|
||||
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
|
||||
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
|
||||
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
|
||||
#endif
|
||||
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
|
||||
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
|
||||
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
|
||||
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief Only applicable to Rollup SMA
|
||||
*
|
||||
|
@ -48,6 +51,7 @@ int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
|
|||
* @return int32_t
|
||||
*/
|
||||
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Only applicable to Rollup SMA
|
||||
|
@ -108,6 +112,7 @@ int32_t smaBegin(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief pre-commit for rollup sma(sync commit).
|
||||
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||
|
@ -169,6 +174,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
|
|||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
// SQTaskFile ======================================================
|
||||
|
||||
|
@ -230,6 +236,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief post-commit for rollup sma
|
||||
* 1) clean up the outdated qtaskinfo files
|
||||
|
@ -249,6 +256,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Rsma async commit implementation(only do some necessary light weighted task)
|
||||
|
|
|
@ -0,0 +1,720 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sma.h"
|
||||
|
||||
// =================================================================================================
|
||||
static int32_t tdRSmaEncodeFS(uint8_t *p, SRSmaFS *pFS);
|
||||
static int32_t tPutQTaskFile(uint8_t *p, SQTaskFile *pFile);
|
||||
static int32_t tPutQTaskFileSet(uint8_t *p, SQTaskFileSet *pSet);
|
||||
|
||||
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
|
||||
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
|
||||
static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2);
|
||||
|
||||
// =================================================================================================
|
||||
static int32_t tdRSmaEncodeFS(uint8_t *p, SRSmaFS *pFS) {
|
||||
int32_t n = 0;
|
||||
uint32_t nSet = taosArrayGetSize(pFS->aQTaskFSet);
|
||||
|
||||
// version
|
||||
n += tPutU8(p ? p + n : p, 0);
|
||||
|
||||
// SArray<SQTaskFileSet>
|
||||
n += tPutU32v(p ? p + n : p, nSet);
|
||||
for (uint32_t i = 0; i < nSet; ++i) {
|
||||
n += tPutQTaskFileSet(p ? p + n : p, (SQTaskFileSet *)taosArrayGet(pFS->aQTaskFSet, i));
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tPutQTaskFileSet(uint8_t *p, SQTaskFileSet *pSet) {
|
||||
int32_t n = 0;
|
||||
int32_t nQTaskF = taosArrayGetSize(pSet->aQTaskF);
|
||||
|
||||
n += tPutI64v(p ? p + n : p, pSet->suid);
|
||||
n += tPutI32v(p ? p + n : p, nQTaskF);
|
||||
|
||||
// SQTaskFile
|
||||
for (int32_t i = 0; i < nQTaskF; ++i) {
|
||||
n += tPutQTaskFile(p ? p + n : p, taosArrayGet(pSet->aQTaskF, i));
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tPutQTaskFile(uint8_t *p, SQTaskFile *pFile) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI8(p ? p + n : p, pFile->level);
|
||||
n += tPutI64v(p ? p + n : p, pFile->version);
|
||||
n += tPutI64v(p ? p + n : p, pFile->mtime);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tGetQTaskFileSet(uint8_t *p, SQTaskFileSet *pSet) {
|
||||
int32_t n = 0;
|
||||
int32_t nQTaskF = 0;
|
||||
|
||||
n += tGetI64v(p + n, &pSet->suid);
|
||||
n += tGetI32v(p + n, &nQTaskF);
|
||||
|
||||
ASSERT(nQTaskF >= 0);
|
||||
|
||||
pSet->aQTaskF = taosArrayInit(nQTaskF, sizeof(SQTaskFile));
|
||||
if (!pSet->aQTaskF) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < nQTaskF; ++i) {
|
||||
SQTaskFile *pFile = taosArrayGet(pSet->aQTaskF, i);
|
||||
pFile->nRef = 1;
|
||||
n += tGetQTaskFile(p + n, pFile);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tGetQTaskFile(uint8_t *p, SQTaskFile *pFile) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetI8(p + n, &pFile->level);
|
||||
n += tGetI64v(p + n, &pFile->version);
|
||||
n += tGetI64v(p + n, &pFile->mtime);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaGnrtFS(SSma *pSma, SRSmaFS *pFS, char *fname) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
int64_t size;
|
||||
uint8_t *pData = NULL;
|
||||
TdFilePtr pFD = NULL;
|
||||
|
||||
// to binary
|
||||
size = tdRSmaEncodeFS(NULL, pFS) + sizeof(TSCKSUM);
|
||||
pData = taosMemoryMalloc(size);
|
||||
if (pData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
n = tdRSmaEncodeFS(pData, pFS);
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
taosCalcChecksumAppend(0, pData, size);
|
||||
|
||||
// create and write
|
||||
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n = taosWriteFile(pFD, pData, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (taosFsyncFile(pFD) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosCloseFile(&pFD);
|
||||
|
||||
if (pData) taosMemoryFree(pData);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, rsma gnrt fs failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
if (pData) taosMemoryFree(pData);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSCommit1(SSma *pSma, SRSmaFS *pFSNew) {
|
||||
int32_t code = 0;
|
||||
char tfname[TSDB_FILENAME_LEN];
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sRSMAFS.t", tfsGetPrimaryPath(pSma->pVnode->pTfs), TD_DIRSEP,
|
||||
VNODE_RSMA_DIR, TD_DIRSEP);
|
||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sRSMAFS", tfsGetPrimaryPath(pSma->pVnode->pTfs), TD_DIRSEP,
|
||||
VNODE_RSMA_DIR, TD_DIRSEP);
|
||||
|
||||
// gnrt CURRENT.t
|
||||
code = tdRSmaGnrtFS(pSma, pFSNew, tfname);
|
||||
if (code) goto _err;
|
||||
|
||||
// rename
|
||||
code = taosRenameFile(tfname, fname);
|
||||
if (code) {
|
||||
code = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d, rsma fs commit phase 1 failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSMergeQTaskFSet(SQTaskFileSet *pSetOld, SQTaskFileSet *pSetNew) {
|
||||
SQTaskFileSet *
|
||||
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSCommit2(SSma *pSma, SRSmaFS *pFSNew) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = NULL;
|
||||
SRSmaInfo *pRSmaInfo = NULL;
|
||||
SRSmaFS *pRSmaFS = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t nRef;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
if (!pEnv) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_ENV;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
if (!pStat || !(pRSmaFS = RSMA_FS(pStat))) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// data
|
||||
int32_t iOld = 0;
|
||||
int32_t iNew = 0;
|
||||
while (true) {
|
||||
int32_t nOld = taosArrayGetSize(pRSmaFS->aQTaskFSet);
|
||||
int32_t nNew = taosArrayGetSize(pFSNew->aQTaskFSet);
|
||||
SQTaskFileSet fSet;
|
||||
|
||||
if (iOld >= nOld && iNew >= nNew) break;
|
||||
|
||||
SQTaskFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pRSmaFS->aQTaskFSet, iOld) : NULL;
|
||||
SQTaskFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aQTaskFSet, iNew) : NULL;
|
||||
|
||||
if (pSetOld && pSetNew) {
|
||||
if (pSetOld->suid == pSetNew->suid) {
|
||||
goto _merge_old_and_new;
|
||||
} else if (pSetOld->suid < pSetNew->suid) {
|
||||
goto _remove_old;
|
||||
} else {
|
||||
goto _add_new;
|
||||
}
|
||||
} else if (pSetOld) {
|
||||
goto _remove_old;
|
||||
} else {
|
||||
goto _add_new;
|
||||
}
|
||||
|
||||
_merge_old_and_new:
|
||||
|
||||
|
||||
|
||||
// head
|
||||
fSet.pHeadF = pSetOld->pHeadF;
|
||||
if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) {
|
||||
pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (pSetOld->pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->pHeadF = *pSetNew->pHeadF;
|
||||
pSetOld->pHeadF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(fSet.pHeadF);
|
||||
}
|
||||
} else {
|
||||
ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size);
|
||||
ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset);
|
||||
}
|
||||
|
||||
// data
|
||||
fSet.pDataF = pSetOld->pDataF;
|
||||
if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) {
|
||||
pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (pSetOld->pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->pDataF = *pSetNew->pDataF;
|
||||
pSetOld->pDataF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(fSet.pDataF);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size);
|
||||
pSetOld->pDataF->size = pSetNew->pDataF->size;
|
||||
}
|
||||
|
||||
// sma
|
||||
fSet.pSmaF = pSetOld->pSmaF;
|
||||
if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) {
|
||||
pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (pSetOld->pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->pSmaF = *pSetNew->pSmaF;
|
||||
pSetOld->pSmaF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(fSet.pSmaF);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size);
|
||||
pSetOld->pSmaF->size = pSetNew->pSmaF->size;
|
||||
}
|
||||
|
||||
// stt
|
||||
if (sameDisk) {
|
||||
if (pSetNew->nSttF > pSetOld->nSttF) {
|
||||
ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1);
|
||||
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
|
||||
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
|
||||
pSetOld->nSttF++;
|
||||
} else if (pSetNew->nSttF < pSetOld->nSttF) {
|
||||
ASSERT(pSetNew->nSttF == 1);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
pSetOld->aSttF[iStt] = NULL;
|
||||
}
|
||||
|
||||
pSetOld->nSttF = 1;
|
||||
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
|
||||
pSetOld->aSttF[0]->nRef = 1;
|
||||
} else {
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
|
||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
} else {
|
||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
|
||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
|
||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!sameDisk) {
|
||||
pSetOld->diskId = pSetNew->diskId;
|
||||
}
|
||||
|
||||
iOld++;
|
||||
iNew++;
|
||||
continue;
|
||||
|
||||
_remove_old:
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pHeadF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pDataF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->pSmaF);
|
||||
}
|
||||
|
||||
for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSetOld->aSttF[iStt]);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
|
||||
continue;
|
||||
|
||||
_add_new:
|
||||
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1};
|
||||
|
||||
// head
|
||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (fSet.pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.pHeadF = *pSetNew->pHeadF;
|
||||
fSet.pHeadF->nRef = 1;
|
||||
|
||||
// data
|
||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (fSet.pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.pDataF = *pSetNew->pDataF;
|
||||
fSet.pDataF->nRef = 1;
|
||||
|
||||
// sma
|
||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (fSet.pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.pSmaF = *pSetNew->pSmaF;
|
||||
fSet.pSmaF->nRef = 1;
|
||||
|
||||
// stt
|
||||
ASSERT(pSetNew->nSttF == 1);
|
||||
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (fSet.aSttF[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
*fSet.aSttF[0] = *pSetNew->aSttF[0];
|
||||
fSet.aSttF[0]->nRef = 1;
|
||||
|
||||
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
iOld++;
|
||||
iNew++;
|
||||
continue;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Open RSma FS from qTaskInfo files
|
||||
*
|
||||
* @param pSma
|
||||
* @param version
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
int64_t commitID = pVnode->state.commitID;
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pStat = NULL;
|
||||
SArray *output = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (!pEnv) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
|
||||
int32_t vid = 0;
|
||||
int64_t version = -1;
|
||||
sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version);
|
||||
SQTaskFile qTaskFile = {.version = version, .nRef = 1};
|
||||
if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) {
|
||||
goto _end;
|
||||
}
|
||||
smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%d", TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef);
|
||||
}
|
||||
|
||||
_end:
|
||||
for (int32_t i = 0; i < taosArrayGetSize(output); ++i) {
|
||||
void *ptr = taosArrayGetP(output, i);
|
||||
taosMemoryFreeClear(ptr);
|
||||
}
|
||||
taosArrayDestroy(output);
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskFSet); }
|
||||
|
||||
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) {
|
||||
if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) {
|
||||
return -1;
|
||||
} else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
||||
SArray *aQTaskFSet = RSMA_FS(pStat)->aQTaskFSet;
|
||||
SQTaskFile *pTaskF = NULL;
|
||||
int32_t oldVal = 0;
|
||||
|
||||
taosRLockLatch(RSMA_FS_LOCK(pStat));
|
||||
if ((pTaskF = taosArraySearch(aQTaskFSet, &version, tdQTaskInfCmprFn1, TD_EQ))) {
|
||||
oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1);
|
||||
ASSERT(oldVal > 0);
|
||||
}
|
||||
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
return oldVal;
|
||||
}
|
||||
|
||||
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
|
||||
SArray *aQTaskFSet = RSMA_FS(pStat)->aQTaskFSet;
|
||||
int64_t version = -1;
|
||||
|
||||
taosRLockLatch(RSMA_FS_LOCK(pStat));
|
||||
if (taosArrayGetSize(aQTaskFSet) > 0) {
|
||||
version = ((SQTaskFile *)taosArrayGetLast(aQTaskFSet))->version;
|
||||
}
|
||||
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
return version;
|
||||
}
|
||||
|
||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SArray *aQTaskFSet = RSMA_FS(pStat)->aQTaskFSet;
|
||||
char qTaskFullName[TSDB_FILENAME_LEN];
|
||||
SQTaskFile *pTaskF = NULL;
|
||||
int32_t idx = -1;
|
||||
|
||||
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
||||
if ((idx = taosArraySearchIdx(aQTaskFSet, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) {
|
||||
ASSERT(idx < taosArrayGetSize(aQTaskFSet));
|
||||
pTaskF = taosArrayGet(aQTaskFSet, idx);
|
||||
if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) {
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName);
|
||||
if (taosRemoveFile(qTaskFullName) < 0) {
|
||||
smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName,
|
||||
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
} else {
|
||||
smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName);
|
||||
}
|
||||
taosArrayRemove(aQTaskFSet, idx);
|
||||
}
|
||||
}
|
||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Fetch qtaskfiles LE than version
|
||||
*
|
||||
* @param pSma
|
||||
* @param version
|
||||
* @param output
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
TdDirPtr pDir = NULL;
|
||||
TdDirEntryPtr pDirEntry = NULL;
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
|
||||
regex_t regex;
|
||||
int code = 0;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
|
||||
|
||||
if (!taosCheckExistFile(dir)) {
|
||||
smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Resource allocation and init
|
||||
if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) {
|
||||
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
|
||||
char errbuf[128];
|
||||
regerror(code, ®ex, errbuf, sizeof(errbuf));
|
||||
smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (!(pDir = taosOpenDir(dir))) {
|
||||
regfree(®ex);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
smaError("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t dirLen = strlen(dir);
|
||||
char *dirEnd = POINTER_SHIFT(dir, dirLen);
|
||||
regmatch_t regMatch[2];
|
||||
while ((pDirEntry = taosReadDir(pDir))) {
|
||||
char *entryName = taosGetDirEntryName(pDirEntry);
|
||||
if (!entryName) {
|
||||
continue;
|
||||
}
|
||||
|
||||
code = regexec(®ex, entryName, 2, regMatch, 0);
|
||||
|
||||
if (code == 0) {
|
||||
// match
|
||||
smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName);
|
||||
|
||||
int64_t ver = -1;
|
||||
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver);
|
||||
if ((ver <= version) && (ver > -1)) {
|
||||
if (!(*output)) {
|
||||
if (!(*output = taosArrayInit(1, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
char *entryDup = strdup(entryName);
|
||||
if (!entryDup) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
if (!taosArrayPush(*output, &entryDup)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
} else if (code == REG_NOMATCH) {
|
||||
// not match
|
||||
smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName);
|
||||
continue;
|
||||
} else {
|
||||
// has other error
|
||||
char errbuf[128];
|
||||
regerror(code, ®ex, errbuf, sizeof(errbuf));
|
||||
smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf);
|
||||
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
_end:
|
||||
taosCloseDir(&pDir);
|
||||
regfree(®ex);
|
||||
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) {
|
||||
if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) {
|
||||
return -1;
|
||||
} else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) {
|
||||
int32_t code = 0;
|
||||
int32_t idx = taosArraySearchIdx(pFS->aQTaskFSet, qTaskFile, tdQTaskFileCmprFn2, TD_GE);
|
||||
|
||||
if (idx < 0) {
|
||||
idx = taosArrayGetSize(pFS->aQTaskFSet);
|
||||
} else {
|
||||
SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskFSet, idx);
|
||||
int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile);
|
||||
if (c == 0) {
|
||||
pTaskF->nRef = qTaskFile->nRef;
|
||||
pTaskF->level = qTaskFile->level;
|
||||
pTaskF->version = qTaskFile->version;
|
||||
pTaskF->mtime = qTaskFile->mtime;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
if (taosArrayInsert(pFS->aQTaskFSet, idx, qTaskFile) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
#include "sma.h"
|
||||
|
||||
#define RSMA_QTASKINFO_BUFSIZE (32768) // size
|
||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||
|
@ -48,23 +46,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SR
|
|||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
|
||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
|
||||
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
|
||||
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
||||
|
||||
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
|
||||
// adapt accordingly if definition of SRSmaInfo update
|
||||
SRSmaInfo *pResult = NULL;
|
||||
ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
|
||||
pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN));
|
||||
ASSERT(pResult->pTSchema->numOfCols > 1);
|
||||
return pResult;
|
||||
}
|
||||
|
||||
struct SRSmaQTaskInfoItem {
|
||||
int32_t len;
|
||||
int8_t type;
|
||||
|
@ -104,12 +89,6 @@ void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, con
|
|||
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
||||
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
|
||||
|
||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
||||
// Note: free/kill may in RC
|
||||
if (!taskHandle || !(*taskHandle)) return;
|
||||
|
@ -803,6 +782,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
|
@ -820,6 +800,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief sync mode
|
||||
|
@ -1189,65 +1170,6 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
STFile tFile = {0};
|
||||
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
|
||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
|
||||
if (qTaskFileVer > 0) {
|
||||
smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
|
||||
TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
|
||||
} else {
|
||||
smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
|
||||
type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
STFInfo tFileInfo = {0};
|
||||
if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SRSmaQTaskInfoIter fIter = {0};
|
||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||
tdCloseTFile(&tFile);
|
||||
tdDestroyTFile(&tFile);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
|
||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||
tdCloseTFile(&tFile);
|
||||
tdDestroyTFile(&tFile);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||
tdCloseTFile(&tFile);
|
||||
tdDestroyTFile(&tFile);
|
||||
|
||||
// restored successfully from committed or sync
|
||||
smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
|
||||
type, qTaskFileVer);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
|
||||
TD_VID(pVnode), type, qTaskFileVer, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief reload ts data from checkpoint
|
||||
*
|
||||
|
@ -1270,19 +1192,12 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
|
||||
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
// step 3: reload ts data from checkpoint
|
||||
// step 2: reload ts data from checkpoint
|
||||
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// step 4: open SRSmaFS for qTaskFiles
|
||||
// step 3: open SRSmaFS for qTaskFiles
|
||||
if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -1295,191 +1210,6 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Restore from SRSmaQTaskInfoItem
|
||||
*
|
||||
* @param pSma
|
||||
* @param pItem
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
|
||||
SRSmaInfo *pRSmaInfo = NULL;
|
||||
void *qTaskInfo = NULL;
|
||||
|
||||
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid);
|
||||
if (!pRSmaInfo) {
|
||||
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pItem->type == TSDB_RETENTION_L1) {
|
||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0);
|
||||
} else if (pItem->type == TSDB_RETENTION_L2) {
|
||||
qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (!qTaskInfo) {
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
|
||||
pItem->type, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
|
||||
pItem->type);
|
||||
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) {
|
||||
memset(pIter, 0, sizeof(*pIter));
|
||||
pIter->pTFile = pTFile;
|
||||
pIter->offset = TD_FILE_HEAD_SIZE;
|
||||
|
||||
if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
|
||||
pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
|
||||
} else {
|
||||
pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
|
||||
}
|
||||
|
||||
if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
|
||||
pIter->nAlloc = TD_FILE_HEAD_SIZE;
|
||||
}
|
||||
|
||||
pIter->pBuf = taosMemoryMalloc(pIter->nAlloc);
|
||||
if (!pIter->pBuf) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pIter->qBuf = pIter->pBuf;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) {
|
||||
STFile *pTFile = pIter->pTFile;
|
||||
int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;
|
||||
|
||||
if (pIter->offset >= pIter->fsize) {
|
||||
*isFinish = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
|
||||
nBytes = pIter->fsize - pIter->offset;
|
||||
}
|
||||
|
||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t infoLen = 0;
|
||||
taosDecodeFixedI32(pIter->pBuf, &infoLen);
|
||||
if (infoLen > nBytes) {
|
||||
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
||||
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
||||
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (pIter->nAlloc < infoLen) {
|
||||
pIter->nAlloc = infoLen;
|
||||
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
||||
if (!pBuf) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pIter->pBuf = pBuf;
|
||||
}
|
||||
|
||||
nBytes = infoLen;
|
||||
|
||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
pIter->qBuf = pIter->pBuf;
|
||||
pIter->offset += nBytes;
|
||||
pIter->nBytes = nBytes;
|
||||
pIter->nBufPos = 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
|
||||
while (1) {
|
||||
// block iter
|
||||
bool isFinish = false;
|
||||
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (isFinish) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// consume the block
|
||||
int32_t qTaskInfoLenWithHead = 0;
|
||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
||||
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
|
||||
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
|
||||
SRSmaQTaskInfoItem infoItem = {0};
|
||||
pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type);
|
||||
pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid);
|
||||
infoItem.qTaskInfo = pIter->qBuf;
|
||||
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
|
||||
// do the restore job
|
||||
smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
|
||||
type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
|
||||
tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
|
||||
|
||||
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
|
||||
pIter->nBufPos += qTaskInfoLenWithHead;
|
||||
|
||||
if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) {
|
||||
// prepare and load next block in the file
|
||||
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
|
||||
break;
|
||||
}
|
||||
|
||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
||||
continue;
|
||||
}
|
||||
// prepare and load next block in the file
|
||||
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||
SSma *pSma = pRSmaStat->pSma;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
|
@ -1523,148 +1253,6 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||
SSma *pSma = pRSmaStat->pSma;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
int32_t vid = SMA_VID(pSma);
|
||||
int64_t toffset = 0;
|
||||
bool isFileCreated = false;
|
||||
|
||||
if (taosHashGetSize(pInfoHash) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void *infoHash = taosHashIterate(pInfoHash, NULL);
|
||||
if (!infoHash) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
|
||||
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
|
||||
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
|
||||
pRSmaStat->commitAppliedVer, fsMaxVer);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STFile tFile = {0};
|
||||
#if 0
|
||||
if (pRSmaStat->commitAppliedVer > 0) {
|
||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
||||
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||
|
||||
isFileCreated = true;
|
||||
}
|
||||
#endif
|
||||
|
||||
while (infoHash) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
||||
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
infoHash = taosHashIterate(pInfoHash, infoHash);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
#if 0
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
|
||||
#endif
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
|
||||
if (!taskInfo) {
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
char *pOutput = NULL;
|
||||
int32_t len = 0;
|
||||
int8_t type = (int8_t)(i + 1);
|
||||
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
|
||||
smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
|
||||
i + 1, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
if (!pOutput || len <= 0) {
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64
|
||||
" level %d serialize qTaskInfo success but no output(len %d), not persist",
|
||||
vid, pRSmaInfo->suid, i + 1, len);
|
||||
taosMemoryFreeClear(pOutput);
|
||||
continue;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid,
|
||||
pRSmaInfo->suid, i + 1, len);
|
||||
|
||||
if (!isFileCreated) {
|
||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
||||
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
||||
TD_TFILE_FULL_NAME(&tFile));
|
||||
|
||||
isFileCreated = true;
|
||||
}
|
||||
|
||||
char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0};
|
||||
void *pTmpBuf = &tmpBuf;
|
||||
int32_t headLen = 0;
|
||||
headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN);
|
||||
headLen += taosEncodeFixedI8(&pTmpBuf, type);
|
||||
headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid);
|
||||
|
||||
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
|
||||
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
|
||||
pRSmaInfo->suid, i + 1, headLen, toffset);
|
||||
tdAppendTFile(&tFile, pOutput, len, &toffset);
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid,
|
||||
pRSmaInfo->suid, i + 1, len, toffset);
|
||||
|
||||
taosMemoryFree(pOutput);
|
||||
}
|
||||
|
||||
infoHash = taosHashIterate(pInfoHash, infoHash);
|
||||
}
|
||||
|
||||
if (isFileCreated) {
|
||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
||||
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
} else {
|
||||
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||
}
|
||||
|
||||
tdCloseTFile(&tFile);
|
||||
tdDestroyTFile(&tFile);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
|
||||
if (isFileCreated) {
|
||||
tdRemoveTFile(&tFile);
|
||||
tdDestroyTFile(&tFile);
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief trigger to get rsma result in async mode
|
||||
*
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "sma.h"
|
||||
|
||||
// smaFileUtil ================
|
||||
|
||||
#if 0
|
||||
#define TD_FILE_STATE_OK 0
|
||||
#define TD_FILE_STATE_BAD 1
|
||||
|
||||
|
@ -182,6 +182,8 @@ void tdCloseTFile(STFile *pTFile) {
|
|||
|
||||
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
||||
|
||||
#endif
|
||||
|
||||
void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version,
|
||||
char *outputName) {
|
||||
if (version < 0) {
|
||||
|
@ -221,6 +223,7 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||
TD_TFILE_SET_CLOSED(pTFile);
|
||||
|
@ -286,6 +289,8 @@ int32_t tdRemoveTFile(STFile *pTFile) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
// smaXXXUtil ================
|
||||
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
|
||||
void *pResult = taosAcquireRef(rsetId, refId);
|
||||
|
|
Loading…
Reference in New Issue