fix(query): retrieve in-memory buffer data.
This commit is contained in:
parent
c638c33a76
commit
6a32f81265
|
@ -92,8 +92,8 @@ typedef struct SIOCostSummary {
|
||||||
typedef struct SBlockLoadSuppInfo {
|
typedef struct SBlockLoadSuppInfo {
|
||||||
SColumnDataAgg* pstatis;
|
SColumnDataAgg* pstatis;
|
||||||
SColumnDataAgg** plist;
|
SColumnDataAgg** plist;
|
||||||
int16_t* colIdList; // default load column
|
|
||||||
int32_t* slotIds; // colId to slotId
|
int32_t* slotIds; // colId to slotId
|
||||||
|
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
||||||
} SBlockLoadSuppInfo;
|
} SBlockLoadSuppInfo;
|
||||||
|
|
||||||
typedef struct SFileSetIter {
|
typedef struct SFileSetIter {
|
||||||
|
@ -189,16 +189,34 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||||
// pCompBlockLoadInfo->fileId = -1;
|
// pCompBlockLoadInfo->fileId = -1;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
static int32_t setColumnIdList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
||||||
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
pReader->suppInfo.colIdList = taosMemoryCalloc(numOfCols, sizeof(int16_t));
|
pSupInfo->slotIds = taosMemoryCalloc(numOfCols, sizeof(int16_t));
|
||||||
if (pReader->suppInfo.colIdList == NULL) {
|
if (pSupInfo->slotIds == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
if (pSupInfo->buildBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema* pSchema = pReader->pSchema;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
pReader->suppInfo.colIdList[i] = pCol->info.colId;
|
|
||||||
|
for (int32_t j = 0; j < pSchema->numOfCols; ++j) {
|
||||||
|
if (pCol->info.colId == pSchema->columns[j].colId) {
|
||||||
|
pSupInfo->slotIds[i] = j;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
|
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -426,8 +444,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity);
|
blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity);
|
||||||
|
|
||||||
setColumnIdList(pReader, pReader->pResBlock);
|
setColumnIdSlotList(pReader, pReader->pResBlock);
|
||||||
pReader->suppInfo.slotIds = taosMemoryCalloc(pCond->numOfCols, sizeof(int32_t));
|
|
||||||
pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2504,7 +2521,6 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo extract method: getCurrentKeyInBuf()
|
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||||
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||||
|
@ -2522,7 +2538,10 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
||||||
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
|
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
|
||||||
// build data block from in-memory buffer data completed.
|
// build data block from in-memory buffer data completed.
|
||||||
} else { // whole block is required, return it directly
|
} else { // whole block is required, return it directly
|
||||||
// todo check the data version
|
// todo
|
||||||
|
// 1. the version of all rows should be less than the endVersion
|
||||||
|
// 2. current block should not overlap with next neighbor block
|
||||||
|
// 3. current timestamp should not be overlap with each other
|
||||||
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
||||||
pInfo->rows = pBlock->nRow;
|
pInfo->rows = pBlock->nRow;
|
||||||
pInfo->uid = pScanInfo->uid;
|
pInfo->uid = pScanInfo->uid;
|
||||||
|
@ -2740,12 +2759,30 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
SColVal colVal = {0};
|
SColVal colVal = {0};
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
int32_t slotId = pSupInfo->slotIds[i];
|
||||||
|
|
||||||
tTSRowGetVal(pTSRow, pReader->pSchema, pColInfoData->info.colId, &colVal);
|
if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && slotId == 0) {
|
||||||
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull);
|
colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false);
|
||||||
|
} else {
|
||||||
|
tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(colVal.type)) {
|
||||||
|
if (colVal.isNull) {
|
||||||
|
colDataAppendNULL(pColInfoData, numOfRows);
|
||||||
|
} else {
|
||||||
|
varDataSetLen(pSupInfo->buildBuf[i], colVal.value.nData);
|
||||||
|
memcpy(varDataVal(pSupInfo->buildBuf[i]), colVal.value.pData, colVal.value.nData);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, pSupInfo->buildBuf[i], false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
|
@ -3135,7 +3172,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// current table is exhausted, let's try the next table
|
// current table is exhausted, let's try the next table
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, &pStatus->pTableIter);
|
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||||
if (pStatus->pTableIter == NULL) {
|
if (pStatus->pTableIter == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue