Merge pull request #18205 from taosdata/fix/sma_for_null
fix: sma for null
This commit is contained in:
commit
a186cc4491
|
@ -88,6 +88,7 @@ typedef struct SBlockLoadSuppInfo {
|
||||||
int16_t* colIds; // column ids for loading file block data
|
int16_t* colIds; // column ids for loading file block data
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
||||||
|
bool smaValid; // the sma on all queried columns are activated
|
||||||
} SBlockLoadSuppInfo;
|
} SBlockLoadSuppInfo;
|
||||||
|
|
||||||
typedef struct SLastBlockReader {
|
typedef struct SLastBlockReader {
|
||||||
|
@ -213,11 +214,10 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
|
||||||
|
|
||||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||||
|
|
||||||
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SSDataBlock* pBlock) {
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
|
||||||
|
|
||||||
size_t numOfCols = blockDataGetNumOfCols(pBlock);
|
size_t numOfCols = blockDataGetNumOfCols(pBlock);
|
||||||
|
|
||||||
|
pSupInfo->smaValid = true;
|
||||||
pSupInfo->numOfCols = numOfCols;
|
pSupInfo->numOfCols = numOfCols;
|
||||||
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
|
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
|
||||||
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
@ -239,6 +239,28 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
|
||||||
|
int32_t i = 0, j = 0;
|
||||||
|
|
||||||
|
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
|
||||||
|
STColumn* pTCol = &pSchema->columns[i];
|
||||||
|
if (pTCol->colId == pSupInfo->colIds[j]) {
|
||||||
|
if (!IS_BSMA_ON(pTCol)) {
|
||||||
|
pSupInfo->smaValid = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
i += 1;
|
||||||
|
j += 1;
|
||||||
|
} else if (pTCol->colId < pSupInfo->colIds[j]) {
|
||||||
|
// do nothing
|
||||||
|
i += 1;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||||
int32_t num = numOfTables / pBuf->numPerBucket;
|
int32_t num = numOfTables / pBuf->numPerBucket;
|
||||||
int32_t remainder = numOfTables % pBuf->numPerBucket;
|
int32_t remainder = numOfTables % pBuf->numPerBucket;
|
||||||
|
@ -580,7 +602,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
|
pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
|
||||||
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
||||||
if (pSup->pColAgg == NULL || pSup->plist == NULL) {
|
if (pSup->pColAgg == NULL || pSup->plist == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -601,7 +623,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
setColumnIdSlotList(pReader, pReader->pResBlock);
|
setColumnIdSlotList(&pReader->suppInfo, pReader->pResBlock);
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
return code;
|
return code;
|
||||||
|
@ -3763,6 +3785,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pReader->pSchema != NULL) {
|
||||||
|
updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
|
||||||
|
}
|
||||||
|
|
||||||
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
|
STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
|
||||||
|
|
||||||
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
|
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
|
||||||
|
@ -4020,7 +4046,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
}
|
}
|
||||||
|
|
||||||
// there is no statistics data for composed block
|
// there is no statistics data for composed block
|
||||||
if (pReader->status.composedDataBlock) {
|
if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
|
||||||
*pBlockStatis = NULL;
|
*pBlockStatis = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -4060,7 +4086,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
|
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
size_t size = taosArrayGetSize(pSup->pColAgg);
|
size_t size = taosArrayGetSize(pSup->pColAgg);
|
||||||
|
#if 0
|
||||||
while (j < numOfCols && i < size) {
|
while (j < numOfCols && i < size) {
|
||||||
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
|
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
|
||||||
if (pAgg->colId == pSup->colIds[j]) {
|
if (pAgg->colId == pSup->colIds[j]) {
|
||||||
|
@ -4068,6 +4094,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
pSup->plist[j] = pAgg;
|
pSup->plist[j] = pAgg;
|
||||||
} else {
|
} else {
|
||||||
*allHave = false;
|
*allHave = false;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
i += 1;
|
i += 1;
|
||||||
j += 1;
|
j += 1;
|
||||||
|
@ -4077,12 +4104,43 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
j += 1;
|
j += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
|
||||||
|
// fill the all null data column
|
||||||
|
SArray* pNewAggList = taosArrayInit(numOfCols, sizeof(SColumnDataAgg));
|
||||||
|
|
||||||
|
while (j < numOfCols && i < size) {
|
||||||
|
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
|
||||||
|
if (pAgg->colId == pSup->colIds[j]) {
|
||||||
|
taosArrayPush(pNewAggList, pAgg);
|
||||||
|
i += 1;
|
||||||
|
j += 1;
|
||||||
|
} else if (pAgg->colId < pSup->colIds[j]) {
|
||||||
|
i += 1;
|
||||||
|
} else if (pSup->colIds[j] < pAgg->colId) {
|
||||||
|
// all date in this block are null
|
||||||
|
SColumnDataAgg nullColAgg = {.colId = pSup->colIds[j], .numOfNull = pBlock->nRow};
|
||||||
|
taosArrayPush(pNewAggList, &nullColAgg);
|
||||||
|
j += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayClear(pSup->pColAgg);
|
||||||
|
taosArrayAddAll(pSup->pColAgg, pNewAggList);
|
||||||
|
|
||||||
|
size_t num = taosArrayGetSize(pSup->pColAgg);
|
||||||
|
for(int32_t k = 0; k < num; ++k) {
|
||||||
|
pSup->plist[k] = taosArrayGet(pSup->pColAgg, k);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pNewAggList);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
pReader->cost.smaDataLoad += 1;
|
pReader->cost.smaDataLoad += 1;
|
||||||
*pBlockStatis = pSup->plist;
|
*pBlockStatis = pSup->plist;
|
||||||
|
|
||||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
|
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -517,7 +517,7 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
|
||||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
||||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
||||||
|
|
||||||
if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
|
if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type) || ((pColData->flag & HAS_VALUE) == 0)) continue;
|
||||||
|
|
||||||
SColumnDataAgg sma = {.colId = pColData->cid};
|
SColumnDataAgg sma = {.colId = pColData->cid};
|
||||||
tColDataCalcSMA[pColData->type](pColData, &sma.sum, &sma.max, &sma.min, &sma.numOfNull);
|
tColDataCalcSMA[pColData->type](pColData, &sma.sum, &sma.max, &sma.min, &sma.numOfNull);
|
||||||
|
|
Loading…
Reference in New Issue