diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 49e94896cd..c19928c7a9 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 9284147 + GIT_TAG cc973e0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/zh/21-tdinternal/01-arch.md b/docs/zh/21-tdinternal/01-arch.md index 782e4bcdd8..6150f2e757 100644 --- a/docs/zh/21-tdinternal/01-arch.md +++ b/docs/zh/21-tdinternal/01-arch.md @@ -240,7 +240,7 @@ dataDir /mnt/data6 2 0 ## 数据查询 -TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点公共分担查询和计算任务。 +TDengine 提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine 的查询处理需要客户端、vnode、qnode、mnode 节点协同完成,一个复杂的超级表聚合查询可能需要多个 vnode 和 qnode 节点共同分担查询和计算任务。 ### 查询基本流程 diff --git a/include/util/tutil.h b/include/util/tutil.h index 2a3f0dcb02..de96300155 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } } +#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ + if (CODE) { \ + LINO = __LINE__; \ + goto LABEL; \ + } + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 45d2de4a7a..f329a8e1a0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -199,7 +199,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { if (TSDB_CODE_SUCCESS != nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { - tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self, + tscError("%" PRId64 " failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); destroyRequest(*pRequest); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index bcc2e358d6..f825407b45 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -217,17 +217,80 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { } } +static void *vmCloseVnodeInThread(void *param) { + SVnodeThread *pThread = param; + SVnodeMgmt *pMgmt = pThread->pMgmt; + + dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum); + setThreadName("close-vnodes"); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + SVnodeObj *pVnode = pThread->ppVnodes[v]; + + char stepDesc[TSDB_STEP_DESC_LEN] = {0}; + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId, + pMgmt->state.openVnodes, pMgmt->state.totalVnodes); + tmsgReportStartup("vnode-close", stepDesc); + + vmCloseVnode(pMgmt, pVnode); + } + + dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum); + return NULL; +} + static void vmCloseVnodes(SVnodeMgmt *pMgmt) { dInfo("start to close all vnodes"); int32_t numOfVnodes = 0; SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); - for (int32_t i = 0; i < numOfVnodes; ++i) { - if (ppVnodes == NULL || ppVnodes[i] == NULL) continue; - vmCloseVnode(pMgmt, ppVnodes[i]); + int32_t threadNum = tsNumOfCores / 2; + if (threadNum < 1) threadNum = 1; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + + SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].pMgmt = pMgmt; + threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *)); } + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SVnodeThread *pThread = &threads[t]; + if (pThread->ppVnodes != NULL && ppVnodes != NULL) { + pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v]; + } + } + + pMgmt->state.openVnodes = 0; + dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum); + + for (int32_t t = 0; t < threadNum; ++t) { + SVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) { + dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno)); + } + + taosThreadAttrDestroy(&thAttr); + } + + for (int32_t t = 0; t < threadNum; ++t) { + SVnodeThread *pThread = &threads[t]; + if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { + taosThreadJoin(pThread->thread, NULL); + taosThreadClear(&pThread->thread); + } + taosMemoryFree(pThread->ppVnodes); + } + taosMemoryFree(threads); + if (ppVnodes != NULL) { taosMemoryFree(ppVnodes); } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8a3179b2a9..4b1906ba70 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2555,7 +2555,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs); char *sep = ", "; int32_t sepLen = strlen(sep); - int32_t rollupLen = sizeof(rollup) - 2; + int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2; for (int32_t i = 0; i < rollupNum; ++i) { char *funcName = taosArrayGet(pStb->pFuncs, i); if (i) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bf110f1ae3..cb877e3b2d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -32,12 +32,6 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ - if (CODE) { \ - LINO = __LINE__; \ - goto LABEL; \ - } - typedef struct TSDBROW TSDBROW; typedef struct TABLEID TABLEID; typedef struct TSDBKEY TSDBKEY; @@ -247,18 +241,17 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch // SDelFile void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); // tsdbFS.c ============================================================================================== -int32_t tsdbFSOpen(STsdb *pTsdb); +int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback); int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS); int32_t tDFileSetCmprFn(const void *p1, const void *p2); -int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS); -int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); +int32_t tsdbFSCommit(STsdb *pTsdb); +int32_t tsdbFSRollback(STsdb *pTsdb); +int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS); -int32_t tsdbFSRollback(STsdbFS *pFS); - int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet); int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile); // tsdbReaderWriter.c ============================================================================================== diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index aca99ecd2f..988ecc5dd3 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode); int32_t vnodeCommit(SVnode* pVnode); +void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); +bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 712f8bd15b..42ab0c2a37 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -97,10 +97,11 @@ typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; typedef struct STbUidStore STbUidStore; -int metaOpen(SVnode* pVnode, SMeta** ppMeta); +int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaClose(SMeta* pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); int metaCommit(SMeta* pMeta); +int metaFinishCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); @@ -149,10 +150,12 @@ typedef struct { int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); +int32_t tsdbFinishCommit(STsdb* pTsdb); +int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); @@ -200,15 +203,15 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem // sma int32_t smaInit(); void smaCleanUp(); -int32_t smaOpen(SVnode* pVnode); +int32_t smaOpen(SVnode* pVnode, int8_t rollback); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma); -int32_t smaAsyncPreCommit(SSma* pSma); -int32_t smaAsyncCommit(SSma* pSma); -int32_t smaAsyncPostCommit(SSma* pSma); +int32_t smaPreCommit(SSma* pSma); +int32_t smaCommit(SSma* pSma); +int32_t smaPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 85ed40970c..01ad833d20 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -34,6 +34,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { // commit the meta txn int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } +int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); } // abort the meta txn int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); } diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 515fd31e9d..2198033db9 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } -int metaOpen(SVnode *pVnode, SMeta **ppMeta) { +int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { SMeta *pMeta = NULL; int ret; int slen; @@ -60,49 +60,49 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { taosMkDir(pMeta->path); // open env - ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv); + ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback); if (ret < 0) { metaError("vgId:%d, failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pTbDb - ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb); + ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSkmDb - ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb); + ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pUidIdx - ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); + ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pNameIdx - ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx); + ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pCtbIdx - ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx); + ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSuidIdx - ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx); + ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; @@ -119,27 +119,27 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { goto _err; } - ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); + ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pTtlIdx - ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); + ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSmaIdx - ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx); + ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } - ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb); + ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0); if (ret < 0) { metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index a40bbd7d87..5c5b49ece5 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -165,6 +165,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { } else { code = metaCommit(pWriter->pMeta); if (code) goto _err; + code = metaFinishCommit(pWriter->pMeta); + if (code) goto _err; } taosMemoryFree(pWriter); *ppWriter = NULL; diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 6168a00815..3dce724de7 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -54,28 +54,28 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p #endif /** - * @brief Only applicable to Rollup SMA + * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } /** - * @brief Only applicable to Rollup SMA + * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } +int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } /** - * @brief Only applicable to Rollup SMA + * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } +int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } /** * @brief set rsma trigger stat active @@ -366,9 +366,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { + int32_t code = 0; + SVnode *pVnode = pSma->pVnode; SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; + goto _exit; } #if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); @@ -378,8 +380,21 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { return TSDB_CODE_FAILED; } #endif - - return TSDB_CODE_SUCCESS; + if ((code = tsdbCommit(VND_RSMA0(pVnode))) < 0) { + smaError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) { + smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbCommit(VND_RSMA2(pVnode))) < 0) { + smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } +_exit: + terrno = code; + return code; } /** diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index ef0d51f0eb..850e4c5697 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma); pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ } while (0) -#define SMA_OPEN_RSMA_IMPL(v, l) \ - do { \ - SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ - if (!RETENTION_VALID(r)) { \ - if (l == 0) { \ - goto _err; \ - } \ - break; \ - } \ - smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ - if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \ - goto _err; \ - } \ +#define SMA_OPEN_RSMA_IMPL(v, l) \ + do { \ + SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ + if (!RETENTION_VALID(r)) { \ + if (l == 0) { \ + goto _err; \ + } \ + break; \ + } \ + smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ + if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \ + goto _err; \ + } \ } while (0) /** @@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty return 0; } -int32_t smaOpen(SVnode *pVnode) { +int32_t smaOpen(SVnode *pVnode, int8_t rollback) { STsdbCfg *pCfg = &pVnode->config.tsdbCfg; ASSERT(!pVnode->pSma); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 8e4bc34e0c..7e3ba57dea 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -70,17 +70,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { } int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) { ASSERT(0); return -1; } - if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) { + if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) { ASSERT(0); return -1; } - if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) { + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) { ASSERT(0); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a4d993bc6c..460ef611a3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1041,38 +1041,20 @@ _exit: static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { int32_t code = 0; int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - ASSERT(eno == 0 && - "tsdbCommit failure" - "Restart taosd"); - - code = tsdbFSCommit1(pTsdb, &pCommitter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - // lock - taosThreadRwlockWrlock(&pTsdb->rwLock); - - // commit or rollback - code = tsdbFSCommit2(pTsdb, &pCommitter->fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + if (eno) { + code = eno; + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs); TSDB_CHECK_CODE(code, lino, _exit); } - pTsdb->imem = NULL; - - // unlock - taosThreadRwlockUnlock(&pTsdb->rwLock); - - tsdbUnrefMemTable(pMemTable); +_exit: tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); - -_exit: - if (code) { + if (code || eno) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); @@ -1646,3 +1628,50 @@ _exit: } return code; } + +int32_t tsdbFinishCommit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + SMemTable *pMemTable = pTsdb->imem; + + // lock + taosThreadRwlockWrlock(&pTsdb->rwLock); + + code = tsdbFSCommit(pTsdb); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pTsdb->imem = NULL; + + // unlock + taosThreadRwlockUnlock(&pTsdb->rwLock); + if (pMemTable) { + tsdbUnrefMemTable(pMemTable); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d tsdb finish commit", TD_VID(pTsdb->pVnode)); + } + return code; +} + +int32_t tsdbRollbackCommit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFSRollback(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d tsdb rollback commit", TD_VID(pTsdb->pVnode)); + } + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 6fd5629592..8edac7abc5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -16,7 +16,7 @@ #include "tsdb.h" // ================================================================================================= -static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { +static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) { int32_t n = 0; int8_t hasDel = pFS->pDelFile ? 1 : 0; uint32_t nSet = taosArrayGetSize(pFS->aDFileSet); @@ -39,214 +39,112 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { return n; } -static int32_t tsdbGnrtCurrent(STsdb *pTsdb, STsdbFS *pFS, char *fname) { - int32_t code = 0; - int64_t n; - int64_t size; - uint8_t *pData = NULL; - TdFilePtr pFD = NULL; +static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, STsdbFS *pFS) { + int32_t code = 0; + int32_t n = 0; - // to binary - size = tsdbEncodeFS(NULL, pFS) + sizeof(TSCKSUM); - pData = taosMemoryMalloc(size); + // version + n += tGetI8(pData + n, NULL); + + // SDelFile + int8_t hasDel = 0; + n += tGetI8(pData + n, &hasDel); + if (hasDel) { + pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile)); + if (pFS->pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += tGetDelFile(pData + n, pFS->pDelFile); + pFS->pDelFile->nRef = 1; + } else { + pFS->pDelFile = NULL; + } + + // aDFileSet + taosArrayClear(pFS->aDFileSet); + uint32_t nSet = 0; + n += tGetU32v(pData + n, &nSet); + for (uint32_t iSet = 0; iSet < nSet; iSet++) { + SDFileSet fSet = {0}; + + int32_t nt = tGetDFileSet(pData + n, &fSet); + if (nt < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += nt; + if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + ASSERT(n + sizeof(TSCKSUM) == nData); + +_exit: + return code; +} + +static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) { + int32_t code = 0; + int32_t lino = 0; + + // encode to binary + int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM); + uint8_t *pData = taosMemoryMalloc(size); if (pData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - n = tsdbEncodeFS(pData, pFS); - ASSERT(n + sizeof(TSCKSUM) == size); + tsdbFSToBinary(pData, pFS); taosCalcChecksumAppend(0, pData, size); - // create and write - pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + // save to file + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - n = taosWriteFile(pFD, pData, size); + int64_t n = taosWriteFile(pFD, pData, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } if (taosFsyncFile(pFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } taosCloseFile(&pFD); +_exit: if (pData) taosMemoryFree(pData); - return code; - -_err: - tsdbError("vgId:%d, tsdb gnrt current failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - if (pData) taosMemoryFree(pData); + if (code) { + tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } return code; } -// static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) { -// int32_t code = 0; -// char fname[TSDB_FILENAME_LEN]; +int32_t tsdbFSCreate(STsdbFS *pFS) { + int32_t code = 0; -// if (pFrom && pTo) { -// bool isSameDisk = (pFrom->diskId.level == pTo->diskId.level) && (pFrom->diskId.id == pTo->diskId.id); + pFS->pDelFile = NULL; + pFS->aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); + if (pFS->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } -// // head -// if (isSameDisk && pFrom->pHeadF->commitID == pTo->pHeadF->commitID) { -// ASSERT(pFrom->pHeadF->size == pTo->pHeadF->size); -// ASSERT(pFrom->pHeadF->offset == pTo->pHeadF->offset); -// } else { -// tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname); -// taosRemoveFile(fname); -// } - -// // data -// if (isSameDisk && pFrom->pDataF->commitID == pTo->pDataF->commitID) { -// if (pFrom->pDataF->size > pTo->pDataF->size) { -// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE); -// if (code) goto _err; -// } -// } else { -// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname); -// taosRemoveFile(fname); -// } - -// // stt -// if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) { -// if (pFrom->pLastF->size > pTo->pLastF->size) { -// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE); -// if (code) goto _err; -// } -// } else { -// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname); -// taosRemoveFile(fname); -// } - -// // sma -// if (isSameDisk && pFrom->pSmaF->commitID == pTo->pSmaF->commitID) { -// if (pFrom->pSmaF->size > pTo->pSmaF->size) { -// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE); -// if (code) goto _err; -// } -// } else { -// tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname); -// taosRemoveFile(fname); -// } -// } else if (pFrom) { -// // head -// tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname); -// taosRemoveFile(fname); - -// // data -// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname); -// taosRemoveFile(fname); - -// // stt -// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname); -// taosRemoveFile(fname); - -// // fsm -// tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname); -// taosRemoveFile(fname); -// } - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb apply disk file set change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbApplyDelFileChange(STsdbFS *pFS, SDelFile *pFrom, SDelFile *pTo) { -// int32_t code = 0; -// char fname[TSDB_FILENAME_LEN]; - -// if (pFrom && pTo) { -// if (!tsdbDelFileIsSame(pFrom, pTo)) { -// tsdbDelFileName(pFS->pTsdb, pFrom, fname); -// if (taosRemoveFile(fname) < 0) { -// code = TAOS_SYSTEM_ERROR(errno); -// goto _err; -// } -// } -// } else if (pFrom) { -// tsdbDelFileName(pFS->pTsdb, pFrom, fname); -// if (taosRemoveFile(fname) < 0) { -// code = TAOS_SYSTEM_ERROR(errno); -// goto _err; -// } -// } else { -// // do nothing -// } - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb apply del file change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSState *pTo) { -// int32_t code = 0; -// int32_t iFrom = 0; -// int32_t nFrom = taosArrayGetSize(pFrom->aDFileSet); -// int32_t iTo = 0; -// int32_t nTo = taosArrayGetSize(pTo->aDFileSet); -// SDFileSet *pDFileSetFrom; -// SDFileSet *pDFileSetTo; - -// // SDelFile -// code = tsdbApplyDelFileChange(pFS, pFrom->pDelFile, pTo->pDelFile); -// if (code) goto _err; - -// // SDFileSet -// while (iFrom < nFrom && iTo < nTo) { -// pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom); -// pDFileSetTo = (SDFileSet *)taosArrayGet(pTo->aDFileSet, iTo); - -// if (pDFileSetFrom->fid == pDFileSetTo->fid) { -// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo); -// if (code) goto _err; - -// iFrom++; -// iTo++; -// } else if (pDFileSetFrom->fid < pDFileSetTo->fid) { -// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); -// if (code) goto _err; - -// iFrom++; -// } else { -// iTo++; -// } -// } - -// while (iFrom < nFrom) { -// pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom); -// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); -// if (code) goto _err; - -// iFrom++; -// } - -// #if 0 -// // do noting -// while (iTo < nTo) { -// pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo); -// code = tsdbApplyDFileSetChange(pFS, NULL, pDFileSetTo); -// if (code) goto _err; - -// iTo++; -// } -// #endif - -// return code; - -// _err: -// tsdbError("vgId:%d, tsdb fs apply disk change failed sicne %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); -// return code; -// } +_exit: + return code; +} void tsdbFSDestroy(STsdbFS *pFS) { if (pFS->pDelFile) { @@ -268,76 +166,81 @@ void tsdbFSDestroy(STsdbFS *pFS) { static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { int32_t code = 0; - int64_t size; - char fname[TSDB_FILENAME_LEN]; + int32_t lino = 0; + int64_t size = 0; + char fname[TSDB_FILENAME_LEN] = {0}; // SDelFile if (pTsdb->fs.pDelFile) { tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } // SArray + int32_t fid = 0; for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + fid = pSet->fid; // head ========= tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // data ========= tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { - code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } + // else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { + // code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); + // TSDB_CHECK_CODE(code, lino, _exit); + // } // sma ============= tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { - code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } + // else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { + // code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); + // TSDB_CHECK_CODE(code, lino, _exit); + // } // stt =========== for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } } @@ -346,10 +249,11 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { // remove those invalid files (todo) } - return code; - -_err: - tsdbError("vgId:%d, tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), + fid); + } return code; } @@ -363,141 +267,531 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { return 0; } -static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) { - int32_t code = 0; - int8_t hasDel; - uint32_t nSet; - int32_t n = 0; +static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { + SVnode *pVnode = pTsdb->pVnode; + if (pVnode->pTfs) { + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } + } else { + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP); + } + } +} - // version - n += tGetI8(pData + n, NULL); +static int32_t tsdbLoadFSFromFile(const char *fname, STsdbFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + uint8_t *pData = NULL; + + // load binary + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); + if (pFD == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size; + if (taosFStatFile(pFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosReadFile(pFD, pData, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!taosCheckChecksumWhole(pData, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&pFD); + + // decode binary + code = tsdbBinaryToFS(pData, size, pFS); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; +} + +static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN] = {0}; + + int32_t nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSet->pHeadF); + } + + nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->pDataF); + } + + nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->pSmaF); + } + + for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) { + nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->aSttF[iStt]); + } + } + +_exit: + return code; +} + +static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFrom) { + int32_t code = 0; + int32_t lino = 0; + + *pSetTo = (SDFileSet){.diskId = pSetFrom->diskId, .fid = pSetFrom->fid, .nSttF = 0}; + + // head + pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetTo->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pHeadF = *pSetFrom->pHeadF; + pSetTo->pHeadF->nRef = 1; + + // data + pSetTo->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetTo->pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pDataF = *pSetFrom->pDataF; + pSetTo->pDataF->nRef = 1; + + // sma + pSetTo->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetTo->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pSmaF = *pSetFrom->pSmaF; + pSetTo->pSmaF->nRef = 1; + + // stt + for (int32_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) { + pSetTo->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetTo->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pSetTo->nSttF++; + *pSetTo->aSttF[iStt] = *pSetFrom->aSttF[iStt]; + pSetTo->aSttF[iStt]->nRef = 1; + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSetNew) { + int32_t code = 0; + int32_t lino = 0; + int32_t nRef = 0; + bool sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); + char fname[TSDB_FILENAME_LEN] = {0}; + + // head + SHeadFile *pHeadF = pSetOld->pHeadF; + if ((!sameDisk) || (pHeadF->commitID != pSetNew->pHeadF->commitID)) { + pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetOld->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; + + nRef = atomic_sub_fetch_32(&pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pHeadF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pHeadF); + } + } else { + nRef = pHeadF->nRef; + *pHeadF = *pSetNew->pHeadF; + pHeadF->nRef = nRef; + } + + // data + SDataFile *pDataF = pSetOld->pDataF; + if ((!sameDisk) || (pDataF->commitID != pSetNew->pDataF->commitID)) { + pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetOld->pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; + + nRef = atomic_sub_fetch_32(&pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pDataF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pDataF); + } + } else { + nRef = pDataF->nRef; + *pDataF = *pSetNew->pDataF; + pDataF->nRef = nRef; + } + + // sma + SSmaFile *pSmaF = pSetOld->pSmaF; + if ((!sameDisk) || (pSmaF->commitID != pSetNew->pSmaF->commitID)) { + pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetOld->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; + + nRef = atomic_sub_fetch_32(&pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSmaF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSmaF); + } + } else { + nRef = pSmaF->nRef; + *pSmaF = *pSetNew->pSmaF; + pSmaF->nRef = nRef; + } + + // stt + if (sameDisk) { + if (pSetNew->nSttF > pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1); + pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; + pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; + pSetOld->nSttF++; + } else if (pSetNew->nSttF < pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == 1); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + pSetOld->aSttF[iStt] = NULL; + } + + pSetOld->nSttF = 1; + pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; + pSetOld->aSttF[0]->nRef = 1; + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } else { + ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); + } + } + } + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + } + + pSetOld->nSttF = 0; + for (int32_t iStt = 0; iStt < pSetNew->nSttF; iStt++) { + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + + pSetOld->nSttF++; + } + } + + if (!sameDisk) { + pSetOld->diskId = pSetNew->diskId; + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN] = {0}; // SDelFile - n += tGetI8(pData + n, &hasDel); - if (hasDel) { - pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pTsdb->fs.pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + if (pFS->pDelFile) { + SDelFile *pDelFile = pTsdb->fs.pDelFile; - pTsdb->fs.pDelFile->nRef = 1; - n += tGetDelFile(pData + n, pTsdb->fs.pDelFile); + if (pDelFile == NULL || (pDelFile->commitID != pFS->pDelFile->commitID)) { + pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); + if (pTsdb->fs.pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + *pTsdb->fs.pDelFile = *pFS->pDelFile; + pTsdb->fs.pDelFile->nRef = 1; + + if (pDelFile) { + nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1); + if (nRef == 0) { + tsdbDelFileName(pTsdb, pDelFile, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pDelFile); + } + } + } } else { - pTsdb->fs.pDelFile = NULL; + ASSERT(pTsdb->fs.pDelFile == NULL); } - // SArray - taosArrayClear(pTsdb->fs.aDFileSet); - n += tGetU32v(pData + n, &nSet); - for (uint32_t iSet = 0; iSet < nSet; iSet++) { + // aDFileSet + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); + int32_t nNew = taosArrayGetSize(pFS->aDFileSet); SDFileSet fSet = {0}; + int8_t sameDisk = 0; - int32_t nt = tGetDFileSet(pData + n, &fSet); - if (nt < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + if (iOld >= nOld && iNew >= nNew) break; - n += nt; + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL; - if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit); + + iOld++; + iNew++; + } else if (pSetOld->fid < pSetNew->fid) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; + } + } else if (pSetOld) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; } } - ASSERT(n + sizeof(TSCKSUM) == nData); - return code; - -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } // EXPOSED APIS ==================================================================================== -int32_t tsdbFSOpen(STsdb *pTsdb) { +int32_t tsdbFSCommit(STsdb *pTsdb) { int32_t code = 0; + int32_t lino = 0; + STsdbFS fs = {0}; + + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, current, current_t); + + if (!taosCheckExistFile(current_t)) goto _exit; + + // rename the file + if (taosRenameFile(current_t, current) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // Load the new FS + code = tsdbFSCreate(&fs); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbLoadFSFromFile(current, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // apply file change + code = tsdbFSApplyChange(pTsdb, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + tsdbFSDestroy(&fs); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tsdbFSRollback(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, NULL, current_t); + (void)taosRemoveFile(current_t); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno)); + } + return code; +} + +int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pTsdb->pVnode; // open handle - pTsdb->fs.pDelFile = NULL; - pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); - if (pTsdb->fs.aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + code = tsdbFSCreate(&pTsdb->fs); + TSDB_CHECK_CODE(code, lino, _exit); - // load fs or keep empty - char fname[TSDB_FILENAME_LEN]; + // open impl + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, current, current_t); - if (pVnode->pTfs) { - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); + if (taosCheckExistFile(current)) { + code = tsdbLoadFSFromFile(current, &pTsdb->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosCheckExistFile(current_t)) { + if (rollback) { + code = tsdbFSRollback(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFSCommit(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } + } } else { - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); - } - - if (!taosCheckExistFile(fname)) { // empty one - code = tsdbGnrtCurrent(pTsdb, &pTsdb->fs, fname); - if (code) goto _err; - } else { - // read - TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); - if (pFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbSaveFSToFile(&pTsdb->fs, current); + TSDB_CHECK_CODE(code, lino, _exit); - int64_t size; - if (taosFStatFile(pFD, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosCloseFile(&pFD); - goto _err; - } - - uint8_t *pData = taosMemoryMalloc(size); - if (pData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - taosCloseFile(&pFD); - goto _err; - } - - int64_t n = taosReadFile(pFD, pData, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pData); - taosCloseFile(&pFD); - goto _err; - } - - if (!taosCheckChecksumWhole(pData, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - taosMemoryFree(pData); - taosCloseFile(&pFD); - goto _err; - } - - taosCloseFile(&pFD); - - // recover fs - code = tsdbRecoverFS(pTsdb, pData, size); - if (code) { - taosMemoryFree(pData); - goto _err; - } - - taosMemoryFree(pData); + ASSERT(!rollback); } // scan and fix FS code = tsdbScanAndTryFixFS(pTsdb); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); - return code; - -_err: - tsdbError("vgId:%d, tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -538,19 +832,24 @@ int32_t tsdbFSClose(STsdb *pTsdb) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; + int32_t lino = 0; pFS->pDelFile = NULL; - pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); - if (pFS->aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + if (pFS->aDFileSet) { + taosArrayClear(pFS->aDFileSet); + } else { + pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); + if (pFS->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } } if (pTsdb->fs.pDelFile) { pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); if (pFS->pDelFile == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *pFS->pDelFile = *pTsdb->fs.pDelFile; @@ -564,7 +863,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); if (fSet.pHeadF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pHeadF = *pSet->pHeadF; @@ -572,7 +871,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); if (fSet.pDataF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pDataF = *pSet->pDataF; @@ -580,7 +879,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); if (fSet.pSmaF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pSmaF = *pSet->pSmaF; @@ -589,26 +888,21 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); if (fSet.aSttF[fSet.nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF]; } if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } _exit: - return code; -} - -int32_t tsdbFSRollback(STsdbFS *pFS) { - int32_t code = 0; - - ASSERT(0); - + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -714,336 +1008,21 @@ _exit: return code; } -int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { +int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t code = 0; + int32_t lino = 0; char tfname[TSDB_FILENAME_LEN]; - char fname[TSDB_FILENAME_LEN]; - snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); - snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, - pTsdb->path, TD_DIRSEP); + tsdbGetCurrentFName(pTsdb, NULL, tfname); // gnrt CURRENT.t - code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname); - if (code) goto _err; + code = tsdbSaveFSToFile(pFSNew, tfname); + TSDB_CHECK_CODE(code, lino, _exit); - // rename - code = taosRenameFile(tfname, fname); +_exit: if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _err; + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } - - return code; - -_err: - tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { - int32_t code = 0; - int32_t nRef; - char fname[TSDB_FILENAME_LEN]; - - // del - if (pFSNew->pDelFile) { - SDelFile *pDelFile = pTsdb->fs.pDelFile; - - if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) { - pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pTsdb->fs.pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - *pTsdb->fs.pDelFile = *pFSNew->pDelFile; - pTsdb->fs.pDelFile->nRef = 1; - - if (pDelFile) { - nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1); - if (nRef == 0) { - tsdbDelFileName(pTsdb, pDelFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pDelFile); - } - } - } - } else { - ASSERT(pTsdb->fs.pDelFile == NULL); - } - - // data - int32_t iOld = 0; - int32_t iNew = 0; - while (true) { - int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); - int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet); - SDFileSet fSet; - int8_t sameDisk; - - if (iOld >= nOld && iNew >= nNew) break; - - SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; - SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL; - - if (pSetOld && pSetNew) { - if (pSetOld->fid == pSetNew->fid) { - goto _merge_old_and_new; - } else if (pSetOld->fid < pSetNew->fid) { - goto _remove_old; - } else { - goto _add_new; - } - } else if (pSetOld) { - goto _remove_old; - } else { - goto _add_new; - } - - _merge_old_and_new: - sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); - - // head - fSet.pHeadF = pSetOld->pHeadF; - if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) { - pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); - if (pSetOld->pHeadF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->pHeadF = *pSetNew->pHeadF; - pSetOld->pHeadF->nRef = 1; - - nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1); - if (nRef == 0) { - tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(fSet.pHeadF); - } - } else { - ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size); - ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset); - } - - // data - fSet.pDataF = pSetOld->pDataF; - if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) { - pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); - if (pSetOld->pDataF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->pDataF = *pSetNew->pDataF; - pSetOld->pDataF->nRef = 1; - - nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1); - if (nRef == 0) { - tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname); - taosRemoveFile(fname); - taosMemoryFree(fSet.pDataF); - } - } else { - ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size); - pSetOld->pDataF->size = pSetNew->pDataF->size; - } - - // sma - fSet.pSmaF = pSetOld->pSmaF; - if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) { - pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); - if (pSetOld->pSmaF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->pSmaF = *pSetNew->pSmaF; - pSetOld->pSmaF->nRef = 1; - - nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1); - if (nRef == 0) { - tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(fSet.pSmaF); - } - } else { - ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size); - pSetOld->pSmaF->size = pSetNew->pSmaF->size; - } - - // stt - if (sameDisk) { - if (pSetNew->nSttF > pSetOld->nSttF) { - ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1); - pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; - pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; - pSetOld->nSttF++; - } else if (pSetNew->nSttF < pSetOld->nSttF) { - ASSERT(pSetNew->nSttF == 1); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - SSttFile *pSttFile = pSetOld->aSttF[iStt]; - nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pSttFile); - } - pSetOld->aSttF[iStt] = NULL; - } - - pSetOld->nSttF = 1; - pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; - pSetOld->aSttF[0]->nRef = 1; - } else { - for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { - SSttFile *pSttFile = pSetOld->aSttF[iStt]; - nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pSttFile); - } - - pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[iStt] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; - pSetOld->aSttF[iStt]->nRef = 1; - } else { - ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); - ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); - } - } - } - } else { - ASSERT(pSetOld->nSttF == pSetNew->nSttF); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - SSttFile *pSttFile = pSetOld->aSttF[iStt]; - nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); - taosRemoveFile(fname); - taosMemoryFree(pSttFile); - } - - pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (pSetOld->aSttF[iStt] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; - pSetOld->aSttF[iStt]->nRef = 1; - } - } - - if (!sameDisk) { - pSetOld->diskId = pSetNew->diskId; - } - - iOld++; - iNew++; - continue; - - _remove_old: - nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1); - if (nRef == 0) { - tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname); - (void)taosRemoveFile(fname); - taosMemoryFree(pSetOld->pHeadF); - } - - nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1); - if (nRef == 0) { - tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname); - taosRemoveFile(fname); - taosMemoryFree(pSetOld->pDataF); - } - - nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1); - if (nRef == 0) { - tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname); - taosRemoveFile(fname); - taosMemoryFree(pSetOld->pSmaF); - } - - for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { - nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1); - if (nRef == 0) { - tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname); - taosRemoveFile(fname); - taosMemoryFree(pSetOld->aSttF[iStt]); - } - } - - taosArrayRemove(pTsdb->fs.aDFileSet, iOld); - continue; - - _add_new: - fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1}; - - // head - fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); - if (fSet.pHeadF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.pHeadF = *pSetNew->pHeadF; - fSet.pHeadF->nRef = 1; - - // data - fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); - if (fSet.pDataF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.pDataF = *pSetNew->pDataF; - fSet.pDataF->nRef = 1; - - // sma - fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); - if (fSet.pSmaF == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.pSmaF = *pSetNew->pSmaF; - fSet.pSmaF->nRef = 1; - - // stt - ASSERT(pSetNew->nSttF == 1); - fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); - if (fSet.aSttF[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - *fSet.aSttF[0] = *pSetNew->aSttF[0]; - fSet.aSttF[0]->nRef = 1; - - if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - iOld++; - iNew++; - continue; - } - - return code; - -_err: - tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 5197823490..efc74b68ba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { * @param dir * @return int */ -int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg) { +int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback) { STsdb *pTsdb = NULL; int slen = 0; @@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee } // open tsdb - if (tsdbFSOpen(pTsdb) < 0) { + if (tsdbFSOpen(pTsdb, rollback) < 0) { goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 2c68c57176..259c30d591 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { } // do change fs - code = tsdbFSCommit1(pTsdb, &fs); + code = tsdbFSPrepareCommit(pTsdb, &fs); if (code) goto _err; taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pTsdb, &fs); + code = tsdbFSCommit(pTsdb); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 74c704598d..cdbbe6333d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; - code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); + code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); if (code) goto _err; // lock taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); + code = tsdbFSCommit(pWriter->pTsdb); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 07c4c32955..24b678b5eb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -20,8 +20,6 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); -static int vnodeStartCommit(SVnode *pVnode); -static int vnodeEndCommit(SVnode *pVnode); static int vnodeCommitImpl(void *arg); static void vnodeWaitCommit(SVnode *pVnode); @@ -215,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) { } int vnodeCommit(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN]; @@ -234,15 +234,13 @@ int vnodeCommit(SVnode *pVnode) { snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } if (vnodeSaveInfo(dir, &info) < 0) { - vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - if(smaAsyncPreCommit(pVnode->pSma) < 0){ - vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); + if (smaPreCommit(pVnode->pSma) < 0) { + vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } @@ -251,64 +249,73 @@ int vnodeCommit(SVnode *pVnode) { // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { - vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _exit); } if (VND_IS_RSMA(pVnode)) { - if (smaAsyncCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - - if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { - vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - if (tsdbCommit(VND_RSMA1(pVnode)) < 0) { - vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - if (tsdbCommit(VND_RSMA2(pVnode)) < 0) { - vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); + if (smaCommit(pVnode->pSma) < 0) { + vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } } else { - if (tsdbCommit(pVnode->pTsdb) < 0) { - vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } + code = tsdbCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); } if (tqCommit(pVnode->pTq) < 0) { - vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _exit); } - // walCommit (TODO) // commit info if (vnodeCommitInfo(dir, &info) < 0) { - vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + tsdbFinishCommit(pVnode->pTsdb); + + if (metaFinishCommit(pVnode->pMeta) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } pVnode->state.committed = info.state.committed; - // postCommit - // smaSyncPostCommit(pVnode->pSma); - if (smaAsyncPostCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); + if (smaPostCommit(pVnode->pSma) < 0) { + vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } // apply the commit (TODO) walEndSnapshot(pVnode->pWal); - vInfo("vgId:%d, commit end", TD_VID(pVnode)); - +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + vInfo("vgId:%d, commit end", TD_VID(pVnode)); + } return 0; } +bool vnodeShouldRollback(SVnode *pVnode) { + char tFName[TSDB_FILENAME_LEN] = {0}; + snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, + VND_INFO_FNAME_TMP); + + return taosCheckExistFile(tFName); +} + +void vnodeRollback(SVnode *pVnode) { + char tFName[TSDB_FILENAME_LEN] = {0}; + snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, + VND_INFO_FNAME_TMP); + + (void)taosRemoveFile(tFName); +} + static int vnodeCommitImpl(void *arg) { SVnode *pVnode = (SVnode *)arg; @@ -321,16 +328,6 @@ static int vnodeCommitImpl(void *arg) { return 0; } -static int vnodeStartCommit(SVnode *pVnode) { - // TODO - return 0; -} - -static int vnodeEndCommit(SVnode *pVnode) { - // TODO - return 0; -} - static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); } static int vnodeEncodeState(const void *pObj, SJson *pJson) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 001bb5f7f2..3dbd93bb27 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); + int8_t rollback = vnodeShouldRollback(pVnode); + // open buffer pool if (vnodeOpenBufPool(pVnode) < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open meta - if (metaOpen(pVnode, &pVnode->pMeta) < 0) { + if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) { vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open tsdb - if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { + if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open sma - if (smaOpen(pVnode)) { + if (smaOpen(pVnode, rollback)) { vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } @@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } -#if !VNODE_AS_LIB // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } -#endif // vnode begin if (vnodeBegin(pVnode) < 0) { @@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } -#if !VNODE_AS_LIB // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } -#endif + + if (rollback) { + vnodeRollback(pVnode); + } return pVnode; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 47450328be..1aa3d55efc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1095,7 +1095,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); -int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, +int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 55978855d1..02ab475cdf 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -150,9 +150,15 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM); - if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) { + if (NULL == pBuf) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } + + if (!allocBuf(pDeleter, pInput, pBuf)) { + taosFreeQitem(pBuf); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + toDataCacheEntry(pDeleter, pInput, pBuf); taosWriteQitem(pDeleter->pDataBlocks, pBuf); *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index ed455e5e75..d996634d8e 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -324,6 +324,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); if (code) { destroyDataSinker((SDataSinkHandle*)inserter); + taosMemoryFree(inserter); return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 641bcb0c5b..0e74b56882 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4312,8 +4312,9 @@ int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, in return TSDB_CODE_SUCCESS; } -int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, +int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; @@ -4338,6 +4339,31 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta if (pBlock->info.groupId == 0) { pBlock->info.groupId = pKey->groupId; + + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; + } + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; + } + } else { + ASSERT(0); + } + } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pKey->groupId) { @@ -4383,4 +4409,4 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta } blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e199cf8406..599e46c5fc 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1492,6 +1492,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { case STREAM_NORMAL: case STREAM_INVALID: { doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock); + memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = 0; } break; default: @@ -1502,6 +1503,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { doStreamFillImpl(pOperator); doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL); + memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 76fddc3982..add5f44847 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4044,7 +4044,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup // clear the existed group id pBlock->info.groupId = 0; - buildSessionResultDataBlock(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); + buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { @@ -4088,6 +4088,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ } + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); @@ -4617,6 +4623,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single state recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5ad5aa549d..149b1a8447 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); } + } else { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 53e49a6ba5..02b74c260b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -27,7 +27,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF char* streamPath = taosMemoryCalloc(1, len); sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); - if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { taosMemoryFree(streamPath); goto _err; } @@ -36,11 +36,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosMulModeMkDir(streamPath, 0755); taosMemoryFree(streamPath); - if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } - if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { goto _err; } @@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { tdbFree(pKey); tdbFree(pVal); + tdbTbcClose(pCur); return -1; } if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); + tdbTbcClose(pCur); return -1; } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6991f00bb7..c3c58e125a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -99,26 +99,26 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int memset(statePath, 0, 300); tstrncpy(statePath, path, 300); } - if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { + if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) { goto _err; } // open state storage backend - if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) { + if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) { goto _err; } // todo refactor - if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { + if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) { goto _err; } if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db, - &pState->pSessionStateDb) < 0) { + &pState->pSessionStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) { + if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) { goto _err; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ce5917de29..5304938195 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { for (int32_t i = 0; i < epSz; i++) { SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); if (pInfo == NULL) return -1; - if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1; + if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { + taosMemoryFreeClear(pInfo); + return -1; + } taosArrayPush(pTask->childEpInfo, &pInfo); } diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index c90d4e3c03..3d92d164b4 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -31,15 +31,17 @@ typedef struct STBC TBC; typedef struct STxn TXN; // TDB -int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb); +int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback); int32_t tdbClose(TDB *pDb); int32_t tdbBegin(TDB *pDb, TXN *pTxn); int32_t tdbCommit(TDB *pDb, TXN *pTxn); +int32_t tdbPostCommit(TDB *pDb, TXN *pTxn); int32_t tdbAbort(TDB *pDb, TXN *pTxn); int32_t tdbAlter(TDB *pDb, int pages); // TTB -int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb); +int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb, + int8_t rollback); int32_t tdbTbClose(TTB *pTb); int32_t tdbTbDrop(TTB *pTb); int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index a2803d3ecf..a2002a3ac4 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -15,7 +15,7 @@ #include "tdbInt.h" -int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { +int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, int8_t rollback) { TDB *pDb; int dsize; int zsize; @@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { #ifdef USE_MAINDB // open main db - ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb); + ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb, rollback); if (ret < 0) { return -1; } @@ -129,6 +129,21 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { return 0; } +int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) { + SPager *pPager; + int ret; + + for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { + ret = tdbPagerPostCommit(pPager, pTxn); + if (ret < 0) { + tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); + return -1; + } + } + + return 0; +} + int32_t tdbAbort(TDB *pDb, TXN *pTxn) { SPager *pPager; int ret; diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 90332617cd..58f6cedc22 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -305,6 +305,18 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { return 0; } +int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { + if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pPager->inTran = 0; + + return 0; +} + // recovery dirty pages int tdbPagerAbort(SPager *pPager, TXN *pTxn) { SPage *pPage; @@ -657,3 +669,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { return 0; } + +int tdbPagerRollback(SPager *pPager) { + if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} diff --git a/source/libs/tdb/src/db/tdbTable.c b/source/libs/tdb/src/db/tdbTable.c index 008907ca77..73cc34a86f 100644 --- a/source/libs/tdb/src/db/tdbTable.c +++ b/source/libs/tdb/src/db/tdbTable.c @@ -24,7 +24,8 @@ struct STBC { SBTC btc; }; -int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) { +int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb, + int8_t rollback) { TTB *pTb; SPager *pPager; int ret; @@ -110,10 +111,14 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF return -1; } - ret = tdbPagerRestore(pPager, pTb->pBt); - if (ret < 0) { - tdbOsFree(pTb); - return -1; + if (rollback) { + tdbPagerRollback(pPager); + } else { + ret = tdbPagerRestore(pPager, pTb->pBt); + if (ret < 0) { + tdbOsFree(pTb); + return -1; + } } *ppTb = pTb; diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 68434a4319..b45747c972 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -190,12 +190,14 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt); int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn); +int tdbPagerPostCommit(SPager *pPager, TXN *pTxn); int tdbPagerAbort(SPager *pPager, TXN *pTxn); int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); int tdbPagerRestore(SPager *pPager, SBTree *pBt); +int tdbPagerRollback(SPager *pPager); // tdbPCache.c ==================================== #define TDB_PCACHE_PAGE \ diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp index d98c271edb..6ead7c0dd6 100644 --- a/source/libs/tdb/test/tdbExOVFLTest.cpp +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -140,7 +140,7 @@ static void generateBigVal(char *val, int valLen) { static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) { TDB *pEnv = NULL; - int ret = tdbOpen(envName, pageSize, pageNum, &pEnv); + int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0); if (ret) { pEnv = NULL; } @@ -162,8 +162,8 @@ static void insertOfp(void) { // open db TTB *pDb = NULL; tdb_cmpr_fn_t compFunc = tKeyCmpr; - // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); - ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb); + // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); // open the pool @@ -211,8 +211,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) { // open db TTB *pDb = NULL; tdb_cmpr_fn_t compFunc = tKeyCmpr; - // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); - int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb); + // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); // generate value payload @@ -253,7 +253,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) { // open db TTB *pDb = NULL; tdb_cmpr_fn_t compFunc = tKeyCmpr; - ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); // open the pool @@ -354,12 +354,12 @@ TEST(tdb_test, simple_insert1) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", pageSize, 64, &pEnv); + ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); { diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 6070052127..f3a301cf5b 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -130,12 +130,12 @@ TEST(tdb_test, DISABLED_simple_insert1) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 4096, 64, &pEnv); + ret = tdbOpen("tdb", 4096, 64, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); { @@ -250,12 +250,12 @@ TEST(tdb_test, DISABLED_simple_insert2) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 1024, 10, &pEnv); + ret = tdbOpen("tdb", 1024, 10, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tDefaultKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); { @@ -346,11 +346,11 @@ TEST(tdb_test, DISABLED_simple_delete1) { pPool = openPool(); // open env - ret = tdbOpen("tdb", 1024, 256, &pEnv); + ret = tdbOpen("tdb", 1024, 256, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // open database - ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); @@ -435,11 +435,11 @@ TEST(tdb_test, DISABLED_simple_upsert1) { taosRemoveDir("tdb"); // open env - ret = tdbOpen("tdb", 4096, 64, &pEnv); + ret = tdbOpen("tdb", 4096, 64, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // open database - ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); pPool = openPool(); @@ -497,12 +497,12 @@ TEST(tdb_test, multi_thread_query) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 4096, 10, &pEnv); + ret = tdbOpen("tdb", 4096, 10, &pEnv, 0); GTEST_ASSERT_EQ(ret, 0); // Create a database compFunc = tKeyCmpr; - ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); char key[64]; @@ -614,10 +614,10 @@ TEST(tdb_test, DISABLED_multi_thread1) { taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 512, 1, &pDb); + ret = tdbOpen("tdb", 512, 1, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); - ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb); + ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb, 0); GTEST_ASSERT_EQ(ret, 0); auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) { @@ -726,4 +726,4 @@ TEST(tdb_test, DISABLED_multi_thread1) { ret = tdbClose(pDb); GTEST_ASSERT_EQ(ret, 0); #endif -} \ No newline at end of file +} diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index d332d913ca..be862308f1 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile)); if (pFile == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; if (fd >= 0) close(fd); if (fp != NULL) fclose(fp); return NULL; diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index f524680331..467f26b362 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -26,19 +26,25 @@ static void *taosProcessSchedQueue(void *param); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) { + bool schedMalloced = false; + if (NULL == pSched) { pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); if (pSched == NULL) { uError("%s: no enough memory for pSched", label); return NULL; } + + schedMalloced = true; } pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize); if (pSched->queue == NULL) { uError("%s: no enough memory for queue", label); taosCleanUpScheduler(pSched); - taosMemoryFree(pSched); + if (schedMalloced) { + taosMemoryFree(pSched); + } return NULL; } @@ -46,6 +52,9 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab if (pSched->qthread == NULL) { uError("%s: no enough memory for qthread", label); taosCleanUpScheduler(pSched); + if (schedMalloced) { + taosMemoryFree(pSched); + } return NULL; } @@ -58,18 +67,27 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab if (taosThreadMutexInit(&pSched->queueMutex, NULL) < 0) { uError("init %s:queueMutex failed(%s)", label, strerror(errno)); taosCleanUpScheduler(pSched); + if (schedMalloced) { + taosMemoryFree(pSched); + } return NULL; } if (tsem_init(&pSched->emptySem, 0, (uint32_t)pSched->queueSize) != 0) { uError("init %s:empty semaphore failed(%s)", label, strerror(errno)); taosCleanUpScheduler(pSched); + if (schedMalloced) { + taosMemoryFree(pSched); + } return NULL; } if (tsem_init(&pSched->fullSem, 0, 0) != 0) { uError("init %s:full semaphore failed(%s)", label, strerror(errno)); taosCleanUpScheduler(pSched); + if (schedMalloced) { + taosMemoryFree(pSched); + } return NULL; } @@ -83,6 +101,9 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab if (code != 0) { uError("%s: failed to create rpc thread(%s)", label, strerror(errno)); taosCleanUpScheduler(pSched); + if (schedMalloced) { + taosMemoryFree(pSched); + } return NULL; } ++pSched->numOfThreads; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 286eb02336..8ad2deeba9 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -301,7 +301,7 @@ # --- sma ./test.sh -f tsim/sma/drop_sma.sim -# ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim +./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim # temp disable ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim diff --git a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim index 2ff01263a4..7aef9e0f86 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database d1 vgroups 1 +sql create database d1 keep 36500d vgroups 1 sql use d1 print =============== create super table, include column type for count/sum/min/max/first @@ -25,8 +25,8 @@ if $rows != 1 then endi print =============== insert data, mode1: one row one table in sql -sql insert into ct1 values(now+0s, 10, 2.0, 3.0) -sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) +sql insert into ct1 values('2022-10-19 09:55:45.682', 10, 2.0, 3.0) +sql insert into ct1 values('2022-10-19 09:55:46.682', 11, 2.1, 3.1)('2022-10-19 09:55:47.682', -12, -2.2, -3.2)('2022-10-19 09:55:48.682', -13, -2.3, -3.3) print =============== create sma index from super table @@ -34,7 +34,7 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in print $data00 $data01 $data02 $data03 print =============== trigger stream to execute sma aggr task and insert sma data into sma store -sql insert into ct1 values(now+5s, 20, 20.0, 30.0) +sql insert into ct1 values('2022-10-19 09:55:50.682', 20, 20.0, 30.0) #=================================================================== print =============== show streams ================================ diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index cc5836cbfd..39adfc9d53 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -232,7 +232,7 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 -# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py -N 5 -M 3 @@ -248,7 +248,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 5 - python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py -# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 +python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 5 -M 3