more code
This commit is contained in:
parent
caccd5e60a
commit
b79859d2a3
|
@ -122,6 +122,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
|
||||||
|
|
||||||
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
||||||
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
||||||
|
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
|
||||||
|
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
|
||||||
// STSDBRowIter
|
// STSDBRowIter
|
||||||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
void tsdbRowClose(STSDBRowIter *pIter);
|
void tsdbRowClose(STSDBRowIter *pIter);
|
||||||
|
@ -226,9 +228,9 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
|
||||||
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
|
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
|
||||||
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
||||||
// STbDataIter
|
// STbDataIter
|
||||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||||
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
|
||||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||||
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
||||||
|
|
||||||
|
@ -368,15 +370,6 @@ struct TSDBKEY {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SMemSkipListNode SMemSkipListNode;
|
typedef struct SMemSkipListNode SMemSkipListNode;
|
||||||
struct SMemSkipListNode {
|
|
||||||
int8_t level;
|
|
||||||
int8_t flag; // TSDBROW_ROW_FMT for row format, TSDBROW_COL_FMT for col format
|
|
||||||
int32_t iRow;
|
|
||||||
int64_t version;
|
|
||||||
void *pData;
|
|
||||||
SMemSkipListNode *forwards[0];
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct SMemSkipList {
|
typedef struct SMemSkipList {
|
||||||
int64_t size;
|
int64_t size;
|
||||||
uint32_t seed;
|
uint32_t seed;
|
||||||
|
@ -430,8 +423,14 @@ struct TSDBROW {
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SMemSkipListNode {
|
||||||
|
int8_t level;
|
||||||
|
TSDBROW row;
|
||||||
|
SMemSkipListNode *forwards[0];
|
||||||
|
};
|
||||||
|
|
||||||
struct STsdbRowKey {
|
struct STsdbRowKey {
|
||||||
SRowKey rowkey;
|
SRowKey key;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -941,34 +940,7 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||||
// #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
// #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||||
// #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
// #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||||
|
|
||||||
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
|
||||||
if (pIter == NULL) return NULL;
|
|
||||||
|
|
||||||
if (pIter->pRow) {
|
|
||||||
return pIter->pRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pIter->backward) {
|
|
||||||
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->pRow = &pIter->row;
|
|
||||||
if (pIter->pNode->flag == TSDBROW_ROW_FMT) {
|
|
||||||
pIter->row = tsdbRowFromTSRow(pIter->pNode->version, pIter->pNode->pData);
|
|
||||||
} else if (pIter->pNode->flag == TSDBROW_COL_FMT) {
|
|
||||||
pIter->row = tsdbRowFromBlockData(pIter->pNode->pData, pIter->pNode->iRow);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pIter->pRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
|
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
|
||||||
|
|
||||||
|
|
|
@ -298,8 +298,11 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
|
||||||
// mem data iter
|
// mem data iter
|
||||||
config.type = TSDB_ITER_TYPE_MEMT;
|
config.type = TSDB_ITER_TYPE_MEMT;
|
||||||
config.memt = committer->tsdb->imem;
|
config.memt = committer->tsdb->imem;
|
||||||
config.from->ts = committer->ctx->minKey;
|
|
||||||
config.from->version = VERSION_MIN;
|
config.from->version = VERSION_MIN;
|
||||||
|
config.from->key = (SRowKey){
|
||||||
|
.ts = committer->ctx->minKey,
|
||||||
|
.numOfKeys = 0, // TODO: support multiple primary keys
|
||||||
|
};
|
||||||
|
|
||||||
code = tsdbIterOpen(&config, &iter);
|
code = tsdbIterOpen(&config, &iter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -45,7 +45,7 @@ struct STsdbIter {
|
||||||
} dataData[1];
|
} dataData[1];
|
||||||
struct {
|
struct {
|
||||||
SMemTable *memt;
|
SMemTable *memt;
|
||||||
TSDBKEY from[1];
|
STsdbRowKey from[1];
|
||||||
SRBTreeIter iter[1];
|
SRBTreeIter iter[1];
|
||||||
STbData *tbData;
|
STbData *tbData;
|
||||||
STbDataIter tbIter[1];
|
STbDataIter tbIter[1];
|
||||||
|
|
|
@ -44,8 +44,8 @@ typedef struct {
|
||||||
SSttFileReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB
|
SSttFileReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB
|
||||||
SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB
|
SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB
|
||||||
struct {
|
struct {
|
||||||
SMemTable *memt; // TSDB_ITER_TYPE_MEMT_TOMB
|
SMemTable *memt; // TSDB_ITER_TYPE_MEMT_TOMB
|
||||||
TSDBKEY from[1];
|
STsdbRowKey from[1];
|
||||||
}; // TSDB_ITER_TYPE_MEMT
|
}; // TSDB_ITER_TYPE_MEMT
|
||||||
};
|
};
|
||||||
bool filterByVersion;
|
bool filterByVersion;
|
||||||
|
|
|
@ -31,7 +31,7 @@
|
||||||
#define SL_MOVE_BACKWARD 0x1
|
#define SL_MOVE_BACKWARD 0x1
|
||||||
#define SL_MOVE_FROM_POS 0x2
|
#define SL_MOVE_FROM_POS 0x2
|
||||||
|
|
||||||
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
|
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
|
||||||
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
|
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
|
||||||
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
|
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
|
||||||
SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
|
SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
|
||||||
|
@ -219,7 +219,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) {
|
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
(*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
|
(*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
|
||||||
|
@ -241,7 +241,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
|
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
|
||||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||||
SMemSkipListNode *pHead;
|
SMemSkipListNode *pHead;
|
||||||
SMemSkipListNode *pTail;
|
SMemSkipListNode *pTail;
|
||||||
|
@ -433,10 +433,10 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
|
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
|
||||||
SMemSkipListNode *px;
|
SMemSkipListNode *px;
|
||||||
SMemSkipListNode *pn;
|
SMemSkipListNode *pn;
|
||||||
TSDBKEY tKey = {0};
|
STsdbRowKey tKey;
|
||||||
int32_t backward = flags & SL_MOVE_BACKWARD;
|
int32_t backward = flags & SL_MOVE_BACKWARD;
|
||||||
int32_t fromPos = flags & SL_MOVE_FROM_POS;
|
int32_t fromPos = flags & SL_MOVE_FROM_POS;
|
||||||
|
|
||||||
|
@ -455,15 +455,9 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
||||||
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||||
pn = SL_GET_NODE_BACKWARD(px, iLevel);
|
pn = SL_GET_NODE_BACKWARD(px, iLevel);
|
||||||
while (pn != pTbData->sl.pHead) {
|
while (pn != pTbData->sl.pHead) {
|
||||||
if (pn->flag == TSDBROW_ROW_FMT) {
|
tsdbRowGetKey(&pn->row, &tKey);
|
||||||
tKey.version = pn->version;
|
|
||||||
tKey.ts = ((SRow *)pn->pData)->ts;
|
|
||||||
} else if (pn->flag == TSDBROW_COL_FMT) {
|
|
||||||
tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
|
|
||||||
tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
|
int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
|
||||||
if (c <= 0) {
|
if (c <= 0) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -490,15 +484,9 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
||||||
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||||
pn = SL_GET_NODE_FORWARD(px, iLevel);
|
pn = SL_GET_NODE_FORWARD(px, iLevel);
|
||||||
while (pn != pTbData->sl.pTail) {
|
while (pn != pTbData->sl.pTail) {
|
||||||
if (pn->flag == TSDBROW_ROW_FMT) {
|
tsdbRowGetKey(&pn->row, &tKey);
|
||||||
tKey.version = pn->version;
|
|
||||||
tKey.ts = ((SRow *)pn->pData)->ts;
|
|
||||||
} else if (pn->flag == TSDBROW_COL_FMT) {
|
|
||||||
tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
|
|
||||||
tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
|
int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
|
||||||
if (c >= 0) {
|
if (c >= 0) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -547,16 +535,10 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode->level = level;
|
pNode->level = level;
|
||||||
pNode->flag = pRow->type;
|
pNode->row = *pRow;
|
||||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
pNode->version = pRow->version;
|
pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
|
||||||
pNode->pData = (char *)pNode + nSize;
|
memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
|
||||||
memcpy(pNode->pData, pRow->pTSRow, pRow->pTSRow->len);
|
|
||||||
} else if (pRow->type == TSDBROW_COL_FMT) {
|
|
||||||
pNode->iRow = pRow->iRow;
|
|
||||||
pNode->pData = pRow->pBlockData;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// set node
|
// set node
|
||||||
|
@ -656,13 +638,14 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
// loop to add each row to the skiplist
|
// loop to add each row to the skiplist
|
||||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||||
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
|
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
|
||||||
TSDBKEY key = {.version = version, .ts = pBlockData->aTSKEY[0]};
|
STsdbRowKey key;
|
||||||
TSDBROW lRow; // last row
|
TSDBROW lRow; // last row
|
||||||
|
|
||||||
// first row
|
// first row
|
||||||
|
tsdbRowGetKey(&tRow, &key);
|
||||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
||||||
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
|
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
|
||||||
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
|
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
|
||||||
lRow = tRow;
|
lRow = tRow;
|
||||||
|
|
||||||
// remain row
|
// remain row
|
||||||
|
@ -673,7 +656,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
}
|
}
|
||||||
|
|
||||||
while (tRow.iRow < pBlockData->nRow) {
|
while (tRow.iRow < pBlockData->nRow) {
|
||||||
key.ts = pBlockData->aTSKEY[tRow.iRow];
|
tsdbRowGetKey(&tRow, &key);
|
||||||
|
|
||||||
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
|
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
|
||||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
|
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
|
||||||
|
@ -686,8 +669,8 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key.ts >= pTbData->maxKey) {
|
if (key.key.ts >= pTbData->maxKey) {
|
||||||
pTbData->maxKey = key.ts;
|
pTbData->maxKey = key.key.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||||
|
@ -711,7 +694,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
|
|
||||||
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
||||||
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
||||||
TSDBKEY key = {.version = version};
|
STsdbRowKey key;
|
||||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||||
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
||||||
int32_t iRow = 0;
|
int32_t iRow = 0;
|
||||||
|
@ -719,13 +702,13 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
|
|
||||||
// backward put first data
|
// backward put first data
|
||||||
tRow.pTSRow = aRow[iRow++];
|
tRow.pTSRow = aRow[iRow++];
|
||||||
key.ts = tRow.pTSRow->ts;
|
tsdbRowGetKey(&tRow, &key);
|
||||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
||||||
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
|
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
lRow = tRow;
|
lRow = tRow;
|
||||||
|
|
||||||
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
|
pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
|
||||||
|
|
||||||
// forward put rest data
|
// forward put rest data
|
||||||
if (iRow < nRow) {
|
if (iRow < nRow) {
|
||||||
|
@ -735,7 +718,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
|
|
||||||
while (iRow < nRow) {
|
while (iRow < nRow) {
|
||||||
tRow.pTSRow = aRow[iRow];
|
tRow.pTSRow = aRow[iRow];
|
||||||
key.ts = tRow.pTSRow->ts;
|
tsdbRowGetKey(&tRow, &key);
|
||||||
|
|
||||||
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
|
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
|
||||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
|
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
|
||||||
|
@ -750,8 +733,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key.ts >= pTbData->maxKey) {
|
if (key.key.ts >= pTbData->maxKey) {
|
||||||
pTbData->maxKey = key.ts;
|
pTbData->maxKey = key.key.ts;
|
||||||
}
|
}
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||||
|
@ -833,3 +816,26 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
|
||||||
_exit:
|
_exit:
|
||||||
return aTbDataP;
|
return aTbDataP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
||||||
|
if (pIter == NULL) return NULL;
|
||||||
|
|
||||||
|
if (pIter->pRow) {
|
||||||
|
return pIter->pRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->backward) {
|
||||||
|
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->pRow = &pIter->row;
|
||||||
|
pIter->row = pIter->pNode->row;
|
||||||
|
|
||||||
|
return pIter->pRow;
|
||||||
|
}
|
|
@ -887,7 +887,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo){
|
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) {
|
||||||
record->uid = pBlockInfo->uid;
|
record->uid = pBlockInfo->uid;
|
||||||
record->firstKey = pBlockInfo->firstKey;
|
record->firstKey = pBlockInfo->firstKey;
|
||||||
record->lastKey = pBlockInfo->lastKey;
|
record->lastKey = pBlockInfo->lastKey;
|
||||||
|
@ -1122,8 +1122,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||||
", rows:%d, code:%s %s",
|
", rows:%d, code:%s %s",
|
||||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->firstKey,
|
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->firstKey, pBlockInfo->lastKey,
|
||||||
pBlockInfo->lastKey, pBlockInfo->numRow, tstrerror(code), pReader->idStr);
|
pBlockInfo->numRow, tstrerror(code), pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1265,8 +1265,8 @@ static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) {
|
static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) {
|
||||||
return (key.ts >= pBlock->firstKey && key.ts <= pBlock->lastKey) &&
|
return (key.ts >= pBlock->firstKey && key.ts <= pBlock->lastKey) && (pBlock->maxVer >= pVerRange->minVer) &&
|
||||||
(pBlock->maxVer >= pVerRange->minVer) && (pBlock->minVer <= pVerRange->maxVer);
|
(pBlock->minVer <= pVerRange->maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
|
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
|
||||||
|
@ -1291,8 +1291,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||||
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
pInfo->overlapWithSttBlock =
|
pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
|
||||||
!(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->moreThanCapcity = pBlockInfo->numRow > pReader->resBlockInfo.capacity;
|
pInfo->moreThanCapcity = pBlockInfo->numRow > pReader->resBlockInfo.capacity;
|
||||||
|
@ -1911,7 +1910,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScanInfo* pBlockScanInfo, TSDBKEY* pKey,
|
int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScanInfo* pBlockScanInfo, STsdbRowKey* pKey,
|
||||||
SMemTable* pMem, SIterInfo* pIter, const char* type) {
|
SMemTable* pMem, SIterInfo* pIter, const char* type) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t backward = (!ASCENDING_TRAVERSE(pReader->info.order));
|
int32_t backward = (!ASCENDING_TRAVERSE(pReader->info.order));
|
||||||
|
@ -1927,8 +1926,8 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64 ", check data in %s from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
tsdbDebug("%p uid:%" PRIu64 ", check data in %s from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||||
"-%" PRId64 " %s",
|
"-%" PRId64 " %s",
|
||||||
pReader, pBlockScanInfo->uid, type, pKey->ts, pReader->info.order, (*pData)->minKey, (*pData)->maxKey,
|
pReader, pBlockScanInfo->uid, type, pKey->key.ts, pReader->info.order, (*pData)->minKey,
|
||||||
pReader->idStr);
|
(*pData)->maxKey, pReader->idStr);
|
||||||
} else {
|
} else {
|
||||||
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for %s, code:%s, %s", pReader, pBlockScanInfo->uid,
|
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for %s, code:%s, %s", pReader, pBlockScanInfo->uid,
|
||||||
type, tstrerror(code), pReader->idStr);
|
type, tstrerror(code), pReader->idStr);
|
||||||
|
@ -1947,12 +1946,20 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* d = NULL;
|
STbData* d = NULL;
|
||||||
TSDBKEY startKey = {0};
|
STsdbRowKey startKey = {0};
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey + 1, .version = pReader->info.verRange.minVer};
|
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
|
||||||
|
.key = {
|
||||||
|
.ts = pBlockScanInfo->lastProcKey + 1,
|
||||||
|
.numOfKeys = 0, // TODO: change here if multi-key is supported
|
||||||
|
}};
|
||||||
} else {
|
} else {
|
||||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey - 1, .version = pReader->info.verRange.maxVer};
|
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
|
||||||
|
.key = {
|
||||||
|
.ts = pBlockScanInfo->lastProcKey - 1,
|
||||||
|
.numOfKeys = 0, // TODO: change here if multi-key is supported
|
||||||
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
|
@ -2000,8 +2007,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
|
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
|
||||||
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver,
|
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order,
|
||||||
pInfo->order, &pInfo->verRange);
|
&pInfo->verRange);
|
||||||
if (dropped) {
|
if (dropped) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2093,14 +2100,14 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
pScanInfo->sttKeyInfo.nextProcKey =
|
pScanInfo->sttKeyInfo.nextProcKey =
|
||||||
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
|
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
|
||||||
hasData = true;
|
hasData = true;
|
||||||
} else { // not clean stt blocks
|
} else { // not clean stt blocks
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
|
||||||
pScanInfo->sttBlockReturned = false;
|
pScanInfo->sttBlockReturned = false;
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pScanInfo->cleanSttBlocks = false;
|
pScanInfo->cleanSttBlocks = false;
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
|
||||||
pScanInfo->sttBlockReturned = false;
|
pScanInfo->sttBlockReturned = false;
|
||||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||||
}
|
}
|
||||||
|
@ -2840,8 +2847,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
||||||
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) ||
|
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
||||||
(lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
|
||||||
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
||||||
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -587,22 +587,22 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
|
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
|
||||||
if (row->type == TSDBROW_ROW_FMT) {
|
if (row->type == TSDBROW_ROW_FMT) {
|
||||||
key->version = row->version;
|
key->version = row->version;
|
||||||
tRowGetKey(row->pTSRow, &key->rowkey);
|
tRowGetKey(row->pTSRow, &key->key);
|
||||||
} else {
|
} else {
|
||||||
key->version = row->pBlockData->aVersion[row->iRow];
|
key->version = row->pBlockData->aVersion[row->iRow];
|
||||||
key->rowkey.ts = row->pBlockData->aTSKEY[row->iRow];
|
key->key.ts = row->pBlockData->aTSKEY[row->iRow];
|
||||||
key->rowkey.numOfKeys = 0;
|
key->key.numOfKeys = 0;
|
||||||
for (int32_t i = 0; i < row->pBlockData->nColData; i++) {
|
for (int32_t i = 0; i < row->pBlockData->nColData; i++) {
|
||||||
SColData *pColData = &row->pBlockData->aColData[i];
|
SColData *pColData = &row->pBlockData->aColData[i];
|
||||||
if (pColData->cflag & COL_IS_KEY) {
|
if (pColData->cflag & COL_IS_KEY) {
|
||||||
SColVal cv;
|
SColVal cv;
|
||||||
tColDataGetValue(pColData, row->iRow, &cv);
|
tColDataGetValue(pColData, row->iRow, &cv);
|
||||||
key->rowkey.keys[key->rowkey.numOfKeys].type = pColData->type;
|
key->key.keys[key->key.numOfKeys].type = pColData->type;
|
||||||
key->rowkey.keys[key->rowkey.numOfKeys].value = cv.value;
|
key->key.keys[key->key.numOfKeys].value = cv.value;
|
||||||
key->rowkey.numOfKeys++;
|
key->key.numOfKeys++;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -611,7 +611,7 @@ static void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
|
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
|
||||||
int32_t c = tRowKeyCmpr(&key1->rowkey, &key2->rowkey);
|
int32_t c = tRowKeyCmpr(&key1->key, &key2->key);
|
||||||
|
|
||||||
if (c) {
|
if (c) {
|
||||||
return c;
|
return c;
|
||||||
|
|
Loading…
Reference in New Issue