opti:wal logic
This commit is contained in:
parent
6155c80729
commit
deda4b9eed
|
@ -153,7 +153,6 @@ struct SWalReader {
|
||||||
int64_t capacity;
|
int64_t capacity;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
// TODO remove it
|
|
||||||
SWalCkHead *pHead;
|
SWalCkHead *pHead;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -208,9 +207,9 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset)
|
||||||
|
|
||||||
// only for tq usage
|
// only for tq usage
|
||||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
int32_t walFetchHead(SWalReader *pRead, int64_t ver);
|
||||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
int32_t walFetchBody(SWalReader *pRead);
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
int32_t walSkipFetchBody(SWalReader *pRead);
|
||||||
|
|
||||||
void walRefFirstVer(SWal *, SWalRef *);
|
void walRefFirstVer(SWal *, SWalRef *);
|
||||||
void walRefLastVer(SWal *, SWalRef *);
|
void walRefLastVer(SWal *, SWalRef *);
|
||||||
|
|
|
@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data);
|
||||||
// tqRead
|
// tqRead
|
||||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
|
||||||
|
|
|
@ -184,50 +184,42 @@ end:
|
||||||
return tbSuid == realTbSuid;
|
return tbSuid == realTbSuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
|
||||||
int32_t code = 0;
|
int32_t code = -1;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
|
||||||
int64_t offset = *fetchOffset;
|
int64_t offset = *fetchOffset;
|
||||||
|
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
|
||||||
|
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
|
||||||
|
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
|
||||||
|
|
||||||
while (1) {
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
|
||||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
vgId, offset, lastVer, committedVer, appliedVer);
|
||||||
|
|
||||||
|
while (offset <= appliedVer) {
|
||||||
|
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
||||||
", no more log to return, reqId:0x%" PRIx64,
|
", no more log to return, reqId:0x%" PRIx64,
|
||||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
||||||
*fetchOffset = offset;
|
|
||||||
code = -1;
|
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
|
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
|
||||||
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
|
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
|
||||||
|
|
||||||
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
|
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
code = walFetchBody(pHandle->pWalReader);
|
||||||
|
|
||||||
if (code < 0) {
|
|
||||||
*fetchOffset = offset;
|
|
||||||
code = -1;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
*fetchOffset = offset;
|
|
||||||
code = 0;
|
|
||||||
goto END;
|
goto END;
|
||||||
} else {
|
} else {
|
||||||
if (pHandle->fetchMeta != WITH_DATA) {
|
if (pHandle->fetchMeta != WITH_DATA) {
|
||||||
SWalCont* pHead = &((*ppCkHead)->head);
|
SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
|
||||||
if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
|
if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
|
||||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
code = walFetchBody(pHandle->pWalReader);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
*fetchOffset = offset;
|
|
||||||
code = -1;
|
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isValValidForTable(pHandle, pHead)) {
|
if (isValValidForTable(pHandle, pHead)) {
|
||||||
*fetchOffset = offset;
|
|
||||||
code = 0;
|
code = 0;
|
||||||
goto END;
|
goto END;
|
||||||
} else {
|
} else {
|
||||||
|
@ -236,10 +228,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
|
code = walSkipFetchBody(pHandle->pWalReader);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
*fetchOffset = offset;
|
|
||||||
code = -1;
|
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
offset++;
|
offset++;
|
||||||
|
@ -247,7 +237,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
*fetchOffset = offset;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -179,7 +179,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SWalCkHead* pCkHead = NULL;
|
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, *offset);
|
tqInitTaosxRsp(&taosxRsp, *offset);
|
||||||
|
@ -216,12 +215,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if (offset->type == TMQ_OFFSET__LOG) {
|
if (offset->type == TMQ_OFFSET__LOG) {
|
||||||
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version;
|
int64_t fetchVer = offset->version;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
|
||||||
if (pCkHead == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
|
@ -234,14 +227,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalCont* pHead = &pCkHead->head;
|
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
|
||||||
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
||||||
|
|
||||||
|
@ -291,7 +284,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
end:
|
end:
|
||||||
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,6 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
|
|
||||||
static int32_t walFetchBodyNew(SWalReader *pRead);
|
|
||||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
|
||||||
|
|
||||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
|
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
|
||||||
if (pReader == NULL) {
|
if (pReader == NULL) {
|
||||||
|
@ -80,19 +76,19 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
while (fetchVer <= appliedVer) {
|
while (fetchVer <= appliedVer) {
|
||||||
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
if (walFetchHead(pReader, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t type = pReader->pHead->head.msgType;
|
int32_t type = pReader->pHead->head.msgType;
|
||||||
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
||||||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||||
if (walFetchBodyNew(pReader) < 0) {
|
if (walFetchBody(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
if (walSkipFetchBodyNew(pReader) < 0) {
|
if (walSkipFetchBody(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,102 +252,7 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
|
||||||
|
|
||||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
||||||
|
|
||||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
int64_t contLen;
|
|
||||||
bool seeked = false;
|
|
||||||
|
|
||||||
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
|
|
||||||
|
|
||||||
if (pRead->curVersion != fetchVer) {
|
|
||||||
if (walReaderSeekVer(pRead, fetchVer) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
seeked = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
|
||||||
break;
|
|
||||||
} else if (contLen == 0 && !seeked) {
|
|
||||||
if(walReadSeekVerImpl(pRead, fetchVer) < 0){
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
seeked = true;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
if (contLen < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// pRead->curInvalid = 0;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t walFetchBodyNew(SWalReader *pReader) {
|
|
||||||
SWalCont *pReadHead = &pReader->pHead->head;
|
|
||||||
int64_t ver = pReadHead->version;
|
|
||||||
|
|
||||||
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
|
|
||||||
pReadHead->bodyLen);
|
|
||||||
|
|
||||||
if (pReader->capacity < pReadHead->bodyLen) {
|
|
||||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
|
||||||
if (ptr == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pReader->pHead = ptr;
|
|
||||||
pReadHead = &pReader->pHead->head;
|
|
||||||
pReader->capacity = pReadHead->bodyLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
|
||||||
if (pReadHead->bodyLen < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
|
|
||||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno));
|
|
||||||
} else {
|
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
|
|
||||||
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (walValidBodyCksum(pReader->pHead) != 0) {
|
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
|
|
||||||
pReader->curVersion = ver + 1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
|
||||||
int64_t code;
|
|
||||||
|
|
||||||
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
|
||||||
if (code < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRead->curVersion++;
|
|
||||||
wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|
||||||
int64_t code;
|
int64_t code;
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
bool seeked = false;
|
bool seeked = false;
|
||||||
|
@ -369,15 +270,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curVersion != ver) {
|
||||||
code = walReaderSeekVer(pRead, ver);
|
code = walReaderSeekVer(pRead, ver);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
// pRead->curVersion = ver;
|
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
|
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
|
@ -392,12 +291,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walValidHeadCksum(pHead);
|
code = walValidHeadCksum(pRead->pHead);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
|
@ -405,32 +303,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pRead->curInvalid = 0;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||||
int64_t code;
|
|
||||||
|
|
||||||
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||||
", applied ver:%" PRId64,
|
", applied ver:%" PRId64,
|
||||||
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
||||||
|
|
||||||
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->curVersion++;
|
pRead->curVersion++;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
int32_t walFetchBody(SWalReader *pRead) {
|
||||||
SWalCont *pReadHead = &((*ppHead)->head);
|
SWalCont *pReadHead = &pRead->pHead->head;
|
||||||
int64_t ver = pReadHead->version;
|
int64_t ver = pReadHead->version;
|
||||||
|
|
||||||
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||||
|
@ -439,13 +332,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
pRead->pWal->vers.appliedVer);
|
pRead->pWal->vers.appliedVer);
|
||||||
|
|
||||||
if (pRead->capacity < pReadHead->bodyLen) {
|
if (pRead->capacity < pReadHead->bodyLen) {
|
||||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*ppHead = ptr;
|
pRead->pHead = ptr;
|
||||||
pReadHead = &((*ppHead)->head);
|
pReadHead = &pRead->pHead->head;
|
||||||
pRead->capacity = pReadHead->bodyLen;
|
pRead->capacity = pReadHead->bodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,27 +352,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
pRead->pWal->cfg.vgId, pReadHead->version, ver);
|
pRead->pWal->cfg.vgId, pReadHead->version, ver);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReadHead->version != ver) {
|
if (pReadHead->version != ver) {
|
||||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pReadHead->version, ver);
|
pReadHead->version, ver);
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walValidBodyCksum(*ppHead) != 0) {
|
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
||||||
ver);
|
ver);
|
||||||
// pRead->curInvalid = 1;
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->curVersion = ver + 1;
|
pRead->curVersion++;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue