move log macros into internal header
This commit is contained in:
parent
f485e4c5e8
commit
964397ea63
|
@ -12,6 +12,7 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_WAL_H_
|
#ifndef _TD_WAL_H_
|
||||||
#define _TD_WAL_H_
|
#define _TD_WAL_H_
|
||||||
|
|
||||||
|
@ -24,15 +25,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// clang-format off
|
|
||||||
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
|
||||||
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
|
||||||
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
|
||||||
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
|
||||||
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }}
|
|
||||||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
|
||||||
// clang-format on
|
|
||||||
|
|
||||||
#define WAL_PROTO_VER 0
|
#define WAL_PROTO_VER 0
|
||||||
#define WAL_NOSUFFIX_LEN 20
|
#define WAL_NOSUFFIX_LEN 20
|
||||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||||
|
@ -172,12 +164,9 @@ int32_t walPersist(SWal *);
|
||||||
void walClose(SWal *);
|
void walClose(SWal *);
|
||||||
|
|
||||||
// write interfaces
|
// write interfaces
|
||||||
|
|
||||||
// By assigning index by the caller, wal gurantees linearizability
|
// By assigning index by the caller, wal gurantees linearizability
|
||||||
// Assign version automatically and return to caller,
|
|
||||||
int32_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
int32_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
||||||
|
void walFsync(SWal *, bool force);
|
||||||
void walFsync(SWal *, bool force);
|
|
||||||
|
|
||||||
// apis for lifecycle management
|
// apis for lifecycle management
|
||||||
int32_t walCommit(SWal *, int64_t ver);
|
int32_t walCommit(SWal *, int64_t ver);
|
||||||
|
@ -189,8 +178,6 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
// for tq
|
// for tq
|
||||||
int32_t walApplyVer(SWal *, int64_t ver);
|
int32_t walApplyVer(SWal *, int64_t ver);
|
||||||
|
|
||||||
// int32_t walDataCorrupted(SWal*);
|
|
||||||
|
|
||||||
// wal reader
|
// wal reader
|
||||||
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
|
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
|
||||||
void walCloseReader(SWalReader *pRead);
|
void walCloseReader(SWalReader *pRead);
|
||||||
|
|
|
@ -188,9 +188,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
|
||||||
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
|
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
|
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
|
||||||
|
|
||||||
wDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
|
tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
|
||||||
", 0x%" PRIx64,
|
", 0x%" PRIx64,
|
||||||
vgId, offset, lastVer, committedVer, appliedVer, id);
|
vgId, offset, lastVer, committedVer, appliedVer, id);
|
||||||
|
|
||||||
while (offset <= appliedVer) {
|
while (offset <= appliedVer) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
||||||
|
@ -240,14 +240,12 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqGetTablePrimaryKey(STqReader* pReader){
|
bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
|
||||||
return pReader->hasPrimaryKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid){
|
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSchemaWrapper *schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
|
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
|
||||||
if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){
|
if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
tDeleteSchemaWrapper(schema);
|
tDeleteSchemaWrapper(schema);
|
||||||
|
@ -376,7 +374,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -407,7 +405,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
|
|
||||||
int64_t elapsed = taosGetTimestampMs() - st;
|
int64_t elapsed = taosGetTimestampMs() - st;
|
||||||
if(elapsed > 1000 || elapsed < 0){
|
if (elapsed > 1000 || elapsed < 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -683,10 +681,11 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SColData* pCol = taosArrayGet(pCols, sourceIdx);
|
SColData* pCol = taosArrayGet(pCols, sourceIdx);
|
||||||
SColVal colVal;
|
SColVal colVal;
|
||||||
|
|
||||||
tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual, sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
|
tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
|
||||||
|
sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
|
||||||
if (pCol->cid < pColData->info.colId) {
|
if (pCol->cid < pColData->info.colId) {
|
||||||
sourceIdx++;
|
sourceIdx++;
|
||||||
} else if (pCol->cid == pColData->info.colId) {
|
} else if (pCol->cid == pColData->info.colId) {
|
||||||
|
|
|
@ -27,6 +27,15 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// clang-format off
|
||||||
|
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||||
|
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||||
|
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||||
|
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||||
|
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", DEBUG_DEBUG, wDebugFlag, __VA_ARGS__); }}
|
||||||
|
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
// meta section begin
|
// meta section begin
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t firstVer;
|
int64_t firstVer;
|
||||||
|
|
Loading…
Reference in New Issue