fix crash when there are 2 or more binary columns

This commit is contained in:
Bomin Zhang 2020-07-01 11:49:41 +08:00
parent 29adf9c752
commit 13f8b063e5
1 changed files with 17 additions and 5 deletions

View File

@ -625,18 +625,31 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
return len; return len;
} }
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_DATA_ROW_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
}
}
return result;
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList); size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) { for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = int32_t ret =
@ -650,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
} }
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * MAX_EXPAND_SIZE; int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize;
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) {
@ -678,8 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize);
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + MAX_EXPAND_SIZE);
pBlocks->tid = htonl(pBlocks->tid); pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);