diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index 16a4f55840..9292be83e9 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -1664,11 +1664,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
return handleErrorBeforePoll(pVg, pTmq);
}
- sendInfo->msgInfo = (SDataBuf){
- .pData = msg,
- .len = msgSize,
- .handle = NULL,
- };
+ sendInfo->msgInfo = (SDataBuf){ .pData = msg, .len = msgSize, .handle = NULL };
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c
index 5c20887cf5..92ff550895 100644
--- a/source/dnode/mnode/impl/src/mndMain.c
+++ b/source/dnode/mnode/impl/src/mndMain.c
@@ -124,11 +124,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
- SRpcMsg rpcMsg = {
- .msgType = TDMT_MND_TMQ_TIMER,
- .pCont = pReq,
- .contLen = contLen,
- };
+ SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen };
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index fb2c2f4be3..aecfb9c3e5 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -162,7 +162,6 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
// tsdb
-// typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader;
#define TSDB_DEFAULT_STT_FILE 8
diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h
index 4d3111a9b7..42da2bb3c1 100644
--- a/source/dnode/vnode/src/inc/tsdb.h
+++ b/source/dnode/vnode/src/inc/tsdb.h
@@ -16,6 +16,7 @@
#ifndef _TD_VNODE_TSDB_H_
#define _TD_VNODE_TSDB_H_
+#include "tsimplehash.h"
#include "vnodeInt.h"
#ifdef __cplusplus
@@ -125,11 +126,13 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
-
-// int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger);
-// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
+
+int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema);
+void tsdbRowMergerClear_rv(SRowMerger* pMerger);
+void tsdbRowMergerCleanup_rv(SRowMerger* pMerger);
+
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY
@@ -224,7 +227,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
-void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum);
+void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
@@ -706,7 +709,6 @@ typedef struct SSttBlockLoadInfo {
typedef struct SMergeTree {
int8_t backward;
SRBTree rbt;
- SArray *pIterList;
SLDataIter *pIter;
bool destroyLoadInfo;
SSttBlockLoadInfo *pLoadInfo;
@@ -752,13 +754,29 @@ struct SDiskDataBuilder {
SBlkInfo bi;
};
+typedef struct SLDataIter {
+ SRBTreeNode node;
+ SSttBlk *pSttBlk;
+ SDataFReader *pReader;
+ int32_t iStt;
+ int8_t backward;
+ int32_t iSttBlk;
+ int32_t iRow;
+ SRowInfo rInfo;
+ uint64_t uid;
+ STimeWindow timeWindow;
+ SVersionRange verRange;
+ SSttBlockLoadInfo *pBlockLoadInfo;
+ bool ignoreEarlierTs;
+} SLDataIter;
+
+#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
- bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
+ bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
-TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
@@ -783,6 +801,7 @@ typedef struct SCacheRowsReader {
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo;
+ SLDataIter *pDataIter;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
SDataFReader *pDataFReaderLast;
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index 3c7edd931b..65fc086f8d 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -598,6 +598,7 @@ typedef struct {
SMergeTree mergeTree;
SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
+ SLDataIter* pDataIter;
int64_t lastTs;
} SFSLastNextRowIter;
@@ -645,7 +646,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
}
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
- &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
+ &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter);
state->pMergeTree = &state->mergeTree;
state->state = SFSLASTNEXTROW_BLOCKROW;
}
@@ -667,7 +668,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset;
}
- state->row = tMergeTreeGetRow(&state->mergeTree);
+ state->row = *tMergeTreeGetRow(&state->mergeTree);
*ppRow = &state->row;
if (TSDBROW_TS(&state->row) <= state->lastTs) {
@@ -1211,7 +1212,7 @@ typedef struct {
} CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
- SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
+ SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SDataFReader **pDataFReaderLast, int64_t lastTs) {
int code = 0;
@@ -1274,6 +1275,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsLastState.lastTs = lastTs;
+ pIter->fsLastState.pDataIter = pLDataIter;
pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb;
@@ -1465,7 +1467,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
- nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
+ nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
@@ -1622,7 +1624,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
- nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
+ nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
index 95981c2f08..64d30c77a3 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
@@ -187,13 +187,21 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
}
- int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
+ SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
+
+ int32_t numOfStt = pCfg->sttTrigger;
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
if (p->pLoadInfo == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
+ p->pDataIter = taosMemoryCalloc(pCfg->sttTrigger, sizeof(SLDataIter));
+ if (p->pDataIter == NULL) {
+ tsdbCacherowsReaderClose(p);
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+
p->idstr = taosStrdup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL);
@@ -215,6 +223,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pSchema);
}
+ taosMemoryFreeClear(p->pDataIter);
taosMemoryFree(p->pCurrSchema);
destroyLastBlockLoadInfo(p->pLoadInfo);
diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c
index d0ff403bf7..f27a28acb3 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include
#include "tsdb.h"
#define MEM_MIN_HASH 1024
@@ -298,12 +299,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return rowsNum;
}
-void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
+void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
- void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
+ void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;
diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
index eb383df48d..79f4a17f65 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
@@ -16,22 +16,6 @@
#include "tsdb.h"
// SLDataIter =================================================
-struct SLDataIter {
- SRBTreeNode node;
- SSttBlk *pSttBlk;
- SDataFReader *pReader;
- int32_t iStt;
- int8_t backward;
- int32_t iSttBlk;
- int32_t iRow;
- SRowInfo rInfo;
- uint64_t uid;
- STimeWindow timeWindow;
- SVersionRange verRange;
- SSttBlockLoadInfo *pBlockLoadInfo;
- bool ignoreEarlierTs;
-};
-
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
int32_t numOfSttTrigger) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
@@ -268,25 +252,21 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
}
}
-int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
+int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
int32_t code = TSDB_CODE_SUCCESS;
- *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
- if (*pIter == NULL) {
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _exit;
- }
+ pIter->uid = uid;
+ pIter->pReader = pReader;
+ pIter->iStt = iStt;
+ pIter->backward = backward;
+ pIter->verRange.minVer = pRange->minVer;
+ pIter->verRange.maxVer = pRange->maxVer;
+ pIter->timeWindow.skey = pTimeWindow->skey;
+ pIter->timeWindow.ekey = pTimeWindow->ekey;
- (*pIter)->uid = uid;
- (*pIter)->pReader = pReader;
- (*pIter)->iStt = iStt;
- (*pIter)->backward = backward;
- (*pIter)->verRange = *pRange;
- (*pIter)->timeWindow = *pTimeWindow;
-
- (*pIter)->pBlockLoadInfo = pBlockLoadInfo;
+ pIter->pBlockLoadInfo = pBlockLoadInfo;
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
@@ -294,7 +274,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
- goto _exit;
+ return code;
}
// only apply to the child tables, ordinary tables will not incur this filter procedure.
@@ -310,7 +290,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
- (*pIter)->iSttBlk = -1;
+ pIter->iSttBlk = -1;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
return code;
@@ -343,31 +323,27 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block
- (*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
- if ((*pIter)->iSttBlk != -1) {
- (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
- (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
+ pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
+ if (pIter->iSttBlk != -1) {
+ pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
+ pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
- if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
- (!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
- (*pIter)->pSttBlk = NULL;
+ if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
+ (!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
+ pIter->pSttBlk = NULL;
}
- if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
- (!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
- (*pIter)->pSttBlk = NULL;
- (*pIter)->ignoreEarlierTs = true;
+ if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
+ (!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
+ pIter->pSttBlk = NULL;
+ pIter->ignoreEarlierTs = true;
}
}
return code;
-
-_exit:
- taosMemoryFree(*pIter);
- return code;
}
-void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
+void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */}
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1;
@@ -594,43 +570,38 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
- bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
+ bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) {
+ int32_t code = TSDB_CODE_SUCCESS;
+
pMTree->backward = backward;
pMTree->pIter = NULL;
- pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
- if (pMTree->pIterList == NULL) {
- return TSDB_CODE_OUT_OF_MEMORY;
- }
-
pMTree->idStr = idStr;
+
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
} else { // desc
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
- int32_t code = TSDB_CODE_SUCCESS;
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
pMTree->ignoreEarlierTs = false;
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
- struct SLDataIter *pIter = NULL;
- code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
+ memset(&pLDataIter[i], 0, sizeof(SLDataIter));
+ code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
- bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
+ bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
if (hasVal) {
- taosArrayPush(pMTree->pIterList, &pIter);
- tMergeTreeAddIter(pMTree, pIter);
+ tMergeTreeAddIter(pMTree, &pLDataIter[i]);
} else {
if (!pMTree->ignoreEarlierTs) {
- pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
+ pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
}
- tLDataIterClose(pIter);
}
}
@@ -678,18 +649,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
return pMTree->pIter != NULL;
}
-TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
-
void tMergeTreeClose(SMergeTree *pMTree) {
- size_t size = taosArrayGetSize(pMTree->pIterList);
- for (int32_t i = 0; i < size; ++i) {
- SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
- tLDataIterClose(pIter);
- }
-
- pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
pMTree->pIter = NULL;
-
if (pMTree->destroyLoadInfo) {
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
pMTree->destroyLoadInfo = false;
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 5bd41dd86f..e643e8f1a4 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -18,6 +18,7 @@
#include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
+#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
@@ -108,6 +109,7 @@ typedef struct SLastBlockReader {
uint64_t uid;
SMergeTree mergeTree;
SSttBlockLoadInfo* pInfo;
+ int64_t currentKey;
} SLastBlockReader;
typedef struct SFilesetIter {
@@ -125,12 +127,12 @@ typedef struct SFileDataBlockInfo {
} SFileDataBlockInfo;
typedef struct SDataBlockIter {
- int32_t numOfBlocks;
- int32_t index;
- SArray* blockList; // SArray
- int32_t order;
- SDataBlk block; // current SDataBlk data
- SHashObj* pTableMap;
+ int32_t numOfBlocks;
+ int32_t index;
+ SArray* blockList; // SArray
+ int32_t order;
+ SDataBlk block; // current SDataBlk data
+ SSHashObj* pTableMap;
} SDataBlockIter;
typedef struct SFileBlockDumpInfo {
@@ -148,7 +150,8 @@ typedef struct STableUidList {
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
- SHashObj* pTableMap; // SHash
+ bool mapDataCleaned; // mapData has been cleaned up alreay or not
+ SSHashObj* pTableMap; // SHash
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
SFileBlockDumpInfo fBlockDumpInfo;
@@ -156,6 +159,9 @@ typedef struct SReaderStatus {
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
+ SLDataIter* pLDataIter;
+ SRowMerger merger;
+ SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
} SReaderStatus;
typedef struct SBlockInfoBuf {
@@ -165,6 +171,15 @@ typedef struct SBlockInfoBuf {
int32_t numOfTables;
} SBlockInfoBuf;
+typedef struct STsdbReaderAttr {
+ STSchema* pSchema;
+ EReadMode readMode;
+ uint64_t rowsNum;
+ STimeWindow window;
+ bool freeBlock;
+ SVersionRange verRange;
+} STsdbReaderAttr;
+
struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
@@ -184,37 +199,34 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
- STSchema* pSchema; // the newest version schema
- // STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
- SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
- SDataFReader* pFileReader; // the file reader
- SDelFReader* pDelFReader; // the del file reader
- SArray* pDelIdx; // del file block index;
- SBlockInfoBuf blockInfoBuf;
- int32_t step;
- STsdbReader* innerReader[2];
+ STSchema* pSchema; // the newest version schema
+ SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
+ SDataFReader* pFileReader; // the file reader
+ SDelFReader* pDelFReader; // the del file reader
+ SArray* pDelIdx; // del file block index;
+ SBlockInfoBuf blockInfoBuf;
+ int32_t step;
+ STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
-static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
- SRowMerger* pMerger);
+static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
-static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
- STsdbReader* pReader);
+static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
- STableBlockScanInfo* pInfo);
+ STableBlockScanInfo* pScanInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
-static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
+static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
- TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
+ TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
@@ -225,7 +237,6 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
-static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
@@ -233,9 +244,9 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
-static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);
+static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
-static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
+static STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@@ -384,12 +395,11 @@ static int32_t uidComparFunc(const void* p1, const void* p2) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
-static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
+static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
- SHashObj* pTableMap =
- taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
+ SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pTableMap == NULL) {
return NULL;
}
@@ -399,7 +409,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (pUidList->tableUidList == NULL) {
- taosHashCleanup(pTableMap);
+ tSimpleHashCleanup(pTableMap);
return NULL;
}
@@ -421,7 +431,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
pScanInfo->lastKeyInStt = ekey;
}
- taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
+ tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
}
@@ -436,9 +446,11 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
return pTableMap;
}
-static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
- STableBlockScanInfo** p = NULL;
- while ((p = taosHashIterate(pTableMap, p)) != NULL) {
+static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
+ void *p = NULL;
+ int32_t iter = 0;
+
+ while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
@@ -478,13 +490,15 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
tMapDataClear(&p->mapData);
}
-static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
+static void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
void* p = NULL;
- while ((p = taosHashIterate(pTableMap, p)) != NULL) {
+ int32_t iter = 0;
+
+ while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*(STableBlockScanInfo**)p);
}
- taosHashCleanup(pTableMap);
+ tSimpleHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
@@ -767,13 +781,16 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
+ setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
+
code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _end;
}
- setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
+ ASSERT (pReader->suppInfo.colId[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
+ pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]);
tsdbInitReaderLock(pReader);
@@ -794,7 +811,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
goto _end;
}
- int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
+ int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
@@ -858,28 +875,42 @@ _end:
return code;
}
-static void cleanupTableScanInfo(SHashObj* pTableMap) {
+static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) {
+ // reset the index in last block when handing a new file
+ tMapDataClear(&pScanInfo->mapData);
+ taosArrayClear(pScanInfo->pBlockList);
+}
+
+static void cleanupTableScanInfo(SReaderStatus* pStatus) {
+ if (pStatus->mapDataCleaned) {
+ return;
+ }
+
+ SSHashObj* pTableMap = pStatus->pTableMap;
STableBlockScanInfo** px = NULL;
+ int32_t iter = 0;
+
while (1) {
- px = taosHashIterate(pTableMap, px);
+ px = tSimpleHashIterate(pTableMap, px, &iter);
if (px == NULL) {
break;
}
- // reset the index in last block when handing a new file
- tMapDataClear(&(*px)->mapData);
- taosArrayClear((*px)->pBlockList);
+ doCleanupTableScanInfo(*px);
}
+
+ pStatus->mapDataCleaned = true;
}
-static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
- int32_t numOfQTable = 0;
+static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
size_t sizeInDisk = 0;
size_t numOfTables = taosArrayGetSize(pIndexList);
int64_t st = taosGetTimestampUs();
- cleanupTableScanInfo(pReader->status.pTableMap);
+ cleanupTableScanInfo(&pReader->status);
+ // set the flag for the new file
+ pReader->status.mapDataCleaned = false;
for (int32_t i = 0; i < numOfTables; ++i) {
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
@@ -933,7 +964,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
- numOfQTable += 1;
+ taosArrayPush(pTableScanInfoList, &pScanInfo);
}
}
@@ -944,8 +975,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
tsdbDebug(
"load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
- numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
- pReader->idStr);
+ numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles,
+ sizeInDisk / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el;
@@ -1424,7 +1455,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
return TSDB_CODE_SUCCESS;
}
-static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
+static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
bool asc = ASCENDING_TRAVERSE(pReader->order);
SBlockOrderSupporter sup = {0};
@@ -1433,7 +1464,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order.
- int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
+// int32_t numOfTables = (int32_t)tSimpleHashGetSize(pReader->status.pTableMap);
+ int32_t numOfTables = taosArrayGetSize(pTableList);
int64_t st = taosGetTimestampUs();
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
@@ -1442,17 +1474,21 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
int32_t cnt = 0;
- void* ptr = NULL;
- while (1) {
- ptr = taosHashIterate(pReader->status.pTableMap, ptr);
- if (ptr == NULL) {
- break;
- }
+// void* ptr = NULL;
+// int32_t iter = 0;
- STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
- if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
- continue;
- }
+// while (1) {
+// ptr = tSimpleHashIterate(pReader->status.pTableMap, ptr, &iter);
+// if (ptr == NULL) {
+// break;
+// }
+
+ for(int32_t i = 0; i < numOfTables; ++i) {
+ STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
+ ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
+// if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
+// continue;
+// }
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
sup.numOfBlocksPerTable[sup.numOfTables] = num;
@@ -1832,11 +1868,14 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
return false;
}
- TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
- TSDBKEY k = TSDBROW_KEY(&row);
- pScanInfo->lastKeyInStt = k.ts;
+ TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
+ int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
- if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
+ pLastBlockReader->currentKey = key;
+ pScanInfo->lastKeyInStt = key;
+
+ if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
// the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step.
return true;
@@ -1876,7 +1915,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return code;
}
-static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
+STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
if (pReader->pSchema != NULL) {
return pReader->pSchema;
}
@@ -1899,6 +1938,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
terrno = code;
return NULL;
}
+
+ code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
+ if (code != 0) {
+ terrno = code;
+ return NULL;
+ }
}
if (pReader->pSchema && sversion == pReader->pSchema->version) {
@@ -1927,7 +1972,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
- SRowMerger merge = {0};
+ SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
@@ -1976,25 +2021,25 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true; // todo check if pReader->pSchema is null or not
- int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
if (minKey == tsLast) {
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
- tsdbRowMergerAdd(&merge, &fRow1, NULL);
+ tsdbRowMergerAdd(&pReader->status.merger, fRow1, NULL);
} else {
init = true;
- int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr);
}
if (minKey == k.ts) {
@@ -2003,15 +2048,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
if (init) {
- tsdbRowMergerAdd(&merge, pRow, pSchema);
+ tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
init = true;
- int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
+ int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2024,46 +2069,46 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
- int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
- if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
+ code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
+ if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code;
}
}
if (minKey == tsLast) {
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
- tsdbRowMergerAdd(&merge, &fRow1, NULL);
+ tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
- int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == key) {
if (init) {
- tsdbRowMergerAdd(&merge, &fRow, NULL);
+ tsdbRowMergerAdd(pMerger, &fRow, NULL);
} else {
init = true;
- int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
- int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
+ int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2071,7 +2116,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(pMerger);
return code;
}
@@ -2079,14 +2124,19 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
+ SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
- int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
- bool copied = false;
- int32_t code = TSDB_CODE_SUCCESS;
- SRow* pTSRow = NULL;
- SRowMerger merge = {0};
- TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
- tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
+
+ int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
+ bool copied = false;
+ int32_t code = TSDB_CODE_SUCCESS;
+ SRow* pTSRow = NULL;
+ TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+
+ // create local variable to hold the row value
+ TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
+
+ tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, pReader->idStr);
// only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
@@ -2099,16 +2149,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
- code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
- tsdbRowMergerAdd(&merge, &fRow1, NULL);
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
+ TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ tsdbRowMergerAdd(pMerger, pRow1, NULL);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
- code = tsdbRowMergerGetRow(&merge, &pTSRow);
+ code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2116,26 +2166,26 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // not merge block data
- code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
- code = tsdbRowMergerGetRow(&merge, &pTSRow);
+ code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2143,7 +2193,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
@@ -2172,22 +2222,22 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
- SRow* pTSRow = NULL;
- SRowMerger merge = {0};
+ SRow* pTSRow = NULL;
+ SRowMerger* pMerger = &pReader->status.merger;
- int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
- tsdbRowMergerAdd(&merge, &fRow1, NULL);
+ TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ tsdbRowMergerAdd(pMerger, pRow1, NULL);
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->verRange, pReader->idStr);
- code = tsdbRowMergerGetRow(&merge, &pTSRow);
+ code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2195,7 +2245,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(pMerger);
return code;
} else {
return TSDB_CODE_SUCCESS;
@@ -2210,7 +2260,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) {
- SRowMerger merge = {0};
+ SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
@@ -2283,42 +2333,41 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
- code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
if (minKey == tsLast) {
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
- tsdbRowMergerAdd(&merge, &fRow1, NULL);
+ tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
- code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
+ code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == ik.ts) {
if (init) {
- tsdbRowMergerAdd(&merge, piRow, piSchema);
+ tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else {
init = true;
- code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
+ code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
- pReader);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2326,20 +2375,15 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) {
if (init) {
- if (merge.pTSchema == NULL) {
- return code;
- }
-
- tsdbRowMergerAdd(&merge, pRow, pSchema);
+ tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
- code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
+ code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
- pReader);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2347,13 +2391,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else {
if (minKey == k.ts) {
init = true;
- code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
+ code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
- pReader);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2361,58 +2404,49 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) {
if (init) {
- tsdbRowMergerAdd(&merge, piRow, piSchema);
+ tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else {
init = true;
- // STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
- code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
+ code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
- pReader);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (minKey == tsLast) {
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
- tsdbRowMergerAdd(&merge, &fRow1, NULL);
+ tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
- code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
+ code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
+ doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) {
- code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
- if (merge.pTSchema == NULL) {
- return code;
- }
- tsdbRowMergerAdd(&merge, &fRow, NULL);
+ tsdbRowMergerAdd(pMerger, &fRow, NULL);
}
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
- if (merge.pTSchema == NULL) {
- return code;
- }
-
- code = tsdbRowMergerGetRow(&merge, &pTSRow);
+ code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2420,7 +2454,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(pMerger);
return code;
}
@@ -2514,8 +2548,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return false;
}
- TSDBKEY k = {.ts = ts, .version = ver};
- if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
+ if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->order,
&pReader->verRange)) {
return false;
}
@@ -2547,7 +2580,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
- pLBlockReader->pInfo, false, pReader->idStr, false);
+ pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
@@ -2555,11 +2588,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
}
-static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
- TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
- return TSDBROW_TS(&row);
-}
-
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
@@ -2585,15 +2613,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
- SRowMerger merge = {0};
-
- code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
+ code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
- code = tsdbRowMergerGetRow(&merge, &pTSRow);
+ doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
+ code = tsdbRowMergerGetRow(&pReader->status.merger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -2601,7 +2627,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(&pReader->status.merger);
return code;
}
}
@@ -2905,12 +2931,12 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
}
}
-static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
+static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastFiles = 0;
- size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
+ size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
while (1) {
@@ -2933,13 +2959,14 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
}
if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
- code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
+ code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList);
return code;
}
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
+// ASSERT(taosArrayGetSize(pTableList) > 0);
break;
}
}
@@ -2979,18 +3006,18 @@ static void resetTableListIndex(SReaderStatus* pStatus) {
pList->currentIndex = 0;
uint64_t uid = pList->tableUidList[0];
- pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
+ pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
- if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
+ if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
pStatus->pTableIter = NULL;
return false;
}
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
- pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
+ pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
return (pStatus->pTableIter != NULL);
}
@@ -3000,7 +3027,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS;
- if (taosHashGetSize(pStatus->pTableMap) == 0) {
+ if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return TSDB_CODE_SUCCESS;
}
@@ -3010,8 +3037,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
- bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
- if (!hasVal) {
+ // reset the index in last block when handing a new file
+ doCleanupTableScanInfo(pScanInfo);
+ pStatus->mapDataCleaned = true;
+
+ bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
+ if (!hasDataInLastFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
@@ -3162,7 +3193,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
goto _end;
}
- int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
+ int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
@@ -3172,14 +3203,13 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
}
SBlockIdx* pBlockIdx = NULL;
- int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
- STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
+ STableBlockScanInfo** p = tSimpleHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
@@ -3225,13 +3255,13 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
- for (int32_t i = 0; i < size; ++i) {
- SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
+ for (int32_t j = 0; j < size; ++j) {
+ SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
pReader->rowsNum += p->nRow;
}
} else {
- for (int32_t i = 0; i < size; ++i) {
- SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
+ for (int32_t j = 0; j < size; ++j) {
+ SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
@@ -3301,13 +3331,6 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
while (1) {
- // if (pStatus->pTableIter == NULL) {
- // pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
- // if (pStatus->pTableIter == NULL) {
- // return TSDB_CODE_SUCCESS;
- // }
- // }
-
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
initMemDataIterator(*pBlockScanInfo, pReader);
@@ -3335,7 +3358,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo) {
- STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
+ STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
}
@@ -3352,20 +3375,24 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SBlockNumber num = {0};
- int32_t code = moveToNextFile(pReader, &num);
+ SArray* pTableList = taosArrayInit(40, POINTER_BYTES);
+
+ int32_t code = moveToNextFile(pReader, &num, pTableList);
if (code != TSDB_CODE_SUCCESS) {
+ taosArrayDestroy(pTableList);
return code;
}
// all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastFiles == 0) {
pReader->status.loadFromFile = false;
+ taosArrayDestroy(pTableList);
return code;
}
// initialize the block iterator for a new fileset
if (num.numOfBlocks > 0) {
- code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
+ code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
@@ -3374,6 +3401,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter);
+ taosArrayDestroy(pTableList);
return code;
}
@@ -3537,7 +3565,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
-bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
+bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, SVersionRange* pVerRange) {
if (pDelList == NULL) {
return false;
}
@@ -3549,29 +3577,29 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if (asc) {
if (*index >= num - 1) {
TSDBKEY* last = taosArrayGetLast(pDelList);
- ASSERT(pKey->ts >= last->ts);
+ ASSERT(key >= last->ts);
- if (pKey->ts > last->ts) {
+ if (key > last->ts) {
return false;
- } else if (pKey->ts == last->ts) {
+ } else if (key == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
- return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
+ return (prev->version >= ver && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
- if (pKey->ts < pCurrent->ts) {
+ if (key < pCurrent->ts) {
return false;
}
- if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
+ if (pCurrent->ts <= key && pNext->ts >= key && pCurrent->version >= ver &&
pVerRange->maxVer >= pCurrent->version) {
return true;
}
- while (pNext->ts <= pKey->ts && (*index) < num - 1) {
+ while (pNext->ts <= key && (*index) < num - 1) {
(*index) += 1;
if ((*index) < num - 1) {
@@ -3583,7 +3611,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
continue;
}
- if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
+ if (pCurrent->ts <= key && pNext->ts >= key && pCurrent->version >= ver &&
pVerRange->maxVer >= pCurrent->version) {
return true;
}
@@ -3596,10 +3624,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if (*index <= 0) {
TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
- if (pKey->ts < pFirst->ts) {
+ if (key < pFirst->ts) {
return false;
- } else if (pKey->ts == pFirst->ts) {
- return pFirst->version >= pKey->version;
+ } else if (key == pFirst->ts) {
+ return pFirst->version >= ver;
} else {
ASSERT(0);
}
@@ -3607,15 +3635,15 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);
- if (pKey->ts > pCurrent->ts) {
+ if (key > pCurrent->ts) {
return false;
}
- if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
+ if (pPrev->ts <= key && pCurrent->ts >= key && pPrev->version >= ver) {
return true;
}
- while (pPrev->ts >= pKey->ts && (*index) > 1) {
+ while (pPrev->ts >= key && (*index) > 1) {
(*index) += step;
if ((*index) >= 1) {
@@ -3627,7 +3655,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
continue;
}
- if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
+ if (pPrev->ts <= key && pCurrent->ts >= key && pPrev->version >= ver) {
return true;
}
}
@@ -3655,7 +3683,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
// it is a valid data version
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
- (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
+ (!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
return pRow;
}
@@ -3674,14 +3702,15 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
}
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
- (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
+ (!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
return pRow;
}
}
}
-int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
- STsdbReader* pReader) {
+int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader) {
+ SRowMerger* pMerger = &pReader->status.merger;
+
while (1) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
@@ -3760,10 +3789,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return code;
}
-int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
- SRowMerger* pMerger) {
+int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
+ SRowMerger* pMerger = &pReader->status.merger;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
@@ -3801,8 +3830,8 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
- TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
- tsdbRowMergerAdd(pMerger, &fRow1, NULL);
+ TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
+ tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
@@ -3842,7 +3871,6 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
}
}
- SRowMerger merge = {0};
terrno = 0;
int32_t code = 0;
@@ -3854,8 +3882,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
- STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema;
- code = tsdbRowMergerInit(&merge, ps, ¤t, pTSchema);
+ code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -3865,28 +3892,28 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
- tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
+ tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1);
} else { // let's merge rows in file block
- code = tsdbRowMergerInit(&merge, NULL, ¤t, pReader->pSchema);
+ code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- tsdbRowMergerAdd(&merge, pNextRow, NULL);
+ tsdbRowMergerAdd(&pReader->status.merger,pNextRow, NULL);
}
- code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, &merge, pReader);
+ code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
+ code = tsdbRowMergerGetRow(&pReader->status.merger, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResRow->type = TSDBROW_ROW_FMT;
- tsdbRowMergerClear(&merge);
+ tsdbRowMergerClear_rv(&pReader->status.merger);
*freeTSRow = true;
return TSDB_CODE_SUCCESS;
@@ -3894,7 +3921,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
SRow** pTSRow) {
- SRowMerger merge = {0};
+ SRowMerger* pMerger = &pReader->status.merger;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
@@ -3909,46 +3936,43 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
}
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
- int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
- pReader);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- tsdbRowMergerAdd(&merge, pRow, pSchema);
+ tsdbRowMergerAdd(&pReader->status.merger,pRow, pSchema);
code =
- doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
+ doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
- int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
- if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
+ int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
+ if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code;
}
- code =
- doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
- tsdbRowMergerAdd(&merge, piRow, piSchema);
- code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
- pReader);
+ tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
+ code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
- int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
- tsdbRowMergerClear(&merge);
+ int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
+ tsdbRowMergerClear_rv(pMerger);
return code;
}
@@ -4073,11 +4097,12 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t code = TSDB_CODE_SUCCESS;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
- if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
- SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
- ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
+// ASSERT (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID);// {
+// SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
+// ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
+ ((int64_t*)pReader->status.pPrimaryTsCol->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
- }
+// }
SColVal cv = {0};
int32_t numOfInputCols = pBlockData->nColData;
@@ -4163,10 +4188,12 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
- int32_t size = taosHashGetSize(pReader->status.pTableMap);
+ int32_t size = tSimpleHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL;
- while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
+ int32_t iter = 0;
+
+ while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*p);
}
@@ -4184,7 +4211,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pReader->status.uidList.tableUidList = (uint64_t*)p1;
}
- taosHashClear(pReader->status.pTableMap);
+ tSimpleHashClear(pReader->status.pTableMap);
STableUidList* pUidList = &pReader->status.uidList;
pUidList->currentIndex = 0;
@@ -4205,7 +4232,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pInfo->lastKeyInStt = ekey;
}
- taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
+ tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
return TDB_CODE_SUCCESS;
@@ -4327,6 +4354,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
}
+ if (pReader->pSchema != NULL) {
+ tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
+ }
+
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader %s", pReader->idStr);
@@ -4351,6 +4382,12 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
goto _err;
}
+ pReader->status.pLDataIter = taosMemoryCalloc(pVnode->config.sttTrigger, sizeof(SLDataIter));
+ if (pReader->status.pLDataIter == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ goto _err;
+ }
+
pReader->suspended = true;
if (countOnly) {
@@ -4367,29 +4404,38 @@ _err:
return code;
}
+static void clearSharedPtr(STsdbReader* p) {
+ p->status.pLDataIter = NULL;
+ p->status.pTableMap = NULL;
+ p->status.uidList.tableUidList = NULL;
+ p->pReadSnap = NULL;
+ p->pSchema = NULL;
+ p->pSchemaMap = NULL;
+}
+
+static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
+ pDst->status.pTableMap = pSrc->status.pTableMap;
+ pDst->status.pLDataIter = pSrc->status.pLDataIter;
+ pDst->status.uidList = pSrc->status.uidList;
+ pDst->pSchema = pSrc->pSchema;
+ pDst->pSchemaMap = pSrc->pSchemaMap;
+ pDst->pReadSnap = pSrc->pReadSnap;
+}
+
void tsdbReaderClose(STsdbReader* pReader) {
if (pReader == NULL) {
return;
}
tsdbAcquireReader(pReader);
+
{
if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
STsdbReader* p = pReader->innerReader[0];
-
- p->status.pTableMap = NULL;
- p->status.uidList.tableUidList = NULL;
- p->pReadSnap = NULL;
- p->pSchema = NULL;
- p->pSchemaMap = NULL;
+ clearSharedPtr(p);
p = pReader->innerReader[1];
-
- p->status.pTableMap = NULL;
- p->status.uidList.tableUidList = NULL;
- p->pReadSnap = NULL;
- p->pSchema = NULL;
- p->pSchemaMap = NULL;
+ clearSharedPtr(p);
tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]);
@@ -4413,7 +4459,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
tBlockDataDestroy(&pReader->status.fileBlockData);
cleanupDataBlockIterator(&pReader->status.blockIter);
- size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
+ size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (pReader->status.pTableMap != NULL) {
destroyAllBlockScanInfo(pReader->status.pTableMap);
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
@@ -4440,7 +4486,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbUninitReaderLock(pReader);
- taosMemoryFree(pReader->status.uidList.tableUidList);
+ taosMemoryFreeClear(pReader->status.pLDataIter);
+ taosMemoryFreeClear(pReader->status.uidList.tableUidList);
SIOCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
@@ -4467,6 +4514,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
pCost->initDelSkylineIterTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
+
+ tsdbRowMergerCleanup_rv(&pReader->status.merger);
taosMemoryFree(pReader->pSchema);
tSimpleHashCleanup(pReader->pSchemaMap);
@@ -4495,8 +4544,9 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
+ int32_t iter = 0;
- while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
+ while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
@@ -4517,8 +4567,9 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
+ int32_t iter = 0;
- while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
+ while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
@@ -4607,7 +4658,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
// restore reader's state
// task snapshot
- int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
+ int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (numOfTables > 0) {
qTrace("tsdb/reader: %p, take snapshot", pReader);
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
@@ -4626,18 +4677,10 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
// we need only one row
pPrevReader->capacity = 1;
- pPrevReader->status.pTableMap = pReader->status.pTableMap;
- pPrevReader->status.uidList = pReader->status.uidList;
- pPrevReader->pSchema = pReader->pSchema;
- pPrevReader->pSchemaMap = pReader->pSchemaMap;
- pPrevReader->pReadSnap = pReader->pReadSnap;
+ setSharedPtr(pPrevReader, pReader);
pNextReader->capacity = 1;
- pNextReader->status.pTableMap = pReader->status.pTableMap;
- pNextReader->status.uidList = pReader->status.uidList;
- pNextReader->pSchema = pReader->pSchema;
- pNextReader->pSchemaMap = pReader->pSchemaMap;
- pNextReader->pReadSnap = pReader->pReadSnap;
+ setSharedPtr(pNextReader, pReader);
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
@@ -4694,7 +4737,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
*hasNext = false;
SReaderStatus* pStatus = &pReader->status;
- if (taosHashGetSize(pStatus->pTableMap) == 0) {
+ if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return code;
}
@@ -4947,11 +4990,11 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
return code;
}
-STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
- STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
+STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
+ STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
if (p == NULL || *p == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
- int32_t size = taosHashGetSize(pTableMap);
+ int32_t size = tSimpleHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return NULL;
}
@@ -5037,7 +5080,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
tsdbDataFReaderClose(&pReader->pFileReader);
- int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
+ int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(pBlockIter, pReader->order);
@@ -5108,7 +5151,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfFiles += 1;
- int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
+ int32_t numOfTables = (int32_t)tSimpleHashGetSize(pStatus->pTableMap);
int defaultRows = 4096;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
@@ -5172,7 +5215,8 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
tsdbReaderResume(pReader);
}
- pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
+ int32_t iter = 0;
+ pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
while (pStatus->pTableIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
@@ -5194,7 +5238,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
}
// current table is exhausted, let's try the next table
- pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
+ pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter);
}
tsdbReleaseReader(pReader);
diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c
index 8cbbcb751c..023ef3cd85 100644
--- a/source/dnode/vnode/src/tsdb/tsdbUtil.c
+++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c
@@ -712,124 +712,163 @@ _exit:
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
- SColVal *pColVal = &(SColVal){0};
+ SColVal * pColVal = &(SColVal){0};
STColumn *pTColumn;
int32_t iCol, jCol = 1;
if (NULL == pTSchema) {
pTSchema = pMerger->pTSchema;
}
- ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
- for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
- pTColumn = &pMerger->pTSchema->columns[iCol];
- if (pTSchema->columns[jCol].colId < pTColumn->colId) {
- ++jCol;
- --iCol;
- continue;
- } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
- continue;
- }
+ if (taosArrayGetSize(pMerger->pArray) == 0) {
+ // ts
+ jCol = 0;
+ pTColumn = &pTSchema->columns[jCol++];
- tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
-
- if (key.version > pMerger->version) {
- if (!COL_VAL_IS_NONE(pColVal)) {
- if (IS_VAR_DATA_TYPE(pColVal->type)) {
- SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
- if (!COL_VAL_IS_NULL(pColVal)) {
- code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
- if (code) return code;
-
- pTColVal->value.nData = pColVal->value.nData;
- if (pTColVal->value.nData) {
- memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
- }
- pTColVal->flag = 0;
- } else {
- tFree(pTColVal->value.pData);
- taosArraySet(pMerger->pArray, iCol, pColVal);
- }
- } else {
- taosArraySet(pMerger->pArray, iCol, pColVal);
- }
- }
- } else if (key.version < pMerger->version) {
- SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
- if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
- if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
- code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
- if (code) return code;
-
- tColVal->value.nData = pColVal->value.nData;
- if (pColVal->value.nData) {
- memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
- }
- tColVal->flag = 0;
- } else {
- taosArraySet(pMerger->pArray, iCol, pColVal);
- }
- }
- } else {
- ASSERT(0 && "dup versions not allowed");
- }
- }
-
- pMerger->version = key.version;
- return code;
-}
-/*
-int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
- int32_t code = 0;
- TSDBKEY key = TSDBROW_KEY(pRow);
- SColVal *pColVal = &(SColVal){0};
- STColumn *pTColumn;
-
- pMerger->pTSchema = pTSchema;
- pMerger->version = key.version;
-
- pMerger->pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
- if (pMerger->pArray == NULL) {
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _exit;
- }
-
- // ts
- pTColumn = &pTSchema->columns[0];
-
- ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
-
- *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
- if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _exit;
- }
-
- // other
- for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
- tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
- if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
- uint8_t *pVal = pColVal->value.pData;
-
- pColVal->value.pData = NULL;
- code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
- if (code) goto _exit;
-
- if (pColVal->value.nData) {
- memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
- }
- }
+ ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
+ *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
- goto _exit;
+ return code;
+ // goto _exit;
+ }
+
+ // other
+ for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pMerger->pTSchema->numOfCols; ++iCol) {
+ pTColumn = &pMerger->pTSchema->columns[iCol];
+ if (pTSchema->columns[jCol].colId < pTColumn->colId) {
+ ++jCol;
+ --iCol;
+ continue;
+ } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
+ taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
+ continue;
+ }
+
+ tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
+ if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
+ uint8_t *pVal = pColVal->value.pData;
+
+ pColVal->value.pData = NULL;
+ code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
+ if (code) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+
+ if (pColVal->value.nData) {
+ memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
+ }
+ }
+
+ if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ return code;
+ }
+ }
+
+ for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
+ pTColumn = &pMerger->pTSchema->columns[iCol];
+ taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
+ }
+
+ pMerger->version = key.version;
+ return 0;
+ } else {
+ ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
+
+ for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
+ pTColumn = &pMerger->pTSchema->columns[iCol];
+ if (pTSchema->columns[jCol].colId < pTColumn->colId) {
+ ++jCol;
+ --iCol;
+ continue;
+ } else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
+ continue;
+ }
+
+ tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
+
+ if (key.version > pMerger->version) {
+ if (!COL_VAL_IS_NONE(pColVal)) {
+ if (IS_VAR_DATA_TYPE(pColVal->type)) {
+ SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
+ if (!COL_VAL_IS_NULL(pColVal)) {
+ code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
+ if (code) return code;
+
+ pTColVal->value.nData = pColVal->value.nData;
+ if (pTColVal->value.nData) {
+ memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
+ }
+ pTColVal->flag = 0;
+ } else {
+ tFree(pTColVal->value.pData);
+ taosArraySet(pMerger->pArray, iCol, pColVal);
+ }
+ } else {
+ taosArraySet(pMerger->pArray, iCol, pColVal);
+ }
+ }
+ } else if (key.version < pMerger->version) {
+ SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
+ if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
+ if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
+ code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
+ if (code) return code;
+
+ tColVal->value.nData = pColVal->value.nData;
+ if (pColVal->value.nData) {
+ memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
+ }
+ tColVal->flag = 0;
+ } else {
+ taosArraySet(pMerger->pArray, iCol, pColVal);
+ }
+ }
+ } else {
+ ASSERT(0 && "dup versions not allowed");
+ }
+ }
+
+ pMerger->version = key.version;
+ return code;
+ }
+}
+
+int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
+ pMerger->pTSchema = pSchema;
+ pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
+ if (pMerger->pArray == NULL) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ } else {
+ return TSDB_CODE_SUCCESS;
+ }
+}
+
+void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
+ for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
+ SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
+ if (IS_VAR_DATA_TYPE(pTColVal->type)) {
+ tFree(pTColVal->value.pData);
}
}
-_exit:
- return code;
+ taosArrayClear(pMerger->pArray);
}
-*/
+
+void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
+ int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
+ for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
+ SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
+ if (IS_VAR_DATA_TYPE(pTColVal->type)) {
+ tFree(pTColVal->value.pData);
+ }
+ }
+
+ taosArrayDestroy(pMerger->pArray);
+}
+
void tsdbRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
diff --git a/source/util/src/tsimplehash.c b/source/util/src/tsimplehash.c
index ec1991923f..4c7983a983 100644
--- a/source/util/src/tsimplehash.c
+++ b/source/util/src/tsimplehash.c
@@ -361,10 +361,6 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
return TSDB_CODE_SUCCESS;
}
-static void destroyItems(void* pItem) {
- taosMemoryFree(*(void**)pItem);
-}
-
void tSimpleHashClear(SSHashObj *pHashObj) {
if (!pHashObj || taosHashTableEmpty(pHashObj)) {
return;