diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8d402b953a..0beb7c3b83 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + #ifndef _TD_WAL_H_ #define _TD_WAL_H_ @@ -24,15 +25,6 @@ extern "C" { #endif -// clang-format off -#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }} -#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }} -// clang-format on - #define WAL_PROTO_VER 0 #define WAL_NOSUFFIX_LEN 20 #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) @@ -172,12 +164,9 @@ int32_t walPersist(SWal *); void walClose(SWal *); // write interfaces - // By assigning index by the caller, wal gurantees linearizability -// Assign version automatically and return to caller, int32_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); - -void walFsync(SWal *, bool force); +void walFsync(SWal *, bool force); // apis for lifecycle management int32_t walCommit(SWal *, int64_t ver); @@ -189,8 +178,6 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver); // for tq int32_t walApplyVer(SWal *, int64_t ver); -// int32_t walDataCorrupted(SWal*); - // wal reader SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id); void walCloseReader(SWalReader *pRead); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d3be8fa666..103d1d33ae 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -188,9 +188,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal); int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal); - wDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64 - ", 0x%" PRIx64, - vgId, offset, lastVer, committedVer, appliedVer, id); + tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64 + ", 0x%" PRIx64, + vgId, offset, lastVer, committedVer, appliedVer, id); while (offset <= appliedVer) { if (walFetchHead(pHandle->pWalReader, offset) < 0) { @@ -240,14 +240,12 @@ END: return code; } -bool tqGetTablePrimaryKey(STqReader* pReader){ - return pReader->hasPrimaryKey; -} +bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; } -void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid){ - bool ret = false; - SSchemaWrapper *schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); - if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){ +void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { + bool ret = false; + SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); + if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } tDeleteSchemaWrapper(schema); @@ -376,7 +374,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con } bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { - SWalReader* pWalReader = pReader->pWalReader; + SWalReader* pWalReader = pReader->pWalReader; int64_t st = taosGetTimestampMs(); while (1) { @@ -407,7 +405,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { pReader->msg.msgStr = NULL; int64_t elapsed = taosGetTimestampMs() - st; - if(elapsed > 1000 || elapsed < 0){ + if (elapsed > 1000 || elapsed < 0) { return false; } @@ -683,10 +681,11 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* continue; } - SColData* pCol = taosArrayGet(pCols, sourceIdx); - SColVal colVal; + SColData* pCol = taosArrayGet(pCols, sourceIdx); + SColVal colVal; - tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual, sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId); + tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual, + sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId); if (pCol->cid < pColData->info.colId) { sourceIdx++; } else if (pCol->cid == pColData->info.colId) { diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index afde531011..6eab2d6766 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -27,6 +27,15 @@ extern "C" { #endif +// clang-format off +#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }} +#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }} +// clang-format on + // meta section begin typedef struct { int64_t firstVer;