do further check for blk with len 0 and SKVRow
This commit is contained in:
parent
823e0d2614
commit
11b0755ef5
|
@ -309,36 +309,63 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
|
||||||
}
|
}
|
||||||
// Add SMemRowType ahead of SDataRow
|
// Add SMemRowType ahead of SDataRow
|
||||||
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
|
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
|
||||||
|
// copy the header firstly
|
||||||
memcpy(pDest, pSrc, sizeof(SSubmitBlk));
|
memcpy(pDest, pSrc, sizeof(SSubmitBlk));
|
||||||
int nRows = htons(pSrc->numOfRows);
|
|
||||||
if (nRows <= 0) {
|
int32_t nRows = htons(pDest->numOfRows);
|
||||||
|
int32_t dataLen = htonl(pDest->dataLen);
|
||||||
|
|
||||||
|
if ((nRows <= 0) || (dataLen <= 0)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *pDestData = pDest->data;
|
char *pDestData = pDest->data;
|
||||||
char *pSrcData = pSrc->data;
|
char *pSrcData = pSrc->data;
|
||||||
for (int i = 0; i < nRows; ++i) {
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
memRowSetType(pDestData, SMEM_ROW_DATA);
|
memRowSetType(pDestData, SMEM_ROW_DATA);
|
||||||
memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData));
|
memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData));
|
||||||
pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData));
|
pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData));
|
||||||
pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData));
|
pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData));
|
||||||
++(*lenExpand);
|
++(*lenExpand);
|
||||||
}
|
}
|
||||||
int32_t dataLen = htonl(pDest->dataLen);
|
|
||||||
pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t));
|
pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check SDataRow by comparing the SDataRow len and SSubmitBlk dataLen
|
||||||
static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) {
|
static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) {
|
||||||
int32_t len = 0;
|
if ((nRows <= 0) || (dataLen <= 0)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
int32_t len = 0, kvLen = 0;
|
||||||
for (int i = 0; i < nRows; ++i) {
|
for (int i = 0; i < nRows; ++i) {
|
||||||
len += dataRowLen(pBlkData);
|
len += dataRowLen(pBlkData);
|
||||||
if (len > dataLen) {
|
if (len > dataLen) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For SDataRow between version [2.1.5.0 and 2.1.6.X], it would never conflict.
|
||||||
|
* For SKVRow between version [2.1.5.0 and 2.1.6.X], it may conflict in below scenario
|
||||||
|
* - with 1st type byte 0x01 and sversion 0x0101(257), thus do further check
|
||||||
|
*/
|
||||||
|
if (dataRowLen(pBlkData) == 257) {
|
||||||
|
SMemRow memRow = pBlkData;
|
||||||
|
SKVRow kvRow = memRowKvBody(memRow);
|
||||||
|
int nCols = kvRowNCols(kvRow);
|
||||||
|
uint16_t calcTsOffset = (uint16_t)(TD_MEM_ROW_KV_HEAD_SIZE + sizeof(SColIdx) * nCols);
|
||||||
|
uint16_t realTsOffset = (kvRowColIdx(kvRow))->offset;
|
||||||
|
if (calcTsOffset == realTsOffset) {
|
||||||
|
kvLen += memRowKvTLen(memRow);
|
||||||
|
}
|
||||||
|
}
|
||||||
pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData));
|
pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData));
|
||||||
}
|
}
|
||||||
if (len != dataLen) {
|
if (len != dataLen) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (kvLen == dataLen) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// for WAL SMemRow/SDataRow compatibility
|
// for WAL SMemRow/SDataRow compatibility
|
||||||
|
|
Loading…
Reference in New Issue