commit
5b3d0de310
|
@ -3978,7 +3978,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
|
||||||
if (isNull(data1, srcType) || isNull(data2, srcType)) {
|
if (isNull(data1, srcType) || isNull(data2, srcType)) {
|
||||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||||
} else {
|
} else {
|
||||||
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
|
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
|
||||||
}
|
}
|
||||||
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
|
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
|
||||||
point1.val = data1;
|
point1.val = data1;
|
||||||
|
@ -3987,7 +3987,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
|
||||||
if (isNull(data1, srcType) || isNull(data2, srcType)) {
|
if (isNull(data1, srcType) || isNull(data2, srcType)) {
|
||||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||||
} else {
|
} else {
|
||||||
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
|
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -2758,6 +2758,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
||||||
const char* msg1 = "too many columns in group by clause";
|
const char* msg1 = "too many columns in group by clause";
|
||||||
const char* msg2 = "invalid column name in group by clause";
|
const char* msg2 = "invalid column name in group by clause";
|
||||||
const char* msg3 = "columns from one table allowed as group by columns";
|
const char* msg3 = "columns from one table allowed as group by columns";
|
||||||
|
const char* msg4 = "join query does not support group by";
|
||||||
const char* msg7 = "not support group by expression";
|
const char* msg7 = "not support group by expression";
|
||||||
const char* msg8 = "not allowed column type for group by";
|
const char* msg8 = "not allowed column type for group by";
|
||||||
const char* msg9 = "tags not allowed for table query";
|
const char* msg9 = "tags not allowed for table query";
|
||||||
|
@ -2778,6 +2779,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pQueryInfo->numOfTables > 1) {
|
||||||
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||||
|
}
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
SSchema* pSchema = NULL;
|
SSchema* pSchema = NULL;
|
||||||
SSchema s = tscGetTbnameColumnSchema();
|
SSchema s = tscGetTbnameColumnSchema();
|
||||||
|
|
|
@ -882,7 +882,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
if (pQueryInfo->tsBuf != NULL) {
|
if (pQueryInfo->tsBuf != NULL) {
|
||||||
// note: here used the index instead of actual vnode id.
|
// note: here used the index instead of actual vnode id.
|
||||||
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
|
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
|
||||||
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
|
int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,8 +156,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
||||||
win->ekey = elem1.ts;
|
win->ekey = elem1.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
|
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
|
||||||
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
|
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
|
||||||
} else {
|
} else {
|
||||||
pLimit->offset -= 1;//offset apply to projection?
|
pLimit->offset -= 1;//offset apply to projection?
|
||||||
}
|
}
|
||||||
|
@ -193,8 +193,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
||||||
TSKEY et = taosGetTimestampUs();
|
TSKEY et = taosGetTimestampUs();
|
||||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||||
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey,
|
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
|
||||||
tsBufGetNumOfVnodes(output1), et - st);
|
tsBufGetNumOfGroup(output1), et - st);
|
||||||
|
|
||||||
return output1->numOfTotal;
|
return output1->numOfTotal;
|
||||||
}
|
}
|
||||||
|
@ -282,7 +282,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
|
||||||
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
|
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int32_t* list = NULL;
|
int32_t* list = NULL;
|
||||||
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
|
tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
|
||||||
|
|
||||||
// The virtual node, of which all tables are disqualified after the timestamp intersection,
|
// The virtual node, of which all tables are disqualified after the timestamp intersection,
|
||||||
// is removed to avoid next stage query.
|
// is removed to avoid next stage query.
|
||||||
|
@ -314,7 +314,7 @@ static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
|
||||||
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
|
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int32_t* list = NULL;
|
int32_t* list = NULL;
|
||||||
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
|
tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
|
||||||
|
|
||||||
size_t numOfGroups = taosArrayGetSize(pVgroupTables);
|
size_t numOfGroups = taosArrayGetSize(pVgroupTables);
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,6 @@ typedef struct SFillInfo {
|
||||||
int32_t numOfTags; // number of tags
|
int32_t numOfTags; // number of tags
|
||||||
int32_t numOfCols; // number of columns, including the tags columns
|
int32_t numOfCols; // number of columns, including the tags columns
|
||||||
int32_t rowSize; // size of each row
|
int32_t rowSize; // size of each row
|
||||||
// char ** pTags; // tags value for current interpolation
|
|
||||||
SFillTagColInfo* pTags; // tags value for filling gap
|
SFillTagColInfo* pTags; // tags value for filling gap
|
||||||
SInterval interval;
|
SInterval interval;
|
||||||
char * prevValues; // previous row of data, to generate the interpolation results
|
char * prevValues; // previous row of data, to generate the interpolation results
|
||||||
|
@ -83,7 +82,7 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRo
|
||||||
|
|
||||||
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
|
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
|
||||||
|
|
||||||
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
|
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
|
||||||
|
|
||||||
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
|
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ extern "C" {
|
||||||
|
|
||||||
#define MEM_BUF_SIZE (1 << 20)
|
#define MEM_BUF_SIZE (1 << 20)
|
||||||
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
|
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
|
||||||
#define TS_COMP_FILE_VNODE_MAX 512
|
#define TS_COMP_FILE_GROUP_MAX 512
|
||||||
|
|
||||||
typedef struct STSList {
|
typedef struct STSList {
|
||||||
char* rawBuf;
|
char* rawBuf;
|
||||||
|
@ -38,7 +38,7 @@ typedef struct STSList {
|
||||||
typedef struct STSElem {
|
typedef struct STSElem {
|
||||||
TSKEY ts;
|
TSKEY ts;
|
||||||
tVariant* tag;
|
tVariant* tag;
|
||||||
int32_t vnode;
|
int32_t id;
|
||||||
} STSElem;
|
} STSElem;
|
||||||
|
|
||||||
typedef struct STSCursor {
|
typedef struct STSCursor {
|
||||||
|
@ -60,17 +60,17 @@ typedef struct STSBlock {
|
||||||
* The size of buffer file should not be greater than 2G,
|
* The size of buffer file should not be greater than 2G,
|
||||||
* and the offset of int32_t type is enough
|
* and the offset of int32_t type is enough
|
||||||
*/
|
*/
|
||||||
typedef struct STSVnodeBlockInfo {
|
typedef struct STSGroupBlockInfo {
|
||||||
int32_t vnode; // vnode id
|
int32_t id; // group id
|
||||||
int32_t offset; // offset set value in file
|
int32_t offset; // offset set value in file
|
||||||
int32_t numOfBlocks; // number of total blocks
|
int32_t numOfBlocks; // number of total blocks
|
||||||
int32_t compLen; // compressed size
|
int32_t compLen; // compressed size
|
||||||
} STSVnodeBlockInfo;
|
} STSGroupBlockInfo;
|
||||||
|
|
||||||
typedef struct STSVnodeBlockInfoEx {
|
typedef struct STSGroupBlockInfoEx {
|
||||||
STSVnodeBlockInfo info;
|
STSGroupBlockInfo info;
|
||||||
int32_t len; // length before compress
|
int32_t len; // length before compress
|
||||||
} STSVnodeBlockInfoEx;
|
} STSGroupBlockInfoEx;
|
||||||
|
|
||||||
typedef struct STSBuf {
|
typedef struct STSBuf {
|
||||||
FILE* f;
|
FILE* f;
|
||||||
|
@ -78,9 +78,9 @@ typedef struct STSBuf {
|
||||||
uint32_t fileSize;
|
uint32_t fileSize;
|
||||||
|
|
||||||
// todo use array
|
// todo use array
|
||||||
STSVnodeBlockInfoEx* pData;
|
STSGroupBlockInfoEx* pData;
|
||||||
uint32_t numOfAlloc;
|
uint32_t numOfAlloc;
|
||||||
uint32_t numOfVnodes;
|
uint32_t numOfGroups;
|
||||||
|
|
||||||
char* assistBuf;
|
char* assistBuf;
|
||||||
int32_t bufSize;
|
int32_t bufSize;
|
||||||
|
@ -94,22 +94,22 @@ typedef struct STSBuf {
|
||||||
|
|
||||||
typedef struct STSBufFileHeader {
|
typedef struct STSBufFileHeader {
|
||||||
uint32_t magic; // file magic number
|
uint32_t magic; // file magic number
|
||||||
uint32_t numOfVnode; // number of vnode stored in current file
|
uint32_t numOfGroup; // number of group stored in current file
|
||||||
int32_t tsOrder; // timestamp order in current file
|
int32_t tsOrder; // timestamp order in current file
|
||||||
} STSBufFileHeader;
|
} STSBufFileHeader;
|
||||||
|
|
||||||
STSBuf* tsBufCreate(bool autoDelete, int32_t order);
|
STSBuf* tsBufCreate(bool autoDelete, int32_t order);
|
||||||
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
|
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
|
||||||
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t vnodeId);
|
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t id);
|
||||||
|
|
||||||
void* tsBufDestroy(STSBuf* pTSBuf);
|
void* tsBufDestroy(STSBuf* pTSBuf);
|
||||||
|
|
||||||
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
|
void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, int32_t len);
|
||||||
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
|
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
|
||||||
|
|
||||||
STSBuf* tsBufClone(STSBuf* pTSBuf);
|
STSBuf* tsBufClone(STSBuf* pTSBuf);
|
||||||
|
|
||||||
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
|
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
|
||||||
|
|
||||||
void tsBufFlush(STSBuf* pTSBuf);
|
void tsBufFlush(STSBuf* pTSBuf);
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf);
|
||||||
|
|
||||||
bool tsBufNextPos(STSBuf* pTSBuf);
|
bool tsBufNextPos(STSBuf* pTSBuf);
|
||||||
|
|
||||||
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag);
|
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
|
||||||
|
|
||||||
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
|
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
|
||||||
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
|
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
|
||||||
|
@ -131,11 +131,11 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
|
||||||
*/
|
*/
|
||||||
void tsBufDisplay(STSBuf* pTSBuf);
|
void tsBufDisplay(STSBuf* pTSBuf);
|
||||||
|
|
||||||
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf);
|
int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf);
|
||||||
|
|
||||||
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
|
void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id);
|
||||||
|
|
||||||
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks);
|
int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t id, void* buf, int32_t* len, int32_t* numOfBlocks);
|
||||||
|
|
||||||
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag);
|
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag);
|
||||||
|
|
||||||
|
|
|
@ -609,7 +609,7 @@ int32_t intersect(SArray *pLeft, SArray *pRight, SArray *pFinalRes) {
|
||||||
/*
|
/*
|
||||||
* traverse the result and apply the function to each item to check if the item is qualified or not
|
* traverse the result and apply the function to each item to check if the item is qualified or not
|
||||||
*/
|
*/
|
||||||
static void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
|
static UNUSED_FUNC void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
|
||||||
assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE && fp != NULL);
|
assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE && fp != NULL);
|
||||||
|
|
||||||
// scan the result array list and check for each item in the list
|
// scan the result array list and check for each item in the list
|
||||||
|
@ -660,7 +660,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p
|
||||||
* @param pSchema tag schemas
|
* @param pSchema tag schemas
|
||||||
* @param fp filter callback function
|
* @param fp filter callback function
|
||||||
*/
|
*/
|
||||||
static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
|
static UNUSED_FUNC void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
|
||||||
size_t size = taosArrayGetSize(pResult);
|
size_t size = taosArrayGetSize(pResult);
|
||||||
|
|
||||||
SArray* array = taosArrayInit(size, POINTER_BYTES);
|
SArray* array = taosArrayInit(size, POINTER_BYTES);
|
||||||
|
@ -733,10 +733,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
||||||
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
|
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
|
||||||
|
|
||||||
param->setupInfoFn(pExpr, param->pExtInfo);
|
param->setupInfoFn(pExpr, param->pExtInfo);
|
||||||
if (pSkipList == NULL) {
|
|
||||||
tArrayTraverse(pExpr, param->nodeFilterFn, result);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tQueryInfo *pQueryInfo = pExpr->_node.info;
|
tQueryInfo *pQueryInfo = pExpr->_node.info;
|
||||||
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
|
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
|
||||||
|
@ -748,49 +744,14 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// recursive traverse left child branch
|
// The value of hasPK is always 0.
|
||||||
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
|
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
|
||||||
|
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
|
||||||
|
|
||||||
if (weight == 0 ) {
|
//apply the hierarchical expression to every node in skiplist for find the qualified nodes
|
||||||
if (taosArrayGetSize(result) > 0 && pSkipList == NULL) {
|
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
|
||||||
/**
|
|
||||||
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
|
|
||||||
* Since no index presented, the filter operation is done by scan all elements in the result set.
|
|
||||||
*
|
|
||||||
* if the query is a high selectivity filter, only small portion of meters are retrieved.
|
|
||||||
*/
|
|
||||||
exprTreeTraverseImpl(pExpr, result, param);
|
|
||||||
} else {
|
|
||||||
/**
|
|
||||||
* apply the hierarchical expression to every node in skiplist for find the qualified nodes
|
|
||||||
*/
|
|
||||||
assert(taosArrayGetSize(result) == 0);
|
|
||||||
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (weight == 2 || (weight == 1 && pExpr->_node.optr == TSDB_RELATION_OR)) {
|
|
||||||
SArray* rLeft = taosArrayInit(10, POINTER_BYTES);
|
|
||||||
SArray* rRight = taosArrayInit(10, POINTER_BYTES);
|
|
||||||
|
|
||||||
tExprTreeTraverse(pLeft, pSkipList, rLeft, param);
|
|
||||||
tExprTreeTraverse(pRight, pSkipList, rRight, param);
|
|
||||||
|
|
||||||
if (pExpr->_node.optr == TSDB_RELATION_AND) { // CROSS
|
|
||||||
intersect(rLeft, rRight, result);
|
|
||||||
} else if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
|
|
||||||
merge(rLeft, rRight, result);
|
|
||||||
} else {
|
|
||||||
assert(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(rLeft);
|
|
||||||
taosArrayDestroy(rRight);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
#if 0
|
||||||
/*
|
/*
|
||||||
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
|
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
|
||||||
*
|
*
|
||||||
|
@ -819,6 +780,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
||||||
* So, we do not set the skip list index as a parameter
|
* So, we do not set the skip list index as a parameter
|
||||||
*/
|
*/
|
||||||
tExprTreeTraverse(pSecond, NULL, result, param);
|
tExprTreeTraverse(pSecond, NULL, result, param);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
||||||
|
|
|
@ -3843,7 +3843,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
|
||||||
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag);
|
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag);
|
||||||
|
|
||||||
// failed to find data with the specified tag value and vnodeId
|
// failed to find data with the specified tag value and vnodeId
|
||||||
if (elem.vnode < 0) {
|
if (!tsBufIsValidElem(&elem)) {
|
||||||
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
|
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
|
||||||
} else {
|
} else {
|
||||||
|
@ -4777,7 +4777,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
||||||
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
|
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
|
||||||
// failed to find data with the specified tag value and vnodeId
|
// failed to find data with the specified tag value and vnodeId
|
||||||
if (elem.vnode < 0) {
|
if (!tsBufIsValidElem(&elem)) {
|
||||||
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
|
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
|
||||||
} else {
|
} else {
|
||||||
|
@ -4802,7 +4802,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
|
|
||||||
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
|
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
|
||||||
// failed to find data with the specified tag value and vnodeId
|
// failed to find data with the specified tag value and vnodeId
|
||||||
if (elem1.vnode < 0) {
|
if (!tsBufIsValidElem(&elem1)) {
|
||||||
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
|
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -215,7 +215,7 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2
|
||||||
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
|
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
|
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case TSDB_DATA_TYPE_INT: {
|
case TSDB_DATA_TYPE_INT: {
|
||||||
*(int32_t*)point->val = (int32_t)linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, (double)point1->key,
|
*(int32_t*)point->val = (int32_t)linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, (double)point1->key,
|
||||||
|
@ -343,7 +343,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
|
||||||
point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset};
|
point1 = (SPoint){.key = *(TSKEY*)(prevValues), .val = prevValues + pCol->col.offset};
|
||||||
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
|
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
|
||||||
point = (SPoint){.key = pFillInfo->start, .val = val1};
|
point = (SPoint){.key = pFillInfo->start, .val = val1};
|
||||||
taosDoLinearInterpolation(type, &point1, &point2, &point);
|
taosGetLinearInterpolationVal(type, &point1, &point2, &point);
|
||||||
}
|
}
|
||||||
|
|
||||||
setTagsValue(pFillInfo, data, *num);
|
setTagsValue(pFillInfo, data, *num);
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
static int32_t getDataStartOffset();
|
static int32_t getDataStartOffset();
|
||||||
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo);
|
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
|
||||||
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
|
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
|
||||||
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
|
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the header info
|
// update the header info
|
||||||
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSDB_ORDER_ASC};
|
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
|
||||||
STSBufUpdateHeader(pTSBuf, &header);
|
STSBufUpdateHeader(pTSBuf, &header);
|
||||||
|
|
||||||
tsBufResetPos(pTSBuf);
|
tsBufResetPos(pTSBuf);
|
||||||
|
@ -75,9 +75,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (header.numOfVnode > pTSBuf->numOfAlloc) {
|
if (header.numOfGroup > pTSBuf->numOfAlloc) {
|
||||||
pTSBuf->numOfAlloc = header.numOfVnode;
|
pTSBuf->numOfAlloc = header.numOfGroup;
|
||||||
STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc);
|
STSGroupBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
tsBufDestroy(pTSBuf);
|
tsBufDestroy(pTSBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -86,7 +86,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
pTSBuf->pData = tmp;
|
pTSBuf->pData = tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTSBuf->numOfVnodes = header.numOfVnode;
|
pTSBuf->numOfGroups = header.numOfGroup;
|
||||||
|
|
||||||
// check the ts order
|
// check the ts order
|
||||||
pTSBuf->tsOrder = header.tsOrder;
|
pTSBuf->tsOrder = header.tsOrder;
|
||||||
|
@ -96,9 +96,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
|
size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
|
||||||
|
|
||||||
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
|
STSGroupBlockInfo* buf = (STSGroupBlockInfo*)calloc(1, infoSize);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
tsBufDestroy(pTSBuf);
|
tsBufDestroy(pTSBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -109,9 +109,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
UNUSED(sz);
|
UNUSED(sz);
|
||||||
|
|
||||||
// the length value for each vnode is not kept in file, so does not set the length value
|
// the length value for each vnode is not kept in file, so does not set the length value
|
||||||
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
|
||||||
STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i];
|
STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
|
||||||
memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo));
|
memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
|
||||||
}
|
}
|
||||||
free(buf);
|
free(buf);
|
||||||
|
|
||||||
|
@ -131,8 +131,8 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
||||||
pTSBuf->autoDelete = autoDelete;
|
pTSBuf->autoDelete = autoDelete;
|
||||||
|
|
||||||
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
|
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
|
||||||
// pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete);
|
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
|
||||||
|
|
||||||
return pTSBuf;
|
return pTSBuf;
|
||||||
}
|
}
|
||||||
|
@ -162,53 +162,53 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
|
static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
|
||||||
int32_t last = pTSBuf->numOfVnodes - 1;
|
int32_t last = pTSBuf->numOfGroups - 1;
|
||||||
|
|
||||||
assert(last >= 0);
|
assert(last >= 0);
|
||||||
return &pTSBuf->pData[last];
|
return &pTSBuf->pData[last];
|
||||||
}
|
}
|
||||||
|
|
||||||
static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
|
static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
|
||||||
if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) {
|
if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
|
||||||
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
|
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
|
||||||
assert((int32_t)newSize > pTSBuf->numOfAlloc);
|
assert((int32_t)newSize > pTSBuf->numOfAlloc);
|
||||||
|
|
||||||
STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize);
|
STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTSBuf->pData = tmp;
|
pTSBuf->pData = tmp;
|
||||||
pTSBuf->numOfAlloc = newSize;
|
pTSBuf->numOfAlloc = newSize;
|
||||||
memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes));
|
memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTSBuf->numOfVnodes > 0) {
|
if (pTSBuf->numOfGroups > 0) {
|
||||||
STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
|
STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
|
||||||
|
|
||||||
// update prev vnode length info in file
|
// update prev vnode length info in file
|
||||||
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info);
|
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set initial value for vnode block
|
// set initial value for vnode block
|
||||||
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info;
|
STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
|
||||||
pBlockInfo->vnode = vnodeId;
|
pBlockInfo->id = id;
|
||||||
pBlockInfo->offset = pTSBuf->fileSize;
|
pBlockInfo->offset = pTSBuf->fileSize;
|
||||||
assert(pBlockInfo->offset >= getDataStartOffset());
|
assert(pBlockInfo->offset >= getDataStartOffset());
|
||||||
|
|
||||||
// update vnode info in file
|
// update vnode info in file
|
||||||
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo);
|
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
|
||||||
|
|
||||||
// add one vnode info
|
// add one vnode info
|
||||||
pTSBuf->numOfVnodes += 1;
|
pTSBuf->numOfGroups += 1;
|
||||||
|
|
||||||
// update the header info
|
// update the header info
|
||||||
STSBufFileHeader header = {
|
STSBufFileHeader header = {
|
||||||
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
|
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
|
||||||
|
|
||||||
STSBufUpdateHeader(pTSBuf, &header);
|
STSBufUpdateHeader(pTSBuf, &header);
|
||||||
return tsBufGetLastVnodeInfo(pTSBuf);
|
return tsBufGetLastGroupInfo(pTSBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void shrinkBuffer(STSList* ptsData) {
|
static void shrinkBuffer(STSList* ptsData) {
|
||||||
|
@ -279,10 +279,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
pTSBuf->tsData.len = 0;
|
pTSBuf->tsData.len = 0;
|
||||||
|
|
||||||
STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
|
STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
|
||||||
|
|
||||||
pVnodeBlockInfoEx->info.compLen += blockSize;
|
pGroupBlockInfoEx->info.compLen += blockSize;
|
||||||
pVnodeBlockInfoEx->info.numOfBlocks += 1;
|
pGroupBlockInfoEx->info.numOfBlocks += 1;
|
||||||
|
|
||||||
shrinkBuffer(&pTSBuf->tsData);
|
shrinkBuffer(&pTSBuf->tsData);
|
||||||
}
|
}
|
||||||
|
@ -413,20 +413,20 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) {
|
void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, int32_t len) {
|
||||||
STSVnodeBlockInfoEx* pBlockInfo = NULL;
|
STSGroupBlockInfoEx* pBlockInfo = NULL;
|
||||||
STSList* ptsData = &pTSBuf->tsData;
|
STSList* ptsData = &pTSBuf->tsData;
|
||||||
|
|
||||||
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
|
if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
|
||||||
writeDataToDisk(pTSBuf);
|
writeDataToDisk(pTSBuf);
|
||||||
shrinkBuffer(ptsData);
|
shrinkBuffer(ptsData);
|
||||||
|
|
||||||
pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId);
|
pBlockInfo = addOneGroupInfo(pTSBuf, id);
|
||||||
} else {
|
} else {
|
||||||
pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf);
|
pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pBlockInfo->info.vnode == vnodeId);
|
assert(pBlockInfo->info.id == id);
|
||||||
|
|
||||||
if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
|
if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
|
||||||
// new arrived data with different tags value, save current value into disk first
|
// new arrived data with different tags value, save current value into disk first
|
||||||
|
@ -464,23 +464,23 @@ void tsBufFlush(STSBuf* pTSBuf) {
|
||||||
writeDataToDisk(pTSBuf);
|
writeDataToDisk(pTSBuf);
|
||||||
shrinkBuffer(&pTSBuf->tsData);
|
shrinkBuffer(&pTSBuf->tsData);
|
||||||
|
|
||||||
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
|
STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
|
||||||
|
|
||||||
// update prev vnode length info in file
|
// update prev vnode length info in file
|
||||||
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info);
|
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
|
||||||
|
|
||||||
// save the ts order into header
|
// save the ts order into header
|
||||||
STSBufFileHeader header = {
|
STSBufFileHeader header = {
|
||||||
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
|
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
|
||||||
STSBufUpdateHeader(pTSBuf, &header);
|
STSBufUpdateHeader(pTSBuf, &header);
|
||||||
|
|
||||||
fsync(fileno(pTSBuf->f));
|
fsync(fileno(pTSBuf->f));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsBufFindVnodeById(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) {
|
static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) {
|
||||||
int32_t j = -1;
|
int32_t j = -1;
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
if (pVnodeInfoEx[i].info.vnode == vnodeId) {
|
if (pGroupInfoEx[i].info.id == id) {
|
||||||
j = i;
|
j = i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -490,7 +490,7 @@ static int32_t tsBufFindVnodeById(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t num
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo opt performance by cache blocks info
|
// todo opt performance by cache blocks info
|
||||||
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) {
|
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) {
|
||||||
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
|
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -517,7 +517,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) {
|
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, tVariant* tag) {
|
||||||
bool decomp = false;
|
bool decomp = false;
|
||||||
|
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
|
@ -544,14 +544,14 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) {
|
static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) {
|
||||||
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info;
|
STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
|
||||||
if (pBlockInfo->numOfBlocks <= blockIndex) {
|
if (pBlockInfo->numOfBlocks <= blockIndex) {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
STSCursor* pCur = &pTSBuf->cur;
|
STSCursor* pCur = &pTSBuf->cur;
|
||||||
if (pCur->vgroupIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
|
if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
|
||||||
(pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
|
(pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
bool decomp = false;
|
bool decomp = false;
|
||||||
|
@ -586,13 +586,13 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
|
||||||
|
|
||||||
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
|
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
|
||||||
|
|
||||||
pCur->vgroupIndex = vnodeIndex;
|
pCur->vgroupIndex = groupIndex;
|
||||||
pCur->blockIndex = blockIndex;
|
pCur->blockIndex = blockIndex;
|
||||||
|
|
||||||
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
|
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
|
static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) {
|
||||||
if (offset < 0 || offset >= getDataStartOffset()) {
|
if (offset < 0 || offset >= getDataStartOffset()) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -601,12 +601,12 @@ static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockIn
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
|
fwrite(pVInfo, sizeof(STSGroupBlockInfo), 1, pTSBuf->f);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
|
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
|
||||||
int32_t j = tsBufFindVnodeById(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
|
int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
|
||||||
if (j == -1) {
|
if (j == -1) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -615,7 +615,7 @@ STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
|
int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
|
||||||
if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
|
if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,7 +631,7 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tsBufNextPos(STSBuf* pTSBuf) {
|
bool tsBufNextPos(STSBuf* pTSBuf) {
|
||||||
if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) {
|
if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -650,16 +650,16 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // get the last timestamp record in the last block of the last vnode
|
} else { // get the last timestamp record in the last block of the last vnode
|
||||||
assert(pTSBuf->numOfVnodes > 0);
|
assert(pTSBuf->numOfGroups > 0);
|
||||||
|
|
||||||
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
|
int32_t groupIndex = pTSBuf->numOfGroups - 1;
|
||||||
pCur->vgroupIndex = vnodeIndex;
|
pCur->vgroupIndex = groupIndex;
|
||||||
|
|
||||||
int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
|
int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
|
||||||
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
|
STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
|
||||||
int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
|
int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
|
||||||
|
|
||||||
tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex);
|
tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
|
||||||
|
|
||||||
pCur->tsIndex = pTSBuf->block.numOfElem - 1;
|
pCur->tsIndex = pTSBuf->block.numOfElem - 1;
|
||||||
if (pTSBuf->block.numOfElem == 0) {
|
if (pTSBuf->block.numOfElem == 0) {
|
||||||
|
@ -678,12 +678,12 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
|
if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
|
||||||
(pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
|
(pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
|
||||||
int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
|
int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
|
||||||
|
|
||||||
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
|
STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
|
||||||
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
||||||
(pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
(pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
||||||
if ((pCur->vgroupIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) ||
|
||||||
(pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
(pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
|
||||||
pCur->vgroupIndex = -1;
|
pCur->vgroupIndex = -1;
|
||||||
return false;
|
return false;
|
||||||
|
@ -719,7 +719,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
||||||
STSElem elem1 = {.vnode = -1};
|
STSElem elem1 = {.id = -1};
|
||||||
if (pTSBuf == NULL) {
|
if (pTSBuf == NULL) {
|
||||||
return elem1;
|
return elem1;
|
||||||
}
|
}
|
||||||
|
@ -731,7 +731,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
STSBlock* pBlock = &pTSBuf->block;
|
STSBlock* pBlock = &pTSBuf->block;
|
||||||
|
|
||||||
elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode;
|
elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
|
||||||
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
|
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
|
||||||
elem1.tag = &pBlock->tag;
|
elem1.tag = &pBlock->tag;
|
||||||
|
|
||||||
|
@ -742,34 +742,34 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
|
||||||
* current only support ts comp data from two vnode merge
|
* current only support ts comp data from two vnode merge
|
||||||
* @param pDestBuf
|
* @param pDestBuf
|
||||||
* @param pSrcBuf
|
* @param pSrcBuf
|
||||||
* @param vnodeId
|
* @param id
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
|
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
|
||||||
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) {
|
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) {
|
if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// src can only have one vnode index
|
// src can only have one vnode index
|
||||||
assert(pSrcBuf->numOfVnodes == 1);
|
assert(pSrcBuf->numOfGroups == 1);
|
||||||
|
|
||||||
// there are data in buffer, flush to disk first
|
// there are data in buffer, flush to disk first
|
||||||
tsBufFlush(pDestBuf);
|
tsBufFlush(pDestBuf);
|
||||||
|
|
||||||
// compared with the last vnode id
|
// compared with the last vnode id
|
||||||
int32_t vnodeId = tsBufGetLastVnodeInfo((STSBuf*) pSrcBuf)->info.vnode;
|
int32_t id = tsBufGetLastGroupInfo((STSBuf*) pSrcBuf)->info.id;
|
||||||
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) {
|
if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
|
||||||
int32_t oldSize = pDestBuf->numOfVnodes;
|
int32_t oldSize = pDestBuf->numOfGroups;
|
||||||
int32_t newSize = oldSize + pSrcBuf->numOfVnodes;
|
int32_t newSize = oldSize + pSrcBuf->numOfGroups;
|
||||||
|
|
||||||
if (pDestBuf->numOfAlloc < newSize) {
|
if (pDestBuf->numOfAlloc < newSize) {
|
||||||
pDestBuf->numOfAlloc = newSize;
|
pDestBuf->numOfAlloc = newSize;
|
||||||
|
|
||||||
STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize);
|
STSGroupBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -778,23 +778,23 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// directly copy the vnode index information
|
// directly copy the vnode index information
|
||||||
memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx));
|
memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
|
||||||
|
|
||||||
// set the new offset value
|
// set the new offset value
|
||||||
for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
|
||||||
STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
|
STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
|
||||||
pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
|
pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
|
||||||
pBlockInfoEx->info.vnode = vnodeId;
|
pBlockInfoEx->info.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDestBuf->numOfVnodes = newSize;
|
pDestBuf->numOfGroups = newSize;
|
||||||
} else {
|
} else {
|
||||||
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf);
|
STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
|
||||||
|
|
||||||
pBlockInfoEx->len += pSrcBuf->pData[0].len;
|
pBlockInfoEx->len += pSrcBuf->pData[0].len;
|
||||||
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
|
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
|
||||||
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
|
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
|
||||||
pBlockInfoEx->info.vnode = vnodeId;
|
pBlockInfoEx->info.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t r = fseek(pDestBuf->f, 0, SEEK_END);
|
int32_t r = fseek(pDestBuf->f, 0, SEEK_END);
|
||||||
|
@ -827,23 +827,23 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
|
||||||
|
|
||||||
assert(pDestBuf->fileSize == oldSize + size);
|
assert(pDestBuf->fileSize == oldSize + size);
|
||||||
|
|
||||||
// tscDebug("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf,
|
// tscDebug("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfGroups:%d, autoDelete:%d", pDestBuf,
|
||||||
// pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete);
|
// pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfGroups, pDestBuf->autoDelete);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t vnodeId) {
|
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
|
||||||
STSBuf* pTSBuf = tsBufCreate(true, order);
|
STSBuf* pTSBuf = tsBufCreate(true, order);
|
||||||
|
|
||||||
STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info);
|
STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
|
||||||
pBlockInfo->numOfBlocks = numOfBlocks;
|
pBlockInfo->numOfBlocks = numOfBlocks;
|
||||||
pBlockInfo->compLen = len;
|
pBlockInfo->compLen = len;
|
||||||
pBlockInfo->offset = getDataStartOffset();
|
pBlockInfo->offset = getDataStartOffset();
|
||||||
pBlockInfo->vnode = vnodeId;
|
pBlockInfo->id = id;
|
||||||
|
|
||||||
// update prev vnode length info in file
|
// update prev vnode length info in file
|
||||||
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
|
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
|
||||||
|
|
||||||
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
|
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
|
||||||
UNUSED(ret);
|
UNUSED(ret);
|
||||||
|
@ -855,7 +855,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
|
||||||
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
||||||
|
|
||||||
STSBufFileHeader header = {
|
STSBufFileHeader header = {
|
||||||
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
|
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
|
||||||
STSBufUpdateHeader(pTSBuf, &header);
|
STSBufUpdateHeader(pTSBuf, &header);
|
||||||
|
|
||||||
fsync(fileno(pTSBuf->f));
|
fsync(fileno(pTSBuf->f));
|
||||||
|
@ -863,14 +863,14 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
|
||||||
return pTSBuf;
|
return pTSBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) {
|
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag) {
|
||||||
STSElem elem = {.vnode = -1};
|
STSElem elem = {.id = -1};
|
||||||
|
|
||||||
if (pTSBuf == NULL) {
|
if (pTSBuf == NULL) {
|
||||||
return elem;
|
return elem;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t j = tsBufFindVnodeById(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
|
int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
|
||||||
if (j == -1) {
|
if (j == -1) {
|
||||||
return elem;
|
return elem;
|
||||||
}
|
}
|
||||||
|
@ -879,7 +879,7 @@ STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) {
|
||||||
// tsBufDisplay(pTSBuf);
|
// tsBufDisplay(pTSBuf);
|
||||||
|
|
||||||
STSCursor* pCur = &pTSBuf->cur;
|
STSCursor* pCur = &pTSBuf->cur;
|
||||||
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
|
STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
|
||||||
|
|
||||||
int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
|
int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
|
||||||
if (blockIndex < 0) {
|
if (blockIndex < 0) {
|
||||||
|
@ -935,7 +935,7 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
void tsBufDisplay(STSBuf* pTSBuf) {
|
void tsBufDisplay(STSBuf* pTSBuf) {
|
||||||
printf("-------start of ts comp file-------\n");
|
printf("-------start of ts comp file-------\n");
|
||||||
printf("number of vnode:%d\n", pTSBuf->numOfVnodes);
|
printf("number of vnode:%d\n", pTSBuf->numOfGroups);
|
||||||
|
|
||||||
int32_t old = pTSBuf->cur.order;
|
int32_t old = pTSBuf->cur.order;
|
||||||
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
||||||
|
@ -945,7 +945,7 @@ void tsBufDisplay(STSBuf* pTSBuf) {
|
||||||
while (tsBufNextPos(pTSBuf)) {
|
while (tsBufNextPos(pTSBuf)) {
|
||||||
STSElem elem = tsBufGetElem(pTSBuf);
|
STSElem elem = tsBufGetElem(pTSBuf);
|
||||||
if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
|
if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
|
||||||
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
|
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i64Key, elem.ts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -954,20 +954,20 @@ void tsBufDisplay(STSBuf* pTSBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getDataStartOffset() {
|
static int32_t getDataStartOffset() {
|
||||||
return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo);
|
return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update prev vnode length info in file
|
// update prev vnode length info in file
|
||||||
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) {
|
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) {
|
||||||
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo);
|
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo);
|
||||||
doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo);
|
doUpdateGroupInfo(pTSBuf, offset, pBlockInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
|
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
|
||||||
const int32_t INITIAL_VNODEINFO_SIZE = 4;
|
const int32_t INITIAL_GROUPINFO_SIZE = 4;
|
||||||
|
|
||||||
pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE;
|
pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
|
||||||
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx));
|
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
|
||||||
if (pTSBuf->pData == NULL) {
|
if (pTSBuf->pData == NULL) {
|
||||||
tsBufDestroy(pTSBuf);
|
tsBufDestroy(pTSBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -999,35 +999,35 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
|
||||||
return pTSBuf;
|
return pTSBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf) {
|
int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) {
|
||||||
if (pTSBuf == NULL) {
|
if (pTSBuf == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pTSBuf->numOfVnodes;
|
return pTSBuf->numOfGroups;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) {
|
void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
|
||||||
int32_t size = tsBufGetNumOfVnodes(pTSBuf);
|
int32_t size = tsBufGetNumOfGroup(pTSBuf);
|
||||||
if (num != NULL) {
|
if (num != NULL) {
|
||||||
*num = size;
|
*num = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
*vnodeId = NULL;
|
*id = NULL;
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*vnodeId) = malloc(tsBufGetNumOfVnodes(pTSBuf) * sizeof(int32_t));
|
(*id) = malloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));
|
||||||
|
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
(*vnodeId)[i] = pTSBuf->pData[i].info.vnode;
|
(*id)[i] = pTSBuf->pData[i].info.id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
|
int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
|
||||||
assert(vnodeIndex >= 0 && vnodeIndex < pTSBuf->numOfVnodes);
|
assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
|
||||||
STSVnodeBlockInfo *pBlockInfo = &pTSBuf->pData[vnodeIndex].info;
|
STSGroupBlockInfo *pBlockInfo = &pTSBuf->pData[groupIndex].info;
|
||||||
|
|
||||||
*len = 0;
|
*len = 0;
|
||||||
*numOfBlocks = 0;
|
*numOfBlocks = 0;
|
||||||
|
@ -1052,11 +1052,11 @@ int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeIndex, void* buf, in
|
||||||
}
|
}
|
||||||
|
|
||||||
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag) {
|
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag) {
|
||||||
STSElem el = {.vnode = -1};
|
STSElem el = {.id = -1};
|
||||||
|
|
||||||
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
|
||||||
el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.vnode, pTag);
|
el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag);
|
||||||
if (el.vnode == pTSBuf->pData[i].info.vnode) {
|
if (el.id == pTSBuf->pData[i].info.id) {
|
||||||
return el;
|
return el;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1065,5 +1065,5 @@ STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tsBufIsValidElem(STSElem* pElem) {
|
bool tsBufIsValidElem(STSElem* pElem) {
|
||||||
return pElem->vnode >= 0;
|
return pElem->id >= 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,7 @@ void simpleTest() {
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num);
|
EXPECT_EQ(pTSBuf->tsData.len, sizeof(int64_t) * num);
|
||||||
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
|
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
|
||||||
EXPECT_EQ(pTSBuf->numOfVnodes, 1);
|
EXPECT_EQ(pTSBuf->numOfGroups, 1);
|
||||||
|
|
||||||
tsBufFlush(pTSBuf);
|
tsBufFlush(pTSBuf);
|
||||||
EXPECT_EQ(pTSBuf->tsData.len, 0);
|
EXPECT_EQ(pTSBuf->tsData.len, 0);
|
||||||
|
@ -69,7 +69,7 @@ void largeTSTest() {
|
||||||
// the data has been flush to disk, no data in cache
|
// the data has been flush to disk, no data in cache
|
||||||
EXPECT_EQ(pTSBuf->tsData.len, 0);
|
EXPECT_EQ(pTSBuf->tsData.len, 0);
|
||||||
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
|
EXPECT_EQ(tVariantCompare(&pTSBuf->block.tag, &t), 0);
|
||||||
EXPECT_EQ(pTSBuf->numOfVnodes, 1);
|
EXPECT_EQ(pTSBuf->numOfGroups, 1);
|
||||||
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
|
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
tsBufFlush(pTSBuf);
|
tsBufFlush(pTSBuf);
|
||||||
|
@ -105,7 +105,7 @@ void multiTagsTest() {
|
||||||
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
|
EXPECT_EQ(pTSBuf->tsData.len, num * sizeof(int64_t));
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
|
EXPECT_EQ(pTSBuf->block.tag.i64Key, numOfTags - 1);
|
||||||
EXPECT_EQ(pTSBuf->numOfVnodes, 1);
|
EXPECT_EQ(pTSBuf->numOfGroups, 1);
|
||||||
|
|
||||||
tsBufFlush(pTSBuf);
|
tsBufFlush(pTSBuf);
|
||||||
EXPECT_EQ(pTSBuf->tsData.len, 0);
|
EXPECT_EQ(pTSBuf->tsData.len, 0);
|
||||||
|
@ -139,7 +139,7 @@ void multiVnodeTagsTest() {
|
||||||
start += step * num;
|
start += step * num;
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1);
|
EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
|
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
|
||||||
|
@ -184,7 +184,7 @@ void loadDataTest() {
|
||||||
start += step * num;
|
start += step * num;
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1);
|
EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
|
EXPECT_EQ(pTSBuf->tsOrder, TSDB_ORDER_ASC);
|
||||||
|
@ -203,7 +203,7 @@ void loadDataTest() {
|
||||||
// create from exists file
|
// create from exists file
|
||||||
STSBuf* pNewBuf = tsBufCreateFromFile(pTSBuf->path, false);
|
STSBuf* pNewBuf = tsBufCreateFromFile(pTSBuf->path, false);
|
||||||
EXPECT_EQ(pNewBuf->tsOrder, pTSBuf->tsOrder);
|
EXPECT_EQ(pNewBuf->tsOrder, pTSBuf->tsOrder);
|
||||||
EXPECT_EQ(pNewBuf->numOfVnodes, numOfVnode);
|
EXPECT_EQ(pNewBuf->numOfGroups, numOfVnode);
|
||||||
EXPECT_EQ(pNewBuf->fileSize, pTSBuf->fileSize);
|
EXPECT_EQ(pNewBuf->fileSize, pTSBuf->fileSize);
|
||||||
|
|
||||||
EXPECT_EQ(pNewBuf->pData[0].info.offset, pTSBuf->pData[0].info.offset);
|
EXPECT_EQ(pNewBuf->pData[0].info.offset, pTSBuf->pData[0].info.offset);
|
||||||
|
@ -269,7 +269,7 @@ void TSTraverse() {
|
||||||
start += step * num;
|
start += step * num;
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPECT_EQ(pTSBuf->numOfVnodes, j + 1);
|
EXPECT_EQ(pTSBuf->numOfGroups, j + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsBufResetPos(pTSBuf);
|
tsBufResetPos(pTSBuf);
|
||||||
|
@ -304,7 +304,7 @@ void TSTraverse() {
|
||||||
int32_t totalOutput = 10;
|
int32_t totalOutput = 10;
|
||||||
while (1) {
|
while (1) {
|
||||||
STSElem elem = tsBufGetElem(pTSBuf);
|
STSElem elem = tsBufGetElem(pTSBuf);
|
||||||
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
|
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
|
||||||
|
|
||||||
if (!tsBufNextPos(pTSBuf)) {
|
if (!tsBufNextPos(pTSBuf)) {
|
||||||
break;
|
break;
|
||||||
|
@ -352,7 +352,7 @@ void TSTraverse() {
|
||||||
totalOutput = 10;
|
totalOutput = 10;
|
||||||
while (1) {
|
while (1) {
|
||||||
STSElem elem = tsBufGetElem(pTSBuf);
|
STSElem elem = tsBufGetElem(pTSBuf);
|
||||||
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
|
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
|
||||||
|
|
||||||
if (!tsBufNextPos(pTSBuf)) {
|
if (!tsBufNextPos(pTSBuf)) {
|
||||||
break;
|
break;
|
||||||
|
@ -427,7 +427,7 @@ void mergeDiffVnodeBufferTest() {
|
||||||
tsBufFlush(pTSBuf2);
|
tsBufFlush(pTSBuf2);
|
||||||
|
|
||||||
tsBufMerge(pTSBuf1, pTSBuf2);
|
tsBufMerge(pTSBuf1, pTSBuf2);
|
||||||
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
|
EXPECT_EQ(pTSBuf1->numOfGroups, 2);
|
||||||
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
|
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
|
||||||
|
|
||||||
tsBufDisplay(pTSBuf1);
|
tsBufDisplay(pTSBuf1);
|
||||||
|
@ -472,7 +472,7 @@ void mergeIdenticalVnodeBufferTest() {
|
||||||
tsBufFlush(pTSBuf2);
|
tsBufFlush(pTSBuf2);
|
||||||
|
|
||||||
tsBufMerge(pTSBuf1, pTSBuf2);
|
tsBufMerge(pTSBuf1, pTSBuf2);
|
||||||
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
|
EXPECT_EQ(pTSBuf1->numOfGroups, 2);
|
||||||
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
|
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
|
||||||
|
|
||||||
tsBufResetPos(pTSBuf1);
|
tsBufResetPos(pTSBuf1);
|
||||||
|
@ -482,12 +482,12 @@ void mergeIdenticalVnodeBufferTest() {
|
||||||
STSElem elem = tsBufGetElem(pTSBuf1);
|
STSElem elem = tsBufGetElem(pTSBuf1);
|
||||||
|
|
||||||
if (count++ < numOfTags * num) {
|
if (count++ < numOfTags * num) {
|
||||||
EXPECT_EQ(elem.vnode, 12);
|
EXPECT_EQ(elem.id, 12);
|
||||||
} else {
|
} else {
|
||||||
EXPECT_EQ(elem.vnode, 77);
|
EXPECT_EQ(elem.id, 77);
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
|
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.id, elem.tag->i64Key, elem.ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsBufDestroy(pTSBuf1);
|
tsBufDestroy(pTSBuf1);
|
||||||
|
|
Loading…
Reference in New Issue