diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index ddd6f1c05f..ab090940f2 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -81,7 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -int32_t mndProcessMsg(SRpcMsg *pMsg); +int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); /** diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 85120102bc..1de9875d06 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -40,7 +40,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { break; default: pMsg->info.node = pMgmt->pMnode; - code = mndProcessMsg(pMsg); + code = mndProcessRpcMsg(pMsg); } if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index cec49a4cbe..6661347e42 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,13 +75,12 @@ typedef struct { } STelemMgmt; typedef struct { - SWal *pWal; - sem_t syncSem; - int64_t sync; - bool standby; - bool restored; - int32_t errCode; - int32_t transId; + SWal *pWal; + sem_t syncSem; + int64_t sync; + bool standby; + int32_t errCode; + int32_t transId; } SSyncMgmt; typedef struct { @@ -90,34 +89,45 @@ typedef struct { } SGrantInfo; typedef struct SMnode { - int32_t selfDnodeId; - int64_t clusterId; - TdThread thread; - bool deploy; - bool stopped; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - char *path; - int64_t checkTime; - SSdb *pSdb; - SMgmtWrapper *pWrapper; - SArray *pSteps; - SQHandle *pQuery; - SShowMgmt showMgmt; - SProfileMgmt profileMgmt; - STelemMgmt telemMgmt; - SSyncMgmt syncMgmt; - SHashObj *infosMeta; - SHashObj *perfsMeta; - SGrantInfo grant; - MndMsgFp msgFp[TDMT_MAX]; - SMsgCb msgCb; + int32_t selfDnodeId; + int64_t clusterId; + TdThread thread; + TdThreadRwlock lock; + int32_t rpcRef; + int32_t syncRef; + bool stopped; + bool restored; + bool deploy; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + char *path; + int64_t checkTime; + SSdb *pSdb; + SArray *pSteps; + SQHandle *pQuery; + SHashObj *infosMeta; + SHashObj *perfsMeta; + SShowMgmt showMgmt; + SProfileMgmt profileMgmt; + STelemMgmt telemMgmt; + SSyncMgmt syncMgmt; + SGrantInfo grant; + MndMsgFp msgFp[TDMT_MAX]; + SMsgCb msgCb; } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); int64_t mndGenerateUid(char *name, int32_t len); +int32_t mndAcquireRpcRef(SMnode *pMnode); +void mndReleaseRpcRef(SMnode *pMnode); +void mndSetRestore(SMnode *pMnode, bool restored); +void mndSetStop(SMnode *pMnode); +bool mndGetStop(SMnode *pMnode); +int32_t mndAcquireSyncRef(SMnode *pMnode); +void mndReleaseSyncRef(SMnode *pMnode); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mndMain.c similarity index 83% rename from source/dnode/mnode/impl/src/mnode.c rename to source/dnode/mnode/impl/src/mndMain.c index 5458bc5126..995fe83cc5 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -85,7 +85,7 @@ static void *mndThreadFp(void *param) { while (1) { lastTime++; taosMsleep(100); - if (pMnode->stopped) break; + if (mndGetStop(pMnode)) break; if (lastTime % (tsTransPullupInterval * 10) == 0) { mndPullupTrans(pMnode); @@ -118,7 +118,6 @@ static int32_t mndInitTimer(SMnode *pMnode) { } static void mndCleanupTimer(SMnode *pMnode) { - pMnode->stopped = true; if (taosCheckPthreadValid(pMnode->thread)) { taosThreadJoin(pMnode->thread, NULL); taosThreadClear(&pMnode->thread); @@ -335,15 +334,19 @@ void mndClose(SMnode *pMnode) { int32_t mndStart(SMnode *pMnode) { mndSyncStart(pMnode); if (pMnode->deploy) { - if (sdbDeploy(pMnode->pSdb) != 0) return -1; - pMnode->syncMgmt.restored = true; + if (sdbDeploy(pMnode->pSdb) != 0) { + mError("failed to deploy sdb while start mnode"); + return -1; + } + mndSetRestore(pMnode, true); } return mndInitTimer(pMnode); } void mndStop(SMnode *pMnode) { + mndSetStop(pMnode); mndSyncStop(pMnode); - return mndCleanupTimer(pMnode); + mndCleanupTimer(pMnode); } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { @@ -362,6 +365,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return TAOS_SYNC_PROPOSE_OTHER_ERROR; } + if (mndAcquireSyncRef(pMnode) != 0) { + mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); + return TAOS_SYNC_PROPOSE_OTHER_ERROR; + } + char logBuf[512]; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); @@ -405,59 +413,45 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = TAOS_SYNC_PROPOSE_OTHER_ERROR; } + mndReleaseSyncRef(pMnode); return code; } -static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) { - if (!IsReq(pMsg)) return 0; - if (mndIsMaster(pMsg->info.node)) return 0; +static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { + if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; - if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || - pMsg->msgType == TDMT_MND_TRANS_TIMER) { - return -1; - } - mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, - TMSG_INFO(pMsg->msgType)); + if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && + pMsg->msgType != TDMT_MND_TRANS_TIMER) { + mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + TMSG_INFO(pMsg->msgType)); - SEpSet epSet = {0}; - mndGetMnodeEpSet(pMsg->info.node, &epSet); + SEpSet epSet = {0}; + mndGetMnodeEpSet(pMsg->info.node, &epSet); -#if 0 - mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse); - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { - epSet.inUse = (i + 1) % epSet.numOfEps; + int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); + pMsg->info.rsp = rpcMallocCont(contLen); + if (pMsg->info.rsp != NULL) { + tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); + pMsg->info.rspLen = contLen; + terrno = TSDB_CODE_RPC_REDIRECT; + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; } } -#endif - - int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); - pMsg->info.rsp = rpcMallocCont(contLen); - if (pMsg->info.rsp != NULL) { - tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); - pMsg->info.rspLen = contLen; - terrno = TSDB_CODE_RPC_REDIRECT; - } else { - terrno = TSDB_CODE_OUT_OF_MEMORY; - } return -1; } -static int32_t mndCheckRequestValid(SRpcMsg *pMsg) { +static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; - mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; return -1; } -int32_t mndProcessMsg(SRpcMsg *pMsg) { - if (mndCheckMnodeMaster(pMsg) != 0) return -1; - if (mndCheckRequestValid(pMsg) != 0) return -1; - +int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; if (fp == NULL) { @@ -466,8 +460,13 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { return -1; } - mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + if (mndCheckMsgContent(pMsg) != 0) return -1; + if (mndCheckMnodeState(pMsg) != 0) return -1; + + mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); int32_t code = (*fp)(pMsg); + mndReleaseRpcRef(pMnode); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mTrace("msg:%p, won't response immediately since in progress", pMsg); } else if (code == 0) { @@ -476,6 +475,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); } + return code; } @@ -502,7 +502,7 @@ int64_t mndGenerateUid(char *name, int32_t len) { int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo) { - if (!mndIsMaster(pMnode)) return -1; + if (mndAcquireRpcRef(pMnode) != 0) return -1; SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); @@ -511,6 +511,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc)); pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc)); if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) { + mndReleaseRpcRef(pMnode); return -1; } @@ -605,6 +606,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_total = INT32_MAX; } + mndReleaseRpcRef(pMnode); return 0; } @@ -612,3 +614,76 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync); return 0; } + +int32_t mndAcquireRpcRef(SMnode *pMnode) { + int32_t code = 0; + taosThreadRwlockRdlock(&pMnode->lock); + if (pMnode->stopped) { + terrno = TSDB_CODE_APP_NOT_READY; + code = -1; + } else if (!mndIsMaster(pMnode)) { + code = -1; + } else { + int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); + mTrace("mnode rpc is acquired, ref:%d", ref); + } + taosThreadRwlockUnlock(&pMnode->lock); + return code; +} + +void mndReleaseRpcRef(SMnode *pMnode) { + taosThreadRwlockRdlock(&pMnode->lock); + int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); + mTrace("mnode rpc is released, ref:%d", ref); + taosThreadRwlockUnlock(&pMnode->lock); +} + +void mndSetRestore(SMnode *pMnode, bool restored) { + if (restored) { + taosThreadRwlockWrlock(&pMnode->lock); + pMnode->restored = true; + taosThreadRwlockUnlock(&pMnode->lock); + mTrace("mnode set restored:%d", restored); + } else { + taosThreadRwlockWrlock(&pMnode->lock); + pMnode->restored = false; + taosThreadRwlockUnlock(&pMnode->lock); + mTrace("mnode set restored:%d", restored); + while (1) { + if (pMnode->rpcRef <= 0) break; + taosMsleep(3); + } + } +} + +bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; } + +void mndSetStop(SMnode *pMnode) { + taosThreadRwlockWrlock(&pMnode->lock); + pMnode->stopped = true; + taosThreadRwlockUnlock(&pMnode->lock); + mTrace("mnode set stopped"); +} + +bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; } + +int32_t mndAcquireSyncRef(SMnode *pMnode) { + int32_t code = 0; + taosThreadRwlockRdlock(&pMnode->lock); + if (pMnode->stopped) { + terrno = TSDB_CODE_APP_NOT_READY; + code = -1; + } else { + int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); + mTrace("mnode sync is acquired, ref:%d", ref); + } + taosThreadRwlockUnlock(&pMnode->lock); + return code; +} + +void mndReleaseSyncRef(SMnode *pMnode) { + taosThreadRwlockRdlock(&pMnode->lock); + int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); + mTrace("mnode sync is released, ref:%d", ref); + taosThreadRwlockUnlock(&pMnode->lock); +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e25aa468bf..c6ab916ee1 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -63,7 +63,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { if (!pMnode->deploy) { mInfo("mnode sync restore finished"); mndTransPullup(pMnode); - pMnode->syncMgmt.restored = true; + mndSetRestore(pMnode, true); } } @@ -85,7 +85,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) { SMnode *pMnode = pFsm->data; - pMnode->syncMgmt.restored = false; + mndSetRestore(pMnode, false); mInfo("start to apply snapshot to sdb, len:%d", len); int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len); @@ -93,7 +93,7 @@ int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char mError("failed to apply snapshot to sdb, len:%d", len); } else { mInfo("successfully to apply snapshot to sdb, len:%d", len); - pMnode->syncMgmt.restored = true; + mndSetRestore(pMnode, true); } // taosMemoryFree(pBuf); @@ -250,7 +250,7 @@ bool mndIsMaster(SMnode *pMnode) { return false; } - if (!pMgmt->restored) { + if (!pMnode->restored) { terrno = TSDB_CODE_APP_NOT_READY; return false; } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 32e29f5e1c..411d4c59ea 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -166,7 +166,6 @@ typedef struct SSdbRow { typedef struct SSdb { SMnode *pMnode; char *currDir; - char *syncDir; char *tmpDir; int64_t lastCommitVer; int64_t curVer; @@ -182,6 +181,7 @@ typedef struct SSdb { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; + TdThreadMutex filelock; } SSdb; typedef struct SSdbIter { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index d289e30d7b..aef3476440 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -56,6 +56,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { pSdb->curTerm = -1; pSdb->lastCommitVer = -1; pSdb->pMnode = pOption->pMnode; + taosThreadMutexInit(&pSdb->filelock, NULL); mDebug("sdb init successfully"); return pSdb; } @@ -69,10 +70,6 @@ void sdbCleanup(SSdb *pSdb) { taosMemoryFreeClear(pSdb->currDir); } - if (pSdb->syncDir != NULL) { - taosMemoryFreeClear(pSdb->syncDir); - } - if (pSdb->tmpDir != NULL) { taosMemoryFreeClear(pSdb->tmpDir); } @@ -104,6 +101,7 @@ void sdbCleanup(SSdb *pSdb) { mDebug("sdb table:%s is cleaned up", sdbTableName(i)); } + taosThreadMutexDestroy(&pSdb->filelock); taosMemoryFree(pSdb); mDebug("sdb is cleaned up"); } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 46abd5e8b2..b2dcbd68e3 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -22,13 +22,14 @@ #define SDB_RESERVE_SIZE 512 #define SDB_FILE_VER 1 -static int32_t sdbRunDeployFp(SSdb *pSdb) { +static int32_t sdbDeployData(SSdb *pSdb) { mDebug("start to deploy sdb"); for (int32_t i = SDB_MAX - 1; i >= 0; --i) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; + mDebug("start to deploy sdb:%s", sdbTableName(i)); if ((*fp)(pSdb->pMnode) != 0) { mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr()); return -1; @@ -39,6 +40,39 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { return 0; } +static void sdbResetData(SSdb *pSdb) { + mDebug("start to reset sdb"); + + for (ESdbType i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = pSdb->hashObjs[i]; + if (hash == NULL) continue; + + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL) continue; + + sdbFreeRow(pSdb, pRow, true); + ppRow = taosHashIterate(hash, ppRow); + } + } + + for (ESdbType i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = pSdb->hashObjs[i]; + if (hash == NULL) continue; + + taosHashClear(pSdb->hashObjs[i]); + pSdb->tableVer[i] = 0; + pSdb->maxId[i] = 0; + mDebug("sdb:%s is reset", sdbTableName(i)); + } + + pSdb->curVer = -1; + pSdb->curTerm = -1; + pSdb->lastCommitVer = -1; + mDebug("sdb reset successfully"); +} + static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { int64_t sver = 0; int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t)); @@ -169,11 +203,15 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { return 0; } -int32_t sdbReadFile(SSdb *pSdb) { +static int32_t sdbReadFileImp(SSdb *pSdb) { int64_t offset = 0; int32_t code = 0; int32_t readLen = 0; int64_t ret = 0; + char file[PATH_MAX] = {0}; + + snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + mDebug("start to read file:%s", file); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); if (pRaw == NULL) { @@ -182,10 +220,6 @@ int32_t sdbReadFile(SSdb *pSdb) { return -1; } - char file[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to read file:%s", file); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { taosMemoryFree(pRaw); @@ -196,8 +230,6 @@ int32_t sdbReadFile(SSdb *pSdb) { if (sdbReadFileHead(pSdb, pFile) != 0) { mError("failed to read file:%s head since %s", file, terrstr()); - pSdb->curVer = -1; - pSdb->curTerm = -1; taosMemoryFree(pRaw); taosCloseFile(&pFile); return -1; @@ -264,6 +296,20 @@ _OVER: return code; } +int32_t sdbReadFile(SSdb *pSdb) { + taosThreadMutexLock(&pSdb->filelock); + + sdbResetData(pSdb); + int32_t code = sdbReadFileImp(pSdb); + if (code != 0) { + mError("failed to read sdb since %s", terrstr()); + sdbResetData(pSdb); + } + + taosThreadMutexUnlock(&pSdb->filelock); + return code; +} + static int32_t sdbWriteFileImp(SSdb *pSdb) { int32_t code = 0; @@ -378,15 +424,21 @@ int32_t sdbWriteFile(SSdb *pSdb) { return 0; } - return sdbWriteFileImp(pSdb); + taosThreadMutexLock(&pSdb->filelock); + int32_t code = sdbWriteFileImp(pSdb); + if (code != 0) { + mError("failed to write sdb since %s", terrstr()); + } + taosThreadMutexUnlock(&pSdb->filelock); + return code; } int32_t sdbDeploy(SSdb *pSdb) { - if (sdbRunDeployFp(pSdb) != 0) { + if (sdbDeployData(pSdb) != 0) { return -1; } - if (sdbWriteFileImp(pSdb) != 0) { + if (sdbWriteFile(pSdb) != 0) { return -1; } @@ -397,13 +449,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { char datafile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0}; snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + taosThreadMutexLock(&pSdb->filelock); if (taosCopyFile(datafile, tmpfile) != 0) { + taosThreadMutexUnlock(&pSdb->filelock); terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); return NULL; } + taosThreadMutexUnlock(&pSdb->filelock); SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); if (pIter == NULL) { @@ -422,11 +477,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { return pIter; } -static void sdbCloseIter(SSdbIter *pIter) { +static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) { if (pIter == NULL) return; if (pIter->file != NULL) { taosCloseFile(&pIter->file); } + + char tmpfile[PATH_MAX] = {0}; + snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + taosRemoveFile(tmpfile); + taosMemoryFree(pIter); mInfo("sdbiter:%p, is closed", pIter); } @@ -453,15 +513,14 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) { } int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) { - const int32_t maxlen = 100; - SSdbIter *pIter = sdbGetIter(pSdb, ppIter); if (pIter == NULL) return -1; - char *pBuf = taosMemoryCalloc(1, maxlen); + int32_t maxlen = 100; + char *pBuf = taosMemoryCalloc(1, maxlen); if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); return -1; } @@ -472,7 +531,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return -1; } else if (readlen == 0) { @@ -480,7 +539,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return 0; } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) { @@ -494,7 +553,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = pBuf; *len = readlen; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); return 0; } else { // impossible @@ -502,7 +561,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return -1; } @@ -512,7 +571,7 @@ int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) { char datafile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0}; snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) {