Merge pull request #7361 from taosdata/fix/TD-6044
[TD-6044]<hotfix>: WAL compatibility since v2.1.5.0[ci skip]
This commit is contained in:
commit
e701aa8dcf
|
@ -32,7 +32,7 @@ typedef enum {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t msgType;
|
int8_t msgType;
|
||||||
int8_t sver;
|
int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility
|
||||||
int8_t reserved[2];
|
int8_t reserved[2];
|
||||||
int32_t len;
|
int32_t len;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
#define TAOS_RANDOM_FILE_FAIL_TEST
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
|
@ -114,7 +115,7 @@ void walRemoveAllOldFiles(void *handle) {
|
||||||
#if defined(WAL_CHECKSUM_WHOLE)
|
#if defined(WAL_CHECKSUM_WHOLE)
|
||||||
|
|
||||||
static void walUpdateChecksum(SWalHead *pHead) {
|
static void walUpdateChecksum(SWalHead *pHead) {
|
||||||
pHead->sver = 1;
|
pHead->sver = 2;
|
||||||
pHead->cksum = 0;
|
pHead->cksum = 0;
|
||||||
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len);
|
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len);
|
||||||
}
|
}
|
||||||
|
@ -122,7 +123,7 @@ static void walUpdateChecksum(SWalHead *pHead) {
|
||||||
static int walValidateChecksum(SWalHead *pHead) {
|
static int walValidateChecksum(SWalHead *pHead) {
|
||||||
if (pHead->sver == 0) { // for compatible with wal before sver 1
|
if (pHead->sver == 0) { // for compatible with wal before sver 1
|
||||||
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
|
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
|
||||||
} else if (pHead->sver == 1) {
|
} else if (pHead->sver >= 1) {
|
||||||
uint32_t cksum = pHead->cksum;
|
uint32_t cksum = pHead->cksum;
|
||||||
pHead->cksum = 0;
|
pHead->cksum = 0;
|
||||||
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
|
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
|
||||||
|
@ -281,7 +282,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHead->sver == 1) {
|
if (pHead->sver >= 1) {
|
||||||
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
|
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
|
||||||
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
|
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
@ -306,7 +307,115 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
|
||||||
|
|
||||||
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
return TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
|
// Add SMemRowType ahead of SDataRow
|
||||||
|
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
|
||||||
|
// copy the header firstly
|
||||||
|
memcpy(pDest, pSrc, sizeof(SSubmitBlk));
|
||||||
|
|
||||||
|
int32_t nRows = htons(pDest->numOfRows);
|
||||||
|
int32_t dataLen = htonl(pDest->dataLen);
|
||||||
|
|
||||||
|
if ((nRows <= 0) || (dataLen <= 0)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *pDestData = pDest->data;
|
||||||
|
char *pSrcData = pSrc->data;
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
memRowSetType(pDestData, SMEM_ROW_DATA);
|
||||||
|
memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData));
|
||||||
|
pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData));
|
||||||
|
pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData));
|
||||||
|
++(*lenExpand);
|
||||||
|
}
|
||||||
|
pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check SDataRow by comparing the SDataRow len and SSubmitBlk dataLen
|
||||||
|
static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) {
|
||||||
|
if ((nRows <= 0) || (dataLen <= 0)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
int32_t len = 0, kvLen = 0;
|
||||||
|
for (int i = 0; i < nRows; ++i) {
|
||||||
|
len += dataRowLen(pBlkData);
|
||||||
|
if (len > dataLen) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For SDataRow between version [2.1.5.0 and 2.1.6.X], it would never conflict.
|
||||||
|
* For SKVRow between version [2.1.5.0 and 2.1.6.X], it may conflict in below scenario
|
||||||
|
* - with 1st type byte 0x01 and sversion 0x0101(257), thus do further check
|
||||||
|
*/
|
||||||
|
if (dataRowLen(pBlkData) == 257) {
|
||||||
|
SMemRow memRow = pBlkData;
|
||||||
|
SKVRow kvRow = memRowKvBody(memRow);
|
||||||
|
int nCols = kvRowNCols(kvRow);
|
||||||
|
uint16_t calcTsOffset = (uint16_t)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nCols);
|
||||||
|
uint16_t realTsOffset = (kvRowColIdx(kvRow))->offset;
|
||||||
|
if (calcTsOffset == realTsOffset) {
|
||||||
|
kvLen += memRowKvTLen(memRow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData));
|
||||||
|
}
|
||||||
|
if (len != dataLen) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (kvLen == dataLen) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// for WAL SMemRow/SDataRow compatibility
|
||||||
|
static int walSMemRowCheck(SWalHead *pHead) {
|
||||||
|
if ((pHead->sver < 2) && (pHead->msgType == TSDB_MSG_TYPE_SUBMIT)) {
|
||||||
|
SSubmitMsg *pMsg = (SSubmitMsg *)pHead->cont;
|
||||||
|
int32_t numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
|
if (numOfBlocks <= 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nTotalRows = 0;
|
||||||
|
SSubmitBlk *pBlk = (SSubmitBlk *)pMsg->blocks;
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
int32_t dataLen = htonl(pBlk->dataLen);
|
||||||
|
int32_t nRows = htons(pBlk->numOfRows);
|
||||||
|
nTotalRows += nRows;
|
||||||
|
if (!walIsSDataRow(pBlk->data, nRows, dataLen)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
pBlk = (SSubmitBlk *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + dataLen);
|
||||||
|
}
|
||||||
|
ASSERT(nTotalRows >= 0);
|
||||||
|
SWalHead *pWalHead = (SWalHead *)calloc(sizeof(SWalHead) + pHead->len + nTotalRows * sizeof(uint8_t), 1);
|
||||||
|
if (pWalHead == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pWalHead, pHead, sizeof(SWalHead) + sizeof(SSubmitMsg));
|
||||||
|
|
||||||
|
SSubmitMsg *pDestMsg = (SSubmitMsg *)pWalHead->cont;
|
||||||
|
SSubmitBlk *pDestBlks = (SSubmitBlk *)pDestMsg->blocks;
|
||||||
|
SSubmitBlk *pSrcBlks = (SSubmitBlk *)pMsg->blocks;
|
||||||
|
int32_t lenExpand = 0;
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
expandSubmitBlk(pDestBlks, pSrcBlks, &lenExpand);
|
||||||
|
pDestBlks = POINTER_SHIFT(pDestBlks, htonl(pDestBlks->dataLen) + sizeof(SSubmitBlk));
|
||||||
|
pSrcBlks = POINTER_SHIFT(pSrcBlks, htonl(pSrcBlks->dataLen) + sizeof(SSubmitBlk));
|
||||||
|
}
|
||||||
|
if (lenExpand > 0) {
|
||||||
|
pDestMsg->header.contLen = htonl(pDestMsg->length) + lenExpand;
|
||||||
|
pDestMsg->length = htonl(pDestMsg->header.contLen);
|
||||||
|
pWalHead->len = pWalHead->len + lenExpand;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len);
|
||||||
|
tfree(pWalHead);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
|
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
|
||||||
int32_t size = WAL_MAX_SIZE;
|
int32_t size = WAL_MAX_SIZE;
|
||||||
|
@ -346,7 +455,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined(WAL_CHECKSUM_WHOLE)
|
#if defined(WAL_CHECKSUM_WHOLE)
|
||||||
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 1) {
|
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
|
||||||
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
||||||
pHead->version, pHead->len, offset);
|
pHead->version, pHead->len, offset);
|
||||||
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
||||||
|
@ -379,7 +488,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHead->sver == 1 && !walValidateChecksum(pHead)) {
|
if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
|
||||||
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
|
||||||
pHead->version, pHead->len, offset);
|
pHead->version, pHead->len, offset);
|
||||||
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
|
||||||
|
@ -432,6 +541,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
||||||
pWal->version = pHead->version;
|
pWal->version = pHead->version;
|
||||||
|
|
||||||
//wInfo("writeFp: %ld", offset);
|
//wInfo("writeFp: %ld", offset);
|
||||||
|
if (0 != walSMemRowCheck(pHead)) {
|
||||||
|
wError("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
|
||||||
|
pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset);
|
||||||
|
tfClose(tfd);
|
||||||
|
tfree(buffer);
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
|
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue