enh(query): add block sma for int type column data.
This commit is contained in:
parent
5db0f7b1d8
commit
4d2bc796e7
|
@ -826,9 +826,24 @@ TEST(testCase, update_test) {
|
|||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
taos_query(pConn, "use abc1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create database, code:%s", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
return;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to use db, code:%s", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
return;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tup (ts timestamp, k int);");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table, reason:%s", taos_errstr(pRes));
|
||||
}
|
||||
|
@ -836,8 +851,8 @@ TEST(testCase, update_test) {
|
|||
taos_free_result(pRes);
|
||||
|
||||
char s[256] = {0};
|
||||
for(int32_t i = 0; i < 7000; ++i) {
|
||||
sprintf(s, "insert into tup values('2020-1-1 1:1:1', %d)", i);
|
||||
for(int32_t i = 0; i < 17000; ++i) {
|
||||
sprintf(s, "insert into tup values(now+%da, %d)", i, i);
|
||||
pRes = taos_query(pConn, s);
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct SBlockOrderSupporter {
|
|||
|
||||
typedef struct SIOCostSummary {
|
||||
int64_t blockLoadTime;
|
||||
int64_t statisInfoLoadTime;
|
||||
int64_t smaLoadTime;
|
||||
int64_t checkForNextTime;
|
||||
int64_t headFileLoad;
|
||||
int64_t headFileLoadTime;
|
||||
|
@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo {
|
|||
} SBlockLoadSuppInfo;
|
||||
|
||||
typedef struct SFilesetIter {
|
||||
int32_t numOfFiles; // number of total files
|
||||
int32_t index; // current accessed index in the list
|
||||
SArray* pFileList; // data file list
|
||||
int32_t order;
|
||||
int32_t numOfFiles; // number of total files
|
||||
int32_t index; // current accessed index in the list
|
||||
SArray* pFileList; // data file list
|
||||
int32_t order;
|
||||
} SFilesetIter;
|
||||
|
||||
typedef struct SFileDataBlockInfo {
|
||||
|
@ -122,20 +122,6 @@ struct STsdbReader {
|
|||
STSchema* pSchema;
|
||||
SDataFReader* pFileReader;
|
||||
SVersionRange verRange;
|
||||
#if 0
|
||||
SArray* prev; // previous row which is before than time window
|
||||
SArray* next; // next row which is after the query time window
|
||||
SFileBlockInfo* pDataBlockInfo;
|
||||
SDataCols* pDataCols; // in order to hold current file data block
|
||||
int32_t allocSize; // allocated data block size
|
||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
||||
// SDFileSet* pFileGroup;
|
||||
// SFSIter fileIter;
|
||||
// SReadH rhelper;
|
||||
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
|
||||
#endif
|
||||
};
|
||||
|
||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
||||
|
@ -247,33 +233,6 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
|
|||
return win;
|
||||
}
|
||||
|
||||
// todo remove this
|
||||
static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
|
||||
// pReader->window = pCond->twindows[tWinIdx];
|
||||
|
||||
// bool updateTs = false;
|
||||
// int64_t startTs = updateQueryTimeWindow(pReader->pTsdb);
|
||||
// if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||
// if (startTs > pReader->window.skey) {
|
||||
// pReader->window.skey = startTs;
|
||||
// pCond->twindows[tWinIdx].skey = startTs;
|
||||
// updateTs = true;
|
||||
// }
|
||||
// } else {
|
||||
// if (startTs > pReader->window.ekey) {
|
||||
// pReader->window.ekey = startTs;
|
||||
// pCond->twindows[tWinIdx].ekey = startTs;
|
||||
// updateTs = true;
|
||||
// }
|
||||
// }
|
||||
|
||||
// if (updateTs) {
|
||||
// tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
|
||||
// pReader, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pReader->window.skey,
|
||||
// pReader->window.ekey, pReader->idStr);
|
||||
// }
|
||||
}
|
||||
|
||||
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
|
||||
int32_t rowLen = 0;
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
|
@ -399,8 +358,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
|||
pReader->type = pCond->type;
|
||||
pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows);
|
||||
|
||||
// todo remove this
|
||||
setQueryTimewindow(pReader, pCond, 0);
|
||||
ASSERT(pCond->numOfCols > 0);
|
||||
|
||||
limitOutputBufferSize(pCond, &pReader->capacity);
|
||||
|
@ -914,200 +871,6 @@ _error:
|
|||
|
||||
// return midPos;
|
||||
// }
|
||||
// static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
|
||||
// STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema*
|
||||
// pSchema2, bool update, TSKEY* lastRowKey) {
|
||||
// #if 1
|
||||
// STSchema* pSchema;
|
||||
// STSRow* row;
|
||||
// int16_t colId;
|
||||
// int16_t offset;
|
||||
|
||||
// bool isRow1DataRow = TD_IS_TP_ROW(row1);
|
||||
// bool isRow2DataRow;
|
||||
// bool isChosenRowDataRow;
|
||||
// int32_t chosen_itr;
|
||||
// SCellVal sVal = {0};
|
||||
// TSKEY rowKey = TSKEY_INITIAL_VAL;
|
||||
// int32_t nResult = 0;
|
||||
// int32_t mergeOption = 0; // 0 discard 1 overwrite 2 merge
|
||||
|
||||
// // the schema version info is embeded in STSRow
|
||||
// int32_t numOfColsOfRow1 = 0;
|
||||
|
||||
// if (pSchema1 == NULL) {
|
||||
// pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
||||
// }
|
||||
|
||||
// #ifdef TD_DEBUG_PRINT_ROW
|
||||
// char flags[70] = {0};
|
||||
// STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo;
|
||||
// snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode),
|
||||
// pTsdb->dir, row1 ? "!" : "", row2 ? "!" : "");
|
||||
// tdSRowPrint(row1, pSchema1, flags);
|
||||
// #endif
|
||||
|
||||
// if (isRow1DataRow) {
|
||||
// numOfColsOfRow1 = schemaNCols(pSchema1);
|
||||
// } else {
|
||||
// numOfColsOfRow1 = tdRowGetNCols(row1);
|
||||
// }
|
||||
|
||||
// int32_t numOfColsOfRow2 = 0;
|
||||
// if (row2) {
|
||||
// isRow2DataRow = TD_IS_TP_ROW(row2);
|
||||
// if (pSchema2 == NULL) {
|
||||
// pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
|
||||
// }
|
||||
// if (isRow2DataRow) {
|
||||
// numOfColsOfRow2 = schemaNCols(pSchema2);
|
||||
// } else {
|
||||
// numOfColsOfRow2 = tdRowGetNCols(row2);
|
||||
// }
|
||||
// }
|
||||
|
||||
// int32_t i = 0, j = 0, k = 0;
|
||||
// while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
|
||||
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
||||
|
||||
// int32_t colIdOfRow1;
|
||||
// if (j >= numOfColsOfRow1) {
|
||||
// colIdOfRow1 = INT32_MAX;
|
||||
// } else if (isRow1DataRow) {
|
||||
// colIdOfRow1 = pSchema1->columns[j].colId;
|
||||
// } else {
|
||||
// colIdOfRow1 = tdKvRowColIdAt(row1, j);
|
||||
// }
|
||||
|
||||
// int32_t colIdOfRow2;
|
||||
// if (k >= numOfColsOfRow2) {
|
||||
// colIdOfRow2 = INT32_MAX;
|
||||
// } else if (isRow2DataRow) {
|
||||
// colIdOfRow2 = pSchema2->columns[k].colId;
|
||||
// } else {
|
||||
// colIdOfRow2 = tdKvRowColIdAt(row2, k);
|
||||
// }
|
||||
|
||||
// if (colIdOfRow1 < colIdOfRow2) { // the most probability
|
||||
// if (colIdOfRow1 < pColInfo->info.colId) {
|
||||
// ++j;
|
||||
// continue;
|
||||
// }
|
||||
// row = row1;
|
||||
// pSchema = pSchema1;
|
||||
// isChosenRowDataRow = isRow1DataRow;
|
||||
// chosen_itr = j;
|
||||
// } else if (colIdOfRow1 == colIdOfRow2) {
|
||||
// if (colIdOfRow1 < pColInfo->info.colId) {
|
||||
// ++j;
|
||||
// ++k;
|
||||
// continue;
|
||||
// }
|
||||
// row = row1;
|
||||
// pSchema = pSchema1;
|
||||
// isChosenRowDataRow = isRow1DataRow;
|
||||
// chosen_itr = j;
|
||||
// } else {
|
||||
// if (colIdOfRow2 < pColInfo->info.colId) {
|
||||
// ++k;
|
||||
// continue;
|
||||
// }
|
||||
// row = row2;
|
||||
// pSchema = pSchema2;
|
||||
// chosen_itr = k;
|
||||
// isChosenRowDataRow = isRow2DataRow;
|
||||
// }
|
||||
|
||||
// if (isChosenRowDataRow) {
|
||||
// colId = pSchema->columns[chosen_itr].colId;
|
||||
// offset = pSchema->columns[chosen_itr].offset;
|
||||
// // TODO: use STSRowIter
|
||||
// tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
|
||||
// if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
// rowKey = *(TSKEY*)sVal.val;
|
||||
// if (rowKey != *lastRowKey) {
|
||||
// mergeOption = 1;
|
||||
// if (*lastRowKey != TSKEY_INITIAL_VAL) {
|
||||
// ++(*curRow);
|
||||
// }
|
||||
// *lastRowKey = rowKey;
|
||||
// ++nResult;
|
||||
// } else if (update) {
|
||||
// mergeOption = 2;
|
||||
// } else {
|
||||
// mergeOption = 0;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// // TODO: use STSRowIter
|
||||
// if (chosen_itr == 0) {
|
||||
// colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
// tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
|
||||
// rowKey = *(TSKEY*)sVal.val;
|
||||
// if (rowKey != *lastRowKey) {
|
||||
// mergeOption = 1;
|
||||
// if (*lastRowKey != TSKEY_INITIAL_VAL) {
|
||||
// ++(*curRow);
|
||||
// }
|
||||
// *lastRowKey = rowKey;
|
||||
// ++nResult;
|
||||
// } else if (update) {
|
||||
// mergeOption = 2;
|
||||
// } else {
|
||||
// mergeOption = 0;
|
||||
// break;
|
||||
// }
|
||||
// } else {
|
||||
// SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
|
||||
// colId = pColIdx->colId;
|
||||
// offset = pColIdx->offset;
|
||||
// tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
|
||||
// }
|
||||
// }
|
||||
|
||||
// ASSERT(rowKey != TSKEY_INITIAL_VAL);
|
||||
|
||||
// if (colId == pColInfo->info.colId) {
|
||||
// if (tdValTypeIsNorm(sVal.valType)) {
|
||||
// colDataAppend(pColInfo, *curRow, sVal.val, false);
|
||||
// } else if (tdValTypeIsNull(sVal.valType)) {
|
||||
// colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
// } else if (tdValTypeIsNone(sVal.valType)) {
|
||||
// // TODO: Set null if nothing append for this row
|
||||
// if (mergeOption == 1) {
|
||||
// colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
// }
|
||||
// } else {
|
||||
// ASSERT(0);
|
||||
// }
|
||||
|
||||
// ++i;
|
||||
|
||||
// if (row == row1) {
|
||||
// ++j;
|
||||
// } else {
|
||||
// ++k;
|
||||
// }
|
||||
// } else {
|
||||
// if (mergeOption == 1) {
|
||||
// colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
// }
|
||||
// ++i;
|
||||
// }
|
||||
// }
|
||||
|
||||
// if (mergeOption == 1) {
|
||||
// while (i < numOfCols) { // the remain columns are all null data
|
||||
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
||||
// colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
// ++i;
|
||||
// }
|
||||
// }
|
||||
|
||||
// return nResult;
|
||||
// #endif
|
||||
// }
|
||||
|
||||
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
|
||||
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||
|
@ -1387,66 +1150,6 @@ _error:
|
|||
// pTsdbReadHandle->idStr);
|
||||
// }
|
||||
|
||||
// int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||
// int firstPos, lastPos, midPos = -1;
|
||||
// int numOfRows;
|
||||
// TSKEY* keyList;
|
||||
|
||||
// if (num <= 0) return -1;
|
||||
|
||||
// keyList = (TSKEY*)pValue;
|
||||
// firstPos = 0;
|
||||
// lastPos = num - 1;
|
||||
|
||||
// if (order == TSDB_ORDER_DESC) {
|
||||
// // find the first position which is smaller than the key
|
||||
// while (1) {
|
||||
// if (key >= keyList[lastPos]) return lastPos;
|
||||
// if (key == keyList[firstPos]) return firstPos;
|
||||
// if (key < keyList[firstPos]) return firstPos - 1;
|
||||
|
||||
// numOfRows = lastPos - firstPos + 1;
|
||||
// midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
// if (key < keyList[midPos]) {
|
||||
// lastPos = midPos - 1;
|
||||
// } else if (key > keyList[midPos]) {
|
||||
// firstPos = midPos + 1;
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
// } else {
|
||||
// // find the first position which is bigger than the key
|
||||
// while (1) {
|
||||
// if (key <= keyList[firstPos]) return firstPos;
|
||||
// if (key == keyList[lastPos]) return lastPos;
|
||||
|
||||
// if (key > keyList[lastPos]) {
|
||||
// lastPos = lastPos + 1;
|
||||
// if (lastPos >= num)
|
||||
// return -1;
|
||||
// else
|
||||
// return lastPos;
|
||||
// }
|
||||
|
||||
// numOfRows = lastPos - firstPos + 1;
|
||||
// midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
// if (key < keyList[midPos]) {
|
||||
// lastPos = midPos - 1;
|
||||
// } else if (key > keyList[midPos]) {
|
||||
// firstPos = midPos + 1;
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// return midPos;
|
||||
// }
|
||||
|
||||
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
|
||||
taosMemoryFreeClear(pSup->numOfBlocksPerTable);
|
||||
taosMemoryFreeClear(pSup->indexPerTable);
|
||||
|
@ -2882,162 +2585,6 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// static void destroyHelper(void* param) {
|
||||
// if (param == NULL) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
// // tQueryInfo* pInfo = (tQueryInfo*)param;
|
||||
// // if (pInfo->optr != TSDB_RELATION_IN) {
|
||||
// // taosMemoryFreeClear(pInfo->q);
|
||||
// // } else {
|
||||
// // taosHashCleanup((SHashObj *)(pInfo->q));
|
||||
// // }
|
||||
|
||||
// taosMemoryFree(param);
|
||||
// }
|
||||
|
||||
// #define TSDB_PREV_ROW 0x1
|
||||
// #define TSDB_NEXT_ROW 0x2
|
||||
|
||||
// static bool loadBlockOfActiveTable(STsdbReader* pTsdbReadHandle) {
|
||||
// if (pTsdbReadHandle->checkFiles) {
|
||||
// // check if the query range overlaps with the file data block
|
||||
// bool exists = true;
|
||||
|
||||
// int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// pTsdbReadHandle->checkFiles = false;
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// if (exists) {
|
||||
// tsdbRetrieveDataBlock((STsdbReader**)pTsdbReadHandle, NULL);
|
||||
// if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
|
||||
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
|
||||
// assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
|
||||
// }
|
||||
|
||||
// pTsdbReadHandle->currentLoadExternalRows = false; // clear the flag, since the exact matched row is found.
|
||||
// return exists;
|
||||
// }
|
||||
|
||||
// pTsdbReadHandle->checkFiles = false;
|
||||
// }
|
||||
|
||||
// if (hasMoreDataInCache(pTsdbReadHandle)) {
|
||||
// pTsdbReadHandle->currentLoadExternalRows = false;
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// // current result is empty
|
||||
// if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
|
||||
// pTsdbReadHandle->cur.rows == 0) {
|
||||
// // SMemTable* pMemRef = pTsdbReadHandle->pMemTable;
|
||||
|
||||
// // doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
|
||||
// // doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
|
||||
|
||||
// bool result = tsdbGetExternalRow(pTsdbReadHandle);
|
||||
|
||||
// // pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
|
||||
// // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
|
||||
// pTsdbReadHandle->currentLoadExternalRows = false;
|
||||
|
||||
// return result;
|
||||
// }
|
||||
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) {
|
||||
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||
// assert(numOfTables > 0);
|
||||
|
||||
// int64_t stime = taosGetTimestampUs();
|
||||
|
||||
// while (pTsdbReadHandle->activeIndex < numOfTables) {
|
||||
// if (loadBlockOfActiveTable(pTsdbReadHandle)) {
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
|
||||
// pCheckInfo->numOfBlocks = 0;
|
||||
|
||||
// pTsdbReadHandle->activeIndex += 1;
|
||||
// pTsdbReadHandle->locateStart = false;
|
||||
// pTsdbReadHandle->checkFiles = true;
|
||||
// pTsdbReadHandle->cur.rows = 0;
|
||||
// pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
|
||||
|
||||
// terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
// int64_t elapsedTime = taosGetTimestampUs() - stime;
|
||||
// pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
|
||||
// }
|
||||
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// bool tsdbGetExternalRow(STsdbReader* pHandle) {
|
||||
// STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
|
||||
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||
|
||||
// cur->fid = INT32_MIN;
|
||||
// cur->mixBlock = true;
|
||||
// if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
|
||||
// cur->rows = 0;
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
|
||||
// for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
// SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
||||
// SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
|
||||
|
||||
// memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);
|
||||
|
||||
// SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
|
||||
// memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
|
||||
|
||||
// if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
// cur->win.skey = *(TSKEY*)pColInfoData->pData;
|
||||
// cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
|
||||
// }
|
||||
// }
|
||||
|
||||
// cur->rows = 2;
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
|
||||
// if (pColumnInfoData == NULL) {
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
// size_t cols = taosArrayGetSize(pColumnInfoData);
|
||||
// for (int32_t i = 0; i < cols; ++i) {
|
||||
// SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
|
||||
// colDataDestroy(pColInfo);
|
||||
// }
|
||||
|
||||
// taosArrayDestroy(pColumnInfoData);
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
|
||||
// size_t size = taosArrayGetSize(pTableCheckInfo);
|
||||
// for (int32_t i = 0; i < size; ++i) {
|
||||
// STableBlockScanInfo* p = taosArrayGet(pTableCheckInfo, i);
|
||||
// destroyTableMemIterator(p);
|
||||
|
||||
// taosMemoryFreeClear(p->pCompInfo);
|
||||
// }
|
||||
|
||||
// taosArrayDestroy(pTableCheckInfo);
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
// ====================================== EXPOSED APIs ======================================
|
||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
||||
const char* idstr) {
|
||||
|
@ -3128,7 +2675,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
|
||||
tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
|
||||
" us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime,
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaLoadTime, pCost->blockLoadTime,
|
||||
pCost->checkForNextTime, pReader->idStr);
|
||||
|
||||
taosMemoryFree(pReader->idStr);
|
||||
|
@ -3185,31 +2732,31 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
|
|||
int32_t code = 0;
|
||||
*allHave = false;
|
||||
|
||||
// there is no statistics data for composed block
|
||||
if (pReader->status.composedDataBlock) {
|
||||
*pBlockStatis = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
|
||||
// assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
// // file block with sub-blocks has no statistics data
|
||||
// if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
|
||||
// *pBlockStatis = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
|
||||
// int64_t stime = taosGetTimestampUs();
|
||||
// int statisStatus = tsdbLoadBlockStatis(&pReader->rhelper, pBlockInfo->compBlock);
|
||||
// if (statisStatus < TSDB_STATIS_OK) {
|
||||
// return terrno;
|
||||
// } else if (statisStatus > TSDB_STATIS_OK) {
|
||||
// *pBlockStatis = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
if (tBlockHasSma(pBlock)) {
|
||||
SArray* pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
|
||||
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pColAgg, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64", code:%s, %s", 0, pFBlock->uid,
|
||||
tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pReader->pTsdb),
|
||||
// TSDB_READ_TABLE_UID(&pReader->rhelper));
|
||||
int64_t el = taosGetTimestampUs() - stime;
|
||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64", elapsed time:%"PRId64"us, %s", 0, pFBlock->uid,
|
||||
el, pReader->idStr);
|
||||
|
||||
// int16_t* colIds = pReader->suppInfo.defaultLoadColumn->pData;
|
||||
|
||||
|
@ -3224,34 +2771,36 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
|
|||
// *allHave = true;
|
||||
// tsdbGetBlockStatis(&pReader->rhelper, pReader->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
|
||||
|
||||
// // always load the first primary timestamp column data
|
||||
// SColumnDataAgg* pPrimaryColStatis = &pReader->suppInfo.pstatis[0];
|
||||
// assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
// always load the first primary timestamp column data
|
||||
SColumnDataAgg* pTsAgg = &pReader->suppInfo.pstatis[0];
|
||||
assert(pTsAgg->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
|
||||
// pPrimaryColStatis->numOfNull = 0;
|
||||
// pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
|
||||
// pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
|
||||
// pReader->suppInfo.plist[0] = &pReader->suppInfo.pstatis[0];
|
||||
pTsAgg->numOfNull = 0;
|
||||
pTsAgg->min = pReader->pResBlock->info.window.skey;
|
||||
pTsAgg->max = pReader->pResBlock->info.window.ekey;
|
||||
pReader->suppInfo.plist[0] = &pReader->suppInfo.pstatis[0];
|
||||
|
||||
// // update the number of NULL data rows
|
||||
// int32_t* slotIds = pReader->suppInfo.slotIds;
|
||||
// for (int32_t i = 1; i < numOfCols; ++i) {
|
||||
// ASSERT(colIds[i] == pReader->pSchema->columns[slotIds[i]].colId);
|
||||
// if (IS_BSMA_ON(&(pReader->pSchema->columns[slotIds[i]]))) {
|
||||
// if (pReader->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL
|
||||
// pReader->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||
// }
|
||||
// update the number of NULL data rows
|
||||
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
|
||||
int32_t* slotIds = pReader->suppInfo.slotIds;
|
||||
|
||||
// pReader->suppInfo.plist[i] = &pReader->suppInfo.pstatis[i];
|
||||
// } else {
|
||||
// *allHave = false;
|
||||
// }
|
||||
// }
|
||||
for (int32_t i = 1; i < numOfCols; ++i) {
|
||||
// ASSERT(colIds[i] == pReader->pSchema->columns[slotIds[i]].colId);
|
||||
if (IS_BSMA_ON(&(pReader->pSchema->columns[slotIds[i]]))) {
|
||||
if (pReader->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL
|
||||
// pReader->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||
}
|
||||
|
||||
// int64_t elapsed = taosGetTimestampUs() - stime;
|
||||
// pReader->cost.statisInfoLoadTime += elapsed;
|
||||
pReader->suppInfo.plist[i] = &pReader->suppInfo.pstatis[i];
|
||||
} else {
|
||||
*allHave = false;
|
||||
}
|
||||
}
|
||||
|
||||
// *pBlockStatis = pReader->suppInfo.plist;
|
||||
int64_t elapsed = taosGetTimestampUs() - stime;
|
||||
pReader->cost.smaLoadTime += elapsed;
|
||||
|
||||
*pBlockStatis = pReader->suppInfo.plist;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3286,8 +2835,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
setQueryTimewindow(pReader, pCond, tWinIdx);
|
||||
|
||||
pReader->order = pCond->order;
|
||||
pReader->type = BLOCK_LOAD_OFFSET_ORDER;
|
||||
pReader->status.loadFromFile = true;
|
||||
|
@ -3324,114 +2871,72 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
|
||||
return (numOfRows - startRow) / bucketRange;
|
||||
}
|
||||
|
||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
|
||||
int32_t code = 0;
|
||||
// pTableBlockInfo->totalSize = 0;
|
||||
// pTableBlockInfo->totalRows = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pTableBlockInfo->totalSize = 0;
|
||||
pTableBlockInfo->totalRows = 0;
|
||||
|
||||
// STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb);
|
||||
// find the start data block in file
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
// // find the start data block in file
|
||||
// pReader->locateStart = true;
|
||||
// STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb);
|
||||
// int32_t fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision);
|
||||
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
|
||||
pTableBlockInfo->defMinRows = pc->minRows;
|
||||
pTableBlockInfo->defMaxRows = pc->maxRows;
|
||||
|
||||
// tsdbRLockFS(pFileHandle);
|
||||
// tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order);
|
||||
// tsdbFSIterSeek(&pReader->fileIter, fid);
|
||||
// tsdbUnLockFS(pFileHandle);
|
||||
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
|
||||
|
||||
// STsdbCfg* pc = REPO_CFG(pReader->pTsdb);
|
||||
// pTableBlockInfo->defMinRows = pc->minRows;
|
||||
// pTableBlockInfo->defMaxRows = pc->maxRows;
|
||||
pTableBlockInfo->numOfFiles += 1;
|
||||
|
||||
// int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
|
||||
int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
|
||||
int defaultRows = 4096;
|
||||
|
||||
// pTableBlockInfo->numOfFiles += 1;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
|
||||
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
||||
|
||||
// int32_t code = TSDB_CODE_SUCCESS;
|
||||
// int32_t numOfBlocks = 0;
|
||||
// int32_t numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo);
|
||||
// int defaultRows = 4096;
|
||||
// STimeWindow win = TSWINDOW_INITIALIZER;
|
||||
pTableBlockInfo->numOfTables = numOfTables;
|
||||
|
||||
// while (true) {
|
||||
// numOfBlocks = 0;
|
||||
// tsdbRLockFS(REPO_FS(pReader->pTsdb));
|
||||
while (true) {
|
||||
bool hasNext = blockIteratorNext(&pStatus->blockIter);
|
||||
if (hasNext) {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
// if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) {
|
||||
// tsdbUnLockFS(REPO_FS(pReader->pTsdb));
|
||||
// break;
|
||||
// }
|
||||
int32_t numOfRows = pBlock->nRow;
|
||||
pTableBlockInfo->totalRows += numOfRows;
|
||||
|
||||
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey);
|
||||
if (numOfRows > pTableBlockInfo->maxRows) {
|
||||
pTableBlockInfo->maxRows = numOfRows;
|
||||
}
|
||||
|
||||
// // current file are not overlapped with query time window, ignore remain files
|
||||
// if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
|
||||
// tsdbUnLockFS(REPO_FS(pReader->pTsdb));
|
||||
// tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
|
||||
// pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||
// pReader->pFileGroup = NULL;
|
||||
// break;
|
||||
// }
|
||||
if (numOfRows < pTableBlockInfo->minRows) {
|
||||
pTableBlockInfo->minRows = numOfRows;
|
||||
}
|
||||
|
||||
// pTableBlockInfo->numOfFiles += 1;
|
||||
// if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) {
|
||||
// tsdbUnLockFS(REPO_FS(pReader->pTsdb));
|
||||
// code = terrno;
|
||||
// break;
|
||||
// }
|
||||
if (numOfRows < defaultRows) {
|
||||
pTableBlockInfo->numOfSmallBlocks += 1;
|
||||
}
|
||||
|
||||
// tsdbUnLockFS(REPO_FS(pReader->pTsdb));
|
||||
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
|
||||
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) {
|
||||
// code = terrno;
|
||||
// break;
|
||||
// }
|
||||
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
||||
}
|
||||
|
||||
// if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
|
||||
// break;
|
||||
// }
|
||||
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
|
||||
// pReader->pFileGroup->fid, pReader->idStr);
|
||||
}
|
||||
|
||||
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
|
||||
// pReader->pFileGroup->fid, pReader->idStr);
|
||||
|
||||
// if (numOfBlocks == 0) {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// pTableBlockInfo->numOfBlocks += numOfBlocks;
|
||||
|
||||
// for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
|
||||
|
||||
// SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
|
||||
|
||||
// for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
|
||||
// pTableBlockInfo->totalSize += pBlock[j].len;
|
||||
|
||||
// int32_t numOfRows = pBlock[j].numOfRows;
|
||||
// pTableBlockInfo->totalRows += numOfRows;
|
||||
|
||||
// if (numOfRows > pTableBlockInfo->maxRows) {
|
||||
// pTableBlockInfo->maxRows = numOfRows;
|
||||
// }
|
||||
|
||||
// if (numOfRows < pTableBlockInfo->minRows) {
|
||||
// pTableBlockInfo->minRows = numOfRows;
|
||||
// }
|
||||
|
||||
// if (numOfRows < defaultRows) {
|
||||
// pTableBlockInfo->numOfSmallBlocks += 1;
|
||||
// }
|
||||
|
||||
// int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
|
||||
// pTableBlockInfo->blockRowsHisto[bucketIndex]++;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// pTableBlockInfo->numOfTables = numOfTables;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1230,10 +1230,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
|
|||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
pColAgg->sum += colVal.value.i32;
|
||||
if (pColAgg->min > colVal.value.i32) {
|
||||
pColAgg->min = colVal.value.i32;
|
||||
}
|
||||
if (pColAgg->max < colVal.value.i32) {
|
||||
pColAgg->max = colVal.value.i32;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
pColAgg->sum += colVal.value.i64;
|
||||
if (pColAgg->min > colVal.value.i64) {
|
||||
pColAgg->min = colVal.value.i64;
|
||||
}
|
||||
if (pColAgg->max < colVal.value.i64) {
|
||||
pColAgg->max = colVal.value.i64;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
|
|
Loading…
Reference in New Issue