other: add some logs.

This commit is contained in:
Haojun Liao 2023-09-09 15:18:54 +08:00
parent 6f5c855a4b
commit aa8909b267
6 changed files with 24 additions and 22 deletions

View File

@ -192,7 +192,7 @@ int32_t walApplyVer(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*); // int32_t walDataCorrupted(SWal*);
// wal reader // wal reader
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
void walCloseReader(SWalReader *pRead); void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader); void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadVer(SWalReader *pRead, int64_t ver);

View File

@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
} }
// reset the task status from unfinished transaction // reset the task status from unfinished transaction

View File

@ -312,14 +312,14 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
return -1; return -1;
} }
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL); handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
handle->execHandle.pTqReader = tqReaderOpen(pVnode); handle->execHandle.pTqReader = tqReaderOpen(pVnode);
buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext)); (SSnapContext**)(&reader.sContext));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL); handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {

View File

@ -250,7 +250,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
return NULL; return NULL;
} }
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL); pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
if (pReader->pWalReader == NULL) { if (pReader->pWalReader == NULL) {
taosMemoryFree(pReader); taosMemoryFree(pReader);
return NULL; return NULL;

View File

@ -59,7 +59,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
ASSERT(pData->pWal != NULL); ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL); taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReader(pData->pWal, NULL); pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
ASSERT(pData->pWalHandle != NULL); ASSERT(pData->pWalHandle != NULL);
pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;

View File

@ -16,7 +16,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "walInt.h" #include "walInt.h"
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
if (pReader == NULL) { if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -24,7 +24,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
} }
pReader->pWal = pWal; pReader->pWal = pWal;
pReader->readerId = tGenIdPI64(); pReader->readerId = (id != 0)? id:tGenIdPI64();
pReader->pIdxFile = NULL; pReader->pIdxFile = NULL;
pReader->pLogFile = NULL; pReader->pLogFile = NULL;
pReader->curVersion = -1; pReader->curVersion = -1;
@ -257,9 +257,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
bool seeked = false; bool seeked = false;
wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64, ", applied ver:%" PRId64", %"PRIx64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer); pRead->pWal->vers.appliedVer, pRead->readerId);
// TODO: valid ver // TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) { if (ver > pRead->pWal->vers.commitVer) {
@ -297,7 +297,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
code = walValidHeadCksum(pRead->pHead); code = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, %"PRIx64, pRead->pWal->cfg.vgId, ver,
pRead->readerId);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
@ -324,11 +325,13 @@ int32_t walSkipFetchBody(SWalReader *pRead) {
int32_t walFetchBody(SWalReader *pRead) { int32_t walFetchBody(SWalReader *pRead) {
SWalCont *pReadHead = &pRead->pHead->head; SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
int32_t vgId = pRead->pWal->cfg.vgId;
int64_t id = pRead->readerId;
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64, ", applied ver:%" PRId64 ", %" PRIx64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer); pRead->pWal->vers.appliedVer, id);
if (pRead->capacity < pReadHead->bodyLen) { if (pRead->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
@ -344,26 +347,25 @@ int32_t walFetchBody(SWalReader *pRead) {
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) { if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, %"PRIx64,
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); vgId, pReadHead->version, ver, tstrerror(terrno), pRead->readerId);
} else { } else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, %"PRIx64,
pRead->pWal->cfg.vgId, pReadHead->version, ver); vgId, pReadHead->version, ver, pRead->readerId);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", %"PRIx64, vgId,
pReadHead->version, ver); pReadHead->version, ver, id);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (walValidBodyCksum(pRead->pHead) != 0) { if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, %" PRIx64, vgId, ver, id);
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }