bug fix
This commit is contained in:
parent
9f8af9a1ea
commit
049a4e1582
|
@ -1859,6 +1859,18 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
|
|||
return len;
|
||||
}
|
||||
|
||||
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
|
||||
int32_t columns = tscGetNumOfColumns(pTableMeta);
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
||||
for (int32_t i = 0; i < columns; i++) {
|
||||
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
|
||||
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
|
||||
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
|
||||
if (pInsertParam->pTableNameList == NULL) {
|
||||
|
@ -1897,6 +1909,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||
if (pBlocks->numOfRows > 0) {
|
||||
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
|
||||
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||
|
@ -1909,8 +1922,8 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
return ret;
|
||||
}
|
||||
|
||||
int64_t destSize =
|
||||
dataBuf->size + pOneTableBlock->size + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
|
||||
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
if (dataBuf->nAllocSize < destSize) {
|
||||
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
|
||||
|
@ -1954,7 +1967,8 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey);
|
||||
}
|
||||
|
||||
int32_t len = pBlocks->numOfRows * getExtendedRowSize(pOneTableBlock) +
|
||||
int32_t len = pBlocks->numOfRows *
|
||||
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
|
||||
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
pBlocks->tid = htonl(pBlocks->tid);
|
||||
|
|
|
@ -616,7 +616,7 @@ typedef void *SMemRow;
|
|||
|
||||
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
|
||||
|
||||
#define memRowSetType(r, t) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0xFE) | (t))) // lowest bit
|
||||
#define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory
|
||||
#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit
|
||||
#define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01))
|
||||
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
|
||||
|
|
Loading…
Reference in New Issue