Merge pull request #12092 from taosdata/feature/TD-14481-3.0
feat: rollup sma data query
This commit is contained in:
commit
a5b39499f7
|
@ -186,6 +186,7 @@ struct STsdbFS {
|
|||
|
||||
#define REPO_ID(r) TD_VID((r)->pVnode)
|
||||
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
|
||||
#define REPO_LEVEL(r) ((r)->level)
|
||||
#define REPO_FS(r) ((r)->fs)
|
||||
#define REPO_META(r) ((r)->pVnode->pMeta)
|
||||
#define REPO_TFS(r) ((r)->pVnode->pTfs)
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
extern const char *TSDB_LEVEL_DNAME[];
|
||||
|
||||
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
|
||||
static const char *tsdbTxnFname[] = {"current.t", "current"};
|
||||
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
|
||||
|
@ -35,12 +37,12 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
|
|||
// static int tsdbProcessExpiredFS(STsdb *pRepo);
|
||||
// static int tsdbCreateMeta(STsdb *pRepo);
|
||||
|
||||
static void tsdbGetRootDir(int repoid, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
|
||||
static void tsdbGetRootDir(int repoid, int8_t level, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, TSDB_LEVEL_DNAME[level]);
|
||||
}
|
||||
|
||||
static void tsdbGetDataDir(int repoid, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
|
||||
static void tsdbGetDataDir(int repoid, int8_t level, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, TSDB_LEVEL_DNAME[level]);
|
||||
}
|
||||
|
||||
// For backward compatibility
|
||||
|
@ -588,8 +590,8 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
|
|||
}
|
||||
|
||||
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
|
||||
tsdbTxnFname[ftype]);
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/%s/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
|
||||
TSDB_LEVEL_DNAME[REPO_LEVEL(pRepo)], tsdbTxnFname[ftype]);
|
||||
}
|
||||
|
||||
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
|
||||
|
@ -719,7 +721,7 @@ static int tsdbScanRootDir(STsdb *pRepo) {
|
|||
STsdbFS *pfs = REPO_FS(pRepo);
|
||||
const STfsFile *pf;
|
||||
|
||||
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
|
||||
tsdbGetRootDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), rootDir);
|
||||
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir);
|
||||
if (tdir == NULL) {
|
||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
|
||||
|
@ -753,7 +755,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
|
|||
STsdbFS *pfs = REPO_FS(pRepo);
|
||||
const STfsFile *pf;
|
||||
|
||||
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
|
||||
tsdbGetDataDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), dataDir);
|
||||
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
|
||||
if (tdir == NULL) {
|
||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
|
||||
|
@ -801,7 +803,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
|
|||
regex_t regex;
|
||||
STsdbFS *pfs = REPO_FS(pRepo);
|
||||
|
||||
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
|
||||
tsdbGetDataDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), dataDir);
|
||||
|
||||
// Resource allocation and init
|
||||
regcomp(®ex, pattern, REG_EXTENDED);
|
||||
|
|
|
@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
|
|||
"rsma", // TSDB_FILE_RSMA
|
||||
};
|
||||
|
||||
static const char *TSDB_LEVEL_DNAME[] = {
|
||||
const char *TSDB_LEVEL_DNAME[] = {
|
||||
"tsdb",
|
||||
"rsma1",
|
||||
"rsma2",
|
||||
|
|
|
@ -98,46 +98,46 @@ typedef struct SIOCostSummary {
|
|||
} SIOCostSummary;
|
||||
|
||||
typedef struct SBlockLoadSuppInfo {
|
||||
SColumnDataAgg *pstatis;
|
||||
SColumnDataAgg **plist;
|
||||
SArray *defaultLoadColumn; // default load column
|
||||
int32_t *slotIds; // colId to slotId
|
||||
SColumnDataAgg* pstatis;
|
||||
SColumnDataAgg** plist;
|
||||
SArray* defaultLoadColumn; // default load column
|
||||
int32_t* slotIds; // colId to slotId
|
||||
} SBlockLoadSuppInfo;
|
||||
|
||||
typedef struct STsdbReadHandle {
|
||||
STsdb* pTsdb;
|
||||
SQueryFilePos cur; // current position
|
||||
int16_t order;
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
|
||||
int32_t numOfBlocks;
|
||||
SArray* pColumns; // column list, SColumnInfoData array list
|
||||
bool locateStart;
|
||||
int32_t outputCapacity;
|
||||
int32_t realNumOfRows;
|
||||
SArray* pTableCheckInfo; // SArray<STableCheckInfo>
|
||||
int32_t activeIndex;
|
||||
bool checkFiles; // check file stage
|
||||
int8_t cachelastrow; // check if last row cached
|
||||
bool loadExternalRow; // load time window external data rows
|
||||
bool currentLoadExternalRows; // current load external rows
|
||||
int32_t loadType; // block load type
|
||||
char* idStr; // query info handle, for debug purpose
|
||||
STsdb* pTsdb;
|
||||
SQueryFilePos cur; // current position
|
||||
int16_t order;
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
|
||||
int32_t numOfBlocks;
|
||||
SArray* pColumns; // column list, SColumnInfoData array list
|
||||
bool locateStart;
|
||||
int32_t outputCapacity;
|
||||
int32_t realNumOfRows;
|
||||
SArray* pTableCheckInfo; // SArray<STableCheckInfo>
|
||||
int32_t activeIndex;
|
||||
bool checkFiles; // check file stage
|
||||
int8_t cachelastrow; // check if last row cached
|
||||
bool loadExternalRow; // load time window external data rows
|
||||
bool currentLoadExternalRows; // current load external rows
|
||||
int32_t loadType; // block load type
|
||||
char* idStr; // query info handle, for debug purpose
|
||||
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
|
||||
SDFileSet* pFileGroup;
|
||||
SFSIter fileIter;
|
||||
SReadH rhelper;
|
||||
STableBlockInfo* pDataBlockInfo;
|
||||
SDataCols* pDataCols; // in order to hold current file data block
|
||||
int32_t allocSize; // allocated data block size
|
||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
||||
SDataCols* pDataCols; // in order to hold current file data block
|
||||
int32_t allocSize; // allocated data block size
|
||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
||||
SBlockLoadSuppInfo suppInfo;
|
||||
SArray* prev; // previous row which is before than time window
|
||||
SArray* next; // next row which is after the query time window
|
||||
SIOCostSummary cost;
|
||||
STSchema* pSchema;
|
||||
SArray* prev; // previous row which is before than time window
|
||||
SArray* next; // next row which is after the query time window
|
||||
SIOCostSummary cost;
|
||||
STSchema* pSchema;
|
||||
} STsdbReadHandle;
|
||||
|
||||
typedef struct STableGroupSupporter {
|
||||
|
@ -164,6 +164,8 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
|
|||
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
|
||||
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
|
||||
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions);
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
pBlockLoadInfo->slot = -1;
|
||||
pBlockLoadInfo->uid = 0;
|
||||
|
@ -350,12 +352,38 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
|
|||
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
int nQUERY = 0;
|
||||
#endif
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
|
||||
if (vnodeIsRollup(pVnode)) {
|
||||
// for(int32_t i=0; i< TSDB_; ) {
|
||||
|
||||
// }
|
||||
int level = 0;
|
||||
#if 1
|
||||
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
|
||||
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
SRetention* pRetention = retentions + i;
|
||||
if (pRetention->keep <= 0 || (now - pRetention->keep) >= winSKey) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#if 0
|
||||
++nQUERY;
|
||||
if(nQUERY%3 == 0) {
|
||||
level = 2;
|
||||
} else if(nQUERY%2 == 0) {
|
||||
level = 1;
|
||||
} else {
|
||||
level = 0;
|
||||
}
|
||||
#endif
|
||||
if (level == TSDB_RETENTION_L0) {
|
||||
return VND_RSMA0(pVnode);
|
||||
} else if (level == TSDB_RETENTION_L1) {
|
||||
return VND_RSMA1(pVnode);
|
||||
} else {
|
||||
return VND_RSMA2(pVnode);
|
||||
}
|
||||
}
|
||||
return pVnode->pTsdb;
|
||||
}
|
||||
|
@ -420,8 +448,10 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
|||
}
|
||||
|
||||
pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
||||
pReadHandle->suppInfo.slotIds = taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
|
||||
pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
|
||||
pReadHandle->suppInfo.slotIds =
|
||||
taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
|
||||
pReadHandle->suppInfo.plist =
|
||||
taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
|
||||
}
|
||||
|
||||
pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
|
||||
|
@ -444,7 +474,6 @@ _end:
|
|||
|
||||
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||
uint64_t taskId) {
|
||||
|
||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
return NULL;
|
||||
|
@ -462,16 +491,16 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STableCheckInfo *pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
|
||||
|
||||
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
|
||||
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
|
||||
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
|
||||
int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
|
||||
|
||||
STSchema* pSchema = pTsdbReadHandle->pSchema;
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while(i < numOfCols && j < pSchema->numOfCols) {
|
||||
while (i < numOfCols && j < pSchema->numOfCols) {
|
||||
if (ids[i] == pSchema->columns[j].colId) {
|
||||
pTsdbReadHandle->suppInfo.slotIds[i] = j;
|
||||
i++;
|
||||
|
@ -1137,7 +1166,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
int32_t slotIndex) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
|
||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
|
|
@ -229,10 +229,28 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
if (tsdbCommit(pVnode->pTsdb) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
|
||||
if(vnodeIsRollup(pVnode)) {
|
||||
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbCommit(pVnode->pTsdb) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (tqCommit(pVnode->pTq) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue