more fix
This commit is contained in:
parent
797a30ec02
commit
f6c2a6bba2
|
@ -154,7 +154,8 @@ static FORCE_INLINE int32_t tBufferPutF32(SBuffer *buffer, float value) {
|
||||||
union {
|
union {
|
||||||
float f;
|
float f;
|
||||||
uint32_t u;
|
uint32_t u;
|
||||||
} u = {.f = value};
|
} u;
|
||||||
|
u.f = value;
|
||||||
return tBufferPutU32(buffer, u.u);
|
return tBufferPutU32(buffer, u.u);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,7 +163,8 @@ static FORCE_INLINE int32_t tBufferPutF64(SBuffer *buffer, double value) {
|
||||||
union {
|
union {
|
||||||
double f;
|
double f;
|
||||||
uint64_t u;
|
uint64_t u;
|
||||||
} u = {.f = value};
|
} u;
|
||||||
|
u.f = value;
|
||||||
return tBufferPutU64(buffer, u.u);
|
return tBufferPutU64(buffer, u.u);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -258,7 +258,7 @@ void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
|
||||||
|
|
||||||
memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, (numOfRows - i) / sizeof(char));
|
memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, (numOfRows - i) / sizeof(char));
|
||||||
i += (numOfRows - i) / sizeof(char) * sizeof(char);
|
i += (numOfRows - i) / sizeof(char) * sizeof(char);
|
||||||
|
|
||||||
for (; i < numOfRows; ++i) {
|
for (; i < numOfRows; ++i) {
|
||||||
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
|
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
|
||||||
}
|
}
|
||||||
|
@ -266,23 +266,24 @@ void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
|
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
|
||||||
|
uint32_t numOfRows) {
|
||||||
int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
|
int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfRows > 1) {
|
if (numOfRows > 1) {
|
||||||
int32_t* pOffset = pColumnInfoData->varmeta.offset;
|
int32_t* pOffset = pColumnInfoData->varmeta.offset;
|
||||||
memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
|
memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
|
||||||
pColumnInfoData->reassigned = true;
|
pColumnInfoData->reassigned = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
|
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
|
||||||
uint32_t numOfRows, bool isNull) {
|
bool isNull) {
|
||||||
int32_t len = pColumnInfoData->info.bytes;
|
int32_t len = pColumnInfoData->info.bytes;
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
|
colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
|
||||||
|
@ -293,18 +294,18 @@ int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
|
return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
|
||||||
} else {
|
} else {
|
||||||
int32_t colBytes = pColumnInfoData->info.bytes;
|
int32_t colBytes = pColumnInfoData->info.bytes;
|
||||||
int32_t colOffset = currentRow * colBytes;
|
int32_t colOffset = currentRow * colBytes;
|
||||||
uint32_t num = 1;
|
uint32_t num = 1;
|
||||||
|
|
||||||
void* pStart = pColumnInfoData->pData + colOffset;
|
void* pStart = pColumnInfoData->pData + colOffset;
|
||||||
memcpy(pStart, pData, colBytes);
|
memcpy(pStart, pData, colBytes);
|
||||||
colOffset += num * colBytes;
|
colOffset += num * colBytes;
|
||||||
|
|
||||||
while (num < numOfRows) {
|
while (num < numOfRows) {
|
||||||
int32_t maxNum = num << 1;
|
int32_t maxNum = num << 1;
|
||||||
int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
|
int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
|
||||||
|
|
||||||
memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
|
memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
|
||||||
colOffset += tnum * colBytes;
|
colOffset += tnum * colBytes;
|
||||||
num += tnum;
|
num += tnum;
|
||||||
|
@ -868,8 +869,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
||||||
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
|
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
|
||||||
// | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
|
// | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
|
||||||
// | each column length |
|
// | each column length |
|
||||||
return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) +
|
return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) +
|
||||||
numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
|
sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||||
|
@ -1878,10 +1879,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf,
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(dumpBuf + len, size - len,
|
len += snprintf(dumpBuf + len, size - len,
|
||||||
"%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
"%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
|
||||||
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
|
"|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
|
||||||
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
|
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
|
||||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
||||||
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
|
|
||||||
|
@ -2078,7 +2079,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
|
||||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
} else {
|
} else {
|
||||||
SValue sv = {.type = pColInfoData->info.type};
|
SValue sv = {.type = pCol->type};
|
||||||
if (pCol->type == pColInfoData->info.type) {
|
if (pCol->type == pColInfoData->info.type) {
|
||||||
memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
|
memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2142,9 +2143,9 @@ _end:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){
|
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId) {
|
||||||
char tmp[TSDB_TABLE_NAME_LEN] = {0};
|
char tmp[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
|
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%" PRIu64, groupId);
|
||||||
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
|
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
|
||||||
strcat(ctbName, tmp);
|
strcat(ctbName, tmp);
|
||||||
}
|
}
|
||||||
|
@ -2200,7 +2201,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
||||||
int8_t type = TSDB_DATA_TYPE_UBIGINT;
|
int8_t type = TSDB_DATA_TYPE_UBIGINT;
|
||||||
const char* name = "group_id";
|
const char* name = "group_id";
|
||||||
int32_t len = strlen(name);
|
int32_t len = strlen(name);
|
||||||
SSmlKv pTag = { .key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
|
SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
|
||||||
taosArrayPush(tags, &pTag);
|
taosArrayPush(tags, &pTag);
|
||||||
|
|
||||||
RandTableName rname = {
|
RandTableName rname = {
|
||||||
|
@ -2582,7 +2583,7 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
||||||
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
|
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
|
||||||
}
|
}
|
||||||
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
|
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
|
||||||
int32_t rowIdx = 0, nextRowIdx = 1;
|
int32_t rowIdx = 0, nextRowIdx = 1;
|
||||||
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
|
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
|
||||||
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
|
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -1759,7 +1759,10 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writer->ctx->tbHasOldData) {
|
if (writer->ctx->tbHasOldData) {
|
||||||
code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */);
|
STsdbRowKey key;
|
||||||
|
|
||||||
|
tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &key);
|
||||||
|
code = tsdbDataFileDoWriteTableOldData(writer, &key);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue