Merge pull request #13146 from taosdata/fix/mnode

enh: sdb snapshot
This commit is contained in:
Shengliang Guan 2022-05-28 13:55:08 +08:00 committed by GitHub
commit fb613050bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 246 additions and 104 deletions

View File

@ -81,7 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
* @param pMsg The request msg. * @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
int32_t mndProcessMsg(SRpcMsg *pMsg); int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
/** /**

View File

@ -40,7 +40,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
break; break;
default: default:
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
code = mndProcessMsg(pMsg); code = mndProcessRpcMsg(pMsg);
} }
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -79,7 +79,6 @@ typedef struct {
sem_t syncSem; sem_t syncSem;
int64_t sync; int64_t sync;
bool standby; bool standby;
bool restored;
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
} SSyncMgmt; } SSyncMgmt;
@ -93,23 +92,26 @@ typedef struct SMnode {
int32_t selfDnodeId; int32_t selfDnodeId;
int64_t clusterId; int64_t clusterId;
TdThread thread; TdThread thread;
bool deploy; TdThreadRwlock lock;
int32_t rpcRef;
int32_t syncRef;
bool stopped; bool stopped;
bool restored;
bool deploy;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
char *path; char *path;
int64_t checkTime; int64_t checkTime;
SSdb *pSdb; SSdb *pSdb;
SMgmtWrapper *pWrapper;
SArray *pSteps; SArray *pSteps;
SQHandle *pQuery; SQHandle *pQuery;
SHashObj *infosMeta;
SHashObj *perfsMeta;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt; STelemMgmt telemMgmt;
SSyncMgmt syncMgmt; SSyncMgmt syncMgmt;
SHashObj *infosMeta;
SHashObj *perfsMeta;
SGrantInfo grant; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb; SMsgCb msgCb;
@ -118,6 +120,14 @@ typedef struct SMnode {
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
int64_t mndGenerateUid(char *name, int32_t len); int64_t mndGenerateUid(char *name, int32_t len);
int32_t mndAcquireRpcRef(SMnode *pMnode);
void mndReleaseRpcRef(SMnode *pMnode);
void mndSetRestore(SMnode *pMnode, bool restored);
void mndSetStop(SMnode *pMnode);
bool mndGetStop(SMnode *pMnode);
int32_t mndAcquireSyncRef(SMnode *pMnode);
void mndReleaseSyncRef(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -85,7 +85,7 @@ static void *mndThreadFp(void *param) {
while (1) { while (1) {
lastTime++; lastTime++;
taosMsleep(100); taosMsleep(100);
if (pMnode->stopped) break; if (mndGetStop(pMnode)) break;
if (lastTime % (tsTransPullupInterval * 10) == 0) { if (lastTime % (tsTransPullupInterval * 10) == 0) {
mndPullupTrans(pMnode); mndPullupTrans(pMnode);
@ -118,7 +118,6 @@ static int32_t mndInitTimer(SMnode *pMnode) {
} }
static void mndCleanupTimer(SMnode *pMnode) { static void mndCleanupTimer(SMnode *pMnode) {
pMnode->stopped = true;
if (taosCheckPthreadValid(pMnode->thread)) { if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL); taosThreadJoin(pMnode->thread, NULL);
taosThreadClear(&pMnode->thread); taosThreadClear(&pMnode->thread);
@ -335,15 +334,19 @@ void mndClose(SMnode *pMnode) {
int32_t mndStart(SMnode *pMnode) { int32_t mndStart(SMnode *pMnode) {
mndSyncStart(pMnode); mndSyncStart(pMnode);
if (pMnode->deploy) { if (pMnode->deploy) {
if (sdbDeploy(pMnode->pSdb) != 0) return -1; if (sdbDeploy(pMnode->pSdb) != 0) {
pMnode->syncMgmt.restored = true; mError("failed to deploy sdb while start mnode");
return -1;
}
mndSetRestore(pMnode, true);
} }
return mndInitTimer(pMnode); return mndInitTimer(pMnode);
} }
void mndStop(SMnode *pMnode) { void mndStop(SMnode *pMnode) {
mndSetStop(pMnode);
mndSyncStop(pMnode); mndSyncStop(pMnode);
return mndCleanupTimer(pMnode); mndCleanupTimer(pMnode);
} }
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
@ -362,6 +365,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
if (mndAcquireSyncRef(pMnode) != 0) {
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
char logBuf[512]; char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pMgmt->sync); char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
@ -405,33 +413,21 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
code = TAOS_SYNC_PROPOSE_OTHER_ERROR; code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
mndReleaseSyncRef(pMnode);
return code; return code;
} }
static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (mndIsMaster(pMsg->info.node)) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
pMsg->msgType == TDMT_MND_TRANS_TIMER) { pMsg->msgType != TDMT_MND_TRANS_TIMER) {
return -1; mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
}
mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0}; SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet); mndGetMnodeEpSet(pMsg->info.node, &epSet);
#if 0
mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
}
#endif
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen); pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) { if (pMsg->info.rsp != NULL) {
@ -441,23 +437,21 @@ static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) {
} else { } else {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
} }
}
return -1; return -1;
} }
static int32_t mndCheckRequestValid(SRpcMsg *pMsg) { static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN; terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1; return -1;
} }
int32_t mndProcessMsg(SRpcMsg *pMsg) { int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
if (mndCheckMnodeMaster(pMsg) != 0) return -1;
if (mndCheckRequestValid(pMsg) != 0) return -1;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
if (fp == NULL) { if (fp == NULL) {
@ -466,8 +460,13 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
return -1; return -1;
} }
mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); if (mndCheckMsgContent(pMsg) != 0) return -1;
if (mndCheckMnodeState(pMsg) != 0) return -1;
mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
int32_t code = (*fp)(pMsg); int32_t code = (*fp)(pMsg);
mndReleaseRpcRef(pMnode);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mTrace("msg:%p, won't response immediately since in progress", pMsg); mTrace("msg:%p, won't response immediately since in progress", pMsg);
} else if (code == 0) { } else if (code == 0) {
@ -476,6 +475,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
return code; return code;
} }
@ -502,7 +502,7 @@ int64_t mndGenerateUid(char *name, int32_t len) {
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
if (!mndIsMaster(pMnode)) return -1; if (mndAcquireRpcRef(pMnode) != 0) return -1;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs(); int64_t ms = taosGetTimestampMs();
@ -511,6 +511,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc)); pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc)); pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) { if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
mndReleaseRpcRef(pMnode);
return -1; return -1;
} }
@ -605,6 +606,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pGrantInfo->timeseries_total = INT32_MAX; pGrantInfo->timeseries_total = INT32_MAX;
} }
mndReleaseRpcRef(pMnode);
return 0; return 0;
} }
@ -612,3 +614,76 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync); pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
return 0; return 0;
} }
int32_t mndAcquireRpcRef(SMnode *pMnode) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
terrno = TSDB_CODE_APP_NOT_READY;
code = -1;
} else if (!mndIsMaster(pMnode)) {
code = -1;
} else {
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
mTrace("mnode rpc is acquired, ref:%d", ref);
}
taosThreadRwlockUnlock(&pMnode->lock);
return code;
}
void mndReleaseRpcRef(SMnode *pMnode) {
taosThreadRwlockRdlock(&pMnode->lock);
int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
mTrace("mnode rpc is released, ref:%d", ref);
taosThreadRwlockUnlock(&pMnode->lock);
}
void mndSetRestore(SMnode *pMnode, bool restored) {
if (restored) {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->restored = true;
taosThreadRwlockUnlock(&pMnode->lock);
mTrace("mnode set restored:%d", restored);
} else {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->restored = false;
taosThreadRwlockUnlock(&pMnode->lock);
mTrace("mnode set restored:%d", restored);
while (1) {
if (pMnode->rpcRef <= 0) break;
taosMsleep(3);
}
}
}
bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }
void mndSetStop(SMnode *pMnode) {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->stopped = true;
taosThreadRwlockUnlock(&pMnode->lock);
mTrace("mnode set stopped");
}
bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }
int32_t mndAcquireSyncRef(SMnode *pMnode) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
terrno = TSDB_CODE_APP_NOT_READY;
code = -1;
} else {
int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1);
mTrace("mnode sync is acquired, ref:%d", ref);
}
taosThreadRwlockUnlock(&pMnode->lock);
return code;
}
void mndReleaseSyncRef(SMnode *pMnode) {
taosThreadRwlockRdlock(&pMnode->lock);
int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
mTrace("mnode sync is released, ref:%d", ref);
taosThreadRwlockUnlock(&pMnode->lock);
}

View File

@ -63,7 +63,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
if (!pMnode->deploy) { if (!pMnode->deploy) {
mInfo("mnode sync restore finished"); mInfo("mnode sync restore finished");
mndTransPullup(pMnode); mndTransPullup(pMnode);
pMnode->syncMgmt.restored = true; mndSetRestore(pMnode, true);
} }
} }
@ -85,7 +85,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) { int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
pMnode->syncMgmt.restored = false; mndSetRestore(pMnode, false);
mInfo("start to apply snapshot to sdb, len:%d", len); mInfo("start to apply snapshot to sdb, len:%d", len);
int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len); int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len);
@ -93,7 +93,7 @@ int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char
mError("failed to apply snapshot to sdb, len:%d", len); mError("failed to apply snapshot to sdb, len:%d", len);
} else { } else {
mInfo("successfully to apply snapshot to sdb, len:%d", len); mInfo("successfully to apply snapshot to sdb, len:%d", len);
pMnode->syncMgmt.restored = true; mndSetRestore(pMnode, true);
} }
// taosMemoryFree(pBuf); // taosMemoryFree(pBuf);
@ -250,7 +250,7 @@ bool mndIsMaster(SMnode *pMnode) {
return false; return false;
} }
if (!pMgmt->restored) { if (!pMnode->restored) {
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
return false; return false;
} }

View File

@ -166,7 +166,6 @@ typedef struct SSdbRow {
typedef struct SSdb { typedef struct SSdb {
SMnode *pMnode; SMnode *pMnode;
char *currDir; char *currDir;
char *syncDir;
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t lastCommitVer;
int64_t curVer; int64_t curVer;
@ -182,6 +181,7 @@ typedef struct SSdb {
SdbDeployFp deployFps[SDB_MAX]; SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
TdThreadMutex filelock;
} SSdb; } SSdb;
typedef struct SSdbIter { typedef struct SSdbIter {

View File

@ -56,6 +56,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
pSdb->curTerm = -1; pSdb->curTerm = -1;
pSdb->lastCommitVer = -1; pSdb->lastCommitVer = -1;
pSdb->pMnode = pOption->pMnode; pSdb->pMnode = pOption->pMnode;
taosThreadMutexInit(&pSdb->filelock, NULL);
mDebug("sdb init successfully"); mDebug("sdb init successfully");
return pSdb; return pSdb;
} }
@ -69,10 +70,6 @@ void sdbCleanup(SSdb *pSdb) {
taosMemoryFreeClear(pSdb->currDir); taosMemoryFreeClear(pSdb->currDir);
} }
if (pSdb->syncDir != NULL) {
taosMemoryFreeClear(pSdb->syncDir);
}
if (pSdb->tmpDir != NULL) { if (pSdb->tmpDir != NULL) {
taosMemoryFreeClear(pSdb->tmpDir); taosMemoryFreeClear(pSdb->tmpDir);
} }
@ -104,6 +101,7 @@ void sdbCleanup(SSdb *pSdb) {
mDebug("sdb table:%s is cleaned up", sdbTableName(i)); mDebug("sdb table:%s is cleaned up", sdbTableName(i));
} }
taosThreadMutexDestroy(&pSdb->filelock);
taosMemoryFree(pSdb); taosMemoryFree(pSdb);
mDebug("sdb is cleaned up"); mDebug("sdb is cleaned up");
} }

View File

@ -22,13 +22,14 @@
#define SDB_RESERVE_SIZE 512 #define SDB_RESERVE_SIZE 512
#define SDB_FILE_VER 1 #define SDB_FILE_VER 1
static int32_t sdbRunDeployFp(SSdb *pSdb) { static int32_t sdbDeployData(SSdb *pSdb) {
mDebug("start to deploy sdb"); mDebug("start to deploy sdb");
for (int32_t i = SDB_MAX - 1; i >= 0; --i) { for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
SdbDeployFp fp = pSdb->deployFps[i]; SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue; if (fp == NULL) continue;
mDebug("start to deploy sdb:%s", sdbTableName(i));
if ((*fp)(pSdb->pMnode) != 0) { if ((*fp)(pSdb->pMnode) != 0) {
mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr()); mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
return -1; return -1;
@ -39,6 +40,39 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
return 0; return 0;
} }
static void sdbResetData(SSdb *pSdb) {
mDebug("start to reset sdb");
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue;
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL) continue;
sdbFreeRow(pSdb, pRow, true);
ppRow = taosHashIterate(hash, ppRow);
}
}
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue;
taosHashClear(pSdb->hashObjs[i]);
pSdb->tableVer[i] = 0;
pSdb->maxId[i] = 0;
mDebug("sdb:%s is reset", sdbTableName(i));
}
pSdb->curVer = -1;
pSdb->curTerm = -1;
pSdb->lastCommitVer = -1;
mDebug("sdb reset successfully");
}
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
int64_t sver = 0; int64_t sver = 0;
int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t)); int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
@ -169,11 +203,15 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return 0; return 0;
} }
int32_t sdbReadFile(SSdb *pSdb) { static int32_t sdbReadFileImp(SSdb *pSdb) {
int64_t offset = 0; int64_t offset = 0;
int32_t code = 0; int32_t code = 0;
int32_t readLen = 0; int32_t readLen = 0;
int64_t ret = 0; int64_t ret = 0;
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file);
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
if (pRaw == NULL) { if (pRaw == NULL) {
@ -182,10 +220,6 @@ int32_t sdbReadFile(SSdb *pSdb) {
return -1; return -1;
} }
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
@ -196,8 +230,6 @@ int32_t sdbReadFile(SSdb *pSdb) {
if (sdbReadFileHead(pSdb, pFile) != 0) { if (sdbReadFileHead(pSdb, pFile) != 0) {
mError("failed to read file:%s head since %s", file, terrstr()); mError("failed to read file:%s head since %s", file, terrstr());
pSdb->curVer = -1;
pSdb->curTerm = -1;
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
taosCloseFile(&pFile); taosCloseFile(&pFile);
return -1; return -1;
@ -264,6 +296,20 @@ _OVER:
return code; return code;
} }
int32_t sdbReadFile(SSdb *pSdb) {
taosThreadMutexLock(&pSdb->filelock);
sdbResetData(pSdb);
int32_t code = sdbReadFileImp(pSdb);
if (code != 0) {
mError("failed to read sdb since %s", terrstr());
sdbResetData(pSdb);
}
taosThreadMutexUnlock(&pSdb->filelock);
return code;
}
static int32_t sdbWriteFileImp(SSdb *pSdb) { static int32_t sdbWriteFileImp(SSdb *pSdb) {
int32_t code = 0; int32_t code = 0;
@ -378,15 +424,21 @@ int32_t sdbWriteFile(SSdb *pSdb) {
return 0; return 0;
} }
return sdbWriteFileImp(pSdb); taosThreadMutexLock(&pSdb->filelock);
int32_t code = sdbWriteFileImp(pSdb);
if (code != 0) {
mError("failed to write sdb since %s", terrstr());
}
taosThreadMutexUnlock(&pSdb->filelock);
return code;
} }
int32_t sdbDeploy(SSdb *pSdb) { int32_t sdbDeploy(SSdb *pSdb) {
if (sdbRunDeployFp(pSdb) != 0) { if (sdbDeployData(pSdb) != 0) {
return -1; return -1;
} }
if (sdbWriteFileImp(pSdb) != 0) { if (sdbWriteFile(pSdb) != 0) {
return -1; return -1;
} }
@ -397,13 +449,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) {
char datafile[PATH_MAX] = {0}; char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
taosThreadMutexLock(&pSdb->filelock);
if (taosCopyFile(datafile, tmpfile) != 0) { if (taosCopyFile(datafile, tmpfile) != 0) {
taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
return NULL; return NULL;
} }
taosThreadMutexUnlock(&pSdb->filelock);
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
if (pIter == NULL) { if (pIter == NULL) {
@ -422,11 +477,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) {
return pIter; return pIter;
} }
static void sdbCloseIter(SSdbIter *pIter) { static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) {
if (pIter == NULL) return; if (pIter == NULL) return;
if (pIter->file != NULL) { if (pIter->file != NULL) {
taosCloseFile(&pIter->file); taosCloseFile(&pIter->file);
} }
char tmpfile[PATH_MAX] = {0};
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
taosRemoveFile(tmpfile);
taosMemoryFree(pIter); taosMemoryFree(pIter);
mInfo("sdbiter:%p, is closed", pIter); mInfo("sdbiter:%p, is closed", pIter);
} }
@ -453,15 +513,14 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) {
} }
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) { int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) {
const int32_t maxlen = 100;
SSdbIter *pIter = sdbGetIter(pSdb, ppIter); SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
if (pIter == NULL) return -1; if (pIter == NULL) return -1;
int32_t maxlen = 100;
char *pBuf = taosMemoryCalloc(1, maxlen); char *pBuf = taosMemoryCalloc(1, maxlen);
if (pBuf == NULL) { if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
return -1; return -1;
} }
@ -472,7 +531,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = NULL; *ppBuf = NULL;
*len = 0; *len = 0;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return -1; return -1;
} else if (readlen == 0) { } else if (readlen == 0) {
@ -480,7 +539,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = NULL; *ppBuf = NULL;
*len = 0; *len = 0;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return 0; return 0;
} else if ((readlen < maxlen && errno != 0) || readlen == maxlen) { } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) {
@ -494,7 +553,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = pBuf; *ppBuf = pBuf;
*len = readlen; *len = readlen;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
return 0; return 0;
} else { } else {
// impossible // impossible
@ -502,7 +561,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = NULL; *ppBuf = NULL;
*len = 0; *len = 0;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return -1; return -1;
} }
@ -512,7 +571,7 @@ int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) {
char datafile[PATH_MAX] = {0}; char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {