commit
45efa44f3b
|
@ -47,6 +47,7 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq);
|
||||||
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId);
|
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId);
|
||||||
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact);
|
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact);
|
||||||
|
|
||||||
|
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len);
|
||||||
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact);
|
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -30,6 +30,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
||||||
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
|
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
|
||||||
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
|
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
|
||||||
void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList);
|
void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList);
|
||||||
|
bool mndDbIsExist(SMnode *pMnode, const char *db);
|
||||||
|
|
||||||
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
||||||
const char *mndGetDbStr(const char *src);
|
const char *mndGetDbStr(const char *src);
|
||||||
|
|
|
@ -224,6 +224,21 @@ SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
|
||||||
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
|
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbRelease(pSdb, pCompact);
|
sdbRelease(pSdb, pCompact);
|
||||||
|
pCompact = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
|
||||||
|
if (pCompact == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)strncpy(dbname, pCompact->dbname, len);
|
||||||
|
mndReleaseCompact(pMnode, pCompact);
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// compact db
|
// compact db
|
||||||
|
@ -488,7 +503,7 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
tFreeSKillCompactReq(&killCompactReq);
|
tFreeSKillCompactReq(&killCompactReq);
|
||||||
sdbRelease(pMnode->pSdb, pCompact);
|
mndReleaseCompact(pMnode, pCompact);
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
@ -640,16 +655,12 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
||||||
sdbRelease(pMnode->pSdb, pDetail);
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
|
char dbname[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
if (pCompact == NULL) TAOS_RETURN(code);
|
TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname);
|
if (!mndDbIsExist(pMnode, dbname)) {
|
||||||
if (pDb == NULL) {
|
|
||||||
needSave = true;
|
needSave = true;
|
||||||
mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, pCompact->dbname);
|
mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
|
||||||
} else {
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
pDb = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!needSave) {
|
if (!needSave) {
|
||||||
|
@ -666,7 +677,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
||||||
}
|
}
|
||||||
mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
|
mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
|
mndTransSetDbName(pTrans, dbname, NULL);
|
||||||
|
|
||||||
pIter = NULL;
|
pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -734,24 +745,20 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
||||||
sdbRelease(pMnode->pSdb, pDetail);
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
}
|
}
|
||||||
|
|
||||||
pDb = mndAcquireDb(pMnode, pCompact->dbname);
|
if (!mndDbIsExist(pMnode, dbname)) {
|
||||||
if (pDb == NULL) {
|
|
||||||
allFinished = true;
|
allFinished = true;
|
||||||
mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, pCompact->dbname);
|
mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
|
||||||
} else {
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
pDb = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (allFinished) {
|
if (allFinished) {
|
||||||
mInfo("compact:%d, all finished", pCompact->compactId);
|
mInfo("compact:%d, all finished", compactId);
|
||||||
pIter = NULL;
|
pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SCompactDetailObj *pDetail = NULL;
|
SCompactDetailObj *pDetail = NULL;
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
|
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (pDetail->compactId == pCompact->compactId) {
|
if (pDetail->compactId == compactId) {
|
||||||
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
|
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
|
||||||
if (pCommitRaw == NULL) {
|
if (pCommitRaw == NULL) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -774,7 +781,15 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
||||||
sdbRelease(pMnode->pSdb, pDetail);
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
|
||||||
|
if (pCompact == NULL) {
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
|
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
|
||||||
|
mndReleaseCompact(pMnode, pCompact);
|
||||||
if (pCommitRaw == NULL) {
|
if (pCommitRaw == NULL) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
@ -793,11 +808,9 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
|
||||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
|
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
|
||||||
mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
|
mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
sdbRelease(pMnode->pSdb, pCompact);
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pCompact);
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -827,8 +840,8 @@ void mndCompactPullup(SMnode *pMnode) {
|
||||||
if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
|
if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
|
||||||
mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
|
mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
mndReleaseCompact(pMnode, pCompact);
|
||||||
}
|
}
|
||||||
mndReleaseCompact(pMnode, pCompact);
|
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
|
|
|
@ -398,6 +398,17 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool mndDbIsExist(SMnode *pMnode, const char *db) {
|
||||||
|
SDbObj *pDb = mndAcquireDb(pMnode, db);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
pDb = NULL;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
|
static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
|
||||||
char *pos = strstr(dbName, TS_PATH_DELIMITER);
|
char *pos = strstr(dbName, TS_PATH_DELIMITER);
|
||||||
if (pos == NULL) {
|
if (pos == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue