Merge pull request #22277 from taosdata/fix/TD-25400
enh(tsdb/cache/read): lazy init table map hash and entries
This commit is contained in:
commit
c24b52ad82
|
@ -841,48 +841,6 @@ typedef enum {
|
|||
READ_MODE_ALL,
|
||||
} EReadMode;
|
||||
|
||||
typedef struct STsdbReaderInfo {
|
||||
uint64_t suid;
|
||||
STSchema *pSchema;
|
||||
EReadMode readMode;
|
||||
uint64_t rowsNum;
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int16_t order;
|
||||
} STsdbReaderInfo;
|
||||
|
||||
typedef struct {
|
||||
SArray *pTombData;
|
||||
} STableLoadInfo;
|
||||
|
||||
struct SDataFileReader;
|
||||
|
||||
typedef struct SCacheRowsReader {
|
||||
STsdb *pTsdb;
|
||||
STsdbReaderInfo info;
|
||||
TdThreadMutex readerMutex;
|
||||
SVnode *pVnode;
|
||||
STSchema *pSchema;
|
||||
STSchema *pCurrSchema;
|
||||
uint64_t uid;
|
||||
char **transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
SArray *pCidList;
|
||||
int32_t *pSlotIds;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
STableKeyInfo *pTableList; // table id list
|
||||
int32_t numOfTables;
|
||||
uint64_t *uidList;
|
||||
SSHashObj *pTableMap;
|
||||
SArray *pLDataIterArray;
|
||||
struct SDataFileReader *pFileReader;
|
||||
STFileSet *pCurFileSet;
|
||||
STsdbReadSnap *pReadSnap;
|
||||
char *idstr;
|
||||
int64_t lastTs;
|
||||
} SCacheRowsReader;
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
int8_t dirty;
|
||||
|
@ -892,14 +850,10 @@ typedef struct {
|
|||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
||||
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
|
||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
|
||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
|
||||
|
@ -909,8 +863,6 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
|||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
||||
// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
|
||||
|
||||
// ========== inline functions ==========
|
||||
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
||||
|
|
|
@ -1020,10 +1020,10 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
|||
code = tsdbCacheLoadFromRocks(pTsdb, uid, pLastArray, remainCols, pr, ltype);
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
}
|
||||
|
||||
if (remainCols) {
|
||||
taosArrayDestroy(remainCols);
|
||||
if (remainCols) {
|
||||
taosArrayDestroy(remainCols);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1592,10 +1592,51 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void freeTableInfoFunc(void *param) {
|
||||
void **p = (void **)param;
|
||||
taosMemoryFreeClear(*p);
|
||||
}
|
||||
|
||||
static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) {
|
||||
if (!pReader->pTableMap) {
|
||||
pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
|
||||
tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc);
|
||||
}
|
||||
|
||||
STableLoadInfo *pInfo = NULL;
|
||||
STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
|
||||
if (!ppInfo) {
|
||||
pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
|
||||
tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
|
||||
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
return *ppInfo;
|
||||
}
|
||||
|
||||
static uint64_t *getUidList(SCacheRowsReader *pReader) {
|
||||
if (!pReader->uidList) {
|
||||
int32_t numOfTables = pReader->numOfTables;
|
||||
|
||||
pReader->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
uint64_t uid = pReader->pTableList[i].uid;
|
||||
pReader->uidList[i] = uid;
|
||||
}
|
||||
|
||||
taosSort(pReader->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
}
|
||||
|
||||
return pReader->uidList;
|
||||
}
|
||||
|
||||
static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsReader *pReader, void *pFileReader,
|
||||
bool isFile) {
|
||||
int32_t code = 0;
|
||||
uint64_t *uidList = pReader->uidList;
|
||||
uint64_t *uidList = getUidList(pReader);
|
||||
int32_t numOfTables = pReader->numOfTables;
|
||||
int64_t suid = pReader->info.suid;
|
||||
|
||||
|
@ -1618,7 +1659,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
|
|||
}
|
||||
|
||||
uint64_t uid = uidList[j];
|
||||
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
|
||||
STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
|
||||
if (pInfo->pTombData == NULL) {
|
||||
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
@ -1660,13 +1701,16 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
|
|||
}
|
||||
|
||||
if (newTable) {
|
||||
pInfo = *(STableLoadInfo **)tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid));
|
||||
pInfo = getTableLoadInfo(pReader, uid);
|
||||
if (pInfo->pTombData == NULL) {
|
||||
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
}
|
||||
|
||||
if (record.version <= pReader->info.verRange.maxVer) {
|
||||
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
|
||||
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
|
||||
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
taosArrayPush(pInfo->pTombData, &delData);
|
||||
}
|
||||
|
@ -1792,12 +1836,10 @@ struct CacheNextRowIter;
|
|||
|
||||
typedef struct SFSNextRowIter {
|
||||
SFSNEXTROWSTATES state; // [input]
|
||||
STsdb *pTsdb; // [input]
|
||||
SBlockIdx *pBlockIdxExp; // [input]
|
||||
STSchema *pTSchema; // [input]
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
int32_t nFileSet;
|
||||
int32_t iFileSet;
|
||||
STFileSet *pFileSet;
|
||||
TFileSetArray *aDFileSet;
|
||||
|
@ -1828,10 +1870,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
int nCols) {
|
||||
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
STsdb *pTsdb = state->pr->pTsdb;
|
||||
|
||||
if (SFSNEXTROW_FS == state->state) {
|
||||
state->nFileSet = TARRAY2_SIZE(state->aDFileSet);
|
||||
state->iFileSet = state->nFileSet;
|
||||
state->iFileSet = TARRAY2_SIZE(state->aDFileSet);
|
||||
|
||||
state->state = SFSNEXTROW_FILESET;
|
||||
}
|
||||
|
@ -1850,7 +1892,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
STFileObj **pFileObj = state->pFileSet->farr;
|
||||
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
|
||||
if (state->pFileSet != state->pr->pCurFileSet) {
|
||||
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.tsdbPageSize};
|
||||
SDataFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
|
||||
const char *filesName[4] = {0};
|
||||
if (pFileObj[0] != NULL) {
|
||||
conf.files[0].file = *pFileObj[0]->f;
|
||||
|
@ -1880,6 +1922,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
state->pr->pCurFileSet = state->pFileSet;
|
||||
|
||||
loadDataTomb(state->pr, state->pr->pFileReader);
|
||||
|
||||
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
if (!state->pIndexList) {
|
||||
|
@ -1887,12 +1934,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
} else {
|
||||
taosArrayClear(state->pIndexList);
|
||||
}
|
||||
const TBrinBlkArray *pBlkArray = NULL;
|
||||
|
||||
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &pBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
|
||||
|
||||
for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
|
||||
SBrinBlk *pBrinBlk = &pBlkArray->data[i];
|
||||
|
@ -1920,8 +1963,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
state->pr->pCurFileSet = state->pFileSet;
|
||||
}
|
||||
|
||||
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
|
||||
state->pr, state->lastTs, aCols, nCols);
|
||||
code = lastIterOpen(&state->lastIter, state->pFileSet, pTsdb, state->pTSchema, state->suid, state->uid, state->pr,
|
||||
state->lastTs, aCols, nCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -2341,7 +2384,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
|
||||
pIter->fsState.pRowIter = pIter;
|
||||
pIter->fsState.state = SFSNEXTROW_FS;
|
||||
pIter->fsState.pTsdb = pTsdb;
|
||||
pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
|
||||
pIter->fsState.pBlockIdxExp = &pIter->idx;
|
||||
pIter->fsState.pTSchema = pTSchema;
|
||||
|
@ -2458,14 +2500,17 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
|
|||
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
|
||||
uint64_t uid = pIter->idx.uid;
|
||||
STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid));
|
||||
SArray *pTombData = pInfo->pTombData;
|
||||
if (pTombData) {
|
||||
taosArrayAddAll(pTombData, pIter->pMemDelData);
|
||||
|
||||
code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline);
|
||||
STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
|
||||
if (pInfo->pTombData == NULL) {
|
||||
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
||||
taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData);
|
||||
|
||||
size_t delSize = TARRAY_SIZE(pInfo->pTombData);
|
||||
if (delSize > 0) {
|
||||
code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
|
||||
}
|
||||
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "tsdb.h"
|
||||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbReadUtil.h"
|
||||
|
||||
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
|
||||
|
||||
|
@ -133,21 +134,6 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void freeTableInfoFunc(void* param) {
|
||||
void** p = (void**)param;
|
||||
taosMemoryFreeClear(*p);
|
||||
}
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
|
||||
*pReader = NULL;
|
||||
|
@ -173,27 +159,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
p->pTableList = pTableIdList;
|
||||
p->numOfTables = numOfTables;
|
||||
|
||||
p->pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
if (p->pTableMap == NULL) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
p->uidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
||||
if (p->uidList == NULL) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
uint64_t uid = p->pTableList[i].uid;
|
||||
p->uidList[i] = uid;
|
||||
STableLoadInfo* pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
|
||||
tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
|
||||
}
|
||||
|
||||
tSimpleHashSetFreeFp(p->pTableMap, freeTableInfoFunc);
|
||||
|
||||
taosSort(p->uidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
||||
int32_t code = setTableSchema(p, suid, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
|
@ -216,14 +181,6 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
}
|
||||
}
|
||||
|
||||
SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
|
||||
int32_t numOfStt = pCfg->sttTrigger;
|
||||
p->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
if (p->pLDataIterArray == NULL) {
|
||||
tsdbCacherowsReaderClose(p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
p->idstr = taosStrdup(idstr);
|
||||
taosThreadMutexInit(&p->readerMutex, NULL);
|
||||
|
||||
|
@ -250,9 +207,11 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
|
||||
taosMemoryFree(p->pCurrSchema);
|
||||
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
|
||||
if (p->pLDataIterArray) {
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
|
||||
}
|
||||
|
||||
if (p->pFileReader) {
|
||||
tsdbDataFileReaderClose(&p->pFileReader);
|
||||
|
@ -318,7 +277,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
||||
bool hasRes = false;
|
||||
SArray* pLastCols = NULL;
|
||||
|
||||
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
||||
if (pRes == NULL) {
|
||||
|
@ -327,57 +285,47 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||
pRes[j] =
|
||||
taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[/*-1 == slotIds[j] ? 0 : */ slotIds[j]].bytes +
|
||||
VARSTR_HEADER_SIZE);
|
||||
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
|
||||
p->ts = INT64_MIN;
|
||||
}
|
||||
|
||||
pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
|
||||
if (pLastCols == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pr->numOfCols; ++i) {
|
||||
int32_t slotId = slotIds[i];
|
||||
struct STColumn* pCol = &pr->pSchema->columns[slotId];
|
||||
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||
}
|
||||
taosArrayPush(pLastCols, &p);
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pr->readerMutex);
|
||||
code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||
STableKeyInfo* pTableList = pr->pTableList;
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||
SArray* pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
|
||||
if (pLastCols == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pr->numOfCols; ++i) {
|
||||
int32_t slotId = slotIds[i];
|
||||
struct STColumn* pCol = &pr->pSchema->columns[slotId];
|
||||
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||
}
|
||||
taosArrayPush(pLastCols, &p);
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int64_t totalLastTs = INT64_MAX;
|
||||
|
||||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||
tb_uid_t uid = pTableList[i].uid;
|
||||
|
||||
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0) {
|
||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
// taosArrayClear(pRow);
|
||||
continue;
|
||||
}
|
||||
SLastCol* pColVal = taosArrayGet(pRow, 0);
|
||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
// taosArrayClear(pRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -400,9 +348,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
p->ts = pColVal->ts;
|
||||
if (k == 0) {
|
||||
if (TARRAY_SIZE(pTableUidList) == 0) {
|
||||
taosArrayPush(pTableUidList, &pKeyInfo->uid);
|
||||
taosArrayPush(pTableUidList, &uid);
|
||||
} else {
|
||||
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
||||
taosArraySet(pTableUidList, 0, &uid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -437,32 +385,25 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
// taosArrayClear(pRow);
|
||||
}
|
||||
|
||||
if (hasRes) {
|
||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(pLastCols, freeItem);
|
||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||
tb_uid_t uid = pr->pTableList[i].uid;
|
||||
tb_uid_t uid = pTableList[i].uid;
|
||||
|
||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0) {
|
||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
// taosArrayClear(pRow);
|
||||
continue;
|
||||
}
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
// taosArrayClear(pRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
// taosArrayClear(pRow);
|
||||
|
||||
taosArrayPush(pTableUidList, &uid);
|
||||
|
||||
|
@ -478,11 +419,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
_end:
|
||||
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
|
||||
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
|
||||
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
taosThreadMutexUnlock(&pr->readerMutex);
|
||||
|
||||
if (pRes != NULL) {
|
||||
|
@ -492,9 +428,7 @@ _end:
|
|||
}
|
||||
|
||||
taosMemoryFree(pRes);
|
||||
// taosArrayDestroyEx(pRow, freeItem);
|
||||
taosArrayDestroy(pRow);
|
||||
taosArrayDestroyEx(pLastCols, freeItem);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -759,7 +759,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
|||
|
||||
pMTree->ignoreEarlierTs = false;
|
||||
|
||||
// todo handle other level of stt files, here only deal with the first level stt
|
||||
int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
|
||||
if (size == 0) {
|
||||
goto _end;
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include "tsdbUtil2.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
|
|
|
@ -36,6 +36,16 @@ typedef enum {
|
|||
EXTERNAL_ROWS_NEXT = 0x3,
|
||||
} EContentData;
|
||||
|
||||
typedef struct STsdbReaderInfo {
|
||||
uint64_t suid;
|
||||
STSchema* pSchema;
|
||||
EReadMode readMode;
|
||||
uint64_t rowsNum;
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int16_t order;
|
||||
} STsdbReaderInfo;
|
||||
|
||||
typedef struct SBlockInfoBuf {
|
||||
int32_t currentIndex;
|
||||
SArray* pData;
|
||||
|
@ -215,6 +225,8 @@ typedef struct SBrinRecordIter {
|
|||
SBrinRecord record;
|
||||
} SBrinRecordIter;
|
||||
|
||||
int32_t uidComparFunc(const void* p1, const void* p2);
|
||||
|
||||
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
|
||||
|
||||
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
|
@ -241,6 +253,41 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
|
|||
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||
|
||||
typedef struct {
|
||||
SArray* pTombData;
|
||||
} STableLoadInfo;
|
||||
|
||||
struct SDataFileReader;
|
||||
|
||||
typedef struct SCacheRowsReader {
|
||||
STsdb* pTsdb;
|
||||
STsdbReaderInfo info;
|
||||
TdThreadMutex readerMutex;
|
||||
SVnode* pVnode;
|
||||
STSchema* pSchema;
|
||||
STSchema* pCurrSchema;
|
||||
uint64_t uid;
|
||||
char** transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
SArray* pCidList;
|
||||
int32_t* pSlotIds;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
STableKeyInfo* pTableList; // table id list
|
||||
int32_t numOfTables;
|
||||
uint64_t* uidList;
|
||||
SSHashObj* pTableMap;
|
||||
SArray* pLDataIterArray;
|
||||
struct SDataFileReader* pFileReader;
|
||||
STFileSet* pCurFileSet;
|
||||
const TBrinBlkArray* pBlkArray;
|
||||
STsdbReadSnap* pReadSnap;
|
||||
char* idstr;
|
||||
int64_t lastTs;
|
||||
} SCacheRowsReader;
|
||||
|
||||
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue