diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index de31a970df..adf244e32a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -43,6 +43,7 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) +#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE) typedef enum { TAOS_WAL_WRITE = 1, diff --git a/include/os/osEnv.h b/include/os/osEnv.h index 293d9d17f8..d4e94d6173 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -49,9 +49,15 @@ extern SDiskSpace tsTempSpace; void osDefaultInit(); void osUpdate(); void osCleanup(); + bool osLogSpaceAvailable(); bool osDataSpaceAvailable(); bool osTempSpaceAvailable(); + +bool osLogSpaceSufficient(); +bool osDataSpaceSufficient(); +bool osTempSpaceSufficient(); + void osSetTimezone(const char *timezone); void osSetSystemLocale(const char *inLocale, const char *inCharSet); @@ -59,4 +65,4 @@ void osSetSystemLocale(const char *inLocale, const char *inCharSet); } #endif -#endif /*_TD_OS_ENV_H_*/ \ No newline at end of file +#endif /*_TD_OS_ENV_H_*/ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6bc0e0e7dd..bbddb539c6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -450,6 +450,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003) #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) #define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005) +#define TSDB_CODE_WAL_CHKSUM_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x1006) // tfs #define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 7d8026f314..6126817ece 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -146,7 +146,7 @@ void taos_close(TAOS *taos) { int taos_errno(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ_META(res)) { - if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; + if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY; return terrno; } @@ -154,13 +154,12 @@ int taos_errno(TAOS_RES *res) { return 0; } - return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_RPC_NETWORK_UNAVAIL - : ((SRequestObj *)res)->code; + return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_QRY_NOT_READY : ((SRequestObj *)res)->code; } const char *taos_errstr(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ_META(res)) { - if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; + if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY; return (const char *)tstrerror(terrno); } @@ -172,7 +171,7 @@ const char *taos_errstr(TAOS_RES *res) { if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) { return pRequest->msgBuf; } else { - return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_RPC_NETWORK_UNAVAIL) + return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_QRY_NOT_READY) : (const char *)tstrerror(pRequest->code); } } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index d07ec7abb0..076826ebc2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -51,26 +51,14 @@ static int32_t dmInitMonitor() { static bool dmCheckDiskSpace() { osUpdate(); - if (!osDataSpaceAvailable()) { - dError("free disk size: %f GB, too little, require %f GB at least at least , quit", - (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, - (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); - terrno = TSDB_CODE_NO_AVAIL_DISK; - return false; + if (!osDataSpaceSufficient()) { + dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); } - if (!osLogSpaceAvailable()) { - dError("free disk size: %f GB, too little, require %f GB at least at least, quit", - (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, - (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0); - terrno = TSDB_CODE_NO_AVAIL_DISK; - return false; + if (!osLogSpaceSufficient()) { + dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0); } - if (!osTempSpaceAvailable()) { - dError("free disk size: %f GB, too little, require %f GB at least at least, quit", - (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, - (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); - terrno = TSDB_CODE_NO_AVAIL_DISK; - return false; + if (!osTempSpaceSufficient()) { + dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); } return true; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index a52ca1bd42..5fb23b045c 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -232,7 +232,7 @@ static int32_t mndInitWal(SMnode *pMnode) { pMnode->pWal = walOpen(path, &cfg); if (pMnode->pWal == NULL) { - mError("failed to open wal since %s", terrstr()); + mError("failed to open wal since %s. wal:%s", terrstr(), path); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index aca9042261..a4d993bc6c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1045,7 +1045,9 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - ASSERT(eno == 0); + ASSERT(eno == 0 && + "tsdbCommit failure" + "Restart taosd"); code = tsdbFSCommit1(pTsdb, &pCommitter->fs); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 450032d4f2..0189ced3c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -160,6 +160,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid goto _err; } + ASSERT(pPool != NULL); // do delete SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData)); if (pDelData == NULL) { @@ -353,6 +354,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel; + ASSERT(pPool != NULL); pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2); if (pTbData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -492,6 +494,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN // node level = tsdbMemSkipListRandLevel(&pTbData->sl); + ASSERT(pPool != NULL); pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); if (pNode == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 730bd264a7..682f2b4225 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -112,6 +112,8 @@ void vnodeBufPoolReset(SVBufPool *pPool) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { SVBufPoolNode *pNode; void *p = NULL; + ASSERT(pPool != NULL); + taosThreadSpinLock(&pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 07d9b96261..07c4c32955 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -73,7 +73,7 @@ int vnodeBegin(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) { if (pVnode->inUse) { - return pVnode->inUse->size > pVnode->inUse->node.size; + return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); } return false; } @@ -89,6 +89,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { data = NULL; if (vnodeEncodeInfo(pInfo, &data) < 0) { + vError("failed to encode json info."); return -1; } @@ -101,7 +102,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { } if (taosWriteFile(pFile, data, strlen(data)) < 0) { - vError("failed to write info file:%s data:%s", fname, terrstr()); + vError("failed to write info file:%s error:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -233,15 +234,15 @@ int vnodeCommit(SVnode *pVnode) { snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } if (vnodeSaveInfo(dir, &info) < 0) { - ASSERT(0); + vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); // preCommit // smaSyncPreCommit(pVnode->pSma); - if (smaAsyncPreCommit(pVnode->pSma) < 0) { - ASSERT(0); + if(smaAsyncPreCommit(pVnode->pSma) < 0){ + vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } @@ -250,44 +251,44 @@ int vnodeCommit(SVnode *pVnode) { // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } if (VND_IS_RSMA(pVnode)) { if (smaAsyncCommit(pVnode->pSma) < 0) { - ASSERT(0); + vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } if (tsdbCommit(VND_RSMA1(pVnode)) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } if (tsdbCommit(VND_RSMA2(pVnode)) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } } else { if (tsdbCommit(pVnode->pTsdb) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } } if (tqCommit(pVnode->pTq) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } // walCommit (TODO) // commit info if (vnodeCommitInfo(dir, &info) < 0) { - ASSERT(0); + vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } @@ -296,7 +297,7 @@ int vnodeCommit(SVnode *pVnode) { // postCommit // smaSyncPostCommit(pVnode->pSma); if (smaAsyncPostCommit(pVnode->pSma) < 0) { - ASSERT(0); + vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 298653d3ed..001bb5f7f2 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -140,7 +140,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); if (pVnode->pWal == NULL) { - vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); + vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b531cf7574..0f93e650c6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -166,6 +166,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t len; int32_t ret; + if (!pVnode->inUse) { + terrno = TSDB_CODE_VND_NOT_SYNCED; + vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr()); + return -1; + } + vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); @@ -286,10 +292,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp _do_commit: vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); // commit current change - vnodeCommit(pVnode); + if (vnodeCommit(pVnode) < 0) { + vError("vgId:%d, failed to commit vnode since %s.", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } // start a new one - vnodeBegin(pVnode); + if (vnodeBegin(pVnode) < 0) { + vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } } return 0; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e000ba8bf8..4a35a15d3e 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -579,6 +579,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -710,6 +711,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -859,6 +861,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -974,6 +977,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index b604d25816..511113352e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -76,7 +76,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); } else { pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); - ASSERT(pEntry != NULL); + if (pEntry == NULL) { + sError("failed to get entry since %s. index:%lld", tstrerror(terrno), index); + return; + } } // cannot commit, even if quorum agree. need check term! if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { @@ -127,7 +130,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // execute fsm if (pSyncNode->pFsm != NULL) { int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); - ASSERT(code == 0); + if (code != 0) { + wError("failed to commit sync node since %s", tstrerror(terrno)); + } } } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9343b45dc7..907eb21f4c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2657,7 +2657,10 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); - ASSERT(code == 0); + if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); + return -1; + } syncNodeReplicate(ths, false); } @@ -2733,7 +2736,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { // del resp mgr, call FpCommitCb - ASSERT(0); + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -2797,8 +2800,8 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); // del resp mgr, call FpCommitCb - ASSERT(0); return -1; } @@ -3050,7 +3053,10 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); } else { code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); - ASSERT(code == 0); + if (code != 0) { + sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i); + return -1; + } ASSERT(pEntry != NULL); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 496c8419de..c3dad104d1 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -234,8 +234,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pEntry->index, err, err, errStr, sysErr, sysErrStr); syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); return -1; } pEntry->index = index; @@ -414,13 +412,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { return 0; } -SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { +SSyncRaftEntry* logStoreGetEntryWithoutLock(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { - taosThreadMutexLock(&(pData->mutex)); - // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReader* pWalHandle = pData->pWalHandle; ASSERT(pWalHandle != NULL); @@ -444,7 +440,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { } } while (0); - ASSERT(0); + sError("failed to read ver since %s. index:%lld", tstrerror(terrno), index); + return NULL; } SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); @@ -465,7 +462,6 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { terrno = saveErr; */ - taosThreadMutexUnlock(&(pData->mutex)); return pEntry; } else { @@ -473,6 +469,16 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { } } +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SSyncRaftEntry *pEntry = NULL; + + taosThreadMutexLock(&pData->mutex); + pEntry = logStoreGetEntryWithoutLock(pLogStore, index); + taosThreadMutexUnlock(&pData->mutex); + return pEntry; +} + int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 7afeb31d4c..13b7e0ac0a 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -121,6 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg ret = tdbPagerWrite(pPager, pPage); if (ret < 0) { + tdbError("failed to write page since %s", terrstr()); return -1; } @@ -483,9 +484,8 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN ret = tdbPagerWrite(pPager, pChild); if (ret < 0) { - // TODO - ASSERT(0); - return 0; + tdbError("failed to write page since %s", terrstr()); + return -1; } // Copy the root page content to the child page @@ -556,8 +556,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ret = tdbPagerWrite(pBt->pPager, pOlds[i]); if (ret < 0) { - // TODO - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } } @@ -583,8 +582,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ret = tdbPagerWrite(pBt->pPager, pParent); if (ret < 0) { - // TODO - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } @@ -719,8 +717,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ret = tdbPagerWrite(pBt->pPager, pNews[iNew]); if (ret < 0) { - // TODO - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } } @@ -937,7 +934,7 @@ static int tdbFetchOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) // mark dirty ret = tdbPagerWrite(pBt->pPager, *ppOfp); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } @@ -1942,7 +1939,7 @@ int tdbBtcDelete(SBTC *pBtc) { // drop the cell on the leaf ret = tdbPagerWrite(pPager, pBtc->pPage); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } @@ -1964,7 +1961,7 @@ int tdbBtcDelete(SBTC *pBtc) { if (idx < nCells) { ret = tdbPagerWrite(pPager, pPage); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } @@ -2029,7 +2026,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int // mark dirty ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page since %s", terrstr()); return -1; } diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index ea16e80562..6c01348bc2 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -106,7 +106,7 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerBegin(pPager, pTxn); if (ret < 0) { - ASSERT(0); + tdbError("failed to begin pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); return -1; } } @@ -121,7 +121,7 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerCommit(pPager, pTxn); if (ret < 0) { - ASSERT(0); + tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); return -1; } } @@ -136,7 +136,7 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerAbort(pPager, pTxn); if (ret < 0) { - ASSERT(0); + tdbError("failed to abort pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId); return -1; } } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 58be8642d7..dd2416e5b8 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -156,6 +156,7 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) { ret = tdbPagerWrite(pPager, pPage); if (ret < 0) { + tdbError("failed to write page since %s", terrstr()); return -1; } @@ -210,7 +211,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { ret = tdbPagerWritePageToJournal(pPager, pPage); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page to journal since %s", tstrerror(terrno)); return -1; } } @@ -226,6 +227,8 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { // Open the journal pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); if (pPager->jfd < 0) { + tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -243,9 +246,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { // sync the journal file ret = tdbOsFSync(pPager->jfd); if (ret < 0) { - // TODO - ASSERT(0); - return 0; + tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } // loop to write the dirty pages to file @@ -255,7 +258,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { pPage = (SPage *)pNode; ret = tdbPagerWritePageToDB(pPager, pPage); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page to db since %s", tstrerror(terrno)); return -1; } } @@ -277,11 +280,25 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { tRBTreeCreate(&pPager->rbt, pageCmpFn); // sync the db file - tdbOsFSync(pPager->fd); + if (tdbOsFSync(pPager->fd) < 0) { + tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } // remove the journal file - tdbOsClose(pPager->jfd); - tdbOsRemove(pPager->jFileName); + if (tdbOsClose(pPager->jfd) < 0) { + tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pPager->inTran = 0; return 0; @@ -297,14 +314,14 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { // 0, sync the journal file ret = tdbOsFSync(pPager->jfd); if (ret < 0) { - // TODO - ASSERT(0); - return 0; + tdbError("failed to fsync jfd due to %s. file:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); if (jfd == NULL) { - return 0; + return -1; } ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); @@ -516,11 +533,15 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno)); if (ret < 0) { + tdbError("failed to write pgno due to %s. file:%s, pgno:%u", strerror(errno), pPager->jFileName, pgno); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize); if (ret < 0) { + tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, pPage->pageSize); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -540,13 +561,15 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { - ASSERT(0); + tdbError("failed to lseek due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize); if (ret < 0) { - ASSERT(0); + tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName, pPage->pageSize); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -590,23 +613,38 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { i64 offset = pPager->pageSize * (pgno - 1); if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { - ASSERT(0); + tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize); if (ret < 0) { - ASSERT(0); + tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, pPager->pageSize); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } } - tdbOsFSync(pPager->fd); + if (tdbOsFSync(pPager->fd) < 0) { + tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } tdbOsFree(pageBuf); - tdbOsClose(jfd); - tdbOsRemove(pPager->jFileName); + if (tdbOsClose(jfd) < 0) { + tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } return 0; } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 144cce2cd0..279f4dc656 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -35,113 +35,251 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } -static FORCE_INLINE void walBuildMetaName(SWal* pWal, int metaVer, char* buf) { - sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } -static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { - int32_t sz = taosArrayGetSize(pWal->fileInfoSet); - ASSERT(sz > 0); -#if 0 - for (int i = 0; i < sz; i++) { - SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); - } -#endif +static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) { + return sprintf(buf, "%s/meta-ver.tmp", pWal->path); +} - SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1); +static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + terrno = TSDB_CODE_SUCCESS; + ASSERT(fileIdx >= 0 && fileIdx < sz); + + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); char fnameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); + walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); - int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize); - pLastFileInfo->fileSize = fileSize; - TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); + TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE); if (pFile == NULL) { + wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + // ensure size as non-negative + pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); + uint64_t magic = WAL_MAGIC; + int64_t walCkHeadSz = sizeof(SWalCkHead); + int64_t end = fileSize; + int64_t offset = 0; + int32_t capacity = 0; + int32_t readSize = 0; + char* buf = NULL; + char* found = NULL; + bool firstTrial = pFileInfo->fileSize < fileSize; - char* buf = taosMemoryMalloc(readSize + 5); - if (buf == NULL) { - taosCloseFile(&pFile); - terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; - return -1; - } - - int64_t offset; - offset = taosLSeekFile(pFile, -readSize, SEEK_END); - if (readSize != taosReadFile(pFile, buf, readSize)) { - taosMemoryFree(buf); - taosCloseFile(&pFile); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - char* found = NULL; + // search for the valid last WAL entry, e.g. block by block while (1) { + offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); + ASSERT(offset <= end); + readSize = end - offset; + capacity = readSize + sizeof(magic); + + int64_t limit = WAL_RECOV_SIZE_LIMIT; + if (limit < readSize) { + wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 + ", end:%" PRId64 ", file:%s", + pWal->cfg.vgId, limit, offset, end, fnameStr); + terrno = TSDB_CODE_WAL_SIZE_LIMIT; + goto _err; + } + + void* ptr = taosMemoryRealloc(buf, capacity); + if (ptr == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + goto _err; + } + buf = ptr; + + int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET); + if (ret < 0) { + wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), offset); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (readSize != taosReadFile(pFile, buf, readSize)) { + wError("vgId:%d, failed to read file due to %s. readSize:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno), + readSize, fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + char* candidate = NULL; char* haystack = buf; - char* candidate; - while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { - // read and validate - SWalCkHead* logContent = (SWalCkHead*)candidate; - if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { - found = candidate; + + while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic))) != NULL) { + // validate head + int64_t len = readSize - (candidate - buf); + if (len < walCkHeadSz) { + break; } + SWalCkHead* logContent = (SWalCkHead*)candidate; + if (walValidHeadCksum(logContent) != 0) { + wError("vgId:%d, failed to validate checksum of wal entry header. offset:% %" PRId64 ", file:%s", + ((char*)(logContent)-buf), fnameStr); + haystack = candidate + 1; + if (firstTrial) { + break; + } else { + continue; + } + } + + // validate body + int64_t size = walCkHeadSz + logContent->head.bodyLen; + if (len < size) { + int64_t extraSize = size - len; + if (capacity < readSize + extraSize + sizeof(magic)) { + capacity += extraSize; + void* ptr = taosMemoryRealloc(buf, capacity); + if (ptr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + buf = ptr; + } + int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET); + if (ret < 0) { + wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), + offset); + terrno = TAOS_SYSTEM_ERROR(errno); + break; + } + if (extraSize != taosReadFile(pFile, buf + readSize, extraSize)) { + wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", extraSize:%" PRId64 ", file:%s", + pWal->cfg.vgId, strerror(errno), offset + readSize, extraSize, fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + break; + } + } + if (walValidBodyCksum(logContent) != 0) { + terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; + wError("vgId:%d, failed to validate checksum of wal entry body. offset:% %" PRId64 ", file:%s", + ((char*)(logContent)-buf), fnameStr); + haystack = candidate + 1; + if (firstTrial) { + break; + } else { + continue; + } + } + + // found one + found = candidate; haystack = candidate + 1; } + if (found || offset == 0) break; - offset = TMIN(0, offset - readSize + sizeof(uint64_t)); - int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET); - ASSERT(offset == offset2); - if (readSize != taosReadFile(pFile, buf, readSize)) { - taosMemoryFree(buf); - taosCloseFile(&pFile); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } -#if 0 - if (found == buf) { - SWalCkHead* logContent = (SWalCkHead*)found; - if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { - // file has to be deleted - taosMemoryFree(buf); - taosCloseFile(&pFile); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; - } - } -#endif + + // go backwards, e.g. by at most one WAL scan buf size + end = offset + walCkHeadSz - 1; + firstTrial = false; } - if (found == NULL) { - // file corrupted, no complete log - // TODO delete and search in previous files - /*ASSERT(0);*/ - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; + // determine end of last entry + SWalCkHead* lastEntry = (SWalCkHead*)found; + int64_t retVer = -1; + int64_t lastEntryBeginOffset = 0; + int64_t lastEntryEndOffset = 0; + + if (lastEntry == NULL) { + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + } else { + retVer = lastEntry->head.version; + lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf); + lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; } // truncate file - SWalCkHead* lastEntry = (SWalCkHead*)found; - int64_t retVer = lastEntry->head.version; - int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf); - int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; if (lastEntryEndOffset != fileSize) { - wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset, + wWarn("vgId:%d, repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize); - taosFtruncateFile(pFile, lastEntryEndOffset); - ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset; - pWal->totSize -= (fileSize - lastEntryEndOffset); + if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { + wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + if (taosFsyncFile(pFile) < 0) { + wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } } + pFileInfo->fileSize = lastEntryEndOffset; taosCloseFile(&pFile); taosMemoryFree(buf); - return retVer; + +_err: + taosCloseFile(&pFile); + taosMemoryFree(buf); + return -1; +} + +static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) { + int metaFileNum = taosArrayGetSize(metaLogList); + int actualFileNum = taosArrayGetSize(actualLogList); + int j = 0; + + // both of the lists in asc order + for (int i = 0; i < actualFileNum; i++) { + SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i); + while (j < metaFileNum) { + SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j); + ASSERT(pMetaInfo != NULL); + if (pMetaInfo->firstVer < pLogInfo->firstVer) { + j++; + } else if (pMetaInfo->firstVer == pLogInfo->firstVer) { + (*pLogInfo) = *pMetaInfo; + j++; + break; + } else { + break; + } + } + } + + taosArrayClear(metaLogList); + + for (int i = 0; i < actualFileNum; i++) { + SWalFileInfo* pFileInfo = taosArrayGet(actualLogList, i); + taosArrayPush(metaLogList, pFileInfo); + } +} + +void walAlignVersions(SWal* pWal) { + if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) { + wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId, + pWal->vers.firstVer, pWal->vers.snapshotVer); + pWal->vers.firstVer = pWal->vers.snapshotVer + 1; + } + if (pWal->vers.lastVer < pWal->vers.snapshotVer) { + wWarn("vgId:%d, lastVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId, + pWal->vers.lastVer, pWal->vers.snapshotVer); + pWal->vers.lastVer = pWal->vers.snapshotVer; + } + if (pWal->vers.commitVer < pWal->vers.snapshotVer) { + wWarn("vgId:%d, commitVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId, + pWal->vers.commitVer, pWal->vers.snapshotVer); + pWal->vers.commitVer = pWal->vers.snapshotVer; + } + if (pWal->vers.appliedVer < pWal->vers.snapshotVer) { + wWarn("vgId:%d, appliedVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId, + pWal->vers.appliedVer, pWal->vers.snapshotVer); + pWal->vers.appliedVer = pWal->vers.snapshotVer; + } + + pWal->vers.commitVer = TMIN(pWal->vers.lastVer, pWal->vers.commitVer); + pWal->vers.appliedVer = TMIN(pWal->vers.commitVer, pWal->vers.appliedVer); } int walCheckAndRepairMeta(SWal* pWal) { @@ -150,7 +288,6 @@ int walCheckAndRepairMeta(SWal* pWal) { const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; - bool fixed = false; regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); @@ -184,225 +321,237 @@ int walCheckAndRepairMeta(SWal* pWal) { taosArraySort(actualLog, compareWalFileInfo); - int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); - int actualFileNum = taosArrayGetSize(actualLog); + int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); + int actualFileNum = taosArrayGetSize(actualLog); + int64_t firstVerPrev = pWal->vers.firstVer; + int64_t lastVerPrev = pWal->vers.lastVer; + int64_t totSize = 0; + bool updateMeta = (metaFileNum != actualFileNum); -#if 0 - for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) { - SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, fileNo); + // rebuild meta of file info + walRebuildFileInfoSet(pWal->fileInfoSet, actualLog); + taosArrayDestroy(actualLog); + + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + ASSERT(sz == actualFileNum); + + // scan and determine the lastVer + int32_t fileIdx = sz; + + while (--fileIdx >= 0) { char fnameStr[WAL_FILE_LEN]; + int64_t fileSize = 0; + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); - int64_t fileSize = 0; - taosStatFile(fnameStr, &fileSize, NULL); - if (fileSize == 0) { + int32_t code = taosStatFile(fnameStr, &fileSize, NULL); + if (code < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("failed to stat file since %s. file:%s", terrstr(), fnameStr); + return -1; + } + + ASSERT(pFileInfo->firstVer >= 0); + + if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) { + totSize += pFileInfo->fileSize; + continue; + } + updateMeta = true; + + int64_t lastVer = walScanLogGetLastVer(pWal, fileIdx); + if (lastVer < 0) { + if (terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { + wError("failed to scan wal last ver since %s", terrstr()); + return -1; + } + ASSERT(pFileInfo->fileSize == 0); + // remove the empty wal log, and its idx taosRemoveFile(fnameStr); walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); taosRemoveFile(fnameStr); - taosArrayPop(pLogInfoArray); - } else { - break; + // remove its meta entry + taosArrayRemove(pWal->fileInfoSet, fileIdx); + continue; } + + // update lastVer + pFileInfo->lastVer = lastVer; + totSize += pFileInfo->fileSize; } - actualFileNum = taosArrayGetSize(pLogInfoArray); -#endif - - { - int32_t i = 0, j = 0; - while (i < actualFileNum && j < metaFileNum) { - SWalFileInfo* pActualFile = taosArrayGet(actualLog, i); - SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j); - if (pActualFile->firstVer < pMetaFile->firstVer) { - char fNameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pActualFile->firstVer, fNameStr); - taosRemoveFile(fNameStr); - walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); - taosRemoveFile(fNameStr); - i++; - } else if (pActualFile->firstVer > pMetaFile->firstVer) { - taosArrayRemove(pWal->fileInfoSet, j); - metaFileNum--; - } else { - i++; - j++; - } - } - if (i == actualFileNum && j == metaFileNum) { - if (j > 0) { - SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1); - int64_t fsize = 0; - char fNameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pLastInfo->firstVer, fNameStr); - taosStatFile(fNameStr, &fsize, NULL); - if (pLastInfo->fileSize != fsize) { - fixed = true; - pLastInfo->fileSize = fsize; - pLastInfo->lastVer = walScanLogGetLastVer(pWal); - } - } - } else { - fixed = true; - while (i < actualFileNum) { - SWalFileInfo* pActualFile = taosArrayGet(actualLog, i); - char fNameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pActualFile->firstVer, fNameStr); - taosStatFile(fNameStr, &pActualFile->fileSize, NULL); - - if (pActualFile->fileSize == 0) { - ASSERT(i == actualFileNum - 1); - taosRemoveFile(fNameStr); - - walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); - taosRemoveFile(fNameStr); - break; - } - - if (i < actualFileNum - 1) { - pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1; - taosArrayPush(pWal->fileInfoSet, pActualFile); - i++; - } else { - pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile); - pActualFile->lastVer = walScanLogGetLastVer(pWal); - if (pActualFile->lastVer == -1) { - taosRemoveFile(fNameStr); - - walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); - taosRemoveFile(fNameStr); - taosArrayPop(pWal->fileInfoSet); - } - break; - } - } - } - } - -#if 0 - if (metaFileNum > actualFileNum) { - taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); - } else if (metaFileNum < actualFileNum) { - for (int i = metaFileNum; i < actualFileNum; i++) { - SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i); - taosArrayPush(pWal->fileInfoSet, pFileInfo); - } - } -#endif - - taosArrayDestroy(actualLog); - + // reset vers info and so on actualFileNum = taosArrayGetSize(pWal->fileInfoSet); pWal->writeCur = actualFileNum - 1; - + pWal->totSize = totSize; + pWal->vers.lastVer = -1; if (actualFileNum > 0) { - int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer; - if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) { - fixed = true; - pWal->vers.lastVer = fLastVer; - } - int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; - if (fFirstVer != pWal->vers.firstVer) { - fixed = true; - pWal->vers.firstVer = fFirstVer; - } + pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer; + } + (void)walAlignVersions(pWal); + + // update meta file + if (updateMeta) { + (void)walSaveMeta(pWal); + } + return 0; +} + +int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) { + if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - if (fixed) { - walSaveMeta(pWal); + if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (walValidHeadCksum(pCkHead) != 0) { + terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; + return -1; } return 0; } -int walCheckAndRepairIdx(SWal* pWal) { +int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); - for (int32_t i = 0; i < sz; i++) { - SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); + ASSERT(fileIdx >= 0 && fileIdx < sz); + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + char fnameStr[WAL_FILE_LEN]; + walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); + char fLogNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr); + int64_t fileSize = 0; - char fnameStr[WAL_FILE_LEN]; - walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); - int64_t fsize; - TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE); - if (pIdxFile == NULL) { - ASSERT(0); + if (taosStatFile(fnameStr, &fileSize, NULL) < 0 && errno != ENOENT) { + wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + ASSERT(pFileInfo->fileSize > 0 && pFileInfo->firstVer >= 0 && pFileInfo->lastVer >= pFileInfo->firstVer); + if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) { + return 0; + } + + // start to repair + int64_t offset = fileSize - fileSize % sizeof(SWalIdxEntry); + TdFilePtr pLogFile = NULL; + TdFilePtr pIdxFile = NULL; + SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer - 1, .offset = -sizeof(SWalCkHead)}; + SWalCkHead ckHead; + memset(&ckHead, 0, sizeof(ckHead)); + ckHead.head.version = idxEntry.ver; + + pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE); + if (pIdxFile == NULL) { + wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ); + if (pLogFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr()); + goto _err; + } + + // determine the last valid entry end, i.e. offset + while ((offset -= sizeof(SWalIdxEntry)) >= 0) { + if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) { + wError("vgId:%d, failed to seek file due to %s. offset:" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno), + offset, fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr()); - return -1; + goto _err; } - taosFStatFile(pIdxFile, &fsize, NULL); - if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) { - taosCloseFile(&pIdxFile); + if (taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { + wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno), + offset, fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (idxEntry.ver > pFileInfo->lastVer) { continue; } - int32_t left = fsize % sizeof(SWalIdxEntry); - int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END); - if (left != 0) { - taosFtruncateFile(pIdxFile, offset); - wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr, - offset, fsize); - } - offset -= sizeof(SWalIdxEntry); - - SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer}; - while (1) { - if (offset < 0) { - taosLSeekFile(pIdxFile, 0, SEEK_SET); - taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); - break; - } - if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d cannot seek offset %ld when repair idx since %s", pWal->cfg.vgId, offset, terrstr()); - } - int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); - if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) { - taosFtruncateFile(pIdxFile, offset); - wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld", - pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset); - offset -= sizeof(SWalIdxEntry); - } else { - break; - } + if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) { + wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "", + pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver); + continue; } - if (idxEntry.ver < pFileInfo->lastVer) { - char fLogNameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr); - TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ); - if (pLogFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr()); - return -1; - } - while (idxEntry.ver < pFileInfo->lastVer) { - if (taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET) == -1) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot seek file %s at %ld, since %s", pWal->cfg.vgId, fLogNameStr, idxEntry.offset, - terrstr()); - return -1; - } - SWalCkHead ckHead; - taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead)); - if (idxEntry.ver != ckHead.head.version) { - // todo truncate this idx also - taosCloseFile(&pLogFile); - wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId, - idxEntry.offset, idxEntry.ver, ckHead.head.version); - return -1; - } - idxEntry.ver = ckHead.head.version + 1; - idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen; - wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset); - taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); - } - taosCloseFile(&pLogFile); + if (idxEntry.ver == ckHead.head.version) { + break; + } + } + offset += sizeof(SWalIdxEntry); + + // ftruncate idx file + if (offset < fileSize) { + if (taosFtruncateFile(pIdxFile, offset) < 0) { + wError("vgId:%d, failed to ftruncate file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, + strerror(errno), offset, fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + + // rebuild idx file + if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) { + wError("vgId:%d, failed to seek file due to %s. offset:" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno), + offset, fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + while (idxEntry.ver < pFileInfo->lastVer) { + ASSERT(idxEntry.ver == ckHead.head.version); + + idxEntry.ver += 1; + idxEntry.offset += sizeof(SWalCkHead) + ckHead.head.bodyLen; + + if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) { + wError("vgId:%d, failed to read wal log head since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(), + idxEntry.offset, fLogNameStr); + goto _err; + } + wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset); + if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) { + wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); + goto _err; + } + } + + if (taosFsyncFile(pIdxFile) < 0) { + wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); + goto _err; + } + + (void)taosCloseFile(&pLogFile); + (void)taosCloseFile(&pIdxFile); + return 0; + +_err: + (void)taosCloseFile(&pLogFile); + (void)taosCloseFile(&pIdxFile); + return -1; +} + +int walCheckAndRepairIdx(SWal* pWal) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + int32_t fileIdx = sz; + while (--fileIdx >= 0) { + if (walCheckAndRepairIdxFile(pWal, fileIdx) < 0) { + wError("vgId:%d, failed to repair idx file since %s. fileIdx:%d", pWal->cfg.vgId, terrstr(), fileIdx); + return -1; } - taosCloseFile(&pIdxFile); } return 0; } @@ -495,14 +644,20 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0); cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField; pRoot = cJSON_Parse(bytes); + if (!pRoot) goto _err; pMeta = cJSON_GetObjectItem(pRoot, "meta"); + if (!pMeta) goto _err; pField = cJSON_GetObjectItem(pMeta, "firstVer"); + if (!pField) goto _err; pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); + if (!pField) goto _err; pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "commitVer"); + if (!pField) goto _err; pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "lastVer"); + if (!pField) goto _err; pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField)); pFiles = cJSON_GetObjectItem(pRoot, "files"); @@ -512,17 +667,23 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { taosArrayEnsureCap(pArray, sz); SWalFileInfo* pData = pArray->pData; for (int i = 0; i < sz; i++) { - cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); + cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); + if (!pInfoJson) goto _err; SWalFileInfo* pInfo = &pData[i]; pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); + if (!pField) goto _err; pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); + if (!pField) goto _err; pInfo->lastVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "createTs"); + if (!pField) goto _err; pInfo->createTs = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "closeTs"); + if (!pField) goto _err; pInfo->closeTs = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "fileSize"); + if (!pField) goto _err; pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); } taosArraySetSize(pArray, sz); @@ -530,6 +691,10 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { pWal->writeCur = sz - 1; cJSON_Delete(pRoot); return 0; + +_err: + cJSON_Delete(pRoot); + return -1; } static int walFindCurMetaVer(SWal* pWal) { @@ -565,22 +730,63 @@ static int walFindCurMetaVer(SWal* pWal) { int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; - walBuildMetaName(pWal, metaVer + 1, fnameStr); - TdFilePtr pMetaFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE); - if (pMetaFile == NULL) { + char tmpFnameStr[WAL_FILE_LEN]; + int n; + + // fsync the idx and log file at first to ensure validity of meta + if (taosFsyncFile(pWal->pIdxFile) < 0) { + wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + + if (taosFsyncFile(pWal->pLogFile) < 0) { + wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + // flush to a tmpfile + n = walBuildTmpMetaName(pWal, tmpFnameStr); + ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); + + TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pMetaFile == NULL) { + wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); if (len != taosWriteFile(pMetaFile, serialized, len)) { - // TODO:clean file - - taosCloseFile(&pMetaFile); - taosRemoveFile(fnameStr); - return -1; + wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile(pMetaFile) < 0) { + wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosCloseFile(&pMetaFile) < 0) { + wError("vgId:%d, failed to close file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // rename it + n = walBuildMetaName(pWal, metaVer + 1, fnameStr); + ASSERT(n < sizeof(fnameStr) && "Buffer overflow of file name"); + + if (taosRenameFile(tmpFnameStr, fnameStr) < 0) { + wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; } - taosCloseFile(&pMetaFile); // delete old file if (metaVer > -1) { walBuildMetaName(pWal, metaVer, fnameStr); @@ -588,6 +794,11 @@ int walSaveMeta(SWal* pWal) { } taosMemoryFree(serialized); return 0; + +_err: + taosCloseFile(&pMetaFile); + taosMemoryFree(serialized); + return -1; } int walLoadMeta(SWal* pWal) { @@ -629,6 +840,10 @@ int walLoadMeta(SWal* pWal) { } // load into fileInfoSet int code = walMetaDeserialize(pWal, buf); + if (code < 0) { + wError("failed to deserialize wal meta. file:%s", fnameStr); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } taosCloseFile(&pFile); taosMemoryFree(buf); return code; diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 7974f3e32e..29058cada1 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -81,6 +81,12 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } + if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pWal); + return NULL; + } + // set config memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); @@ -98,15 +104,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { tstrncpy(pWal->path, path, sizeof(pWal->path)); if (taosMkDir(pWal->path) != 0) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); - taosMemoryFree(pWal); - return NULL; + goto _err; } // init ref pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (pWal->pRefHash == NULL) { - taosMemoryFree(pWal); - return NULL; + wError("failed to init hash since %s", tstrerror(terrno)); + goto _err; } // open meta @@ -117,9 +122,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); if (pWal->fileInfoSet == NULL) { wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); - taosHashCleanup(pWal->pRefHash); - taosMemoryFree(pWal); - return NULL; + goto _err; } // init status @@ -131,46 +134,37 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->writeHead.head.protoVer = WAL_PROTO_VER; pWal->writeHead.magic = WAL_MAGIC; - if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { - taosArrayDestroy(pWal->fileInfoSet); - taosHashCleanup(pWal->pRefHash); - taosMemoryFree(pWal); - return NULL; - } - - pWal->refId = taosAddRef(tsWal.refSetId, pWal); - if (pWal->refId < 0) { - taosHashCleanup(pWal->pRefHash); - taosThreadMutexDestroy(&pWal->mutex); - taosArrayDestroy(pWal->fileInfoSet); - taosMemoryFree(pWal); - return NULL; - } - - walLoadMeta(pWal); + // load meta + (void)walLoadMeta(pWal); if (walCheckAndRepairMeta(pWal) < 0) { wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId); - taosHashCleanup(pWal->pRefHash); - taosRemoveRef(tsWal.refSetId, pWal->refId); - taosThreadMutexDestroy(&pWal->mutex); - taosArrayDestroy(pWal->fileInfoSet); - return NULL; + goto _err; } if (walCheckAndRepairIdx(pWal) < 0) { wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId); - taosHashCleanup(pWal->pRefHash); - taosRemoveRef(tsWal.refSetId, pWal->refId); - taosThreadMutexDestroy(&pWal->mutex); - taosArrayDestroy(pWal->fileInfoSet); - return NULL; + goto _err; + } + + // add ref + pWal->refId = taosAddRef(tsWal.refSetId, pWal); + if (pWal->refId < 0) { + wError("failed to add ref for Wal since %s", tstrerror(terrno)); + goto _err; } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); - return pWal; + +_err: + taosArrayDestroy(pWal->fileInfoSet); + taosHashCleanup(pWal->pRefHash); + taosThreadMutexDestroy(&pWal->mutex); + taosMemoryFree(pWal); + pWal = NULL; + return NULL; } int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { @@ -195,11 +189,11 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { void walClose(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); + (void)walSaveMeta(pWal); taosCloseFile(&pWal->pLogFile); pWal->pLogFile = NULL; taosCloseFile(&pWal->pIdxFile); pWal->pIdxFile = NULL; - walSaveMeta(pWal); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; taosHashCleanup(pWal->pRefHash); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b2cd4fac11..179d809c84 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -181,7 +181,11 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); + if (pRet == NULL) { + wError("failed to find WAL log file with ver:%lld", ver); + terrno = TSDB_CODE_WAL_INVALID_VER; + return -1; + } if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner if (walReadChangeFile(pReader, pRet->firstVer) < 0) { @@ -472,7 +476,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - ASSERT(0); + wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s", + pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -505,6 +510,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } + wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s", + pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1; } diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 1196914dae..4b75db52b7 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -79,6 +79,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { TdFilePtr pIdxTFile, pLogTFile; char fnameStr[WAL_FILE_LEN]; if (pWal->pLogFile != NULL) { + code = taosFsyncFile(pWal->pLogFile); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } code = taosCloseFile(&pWal->pLogFile); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -86,6 +91,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { } } if (pWal->pIdxFile != NULL) { + code = taosFsyncFile(pWal->pIdxFile); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } code = taosCloseFile(&pWal->pIdxFile); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index e7161079d9..1e5c63fa8d 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -219,10 +219,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) { taosCloseFile(&pIdxFile); taosCloseFile(&pLogFile); - taosFsyncFile(pWal->pLogFile); - taosFsyncFile(pWal->pIdxFile); - - walSaveMeta(pWal); + code = walSaveMeta(pWal); + if (code < 0) { + wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); + taosThreadMutexUnlock(&pWal->mutex); + return -1; + } // unlock taosThreadMutexUnlock(&pWal->mutex); @@ -394,7 +396,11 @@ int32_t walRollImpl(SWal *pWal) { pWal->lastRollSeq = walGetSeq(); - walSaveMeta(pWal); + code = walSaveMeta(pWal); + if (code < 0) { + wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); + goto END; + } END: return code; @@ -550,6 +556,11 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in void walFsync(SWal *pWal, bool forceFsync) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { + wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); + if (taosFsyncFile(pWal->pIdxFile) < 0) { + wError("vgId:%d, file:%" PRId64 ".idx, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), + strerror(errno)); + } wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); if (taosFsyncFile(pWal->pLogFile) < 0) { wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index f0442c6fd1..616ab7875d 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -87,11 +87,17 @@ void osUpdate() { void osCleanup() {} -bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; } +bool osLogSpaceAvailable() { return tsLogSpace.size.avail > 0; } -bool osDataSpaceAvailable() { return tsDataSpace.reserved <= tsDataSpace.size.avail; } +bool osDataSpaceAvailable() { return tsDataSpace.size.avail > 0; } -bool osTempSpaceAvailable() { return tsTempSpace.reserved <= tsTempSpace.size.avail; } +bool osTempSpaceAvailable() { return tsTempSpace.size.avail > 0; } + +bool osLogSpaceSufficient() { return tsLogSpace.size.avail > tsLogSpace.reserved; } + +bool osDataSpaceSufficient() { return tsDataSpace.size.avail > tsDataSpace.reserved; } + +bool osTempSpaceSufficient() { return tsTempSpace.size.avail > tsTempSpace.reserved; } void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index eb13a08be4..9a117c6eb4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -447,12 +447,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not f TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no commited offset") // wal -TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Wal unexpected generic error") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "WAL unexpected generic error") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_CHKSUM_MISMATCH, "WAL checksum mismatch") // tfs TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")