diff --git a/cmake/cmake.version b/cmake/cmake.version
index fe35fbe7bd..a6bf90fa3c 100644
--- a/cmake/cmake.version
+++ b/cmake/cmake.version
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
- SET(TD_VER_NUMBER "3.1.0.0.alpha")
+ SET(TD_VER_NUMBER "3.1.1.0.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md
index 365b36b14f..6eaa395087 100644
--- a/docs/en/28-releases/01-tdengine.md
+++ b/docs/en/28-releases/01-tdengine.md
@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
+## 3.1.0.0
+
+
+
## 3.0.7.1
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index 52bb9c87a0..afdf2a76d3 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
+## 3.1.0.0
+
+
+
## 3.0.7.1
diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h
index 75c8eea83a..fa42248c69 100644
--- a/source/dnode/vnode/src/inc/tsdb.h
+++ b/source/dnode/vnode/src/inc/tsdb.h
@@ -841,48 +841,6 @@ typedef enum {
READ_MODE_ALL,
} EReadMode;
-typedef struct STsdbReaderInfo {
- uint64_t suid;
- STSchema *pSchema;
- EReadMode readMode;
- uint64_t rowsNum;
- STimeWindow window;
- SVersionRange verRange;
- int16_t order;
-} STsdbReaderInfo;
-
-typedef struct {
- SArray *pTombData;
-} STableLoadInfo;
-
-struct SDataFileReader;
-
-typedef struct SCacheRowsReader {
- STsdb *pTsdb;
- STsdbReaderInfo info;
- TdThreadMutex readerMutex;
- SVnode *pVnode;
- STSchema *pSchema;
- STSchema *pCurrSchema;
- uint64_t uid;
- char **transferBuf; // todo remove it soon
- int32_t numOfCols;
- SArray *pCidList;
- int32_t *pSlotIds;
- int32_t type;
- int32_t tableIndex; // currently returned result tables
- STableKeyInfo *pTableList; // table id list
- int32_t numOfTables;
- uint64_t *uidList;
- SSHashObj *pTableMap;
- SArray *pLDataIterArray;
- struct SDataFileReader *pFileReader;
- STFileSet *pCurFileSet;
- STsdbReadSnap *pReadSnap;
- char *idstr;
- int64_t lastTs;
-} SCacheRowsReader;
-
typedef struct {
TSKEY ts;
int8_t dirty;
@@ -892,14 +850,10 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
-int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
-int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
-int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
-int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
@@ -909,8 +863,6 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
-// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
-
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index a5b5dea505..7d8cf5b678 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -1020,10 +1020,10 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype);
taosThreadMutexUnlock(&pTsdb->lruMutex);
- }
- if (remainCols) {
- taosArrayDestroy(remainCols);
+ if (remainCols) {
+ taosArrayDestroy(remainCols);
+ }
}
return code;
@@ -1592,10 +1592,51 @@ _err:
return code;
}
+static void freeTableInfoFunc(void *param) {
+ void **p = (void **)param;
+ taosMemoryFreeClear(*p);
+}
+
+static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
+ if (!pReader->pTableMap) {
+ pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
+
+ tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
+ }
+
+ STableLoadInfo *pInfo = NULL;
+ STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
+ if (!ppInfo) {
+ pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
+ tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
+
+ return pInfo;
+ }
+
+ return *ppInfo;
+}
+
+static uint64_t *getUidList(SCacheRowsReader *pReader) {
+ if (!pReader->uidList) {
+ int32_t numOfTables = pReader->numOfTables;
+
+ pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
+
+ for (int32_t i = 0; i < numOfTables; ++i) {
+ uint64_t uid = pReader->pTableList[i].uid;
+ pReader->uidList[i] = uid;
+ }
+
+ taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
+ }
+
+ return pReader->uidList;
+}
+
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
bool isFile) {
int32_t code = 0;
- uint64_t *uidList = pReader->uidList;
+ uint64_t *uidList = getUidList(pReader);
int32_t numOfTables = pReader->numOfTables;
int64_t suid = pReader->info.suid;
@@ -1618,7 +1659,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
}
uint64_t uid = uidList[j];
- STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
+ STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
}
@@ -1660,13 +1701,16 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
}
if (newTable) {
- pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
+ pInfo = getTableLoadInfo(pReader, uid);
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.version <= pReader->info.verRange.maxVer) {
+ tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
+ TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
+
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData);
}
@@ -1792,12 +1836,10 @@ struct CacheNextRowIter;
typedef struct SFSNextRowIter {
SFSNEXTROWSTATES state; // [input]
- STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
tb_uid_t suid;
tb_uid_t uid;
- int32_t nFileSet;
int32_t iFileSet;
STFileSet *pFileSet;
TFileSetArray *aDFileSet;
@@ -1828,10 +1870,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
int nCols) {
SFSNextRowIter *state = (SFSNextRowIter *)iter;
int32_t code = 0;
+ STsdb *pTsdb = state->pr->pTsdb;
if (SFSNEXTROW_FS == state->state) {
- state->nFileSet = TARRAY2_SIZE(state->aDFileSet);
- state->iFileSet = state->nFileSet;
+ state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
state->state = SFSNEXTROW_FILESET;
}
@@ -1850,7 +1892,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
STFileObj **pFileObj = state->pFileSet->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
if (state->pFileSet != state->pr->pCurFileSet) {
- SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.tsdbPageSize};
+ SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
@@ -1880,6 +1922,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader);
+
+ int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray);
+ if (code != TSDB_CODE_SUCCESS) {
+ goto _err;
+ }
}
if (!state->pIndexList) {
@@ -1887,12 +1934,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
} else {
taosArrayClear(state->pIndexList);
}
- const TBrinBlkArray *pBlkArray = NULL;
- int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &pBlkArray);
- if (code != TSDB_CODE_SUCCESS) {
- goto _err;
- }
+ const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
SBrinBlk *pBrinBlk = &pBlkArray->data[i];
@@ -1920,8 +1963,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pr->pCurFileSet = state->pFileSet;
}
- code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
- state->pr, state->lastTs, aCols, nCols);
+ code = lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid, state->pr,
+ state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
@@ -2341,7 +2384,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsState.pRowIter = pIter;
pIter->fsState.state = SFSNEXTROW_FS;
- pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
pIter->fsState.pBlockIdxExp = &pIter->idx;
pIter->fsState.pTSchema = pTSchema;
@@ -2458,14 +2500,17 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
uint64_t uid = pIter->idx.uid;
- STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid));
- SArray *pTombData = pInfo->pTombData;
- if (pTombData) {
- taosArrayAddAll(pTombData, pIter->pMemDelData);
-
- code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline);
+ STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
+ if (pInfo->pTombData == NULL) {
+ pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
}
+ taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData);
+
+ size_t delSize = TARRAY_SIZE(pInfo->pTombData);
+ if (delSize > 0) {
+ code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
+ }
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
index f17041e98b..66c8cc06e2 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
@@ -18,6 +18,7 @@
#include "tcommon.h"
#include "tsdb.h"
#include "tsdbDataFileRW.h"
+#include "tsdbReadUtil.h"
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
@@ -133,21 +134,6 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
return TSDB_CODE_SUCCESS;
}
-static int32_t uidComparFunc(const void* p1, const void* p2) {
- uint64_t pu1 = *(uint64_t*)p1;
- uint64_t pu2 = *(uint64_t*)p2;
- if (pu1 == pu2) {
- return 0;
- } else {
- return (pu1 < pu2) ? -1 : 1;
- }
-}
-
-static void freeTableInfoFunc(void* param) {
- void** p = (void**)param;
- taosMemoryFreeClear(*p);
-}
-
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL;
@@ -173,27 +159,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pTableList = pTableIdList;
p->numOfTables = numOfTables;
- p->pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
- if (p->pTableMap == NULL) {
- tsdbCacherowsReaderClose(p);
- return TSDB_CODE_OUT_OF_MEMORY;
- }
- p->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
- if (p->uidList == NULL) {
- tsdbCacherowsReaderClose(p);
- return TSDB_CODE_OUT_OF_MEMORY;
- }
- for (int32_t i = 0; i < numOfTables; ++i) {
- uint64_t uid = p->pTableList[i].uid;
- p->uidList[i] = uid;
- STableLoadInfo* pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
- tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
- }
-
- tSimpleHashSetFreeFp(p->pTableMap, freeTableInfoFunc);
-
- taosSort(p->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
-
int32_t code = setTableSchema(p, suid, idstr);
if (code != TSDB_CODE_SUCCESS) {
tsdbCacherowsReaderClose(p);
@@ -216,14 +181,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
}
- SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
- int32_t numOfStt = pCfg->sttTrigger;
- p->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
- if (p->pLDataIterArray == NULL) {
- tsdbCacherowsReaderClose(p);
- return TSDB_CODE_OUT_OF_MEMORY;
- }
-
p->idstr = taosStrdup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL);
@@ -250,9 +207,11 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pCurrSchema);
- int64_t loadBlocks = 0;
- double elapse = 0;
- destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
+ if (p->pLDataIterArray) {
+ int64_t loadBlocks = 0;
+ double elapse = 0;
+ destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
+ }
if (p->pFileReader) {
tsdbDataFileReaderClose(&p->pFileReader);
@@ -318,7 +277,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
int32_t code = TSDB_CODE_SUCCESS;
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
bool hasRes = false;
- SArray* pLastCols = NULL;
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
if (pRes == NULL) {
@@ -327,57 +285,47 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
- pRes[j] =
- taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[/*-1 == slotIds[j] ? 0 : */ slotIds[j]].bytes +
- VARSTR_HEADER_SIZE);
+ pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}
- pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
- if (pLastCols == NULL) {
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _end;
- }
-
- for (int32_t i = 0; i < pr->numOfCols; ++i) {
- int32_t slotId = slotIds[i];
- struct STColumn* pCol = &pr->pSchema->columns[slotId];
- SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
-
- if (IS_VAR_DATA_TYPE(pCol->type)) {
- p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
- }
- taosArrayPush(pLastCols, &p);
- }
-
taosThreadMutexLock(&pr->readerMutex);
code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
- int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
+ int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
+ STableKeyInfo* pTableList = pr->pTableList;
// retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
+ SArray* pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
+ if (pLastCols == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _end;
+ }
+
+ for (int32_t i = 0; i < pr->numOfCols; ++i) {
+ int32_t slotId = slotIds[i];
+ struct STColumn* pCol = &pr->pSchema->columns[slotId];
+ SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
+
+ if (IS_VAR_DATA_TYPE(pCol->type)) {
+ p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
+ }
+ taosArrayPush(pLastCols, &p);
+ }
+
int64_t st = taosGetTimestampUs();
int64_t totalLastTs = INT64_MAX;
-
for (int32_t i = 0; i < pr->numOfTables; ++i) {
- STableKeyInfo* pKeyInfo = &pr->pTableList[i];
+ tb_uid_t uid = pTableList[i].uid;
- tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
- // tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
- if (TARRAY_SIZE(pRow) <= 0) {
+ tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
+ if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
taosArrayClearEx(pRow, freeItem);
- // taosArrayClear(pRow);
- continue;
- }
- SLastCol* pColVal = taosArrayGet(pRow, 0);
- if (COL_VAL_IS_NONE(&pColVal->colVal)) {
- taosArrayClearEx(pRow, freeItem);
- // taosArrayClear(pRow);
continue;
}
@@ -400,9 +348,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
p->ts = pColVal->ts;
if (k == 0) {
if (TARRAY_SIZE(pTableUidList) == 0) {
- taosArrayPush(pTableUidList, &pKeyInfo->uid);
+ taosArrayPush(pTableUidList, &uid);
} else {
- taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
+ taosArraySet(pTableUidList, 0, &uid);
}
}
@@ -437,32 +385,25 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
taosArrayClearEx(pRow, freeItem);
- // taosArrayClear(pRow);
}
if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
}
+
+ taosArrayDestroyEx(pLastCols, freeItem);
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
- tb_uid_t uid = pr->pTableList[i].uid;
+ tb_uid_t uid = pTableList[i].uid;
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
- if (TARRAY_SIZE(pRow) <= 0) {
+ if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
taosArrayClearEx(pRow, freeItem);
- // taosArrayClear(pRow);
- continue;
- }
- SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
- if (COL_VAL_IS_NONE(&pColVal->colVal)) {
- taosArrayClearEx(pRow, freeItem);
- // taosArrayClear(pRow);
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
taosArrayClearEx(pRow, freeItem);
- // taosArrayClear(pRow);
taosArrayPush(pTableUidList, &uid);
@@ -478,11 +419,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
_end:
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
- int64_t loadBlocks = 0;
- double elapse = 0;
- pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
- pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
-
taosThreadMutexUnlock(&pr->readerMutex);
if (pRes != NULL) {
@@ -492,9 +428,7 @@ _end:
}
taosMemoryFree(pRes);
- // taosArrayDestroyEx(pRow, freeItem);
taosArrayDestroy(pRow);
- taosArrayDestroyEx(pLastCols, freeItem);
return code;
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
index d74584f844..1d0995427e 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c
@@ -444,6 +444,13 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
+ if (pIter->pReader == NULL) {
+ tsdbError("stt file reader is null, %s", idStr);
+ pIter->pSttBlk = NULL;
+ pIter->iSttBlk = -1;
+ return TSDB_CODE_SUCCESS;
+ }
+
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
@@ -790,7 +797,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
pMTree->ignoreEarlierTs = false;
- // todo handle other level of stt files, here only deal with the first level stt
int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
if (size == 0) {
goto _end;
@@ -815,6 +821,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
taosArrayPush(pList, &pIter);
}
+ } else if (numOfIter > TARRAY2_SIZE(pSttLevel->fobjArr)){
+ int32_t inc = numOfIter - TARRAY2_SIZE(pSttLevel->fobjArr);
+ for (int i = 0; i < inc; ++i) {
+ SLDataIter *pIter = taosArrayPop(pList);
+ destroyLDataIter(pIter);
+ }
}
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
@@ -830,7 +842,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
if (code != TSDB_CODE_SUCCESS) {
- return code;
+ tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
+ tstrerror(code), pMTree->idStr);
}
}
@@ -845,7 +858,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
-
+
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
if (hasVal) {
tMergeTreeAddIter(pMTree, pIter);
diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c
index 635a74d8dd..d560f0d5af 100644
--- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c
+++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c
@@ -22,7 +22,7 @@
#include "tsdbUtil2.h"
#include "tsimplehash.h"
-static int32_t uidComparFunc(const void* p1, const void* p2) {
+int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) {
diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h
index 5c4737440d..e7a1d6b038 100644
--- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h
+++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h
@@ -36,6 +36,16 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;
+typedef struct STsdbReaderInfo {
+ uint64_t suid;
+ STSchema* pSchema;
+ EReadMode readMode;
+ uint64_t rowsNum;
+ STimeWindow window;
+ SVersionRange verRange;
+ int16_t order;
+} STsdbReaderInfo;
+
typedef struct SBlockInfoBuf {
int32_t currentIndex;
SArray* pData;
@@ -215,6 +225,8 @@ typedef struct SBrinRecordIter {
SBrinRecord record;
} SBrinRecordIter;
+int32_t uidComparFunc(const void* p1, const void* p2);
+
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
@@ -241,6 +253,41 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
+typedef struct {
+ SArray* pTombData;
+} STableLoadInfo;
+
+struct SDataFileReader;
+
+typedef struct SCacheRowsReader {
+ STsdb* pTsdb;
+ STsdbReaderInfo info;
+ TdThreadMutex readerMutex;
+ SVnode* pVnode;
+ STSchema* pSchema;
+ STSchema* pCurrSchema;
+ uint64_t uid;
+ char** transferBuf; // todo remove it soon
+ int32_t numOfCols;
+ SArray* pCidList;
+ int32_t* pSlotIds;
+ int32_t type;
+ int32_t tableIndex; // currently returned result tables
+ STableKeyInfo* pTableList; // table id list
+ int32_t numOfTables;
+ uint64_t* uidList;
+ SSHashObj* pTableMap;
+ SArray* pLDataIterArray;
+ struct SDataFileReader* pFileReader;
+ STFileSet* pCurFileSet;
+ const TBrinBlkArray* pBlkArray;
+ STsdbReadSnap* pReadSnap;
+ char* idstr;
+ int64_t lastTs;
+} SCacheRowsReader;
+
+int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
+
#ifdef __cplusplus
}
#endif
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 89572d1c06..586425ec1d 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -160,7 +160,7 @@
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
-,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
+#,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3