Merge pull request #5011 from taosdata/feature/TD-1979
add block dist info
This commit is contained in:
commit
424224abcd
|
@ -41,7 +41,7 @@
|
|||
#define COLUMN_INDEX_INITIAL_VAL (-3)
|
||||
#define COLUMN_INDEX_INITIALIZER \
|
||||
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
|
||||
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
|
||||
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_BLOCK_DIST_COLUMN_INDEX))
|
||||
#define TBNAME_LIST_SEP ","
|
||||
|
||||
typedef struct SColumnList { // todo refactor
|
||||
|
@ -1731,6 +1731,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
|||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
SSchema colSchema = tGetTableNameColumnSchema();
|
||||
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG);
|
||||
} else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
SSchema colSchema = tGetBlockDistColumnSchema();
|
||||
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
@ -2407,6 +2410,14 @@ static bool isTablenameToken(SStrToken* token) {
|
|||
|
||||
return (strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_TBNAME_L));
|
||||
}
|
||||
static bool isTableBlockDistToken(SStrToken* token) {
|
||||
SStrToken tmpToken = *token;
|
||||
SStrToken tableToken = {0};
|
||||
|
||||
extractTableNameFromToken(&tmpToken, &tableToken);
|
||||
|
||||
return (strncasecmp(TSQL_BLOCK_DIST, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_BLOCK_DIST_L));
|
||||
}
|
||||
|
||||
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
|
||||
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, index)->pTableMeta;
|
||||
|
@ -2436,6 +2447,8 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
|
|||
|
||||
if (isTablenameToken(pToken)) {
|
||||
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
|
||||
} else if (isTableBlockDistToken(pToken)) {
|
||||
pIndex->columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX;
|
||||
} else if (strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
|
||||
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
} else {
|
||||
|
@ -2676,8 +2689,7 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
if (!validateIpAddress(pDnodeIp->z, pDnodeIp->n)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1087,6 +1087,8 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
|
|||
// set the correct columnIndex index
|
||||
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
|
||||
} else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
pExpr->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
|
||||
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
|
||||
pExpr->colInfo.colId = pColIndex->columnIndex;
|
||||
} else {
|
||||
|
@ -1503,7 +1505,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
|
|||
return false;
|
||||
}
|
||||
|
||||
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
|
||||
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId == TSDB_BLOCK_DIST_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -60,6 +60,8 @@ void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
|
|||
|
||||
SSchema tGetTableNameColumnSchema();
|
||||
|
||||
SSchema tGetBlockDistColumnSchema();
|
||||
|
||||
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
|
||||
|
||||
bool tscValidateTableNameLength(size_t len);
|
||||
|
|
|
@ -39,6 +39,14 @@ SSchema tGetTableNameColumnSchema() {
|
|||
tstrncpy(s.name, TSQL_TBNAME_L, TSDB_COL_NAME_LEN);
|
||||
return s;
|
||||
}
|
||||
SSchema tGetBlockDistColumnSchema() {
|
||||
SSchema s = {0};
|
||||
s.bytes = TSDB_MAX_BINARY_LEN;;
|
||||
s.type = TSDB_DATA_TYPE_BINARY;
|
||||
s.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
|
||||
tstrncpy(s.name, TSQL_BLOCK_DIST_L, TSDB_COL_NAME_LEN);
|
||||
return s;
|
||||
}
|
||||
|
||||
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name) {
|
||||
SSchema s = {0};
|
||||
|
|
|
@ -235,7 +235,9 @@ do { \
|
|||
#define TSDB_MAX_REPLICA 5
|
||||
|
||||
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
||||
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
|
||||
#define TSDB_UD_COLUMN_INDEX (-100)
|
||||
|
||||
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
||||
|
||||
#define TSDB_MIN_CACHE_BLOCK_SIZE 1
|
||||
|
|
|
@ -234,13 +234,30 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
|
|||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
|
||||
void *qinfo, SMemRef* pRef);
|
||||
|
||||
|
||||
/**
|
||||
* move to next block if exists
|
||||
* get num of rows in mem table
|
||||
*
|
||||
* @param pHandle
|
||||
* @return row size
|
||||
*/
|
||||
|
||||
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
|
||||
|
||||
/**
|
||||
* move to next block if exists
|
||||
*
|
||||
* @param pQueryHandle
|
||||
* @return
|
||||
*/
|
||||
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
|
||||
/**
|
||||
* move to next block if exists but not merge data in memtable
|
||||
*
|
||||
* @param pQueryHandle
|
||||
* @return
|
||||
*/
|
||||
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle);
|
||||
|
||||
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type);
|
||||
|
||||
|
|
|
@ -195,6 +195,7 @@ typedef struct SQueryRuntimeEnv {
|
|||
bool hasTagResults; // if there are tag values in final result or not
|
||||
bool timeWindowInterpo;// if the time window start/end required interpolation
|
||||
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
|
||||
bool queryBlockDist; // if query data block distribution
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
int32_t prevGroupId; // previous executed group id
|
||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "queryLog.h"
|
||||
#include "tlosertree.h"
|
||||
#include "ttype.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1)
|
||||
|
||||
|
@ -90,6 +91,13 @@ typedef struct {
|
|||
STSCursor cur;
|
||||
} SQueryStatusInfo;
|
||||
|
||||
typedef struct {
|
||||
SArray *dataBlockInfos;
|
||||
int64_t firstSeekTimeUs;
|
||||
int64_t numOfRowsInMemTable;
|
||||
char *result;
|
||||
} STableBlockDist;
|
||||
|
||||
#if 0
|
||||
static UNUSED_FUNC void *u_malloc (size_t __size) {
|
||||
uint32_t v = rand();
|
||||
|
@ -1925,6 +1933,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|||
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor
|
||||
SSchema s = tGetTableNameColumnSchema();
|
||||
|
||||
pCtx->inputBytes = s.bytes;
|
||||
pCtx->inputType = s.type;
|
||||
} else if (pIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
SSchema s = tGetBlockDistColumnSchema();
|
||||
pCtx->inputBytes = s.bytes;
|
||||
pCtx->inputType = s.type;
|
||||
} else {
|
||||
|
@ -4453,7 +4465,59 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
|
|||
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
|
||||
}
|
||||
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
|
||||
if (pTableBlockDist != NULL) {
|
||||
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
|
||||
free(pTableBlockDist->result);
|
||||
free(pTableBlockDist);
|
||||
}
|
||||
}
|
||||
static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
|
||||
int32_t len = (int32_t)taosArrayGetSize(pArray);
|
||||
if (len <= 0) {
|
||||
return 0;
|
||||
}
|
||||
assert(rate >= 0 && rate <= 1.0);
|
||||
int idx = (int32_t)((len - 1) * rate);
|
||||
return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows;
|
||||
}
|
||||
static int compareBlockInfo(const void *pLeft, const void *pRight) {
|
||||
int32_t left = ((SDataBlockInfo *)pLeft)->rows;
|
||||
int32_t right = ((SDataBlockInfo *)pRight)->rows;
|
||||
if (left > right) return 1;
|
||||
if (left < right) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
|
||||
if (pTableBlockDist == NULL) {
|
||||
return;
|
||||
}
|
||||
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
|
||||
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
|
||||
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
|
||||
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
|
||||
SDataBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
|
||||
int64_t rows = blockInfo->rows;
|
||||
min = MIN(min, rows);
|
||||
max = MAX(max, rows);
|
||||
totalRows += rows;
|
||||
}
|
||||
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
|
||||
|
||||
taosArraySort(blockInfos, compareBlockInfo);
|
||||
|
||||
int sz = sprintf(pTableBlockDist->result,
|
||||
"summery: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]",
|
||||
getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50),
|
||||
getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99),
|
||||
min, max, avg,
|
||||
totalRows, totalBlocks,
|
||||
pTableBlockDist->firstSeekTimeUs,
|
||||
pTableBlockDist->numOfRowsInMemTable);
|
||||
UNUSED(sz);
|
||||
return;
|
||||
}
|
||||
void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -5906,6 +5970,58 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
|
||||
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
|
||||
}
|
||||
static void buildTableBlockDistResult(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
pQuery->pos = 0;
|
||||
|
||||
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
|
||||
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo));
|
||||
pTableBlockDist->result = (char *)malloc(512);
|
||||
|
||||
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
SSchema blockDistSchema = tGetBlockDistColumnSchema();
|
||||
|
||||
int64_t startTime = taosGetTimestampUs();
|
||||
while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) {
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
freeTableBlockDist(pTableBlockDist);
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
if (pTableBlockDist->firstSeekTimeUs == 0) {
|
||||
pTableBlockDist->firstSeekTimeUs = taosGetTimestampUs() - startTime;
|
||||
}
|
||||
|
||||
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
|
||||
taosArrayPush(pTableBlockDist->dataBlockInfos, &blockInfo);
|
||||
}
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
freeTableBlockDist(pTableBlockDist);
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
|
||||
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
|
||||
|
||||
generateBlockDistResult(pTableBlockDist);
|
||||
|
||||
int type = -1;
|
||||
assert(pQuery->numOfOutput == 1);
|
||||
SExprInfo* pExprInfo = pQuery->pExpr1;
|
||||
for (int32_t j = 0; j < pQuery->numOfOutput; j++) {
|
||||
if (pExprInfo[j].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
type = blockDistSchema.type;
|
||||
}
|
||||
assert(type == TSDB_DATA_TYPE_BINARY);
|
||||
STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result));
|
||||
}
|
||||
|
||||
freeTableBlockDist(pTableBlockDist);
|
||||
|
||||
pQuery->rec.rows = 1;
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
return;
|
||||
}
|
||||
|
||||
static void stableQueryImpl(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
@ -5933,7 +6049,10 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE
|
|||
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
|
||||
if (pExprMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
return TSDB_TBNAME_COLUMN_INDEX;
|
||||
} else if (pExprMsg->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
return TSDB_BLOCK_DIST_COLUMN_INDEX;
|
||||
}
|
||||
|
||||
|
||||
while(j < pQueryMsg->numOfTags) {
|
||||
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
|
||||
|
@ -6390,6 +6509,10 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
|
|||
SSchema s = tGetTableNameColumnSchema();
|
||||
type = s.type;
|
||||
bytes = s.bytes;
|
||||
} else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
SSchema s = tGetBlockDistColumnSchema();
|
||||
type = s.type;
|
||||
bytes = s.bytes;
|
||||
} else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) {
|
||||
// it is a user-defined constant value column
|
||||
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
|
||||
|
@ -6403,7 +6526,7 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
|
|||
} else {
|
||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||
if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
|
||||
if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
|
||||
if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
} else {
|
||||
|
@ -6573,7 +6696,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
|
||||
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX || pColIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6776,9 +6899,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
index += 1;
|
||||
}
|
||||
}
|
||||
|
||||
colIdCheck(pQuery);
|
||||
|
||||
pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
|
||||
|
||||
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
|
||||
return pQInfo;
|
||||
|
||||
|
@ -7289,6 +7413,8 @@ bool qTableQuery(qinfo_t qinfo) {
|
|||
buildTagQueryResult(pQInfo);
|
||||
} else if (pQInfo->runtimeEnv.stableQuery) {
|
||||
stableQueryImpl(pQInfo);
|
||||
} else if (pQInfo->runtimeEnv.queryBlockDist){
|
||||
buildTableBlockDistResult(pQInfo);
|
||||
} else {
|
||||
tableQueryImpl(pQInfo);
|
||||
}
|
||||
|
|
|
@ -213,6 +213,36 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
|
|||
pQueryHandle->pMemRef = NULL;
|
||||
}
|
||||
|
||||
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
|
||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
||||
|
||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
assert(pQueryHandle->activeIndex < size && pQueryHandle->activeIndex >= 0 && size >= 1);
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
|
||||
|
||||
|
||||
int64_t rows = 0;
|
||||
SMemRef* pMemRef = pQueryHandle->pMemRef;
|
||||
if (pMemRef == NULL) { return rows; }
|
||||
|
||||
STableData* pMem = NULL;
|
||||
STableData* pIMem = NULL;
|
||||
|
||||
SMemTable *pMemT = (SMemTable *)(pMemRef->mem);
|
||||
SMemTable *pIMemT = (SMemTable *)(pMemRef->imem);
|
||||
|
||||
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
|
||||
pMem = pMemT->tData[pCheckInfo->tableId.tid];
|
||||
rows += (pMem && pMem->uid == pCheckInfo->tableId.uid) ? pMem->numOfRows: 0;
|
||||
}
|
||||
if (pIMemT && pCheckInfo->tableId.tid < pIMemT->maxTables) {
|
||||
pIMem = pIMemT->tData[pCheckInfo->tableId.tid];
|
||||
rows += (pIMem && pIMem->uid == pCheckInfo->tableId.uid) ? pIMem->numOfRows: 0;
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
|
||||
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
||||
assert(sizeOfGroup >= 1 && pMeta != NULL);
|
||||
|
@ -2109,6 +2139,85 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
|
||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
||||
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
int64_t elapsedTime = stime;
|
||||
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
assert(numOfTables > 0);
|
||||
|
||||
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
|
||||
SMemRef* pMemRef = pQueryHandle->pMemRef;
|
||||
tsdbMayTakeMemSnapshot(pQueryHandle);
|
||||
bool ret = getNeighborRows(pQueryHandle);
|
||||
tsdbMayUnTakeMemSnapshot(pQueryHandle);
|
||||
|
||||
// restore the pMemRef
|
||||
pQueryHandle->pMemRef = pMemRef;
|
||||
return ret;
|
||||
} else if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
|
||||
// the last row is cached in buffer, return it directly.
|
||||
// here note that the pQueryHandle->window must be the TS_INITIALIZER
|
||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
|
||||
SDataRow pRow = NULL;
|
||||
TSKEY key = TSKEY_INITIAL_VAL;
|
||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
||||
|
||||
if (++pQueryHandle->activeIndex < numOfTables) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
|
||||
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
|
||||
tfree(pRow);
|
||||
|
||||
// update the last key value
|
||||
pCheckInfo->lastKey = key + step;
|
||||
|
||||
cur->rows = 1; // only one row
|
||||
cur->lastKey = key + step;
|
||||
cur->mixBlock = true;
|
||||
cur->win.skey = key;
|
||||
cur->win.ekey = key;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pQueryHandle->checkFiles) {
|
||||
// check if the query range overlaps with the file data block
|
||||
bool exists = true;
|
||||
|
||||
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pQueryHandle->activeIndex = 0;
|
||||
pQueryHandle->checkFiles = false;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (exists) {
|
||||
pQueryHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
|
||||
return exists;
|
||||
}
|
||||
|
||||
pQueryHandle->activeIndex = 0;
|
||||
pQueryHandle->checkFiles = false;
|
||||
}
|
||||
|
||||
elapsedTime = taosGetTimestampUs() - stime;
|
||||
pQueryHandle->cost.checkForNextTime += elapsedTime;
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SMemRef* pMemRef) {
|
||||
STsdbQueryHandle* pSecQueryHandle = NULL;
|
||||
|
||||
|
|
|
@ -27,6 +27,9 @@ extern "C" {
|
|||
#define TSQL_TBNAME "TBNAME"
|
||||
#define TSQL_TBNAME_L "tbname"
|
||||
|
||||
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
|
||||
#define TSQL_BLOCK_DIST_L "_block_dist"
|
||||
|
||||
// used to denote the minimum unite in sql parsing
|
||||
typedef struct SStrToken {
|
||||
uint32_t n;
|
||||
|
|
Loading…
Reference in New Issue