Merge pull request #20427 from taosdata/fix/main_bugfix_wxy
fix: last redundant read stt file
This commit is contained in:
commit
e98b32a69f
|
@ -98,7 +98,8 @@ extern char *tsSvrCrashReportUri;
|
||||||
|
|
||||||
// query buffer management
|
// query buffer management
|
||||||
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
||||||
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
|
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
|
||||||
|
extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row loading cache as much as possible
|
||||||
|
|
||||||
// query client
|
// query client
|
||||||
extern int32_t tsQueryPolicy;
|
extern int32_t tsQueryPolicy;
|
||||||
|
@ -145,10 +146,10 @@ extern char tsUdfdResFuncs[];
|
||||||
extern char tsUdfdLdLibPath[];
|
extern char tsUdfdLdLibPath[];
|
||||||
|
|
||||||
// schemaless
|
// schemaless
|
||||||
extern char tsSmlChildTableName[];
|
extern char tsSmlChildTableName[];
|
||||||
extern char tsSmlTagName[];
|
extern char tsSmlTagName[];
|
||||||
//extern bool tsSmlDataFormat;
|
// extern bool tsSmlDataFormat;
|
||||||
//extern int32_t tsSmlBatchSize;
|
// extern int32_t tsSmlBatchSize;
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
extern int64_t tsWalFsyncDataSizeLimit;
|
extern int64_t tsWalFsyncDataSizeLimit;
|
||||||
|
|
|
@ -1136,6 +1136,7 @@ typedef struct {
|
||||||
int64_t numOfInsertSuccessReqs;
|
int64_t numOfInsertSuccessReqs;
|
||||||
int64_t numOfBatchInsertReqs;
|
int64_t numOfBatchInsertReqs;
|
||||||
int64_t numOfBatchInsertSuccessReqs;
|
int64_t numOfBatchInsertSuccessReqs;
|
||||||
|
int32_t numOfCachedTables;
|
||||||
} SVnodeLoad;
|
} SVnodeLoad;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -55,6 +55,8 @@ void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle);
|
||||||
size_t taosLRUCacheGetUsage(SLRUCache *cache);
|
size_t taosLRUCacheGetUsage(SLRUCache *cache);
|
||||||
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache);
|
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache);
|
||||||
|
|
||||||
|
int32_t taosLRUCacheGetElems(SLRUCache *cache);
|
||||||
|
|
||||||
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity);
|
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity);
|
||||||
size_t taosLRUCacheGetCapacity(SLRUCache *cache);
|
size_t taosLRUCacheGetCapacity(SLRUCache *cache);
|
||||||
|
|
||||||
|
|
|
@ -233,6 +233,7 @@ static const SSysDbTableSchema vgroupsSchema[] = {
|
||||||
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
|
{.name = "cacheTables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
||||||
// {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
// {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
|
@ -154,6 +154,7 @@ char tsTagFilterCache = 0;
|
||||||
// positive value (in MB)
|
// positive value (in MB)
|
||||||
int32_t tsQueryBufferSize = -1;
|
int32_t tsQueryBufferSize = -1;
|
||||||
int64_t tsQueryBufferSizeBytes = -1;
|
int64_t tsQueryBufferSizeBytes = -1;
|
||||||
|
int32_t tsCacheLazyLoadThreshold = 500;
|
||||||
|
|
||||||
int32_t tsDiskCfgNum = 0;
|
int32_t tsDiskCfgNum = 0;
|
||||||
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
||||||
|
@ -497,6 +498,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
|
||||||
|
|
||||||
|
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
|
||||||
|
|
||||||
GRANT_CFG_ADD;
|
GRANT_CFG_ADD;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -824,6 +827,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32;
|
||||||
|
|
||||||
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
||||||
|
|
||||||
GRANT_CFG_GET;
|
GRANT_CFG_GET;
|
||||||
|
|
|
@ -1070,7 +1070,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1;
|
if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
|
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
|
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, reserved) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -1148,7 +1149,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, (int32_t*)&reserved) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||||
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
||||||
|
|
|
@ -359,6 +359,7 @@ typedef struct {
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
||||||
void* pTsma;
|
void* pTsma;
|
||||||
|
int32_t numOfCachedTables;
|
||||||
} SVgObj;
|
} SVgObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -412,6 +412,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
if (pVgroup != NULL) {
|
if (pVgroup != NULL) {
|
||||||
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
|
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
|
||||||
pVgroup->cacheUsage = pVload->cacheUsage;
|
pVgroup->cacheUsage = pVload->cacheUsage;
|
||||||
|
pVgroup->numOfCachedTables = pVload->numOfCachedTables;
|
||||||
pVgroup->numOfTables = pVload->numOfTables;
|
pVgroup->numOfTables = pVload->numOfTables;
|
||||||
pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
|
pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
|
||||||
pVgroup->totalStorage = pVload->totalStorage;
|
pVgroup->totalStorage = pVload->totalStorage;
|
||||||
|
@ -440,7 +441,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
if (roleChanged) {
|
if (roleChanged) {
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||||
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs);
|
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
|
||||||
|
pDb->stateTs, curMs);
|
||||||
pDb->stateTs = curMs;
|
pDb->stateTs = curMs;
|
||||||
}
|
}
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
|
@ -803,6 +803,9 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
||||||
int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
|
int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
|
||||||
|
|
||||||
|
|
|
@ -198,9 +198,10 @@ int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32
|
||||||
void *tsdbCacherowsReaderClose(void *pReader);
|
void *tsdbCacherowsReaderClose(void *pReader);
|
||||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||||
|
|
||||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||||
size_t tsdbCacheGetUsage(SVnode *pVnode);
|
size_t tsdbCacheGetUsage(SVnode *pVnode);
|
||||||
|
int32_t tsdbCacheGetElems(SVnode *pVnode);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
typedef struct SMetaTableInfo {
|
typedef struct SMetaTableInfo {
|
||||||
|
@ -264,7 +265,7 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
|
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char* id);
|
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||||
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
|
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
|
||||||
|
|
||||||
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||||
|
|
|
@ -706,6 +706,7 @@ typedef struct SMergeTree {
|
||||||
bool destroyLoadInfo;
|
bool destroyLoadInfo;
|
||||||
SSttBlockLoadInfo *pLoadInfo;
|
SSttBlockLoadInfo *pLoadInfo;
|
||||||
const char *idStr;
|
const char *idStr;
|
||||||
|
bool ignoreEarlierTs;
|
||||||
} SMergeTree;
|
} SMergeTree;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -748,9 +749,10 @@ struct SDiskDataBuilder {
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
bool destroyLoadInfo, const char *idStr);
|
bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
|
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
void tMergeTreeClose(SMergeTree *pMTree);
|
void tMergeTreeClose(SMergeTree *pMTree);
|
||||||
|
|
||||||
|
|
|
@ -632,11 +632,16 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
}
|
}
|
||||||
|
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL);
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
||||||
state->pMergeTree = &state->mergeTree;
|
state->pMergeTree = &state->mergeTree;
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
|
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
|
||||||
|
*pIgnoreEarlierTs = true;
|
||||||
|
*ppRow = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
goto _next_fileset;
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
|
@ -644,16 +649,13 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
}
|
}
|
||||||
case SFSLASTNEXTROW_BLOCKROW: {
|
case SFSLASTNEXTROW_BLOCKROW: {
|
||||||
bool hasVal = false;
|
bool hasVal = false;
|
||||||
do {
|
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
*ppRow = &state->row;
|
||||||
*ppRow = &state->row;
|
hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
hasVal = tMergeTreeNext(&state->mergeTree);
|
|
||||||
} while (TSDBROW_TS(&state->row) <= state->lastTs && hasVal);
|
|
||||||
|
|
||||||
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
||||||
*pIgnoreEarlierTs = true;
|
*pIgnoreEarlierTs = true;
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
*ppRow = NULL;
|
||||||
goto _next_fileset;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pIgnoreEarlierTs = false;
|
*pIgnoreEarlierTs = false;
|
||||||
|
@ -835,7 +837,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
|
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
|
||||||
if (block.maxKey.ts <= state->lastTs) {
|
if (block.maxKey.ts <= state->lastTs) {
|
||||||
*pIgnoreEarlierTs = true;
|
*pIgnoreEarlierTs = true;
|
||||||
goto _next_fileset;
|
if (state->pBlockData) {
|
||||||
|
tBlockDataDestroy(state->pBlockData);
|
||||||
|
state->pBlockData = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppRow = NULL;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
*pIgnoreEarlierTs = false;
|
*pIgnoreEarlierTs = false;
|
||||||
tBlockDataReset(state->pBlockData);
|
tBlockDataReset(state->pBlockData);
|
||||||
|
@ -1724,6 +1732,15 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheGetElems(SVnode *pVnode) {
|
||||||
|
int32_t elems = 0;
|
||||||
|
if (pVnode->pTsdb != NULL) {
|
||||||
|
elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
return elems;
|
||||||
|
}
|
||||||
|
|
||||||
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
||||||
struct {
|
struct {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
|
|
|
@ -332,6 +332,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
// retrieve the only one last row of all tables in the uid list.
|
// retrieve the only one last row of all tables in the uid list.
|
||||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||||
|
|
||||||
|
@ -407,7 +408,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasNotNullRow) {
|
if (hasNotNullRow) {
|
||||||
pr->lastTs = minTs;
|
double cost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
if (cost > tsCacheLazyLoadThreshold) {
|
||||||
|
pr->lastTs = minTs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,7 +421,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
if (hasRes) {
|
if (hasRes) {
|
||||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||||
|
|
|
@ -29,9 +29,11 @@ struct SLDataIter {
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
SSttBlockLoadInfo *pBlockLoadInfo;
|
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||||
|
bool ignoreEarlierTs;
|
||||||
};
|
};
|
||||||
|
|
||||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfSttTrigger) {
|
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
|
||||||
|
int32_t numOfSttTrigger) {
|
||||||
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
|
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -162,7 +164,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
||||||
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||||
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
|
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
|
||||||
|
|
||||||
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr);
|
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
|
||||||
|
idStr);
|
||||||
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -263,7 +266,7 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
|
||||||
|
|
||||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
const char *idStr) {
|
const char *idStr, bool strictTimeRange) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
|
@ -340,6 +343,17 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
if ((*pIter)->iSttBlk != -1) {
|
if ((*pIter)->iSttBlk != -1) {
|
||||||
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
|
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
|
||||||
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
|
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
|
||||||
|
|
||||||
|
if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
|
||||||
|
(!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
|
||||||
|
(*pIter)->pSttBlk = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
|
||||||
|
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
|
||||||
|
(*pIter)->pSttBlk = NULL;
|
||||||
|
(*pIter)->ignoreEarlierTs = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -421,7 +435,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
pBlockData->aUid != NULL) {
|
pBlockData->aUid != NULL) {
|
||||||
i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
|
i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
|
||||||
if (i == -1) {
|
if (i == -1) {
|
||||||
tsdbDebug("failed to find the data in pBlockData, uid:%"PRIu64" , %s", pIter->uid, idStr);
|
tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
|
||||||
pIter->iRow = -1;
|
pIter->iRow = -1;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -508,7 +522,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set start row index
|
// set start row index
|
||||||
pIter->iRow = pIter->backward? pBlockData->nRow-1:0;
|
pIter->iRow = pIter->backward ? pBlockData->nRow - 1 : 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,7 +565,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
bool destroyLoadInfo, const char *idStr) {
|
bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
|
||||||
pMTree->backward = backward;
|
pMTree->backward = backward;
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -569,11 +583,12 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
|
|
||||||
pMTree->pLoadInfo = pBlockLoadInfo;
|
pMTree->pLoadInfo = pBlockLoadInfo;
|
||||||
pMTree->destroyLoadInfo = destroyLoadInfo;
|
pMTree->destroyLoadInfo = destroyLoadInfo;
|
||||||
|
pMTree->ignoreEarlierTs = false;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
struct SLDataIter *pIter = NULL;
|
struct SLDataIter *pIter = NULL;
|
||||||
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
|
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
|
||||||
&pMTree->pLoadInfo[i], pMTree->idStr);
|
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -583,6 +598,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
taosArrayPush(pMTree->pIterList, &pIter);
|
taosArrayPush(pMTree->pIterList, &pIter);
|
||||||
tMergeTreeAddIter(pMTree, pIter);
|
tMergeTreeAddIter(pMTree, pIter);
|
||||||
} else {
|
} else {
|
||||||
|
if (!pMTree->ignoreEarlierTs) {
|
||||||
|
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
|
||||||
|
}
|
||||||
tLDataIterClose(pIter);
|
tLDataIterClose(pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -596,6 +614,8 @@ _end:
|
||||||
|
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
||||||
|
|
||||||
|
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
|
||||||
|
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree) {
|
bool tMergeTreeNext(SMergeTree *pMTree) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pMTree->pIter) {
|
if (pMTree->pIter) {
|
||||||
|
|
|
@ -315,11 +315,11 @@ static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBuf->numOfTables > 0) {
|
if (pBuf->numOfTables > 0) {
|
||||||
STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
|
STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
|
||||||
taosMemoryFree(*p);
|
taosMemoryFree(*p);
|
||||||
pBuf->numOfTables /= pBuf->numPerBucket;
|
pBuf->numOfTables /= pBuf->numPerBucket;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
|
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
|
||||||
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
|
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
|
||||||
if (pBuf->pData == NULL) {
|
if (pBuf->pData == NULL) {
|
||||||
|
@ -919,7 +919,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
pBlockNum->numOfBlocks += 1;
|
pBlockNum->numOfBlocks += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
|
if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
|
||||||
numOfQTable += 1;
|
numOfQTable += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1798,7 +1798,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||||
if (!hasVal) { // the next value will be the accessed key in stt
|
if (!hasVal) { // the next value will be the accessed key in stt
|
||||||
pScanInfo->lastKeyInStt += step;
|
pScanInfo->lastKeyInStt += step;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2481,7 +2481,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
pScanInfo->uid, pReader->idStr);
|
pScanInfo->uid, pReader->idStr);
|
||||||
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
|
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
|
||||||
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
|
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
|
||||||
pLBlockReader->pInfo, false, pReader->idStr);
|
pLBlockReader->pInfo, false, pReader->idStr, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3512,7 +3512,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
CHECK_FILEBLOCK_STATE* state) {
|
CHECK_FILEBLOCK_STATE* state) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
|
|
||||||
*state = CHECK_FILEBLOCK_QUIT;
|
*state = CHECK_FILEBLOCK_QUIT;
|
||||||
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||||
|
@ -3927,7 +3927,8 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
|
pReader->status.uidList.tableUidList =
|
||||||
|
(uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
|
|
|
@ -382,6 +382,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
pLoad->syncRestore = state.restored;
|
pLoad->syncRestore = state.restored;
|
||||||
pLoad->syncCanRead = state.canRead;
|
pLoad->syncCanRead = state.canRead;
|
||||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||||
|
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
||||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||||
|
|
|
@ -580,6 +580,16 @@ static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t taosLRUCacheShardGetElems(SLRUCacheShard *shard) {
|
||||||
|
int32_t elems = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&shard->mutex);
|
||||||
|
elems = shard->table.elems;
|
||||||
|
taosThreadMutexUnlock(&shard->mutex);
|
||||||
|
|
||||||
|
return elems;
|
||||||
|
}
|
||||||
|
|
||||||
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
|
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
|
||||||
size_t usage = 0;
|
size_t usage = 0;
|
||||||
|
|
||||||
|
@ -755,6 +765,16 @@ size_t taosLRUCacheGetUsage(SLRUCache *cache) {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t taosLRUCacheGetElems(SLRUCache *cache) {
|
||||||
|
int32_t elems = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < cache->numShards; ++i) {
|
||||||
|
elems += taosLRUCacheShardGetElems(&cache->shards[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return elems;
|
||||||
|
}
|
||||||
|
|
||||||
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
|
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
|
||||||
size_t usage = 0;
|
size_t usage = 0;
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ class TDTestCase:
|
||||||
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
|
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
|
||||||
|
|
||||||
tdSql.query("select count(*) from information_schema.ins_columns")
|
tdSql.query("select count(*) from information_schema.ins_columns")
|
||||||
tdSql.checkData(0, 0, 271)
|
tdSql.checkData(0, 0, 272)
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
|
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
|
||||||
tdSql.checkRows(14)
|
tdSql.checkRows(14)
|
||||||
|
|
Loading…
Reference in New Issue