Merge branch 'hotfix/TD-1976' into hotfix/TD-2518
This commit is contained in:
commit
bd57fd013a
|
@ -1176,13 +1176,15 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
|
|||
}
|
||||
|
||||
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
|
||||
int32_t numOfCols, STable* pTable) {
|
||||
int32_t numOfCols, STable* pTable, STSchema* pSchema) {
|
||||
char* pData = NULL;
|
||||
|
||||
// the schema version info is embeded in SDataRow
|
||||
STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
int32_t numOfRowCols = schemaNCols(pSchema);
|
||||
|
||||
if (pSchema == NULL) {
|
||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
}
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while(i < numOfCols && j < numOfRowCols) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
|
@ -1199,10 +1201,38 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
|
|||
|
||||
if (pSchema->columns[j].colId == pColInfo->info.colId) {
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
|
||||
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
memcpy(pData, value, varDataTLen(value));
|
||||
} else {
|
||||
memcpy(pData, value, pColInfo->info.bytes);
|
||||
switch (pColInfo->info.type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
memcpy(pData, value, varDataTLen(value));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NULL:
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
*(uint8_t *)pData = *(uint8_t *)value;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
*(uint16_t *)pData = *(uint16_t *)value;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
*(uint32_t *)pData = *(uint32_t *)value;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
*(uint64_t *)pData = *(uint64_t *)value;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
SET_FLOAT_PTR(pData, value);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
SET_DOUBLE_PTR(pData, value);
|
||||
break;
|
||||
default:
|
||||
memcpy(pData, value, pColInfo->info.bytes);
|
||||
}
|
||||
|
||||
j++;
|
||||
|
@ -1401,6 +1431,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
int16_t rv = -1;
|
||||
STSchema* pSchema = NULL;
|
||||
|
||||
int32_t pos = cur->pos;
|
||||
cur->win = TSWINDOW_INITIALIZER;
|
||||
|
||||
|
@ -1429,7 +1462,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
|
||||
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
|
||||
if (rv != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
rv = dataRowVersion(row);
|
||||
}
|
||||
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
|
@ -1442,7 +1480,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
moveToNextRowInMem(pCheckInfo);
|
||||
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
|
||||
if (pCfg->update) {
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
|
||||
if (rv != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
rv = dataRowVersion(row);
|
||||
}
|
||||
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
|
@ -1987,6 +2030,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
int16_t rv = -1;
|
||||
STSchema* pSchema = NULL;
|
||||
|
||||
do {
|
||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
|
||||
|
@ -2007,7 +2052,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
}
|
||||
|
||||
win->ekey = key;
|
||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
|
||||
if (rv != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
rv = dataRowVersion(row);
|
||||
}
|
||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema);
|
||||
|
||||
if (++numOfRows >= maxRowsToRead) {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
|
@ -2090,7 +2139,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
return false;
|
||||
}
|
||||
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
|
||||
tfree(pRow);
|
||||
|
||||
// update the last key value
|
||||
|
@ -2164,7 +2213,7 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
|
|||
return false;
|
||||
}
|
||||
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL);
|
||||
tfree(pRow);
|
||||
|
||||
// update the last key value
|
||||
|
|
Loading…
Reference in New Issue