Merge pull request #14671 from taosdata/feature/3_liaohj

enh(query): add block sma for int type column data.
This commit is contained in:
Haojun Liao 2022-07-08 16:23:13 +08:00 committed by GitHub
commit 1711d646c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 333 additions and 690 deletions

View File

@ -88,8 +88,6 @@ typedef struct {
#pragma pack(push, 1)
typedef struct SColumnDataAgg {
int16_t colId;
int16_t minIndex;
int16_t maxIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;

View File

@ -750,7 +750,6 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -763,7 +762,7 @@ TEST(testCase, agg_query_tables) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show table distributed st1");
pRes = taos_query(pConn, "show table distributed tup");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
@ -822,13 +821,29 @@ TEST(testCase, async_api_test) {
}
#endif
TEST(testCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create database, code:%s", taos_errstr(pRes));
taos_free_result(pRes);
return;
}
TAOS_RES* pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to use db, code:%s", taos_errstr(pRes));
taos_free_result(pRes);
return;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s", taos_errstr(pRes));
}
@ -836,11 +851,10 @@ TEST(testCase, update_test) {
taos_free_result(pRes);
char s[256] = {0};
for(int32_t i = 0; i < 7000; ++i) {
sprintf(s, "insert into tup values('2020-1-1 1:1:1', %d)", i);
for(int32_t i = 0; i < 17000; ++i) {
sprintf(s, "insert into tup values(now+%da, %d)", i, i);
pRes = taos_query(pConn, s);
taos_free_result(pRes);
}
}
#pragma GCC diagnostic pop

View File

@ -131,7 +131,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTabl
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);

File diff suppressed because it is too large Load Diff

View File

@ -1156,7 +1156,7 @@ _err:
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf) {
int32_t code = 0;
TdFilePtr pFD = pReader->pSmaFD;
int64_t offset = pBlock->aSubBlock[0].offset;
int64_t offset = pBlock->aSubBlock[0].sOffset;
int64_t size = pBlock->aSubBlock[0].nSma * sizeof(SColumnDataAgg) + sizeof(TSCKSUM);
uint8_t *pBuf = NULL;
int64_t n;
@ -1179,10 +1179,13 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(NULL, size)) {
if (!taosCheckChecksumWhole(*ppBuf, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}

View File

@ -1234,10 +1234,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break;
case TSDB_DATA_TYPE_SMALLINT:
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32;
if (pColAgg->min > colVal.value.i32) {
pColAgg->min = colVal.value.i32;
}
if (pColAgg->max < colVal.value.i32) {
pColAgg->max = colVal.value.i32;
}
break;
case TSDB_DATA_TYPE_BIGINT:
}
case TSDB_DATA_TYPE_BIGINT: {
pColAgg->sum += colVal.value.i64;
if (pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
}
if (pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
break;
case TSDB_DATA_TYPE_DOUBLE:

View File

@ -741,10 +741,10 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF
if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t v = pFuncParam->param.i;
*da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .maxIndex = 0, .minIndex = 0, .sum = v * numOfRows};
*da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double v = pFuncParam->param.d;
*da = (SColumnDataAgg){.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
*da = (SColumnDataAgg){.numOfNull = 0};
*(double*)&da->min = v;
*(double*)&da->max = v;
@ -752,7 +752,7 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF
} else if (type == TSDB_DATA_TYPE_BOOL) { // todo validate this data type
bool v = pFuncParam->param.i;
*da = (SColumnDataAgg){.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
*da = (SColumnDataAgg){.numOfNull = 0};
*(bool*)&da->min = 0;
*(bool*)&da->max = v;
*(bool*)&da->sum = v * numOfRows;
@ -1130,7 +1130,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
} else if ((*status) == BLK_DATA_SMA_LOAD) {
// this function never returns error?
pCost->loadBlockStatis += 1;
// tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
@ -1141,7 +1141,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
// load the data block statistics to perform further filter
pCost->loadBlockStatis += 1;
// tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
{ // set previous window

View File

@ -210,7 +210,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -2349,7 +2352,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg);
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

View File

@ -1080,6 +1080,19 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
// the data is loaded, not only the block SMA value
for(int32_t i = start; i < num + start; ++i) {
char* p = colDataGetData(pCol, i);
if (memcpy((void*)tval, p, pCol->info.bytes) == 0) {
return i;
}
}
ASSERT(0);
}
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int32_t numOfElems = 0;
@ -1111,15 +1124,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (isMinFunc) {
tval = &pInput->pColumnDataAgg[0]->min;
index = pInput->pColumnDataAgg[0]->minIndex;
} else {
tval = &pInput->pColumnDataAgg[0]->max;
index = pInput->pColumnDataAgg[0]->maxIndex;
}
if (!pBuf->assign) {
pBuf->v = *(int64_t*)tval;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
} else {
@ -1131,6 +1143,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
}
@ -1143,6 +1156,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
}
@ -1154,6 +1168,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
}
@ -1167,6 +1182,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
}
@ -5547,30 +5563,18 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
}
int32_t delta = maxVal - minVal;
int32_t step = delta / 50;
if (step == 0) {
step = 1;
}
// maximum number of step is 80
double factor = pData->numOfBlocks / 80.0;
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets;
bool singleModel = false;
if (bucketRange == 0) {
singleModel = true;
step = 20;
bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
}
int32_t bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * i);
int32_t num = 0;
if (singleModel && pData->blockRowsHisto[i] > 0) {
num = 20;
} else {
num = (pData->blockRowsHisto[i] + step - 1) / step;
if (pData->blockRowsHisto[i] > 0) {
num = (pData->blockRowsHisto[i]) / factor;
}
for (int32_t j = 0; j < num; ++j) {
@ -5578,9 +5582,10 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
len += x;
}
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
printf("%s\n", st);
if (num > 0) {
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
}
varDataSetLen(st, len);
colDataAppend(pColInfo, row++, st, false);

@ -1 +1 @@
Subproject commit 389047db713a3dddfbce292c3260b0864b17d936
Subproject commit c885e967e490105999b84d009a15168728dfafaf