[TD-225] refactor

This commit is contained in:
Haojun Liao 2020-11-04 19:39:03 +08:00
parent adf204ec8f
commit 97f7cca4b6
5 changed files with 145 additions and 145 deletions

View File

@ -882,7 +882,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tsBuf != NULL) { if (pQueryInfo->tsBuf != NULL) {
// note: here used the index instead of actual vnode id. // note: here used the index instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex; int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks); int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -156,8 +156,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
win->ekey = elem1.ts; win->ekey = elem1.ts;
} }
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else { } else {
pLimit->offset -= 1;//offset apply to projection? pLimit->offset -= 1;//offset apply to projection?
} }
@ -193,8 +193,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
TSKEY et = taosGetTimestampUs(); TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us", "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey, pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
tsBufGetNumOfVnodes(output1), et - st); tsBufGetNumOfGroup(output1), et - st);
return output1->numOfTotal; return output1->numOfTotal;
} }
@ -282,7 +282,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0; int32_t num = 0;
int32_t* list = NULL; int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list); tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
// The virtual node, of which all tables are disqualified after the timestamp intersection, // The virtual node, of which all tables are disqualified after the timestamp intersection,
// is removed to avoid next stage query. // is removed to avoid next stage query.
@ -314,7 +314,7 @@ static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0; int32_t num = 0;
int32_t* list = NULL; int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list); tsBufGetGroupIdList(pQueryInfo->tsBuf, &num, &list);
int32_t numOfGroups = taosArrayGetSize(pVgroupTables); int32_t numOfGroups = taosArrayGetSize(pVgroupTables);

View File

@ -26,7 +26,7 @@ extern "C" {
#define MEM_BUF_SIZE (1 << 20) #define MEM_BUF_SIZE (1 << 20)
#define TS_COMP_FILE_MAGIC 0x87F5EC4C #define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512 #define TS_COMP_FILE_GROUP_MAX 512
typedef struct STSList { typedef struct STSList {
char* rawBuf; char* rawBuf;
@ -38,7 +38,7 @@ typedef struct STSList {
typedef struct STSElem { typedef struct STSElem {
TSKEY ts; TSKEY ts;
tVariant* tag; tVariant* tag;
int32_t vnode; int32_t id;
} STSElem; } STSElem;
typedef struct STSCursor { typedef struct STSCursor {
@ -60,17 +60,17 @@ typedef struct STSBlock {
* The size of buffer file should not be greater than 2G, * The size of buffer file should not be greater than 2G,
* and the offset of int32_t type is enough * and the offset of int32_t type is enough
*/ */
typedef struct STSVnodeBlockInfo { typedef struct STSGroupBlockInfo {
int32_t vnode; // vnode id int32_t id; // group id
int32_t offset; // offset set value in file int32_t offset; // offset set value in file
int32_t numOfBlocks; // number of total blocks int32_t numOfBlocks; // number of total blocks
int32_t compLen; // compressed size int32_t compLen; // compressed size
} STSVnodeBlockInfo; } STSGroupBlockInfo;
typedef struct STSVnodeBlockInfoEx { typedef struct STSGroupBlockInfoEx {
STSVnodeBlockInfo info; STSGroupBlockInfo info;
int32_t len; // length before compress int32_t len; // length before compress
} STSVnodeBlockInfoEx; } STSGroupBlockInfoEx;
typedef struct STSBuf { typedef struct STSBuf {
FILE* f; FILE* f;
@ -78,9 +78,9 @@ typedef struct STSBuf {
uint32_t fileSize; uint32_t fileSize;
// todo use array // todo use array
STSVnodeBlockInfoEx* pData; STSGroupBlockInfoEx* pData;
uint32_t numOfAlloc; uint32_t numOfAlloc;
uint32_t numOfVnodes; uint32_t numOfGroups;
char* assistBuf; char* assistBuf;
int32_t bufSize; int32_t bufSize;
@ -94,22 +94,22 @@ typedef struct STSBuf {
typedef struct STSBufFileHeader { typedef struct STSBufFileHeader {
uint32_t magic; // file magic number uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file uint32_t numOfGroup; // number of group stored in current file
int32_t tsOrder; // timestamp order in current file int32_t tsOrder; // timestamp order in current file
} STSBufFileHeader; } STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete, int32_t order); STSBuf* tsBufCreate(bool autoDelete, int32_t order);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t vnodeId); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t id);
void* tsBufDestroy(STSBuf* pTSBuf); void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
STSBuf* tsBufClone(STSBuf* pTSBuf); STSBuf* tsBufClone(STSBuf* pTSBuf);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId); STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf); void tsBufFlush(STSBuf* pTSBuf);
@ -118,7 +118,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag); STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf); STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
@ -131,11 +131,11 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
*/ */
void tsBufDisplay(STSBuf* pTSBuf); void tsBufDisplay(STSBuf* pTSBuf);
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf); int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf);
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId); void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id);
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks); int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t id, void* buf, int32_t* len, int32_t* numOfBlocks);
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag); STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag);

View File

@ -3843,7 +3843,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag); STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) { if (tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else { } else {
@ -4777,7 +4777,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
if (pRuntimeEnv->cur.vgroupIndex == -1) { if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag); STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) { if (tsBufIsValidElem(&elem)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else { } else {
@ -4802,7 +4802,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag); STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem1.vnode < 0) { if (tsBufIsValidElem(&elem1)) {
if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else { } else {

View File

@ -4,7 +4,7 @@
#include "tutil.h" #include "tutil.h"
static int32_t getDataStartOffset(); static int32_t getDataStartOffset();
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo); static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
@ -32,7 +32,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
} }
// update the header info // update the header info
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSDB_ORDER_ASC}; STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
@ -75,9 +75,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
return NULL; return NULL;
} }
if (header.numOfVnode > pTSBuf->numOfAlloc) { if (header.numOfGroup > pTSBuf->numOfAlloc) {
pTSBuf->numOfAlloc = header.numOfVnode; pTSBuf->numOfAlloc = header.numOfGroup;
STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc); STSGroupBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
if (tmp == NULL) { if (tmp == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
@ -86,7 +86,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->pData = tmp; pTSBuf->pData = tmp;
} }
pTSBuf->numOfVnodes = header.numOfVnode; pTSBuf->numOfGroups = header.numOfGroup;
// check the ts order // check the ts order
pTSBuf->tsOrder = header.tsOrder; pTSBuf->tsOrder = header.tsOrder;
@ -96,9 +96,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
return NULL; return NULL;
} }
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); STSGroupBlockInfo* buf = (STSGroupBlockInfo*)calloc(1, infoSize);
if (buf == NULL) { if (buf == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
@ -109,9 +109,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
UNUSED(sz); UNUSED(sz);
// the length value for each vnode is not kept in file, so does not set the length value // the length value for each vnode is not kept in file, so does not set the length value
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) { for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i]; STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo)); memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
} }
free(buf); free(buf);
@ -131,8 +131,8 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
pTSBuf->autoDelete = autoDelete; pTSBuf->autoDelete = autoDelete;
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), // tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
// pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); // pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
return pTSBuf; return pTSBuf;
} }
@ -162,53 +162,53 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
return NULL; return NULL;
} }
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
int32_t last = pTSBuf->numOfVnodes - 1; int32_t last = pTSBuf->numOfGroups - 1;
assert(last >= 0); assert(last >= 0);
return &pTSBuf->pData[last]; return &pTSBuf->pData[last];
} }
static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) { if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
assert((int32_t)newSize > pTSBuf->numOfAlloc); assert((int32_t)newSize > pTSBuf->numOfAlloc);
STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
if (tmp == NULL) { if (tmp == NULL) {
return NULL; return NULL;
} }
pTSBuf->pData = tmp; pTSBuf->pData = tmp;
pTSBuf->numOfAlloc = newSize; pTSBuf->numOfAlloc = newSize;
memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes)); memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
} }
if (pTSBuf->numOfVnodes > 0) { if (pTSBuf->numOfGroups > 0) {
STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
// update prev vnode length info in file // update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
} }
// set initial value for vnode block // set initial value for vnode block
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
pBlockInfo->vnode = vnodeId; pBlockInfo->id = id;
pBlockInfo->offset = pTSBuf->fileSize; pBlockInfo->offset = pTSBuf->fileSize;
assert(pBlockInfo->offset >= getDataStartOffset()); assert(pBlockInfo->offset >= getDataStartOffset());
// update vnode info in file // update vnode info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
// add one vnode info // add one vnode info
pTSBuf->numOfVnodes += 1; pTSBuf->numOfGroups += 1;
// update the header info // update the header info
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
return tsBufGetLastVnodeInfo(pTSBuf); return tsBufGetLastGroupInfo(pTSBuf);
} }
static void shrinkBuffer(STSList* ptsData) { static void shrinkBuffer(STSList* ptsData) {
@ -279,10 +279,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
pTSBuf->tsData.len = 0; pTSBuf->tsData.len = 0;
STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
pVnodeBlockInfoEx->info.compLen += blockSize; pGroupBlockInfoEx->info.compLen += blockSize;
pVnodeBlockInfoEx->info.numOfBlocks += 1; pGroupBlockInfoEx->info.numOfBlocks += 1;
shrinkBuffer(&pTSBuf->tsData); shrinkBuffer(&pTSBuf->tsData);
} }
@ -413,20 +413,20 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) { void tsBufAppend(STSBuf* pTSBuf, int32_t id, tVariant* tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL; STSGroupBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData; STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
writeDataToDisk(pTSBuf); writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData); shrinkBuffer(ptsData);
pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId); pBlockInfo = addOneGroupInfo(pTSBuf, id);
} else { } else {
pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf); pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
} }
assert(pBlockInfo->info.vnode == vnodeId); assert(pBlockInfo->info.id == id);
if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) { if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first // new arrived data with different tags value, save current value into disk first
@ -464,23 +464,23 @@ void tsBufFlush(STSBuf* pTSBuf) {
writeDataToDisk(pTSBuf); writeDataToDisk(pTSBuf);
shrinkBuffer(&pTSBuf->tsData); shrinkBuffer(&pTSBuf->tsData);
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
// update prev vnode length info in file // update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
// save the ts order into header // save the ts order into header
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
fsync(fileno(pTSBuf->f)); fsync(fileno(pTSBuf->f));
} }
static int32_t tsBufFindVnodeById(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) { static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) {
int32_t j = -1; int32_t j = -1;
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfGroups; ++i) {
if (pVnodeInfoEx[i].info.vnode == vnodeId) { if (pGroupInfoEx[i].info.id == id) {
j = i; j = i;
break; break;
} }
@ -490,7 +490,7 @@ static int32_t tsBufFindVnodeById(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t num
} }
// todo opt performance by cache blocks info // todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) { static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) {
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) { if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
return -1; return -1;
} }
@ -517,7 +517,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
return 0; return 0;
} }
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) { static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, tVariant* tag) {
bool decomp = false; bool decomp = false;
int64_t offset = 0; int64_t offset = 0;
@ -544,14 +544,14 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
return -1; return -1;
} }
static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) { static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) {
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
if (pBlockInfo->numOfBlocks <= blockIndex) { if (pBlockInfo->numOfBlocks <= blockIndex) {
assert(false); assert(false);
} }
STSCursor* pCur = &pTSBuf->cur; STSCursor* pCur = &pTSBuf->cur;
if (pCur->vgroupIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
(pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
int32_t i = 0; int32_t i = 0;
bool decomp = false; bool decomp = false;
@ -586,13 +586,13 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
pCur->vgroupIndex = vnodeIndex; pCur->vgroupIndex = groupIndex;
pCur->blockIndex = blockIndex; pCur->blockIndex = blockIndex;
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
} }
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) {
if (offset < 0 || offset >= getDataStartOffset()) { if (offset < 0 || offset >= getDataStartOffset()) {
return -1; return -1;
} }
@ -601,12 +601,12 @@ static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockIn
return -1; return -1;
} }
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); fwrite(pVInfo, sizeof(STSGroupBlockInfo), 1, pTSBuf->f);
return 0; return 0;
} }
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) { STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
int32_t j = tsBufFindVnodeById(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
if (j == -1) { if (j == -1) {
return NULL; return NULL;
} }
@ -615,7 +615,7 @@ STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
} }
int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) { if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
return -1; return -1;
} }
@ -631,7 +631,7 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
} }
bool tsBufNextPos(STSBuf* pTSBuf) { bool tsBufNextPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) { if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
return false; return false;
} }
@ -650,16 +650,16 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
} }
} else { // get the last timestamp record in the last block of the last vnode } else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfVnodes > 0); assert(pTSBuf->numOfGroups > 0);
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; int32_t groupIndex = pTSBuf->numOfGroups - 1;
pCur->vgroupIndex = vnodeIndex; pCur->vgroupIndex = groupIndex;
int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode; int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1; int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex); tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
pCur->tsIndex = pTSBuf->block.numOfElem - 1; pCur->tsIndex = pTSBuf->block.numOfElem - 1;
if (pTSBuf->block.numOfElem == 0) { if (pTSBuf->block.numOfElem == 0) {
@ -678,12 +678,12 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
(pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) { (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
int32_t vnodeId = pTSBuf->pData[pCur->vgroupIndex].info.vnode; int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) || if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
(pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
if ((pCur->vgroupIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSDB_ORDER_ASC) || if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) ||
(pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
pCur->vgroupIndex = -1; pCur->vgroupIndex = -1;
return false; return false;
@ -719,7 +719,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
} }
STSElem tsBufGetElem(STSBuf* pTSBuf) { STSElem tsBufGetElem(STSBuf* pTSBuf) {
STSElem elem1 = {.vnode = -1}; STSElem elem1 = {.id = -1};
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return elem1; return elem1;
} }
@ -731,7 +731,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode; elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = &pBlock->tag; elem1.tag = &pBlock->tag;
@ -742,34 +742,34 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
* current only support ts comp data from two vnode merge * current only support ts comp data from two vnode merge
* @param pDestBuf * @param pDestBuf
* @param pSrcBuf * @param pSrcBuf
* @param vnodeId * @param id
* @return * @return
*/ */
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) { if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
return 0; return 0;
} }
if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) { if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
return -1; return -1;
} }
// src can only have one vnode index // src can only have one vnode index
assert(pSrcBuf->numOfVnodes == 1); assert(pSrcBuf->numOfGroups == 1);
// there are data in buffer, flush to disk first // there are data in buffer, flush to disk first
tsBufFlush(pDestBuf); tsBufFlush(pDestBuf);
// compared with the last vnode id // compared with the last vnode id
int32_t vnodeId = tsBufGetLastVnodeInfo((STSBuf*) pSrcBuf)->info.vnode; int32_t id = tsBufGetLastGroupInfo((STSBuf*) pSrcBuf)->info.id;
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
int32_t oldSize = pDestBuf->numOfVnodes; int32_t oldSize = pDestBuf->numOfGroups;
int32_t newSize = oldSize + pSrcBuf->numOfVnodes; int32_t newSize = oldSize + pSrcBuf->numOfGroups;
if (pDestBuf->numOfAlloc < newSize) { if (pDestBuf->numOfAlloc < newSize) {
pDestBuf->numOfAlloc = newSize; pDestBuf->numOfAlloc = newSize;
STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); STSGroupBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
if (tmp == NULL) { if (tmp == NULL) {
return -1; return -1;
} }
@ -778,23 +778,23 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
} }
// directly copy the vnode index information // directly copy the vnode index information
memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx)); memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
// set the new offset value // set the new offset value
for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) { for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
pBlockInfoEx->info.vnode = vnodeId; pBlockInfoEx->info.id = id;
} }
pDestBuf->numOfVnodes = newSize; pDestBuf->numOfGroups = newSize;
} else { } else {
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->len += pSrcBuf->pData[0].len;
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
pBlockInfoEx->info.vnode = vnodeId; pBlockInfoEx->info.id = id;
} }
int32_t r = fseek(pDestBuf->f, 0, SEEK_END); int32_t r = fseek(pDestBuf->f, 0, SEEK_END);
@ -827,23 +827,23 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
assert(pDestBuf->fileSize == oldSize + size); assert(pDestBuf->fileSize == oldSize + size);
// tscDebug("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, // tscDebug("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfGroups:%d, autoDelete:%d", pDestBuf,
// pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); // pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfGroups, pDestBuf->autoDelete);
return 0; return 0;
} }
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t vnodeId) { STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
STSBuf* pTSBuf = tsBufCreate(true, order); STSBuf* pTSBuf = tsBufCreate(true, order);
STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
pBlockInfo->numOfBlocks = numOfBlocks; pBlockInfo->numOfBlocks = numOfBlocks;
pBlockInfo->compLen = len; pBlockInfo->compLen = len;
pBlockInfo->offset = getDataStartOffset(); pBlockInfo->offset = getDataStartOffset();
pBlockInfo->vnode = vnodeId; pBlockInfo->id = id;
// update prev vnode length info in file // update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
UNUSED(ret); UNUSED(ret);
@ -855,7 +855,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
STSBufFileHeader header = { STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
fsync(fileno(pTSBuf->f)); fsync(fileno(pTSBuf->f));
@ -863,14 +863,14 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
return pTSBuf; return pTSBuf;
} }
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) { STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag) {
STSElem elem = {.vnode = -1}; STSElem elem = {.id = -1};
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return elem; return elem;
} }
int32_t j = tsBufFindVnodeById(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
if (j == -1) { if (j == -1) {
return elem; return elem;
} }
@ -879,7 +879,7 @@ STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) {
// tsBufDisplay(pTSBuf); // tsBufDisplay(pTSBuf);
STSCursor* pCur = &pTSBuf->cur; STSCursor* pCur = &pTSBuf->cur;
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
if (blockIndex < 0) { if (blockIndex < 0) {
@ -935,7 +935,7 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) {
void tsBufDisplay(STSBuf* pTSBuf) { void tsBufDisplay(STSBuf* pTSBuf) {
printf("-------start of ts comp file-------\n"); printf("-------start of ts comp file-------\n");
printf("number of vnode:%d\n", pTSBuf->numOfVnodes); printf("number of vnode:%d\n", pTSBuf->numOfGroups);
int32_t old = pTSBuf->cur.order; int32_t old = pTSBuf->cur.order;
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
@ -945,7 +945,7 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while (tsBufNextPos(pTSBuf)) { while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf); STSElem elem = tsBufGetElem(pTSBuf);
if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) { if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i64Key, elem.ts);
} }
} }
@ -954,20 +954,20 @@ void tsBufDisplay(STSBuf* pTSBuf) {
} }
static int32_t getDataStartOffset() { static int32_t getDataStartOffset() {
return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo); return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo);
} }
// update prev vnode length info in file // update prev vnode length info in file
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) { static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) {
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo); int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo);
doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo); doUpdateGroupInfo(pTSBuf, offset, pBlockInfo);
} }
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
const int32_t INITIAL_VNODEINFO_SIZE = 4; const int32_t INITIAL_GROUPINFO_SIZE = 4;
pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE; pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx)); pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
if (pTSBuf->pData == NULL) { if (pTSBuf->pData == NULL) {
tsBufDestroy(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
@ -999,35 +999,35 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
return pTSBuf; return pTSBuf;
} }
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf) { int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return 0; return 0;
} }
return pTSBuf->numOfVnodes; return pTSBuf->numOfGroups;
} }
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) { void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
int32_t size = tsBufGetNumOfVnodes(pTSBuf); int32_t size = tsBufGetNumOfGroup(pTSBuf);
if (num != NULL) { if (num != NULL) {
*num = size; *num = size;
} }
*vnodeId = NULL; *id = NULL;
if (size == 0) { if (size == 0) {
return; return;
} }
(*vnodeId) = malloc(tsBufGetNumOfVnodes(pTSBuf) * sizeof(int32_t)); (*id) = malloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
(*vnodeId)[i] = pTSBuf->pData[i].info.vnode; (*id)[i] = pTSBuf->pData[i].info.id;
} }
} }
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeIndex, void* buf, int32_t* len, int32_t* numOfBlocks) { int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
assert(vnodeIndex >= 0 && vnodeIndex < pTSBuf->numOfVnodes); assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
STSVnodeBlockInfo *pBlockInfo = &pTSBuf->pData[vnodeIndex].info; STSGroupBlockInfo *pBlockInfo = &pTSBuf->pData[groupIndex].info;
*len = 0; *len = 0;
*numOfBlocks = 0; *numOfBlocks = 0;
@ -1052,11 +1052,11 @@ int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeIndex, void* buf, in
} }
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag) { STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag) {
STSElem el = {.vnode = -1}; STSElem el = {.id = -1};
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) { for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.vnode, pTag); el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag);
if (el.vnode == pTSBuf->pData[i].info.vnode) { if (el.id == pTSBuf->pData[i].info.id) {
return el; return el;
} }
} }
@ -1065,5 +1065,5 @@ STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag) {
} }
bool tsBufIsValidElem(STSElem* pElem) { bool tsBufIsValidElem(STSElem* pElem) {
return pElem->vnode >= 0; return pElem->id >= 0;
} }