Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
76a5560e1d
|
@ -253,6 +253,7 @@ typedef struct SQueryTableDataCond {
|
|||
STimeWindow twindows;
|
||||
int64_t startVersion;
|
||||
int64_t endVersion;
|
||||
bool notLoadData; // response the actual data, not only the rows in the attribute of info.row of ssdatablock
|
||||
} SQueryTableDataCond;
|
||||
|
||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
||||
|
|
|
@ -26,10 +26,10 @@
|
|||
|
||||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) "null",
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) MSG, MSG "-rsp",
|
||||
#define TD_CLOSE_MSG_TYPE(TYPE)
|
||||
#define TD_CLOSE_MSG_SEG(TYPE)
|
||||
|
||||
char *tMsgInfo[] = {
|
||||
|
||||
|
@ -37,20 +37,20 @@
|
|||
|
||||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE)
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
|
||||
#define TD_CLOSE_MSG_TYPE(TYPE) TYPE,
|
||||
#define TD_CLOSE_MSG_SEG(TYPE) TYPE,
|
||||
int32_t tMsgRangeDict[] = {
|
||||
|
||||
#elif defined(TD_MSG_NUMBER_)
|
||||
|
||||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE##_NUM, TYPE##_RSP_NUM,
|
||||
#define TD_CLOSE_MSG_TYPE(TYPE)
|
||||
#define TD_CLOSE_MSG_SEG(TYPE)
|
||||
|
||||
enum {
|
||||
|
||||
|
@ -58,10 +58,10 @@
|
|||
|
||||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
|
||||
#define TD_CLOSE_MSG_TYPE(type)
|
||||
#define TD_CLOSE_MSG_SEG(type)
|
||||
|
||||
int32_t tMsgDict[] = {
|
||||
|
||||
|
@ -70,10 +70,10 @@
|
|||
|
||||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_SEG_CODE,
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
|
||||
#define TD_CLOSE_MSG_TYPE(TYPE)
|
||||
#define TD_CLOSE_MSG_SEG(TYPE)
|
||||
|
||||
enum {
|
||||
|
||||
|
@ -82,10 +82,10 @@
|
|||
|
||||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8),
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP,
|
||||
#define TD_CLOSE_MSG_TYPE(TYPE) TYPE,
|
||||
#define TD_CLOSE_MSG_SEG(TYPE) TYPE,
|
||||
|
||||
enum { // WARN: new msg should be appended to segment tail
|
||||
#endif
|
||||
|
@ -109,7 +109,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_DND_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_DND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
|
||||
|
@ -218,7 +218,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_VIEW, "drop-view", SCMDropViewReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_MND_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||
|
@ -268,7 +268,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_VND_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_VND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
|
||||
|
@ -283,7 +283,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
|||
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_SCH_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_SCH_MSG)
|
||||
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) //4 << 8
|
||||
|
@ -301,11 +301,11 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
|||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_STREAM_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_MON_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MON_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SYNC_MSG) //6 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
|
||||
|
@ -337,7 +337,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_SYNC_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_SYNC_MSG)
|
||||
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8
|
||||
|
@ -348,7 +348,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_VND_STREAM_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
|
||||
|
@ -362,10 +362,10 @@ TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_TYPE(TDMT_END_TMQ_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_TMQ_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark
|
||||
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MAX_MSG)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -166,8 +166,7 @@ typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyI
|
|||
|
||||
typedef struct TsdReader {
|
||||
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
|
||||
SHashObj** pIgnoreTables);
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables);
|
||||
void (*tsdReaderClose)();
|
||||
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
|
||||
int32_t (*tsdSetQueryTableList)();
|
||||
|
|
|
@ -154,8 +154,7 @@ typedef struct STsdbReader STsdbReader;
|
|||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||
|
||||
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
|
||||
SHashObj **pIgnoreTables);
|
||||
SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables);
|
||||
int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||
void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr);
|
||||
void tsdbReaderClose2(STsdbReader *pReader);
|
||||
|
|
|
@ -676,8 +676,6 @@ struct SDelFWriter {
|
|||
};
|
||||
|
||||
#include "tarray2.h"
|
||||
// #include "tsdbFS2.h"
|
||||
// struct STFileSet;
|
||||
typedef struct STFileSet STFileSet;
|
||||
typedef TARRAY2(STFileSet *) TFileSetArray;
|
||||
|
||||
|
@ -786,20 +784,25 @@ typedef struct SBlockDataInfo {
|
|||
int32_t sttBlockIndex;
|
||||
} SBlockDataInfo;
|
||||
|
||||
typedef struct SSttBlockLoadInfo {
|
||||
SBlockDataInfo blockData[2]; // buffered block data
|
||||
int32_t statisBlockIndex; // buffered statistics block index
|
||||
void *statisBlock; // buffered statistics block data
|
||||
void *pSttStatisBlkArray;
|
||||
SArray *aSttBlk;
|
||||
int32_t currentLoadBlockIndex;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
bool checkRemainingRow; // todo: no assign value?
|
||||
bool isLast;
|
||||
bool sttBlockLoaded;
|
||||
// todo: move away
|
||||
typedef struct {
|
||||
SArray *pUid;
|
||||
SArray *pFirstKey;
|
||||
SArray *pLastKey;
|
||||
SArray *pCount;
|
||||
} SSttTableRowsInfo;
|
||||
|
||||
typedef struct SSttBlockLoadInfo {
|
||||
SBlockDataInfo blockData[2]; // buffered block data
|
||||
SArray *aSttBlk;
|
||||
int32_t currentLoadBlockIndex;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
bool checkRemainingRow; // todo: no assign value?
|
||||
bool isLast;
|
||||
bool sttBlockLoaded;
|
||||
SSttTableRowsInfo info;
|
||||
SSttBlockLoadCostInfo cost;
|
||||
} SSttBlockLoadInfo;
|
||||
|
||||
|
@ -888,27 +891,31 @@ typedef struct {
|
|||
_load_tomb_fn loadTombFn;
|
||||
void *pReader;
|
||||
void *idstr;
|
||||
bool rspRows; // response the rows in stt-file, if possible
|
||||
} SMergeTreeConf;
|
||||
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf);
|
||||
typedef struct SSttDataInfoForTable {
|
||||
SArray* pTimeWindowList;
|
||||
int64_t numOfRows;
|
||||
} SSttDataInfoForTable;
|
||||
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||
void tMergeTreePinSttBlock(SMergeTree *pMTree);
|
||||
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
|
||||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||
void tMergeTreeClose(SMergeTree *pMTree);
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pTableInfo);
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||
void tMergeTreePinSttBlock(SMergeTree *pMTree);
|
||||
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
|
||||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||
void tMergeTreeClose(SMergeTree *pMTree);
|
||||
|
||||
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
|
||||
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
|
||||
|
||||
// tsdbCache ==============================================================================================
|
||||
typedef enum {
|
||||
READ_MODE_COUNT_ONLY = 0x1,
|
||||
READ_MODE_ALL,
|
||||
} EReadMode;
|
||||
READER_EXEC_DATA = 0x1,
|
||||
READER_EXEC_ROWS = 0x2,
|
||||
} EExecMode;
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
|
|
|
@ -238,10 +238,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
int32_t tqRestartStreamTasks(STQ* pTq);
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
int32_t tqStartStreamTasks(STQ* pTq);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
|
|
|
@ -1877,7 +1877,7 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb
|
|||
.idstr = pr->idstr,
|
||||
};
|
||||
|
||||
code = tMergeTreeOpen2(&iter->mergeTree, &conf);
|
||||
code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -435,7 +435,7 @@ _exit:
|
|||
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
|
||||
__func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
|
||||
}
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbFSet2.h"
|
||||
#include "tsdbUtil2.h"
|
||||
#include "tsdbMerge.h"
|
||||
#include "tsdbReadUtil.h"
|
||||
#include "tsdbSttFileRW.h"
|
||||
|
@ -52,15 +53,6 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
|
|||
return pLoadInfo;
|
||||
}
|
||||
|
||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
|
||||
pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
|
||||
pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
|
||||
pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
|
||||
}
|
||||
}
|
||||
|
||||
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||
if (pLoadInfo == NULL) {
|
||||
return NULL;
|
||||
|
@ -78,9 +70,11 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
|||
pInfo->sttBlockIndex = -1;
|
||||
pInfo->pin = false;
|
||||
|
||||
if (pLoadInfo->statisBlock != NULL) {
|
||||
tStatisBlockDestroy(pLoadInfo->statisBlock);
|
||||
taosMemoryFreeClear(pLoadInfo->statisBlock);
|
||||
if (pLoadInfo->info.pCount != NULL) {
|
||||
taosArrayDestroy(pLoadInfo->info.pUid);
|
||||
taosArrayDestroy(pLoadInfo->info.pFirstKey);
|
||||
taosArrayDestroy(pLoadInfo->info.pLastKey);
|
||||
taosArrayDestroy(pLoadInfo->info.pCount);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pLoadInfo->aSttBlk);
|
||||
|
@ -172,7 +166,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
|||
pInfo->cost.blockElapsedTime += el;
|
||||
pInfo->cost.loadBlocks += 1;
|
||||
|
||||
tsdbDebug("read last block, total load:%" PRId64 ", trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
|
||||
tsdbDebug("read stt block, total load:%" PRId64 ", trigger by uid:%" PRIu64 ", stt-fileVer:%" PRId64
|
||||
", last block index:%d, entry:%d, rows:%d, uidRange:%" PRId64 "-%" PRId64 " tsRange:%" PRId64 "-%" PRId64
|
||||
" %p, elapsed time:%.2f ms, %s",
|
||||
pInfo->cost.loadBlocks, pIter->uid, pIter->cid, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
|
||||
|
@ -323,95 +317,77 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t suidComparFn(const void *target, const void *p2) {
|
||||
const uint64_t *targetUid = target;
|
||||
const uint64_t *uid2 = p2;
|
||||
if (*uid2 == (*targetUid)) {
|
||||
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
|
||||
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
||||
if (numOfBlocks <= 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return (*targetUid) < (*uid2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
|
||||
SSttFileReader *pReader) {
|
||||
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
|
||||
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
if (p->minTbid.suid <= suid && p->maxTbid.suid >= suid) {
|
||||
break;
|
||||
}
|
||||
int32_t startIndex = 0;
|
||||
while((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
|
||||
++startIndex;
|
||||
}
|
||||
|
||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
return false;
|
||||
if (startIndex >= numOfBlocks || pStatisBlkArray->data[startIndex].minTbid.suid > suid) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (i < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
if (p->minTbid.suid > suid) {
|
||||
return false;
|
||||
int32_t endIndex = startIndex;
|
||||
while(endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) {
|
||||
++endIndex;
|
||||
}
|
||||
|
||||
int32_t num = endIndex - startIndex;
|
||||
pBlockLoadInfo->cost.loadStatisBlocks += num;
|
||||
|
||||
STbStatisBlock block;
|
||||
tStatisBlockInit(&block);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
for(int32_t k = startIndex; k < endIndex; ++k) {
|
||||
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t rows = TARRAY2_SIZE(block.suid);
|
||||
while (i < rows && block.suid->data[i] != suid) {
|
||||
++i;
|
||||
}
|
||||
|
||||
// if (pBlockLoadInfo->statisBlock == NULL) {
|
||||
// pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||
//
|
||||
// int64_t st = taosGetTimestampMs();
|
||||
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
|
||||
// pBlockLoadInfo->statisBlockIndex = i;
|
||||
//
|
||||
// double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||
// pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
// } else if (pBlockLoadInfo->statisBlockIndex != i) {
|
||||
// tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
|
||||
//
|
||||
// int64_t st = taosGetTimestampMs();
|
||||
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
|
||||
// pBlockLoadInfo->statisBlockIndex = i;
|
||||
//
|
||||
// double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||
// pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
// }
|
||||
|
||||
STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
|
||||
int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||
if (index == -1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t j = index;
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
return true;
|
||||
} else if (pBlock->uid->data[j] > uid) {
|
||||
while (j >= 0 && pBlock->suid->data[j] == suid) {
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
return true;
|
||||
} else {
|
||||
j -= 1;
|
||||
}
|
||||
// existed
|
||||
if (i < rows) {
|
||||
if (pBlockLoadInfo->info.pUid == NULL) {
|
||||
pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t));
|
||||
pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(int64_t));
|
||||
pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(int64_t));
|
||||
pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t));
|
||||
}
|
||||
} else {
|
||||
j = index + 1;
|
||||
while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) {
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
return true;
|
||||
} else {
|
||||
j += 1;
|
||||
|
||||
if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pUid, &block.uid->data[i], rows - i);
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i], rows - i);
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i], rows - i);
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pCount, &block.count->data[i], rows - i);
|
||||
} else {
|
||||
while (i < rows && block.suid->data[i] == suid) {
|
||||
taosArrayPush(pBlockLoadInfo->info.pUid, &block.uid->data[i]);
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i]);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i]);
|
||||
taosArrayPush(pBlockLoadInfo->info.pCount, &block.count->data[i]);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
tStatisBlockDestroy(&block);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
|
||||
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
|
||||
|
@ -428,19 +404,28 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
|
|||
return code;
|
||||
}
|
||||
|
||||
// load the stt block info for each stt file block
|
||||
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
|
||||
// load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||
TStatisBlkArray *pStatisBlkArray = NULL;
|
||||
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
// load statistics block for all tables in current stt file
|
||||
code = loadSttStatisticsBlockData(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, suid, idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
@ -448,19 +433,44 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t uidComparFn(const void* p1, const void* p2) {
|
||||
const uint64_t *pFirst = p1;
|
||||
const uint64_t *pVal = p2;
|
||||
|
||||
if (*pFirst == *pVal) {
|
||||
return 0;
|
||||
} else {
|
||||
return *pFirst < *pVal? -1:1;
|
||||
}
|
||||
}
|
||||
|
||||
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, STimeWindow *pTimeWindow,
|
||||
int64_t *numOfRows) {
|
||||
if (pTimeWindow == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ);
|
||||
if (index >= 0) {
|
||||
pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index);
|
||||
pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index);
|
||||
|
||||
*numOfRows += *(int64_t*) taosArrayGet(pLoadInfo->info.pCount, index);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
|
||||
uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
|
||||
SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
|
||||
_load_tomb_fn loadTombFn, void *pReader1) {
|
||||
SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow,
|
||||
int64_t *numOfRows, const char *idStr) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pIter->uid = uid;
|
||||
pIter->uid = pConf->uid;
|
||||
pIter->cid = cid;
|
||||
pIter->backward = backward;
|
||||
pIter->verRange.minVer = pRange->minVer;
|
||||
pIter->verRange.maxVer = pRange->maxVer;
|
||||
pIter->timeWindow.skey = pTimeWindow->skey;
|
||||
pIter->timeWindow.ekey = pTimeWindow->ekey;
|
||||
pIter->verRange.minVer = pConf->verRange.minVer;
|
||||
pIter->verRange.maxVer = pConf->verRange.maxVer;
|
||||
pIter->timeWindow.skey = pConf->timewindow.skey;
|
||||
pIter->timeWindow.ekey = pConf->timewindow.ekey;
|
||||
pIter->pReader = pSttFileReader;
|
||||
pIter->pBlockLoadInfo = pBlockLoadInfo;
|
||||
|
||||
|
@ -473,34 +483,29 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
|||
}
|
||||
|
||||
if (!pBlockLoadInfo->sttBlockLoaded) {
|
||||
code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr);
|
||||
code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, pConf->suid, pConf->loadTombFn, pConf->pReader, idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader);
|
||||
// if (!exists) {
|
||||
// pIter->iSttBlk = -1;
|
||||
// pIter->pSttBlk = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pTimeWindow, numOfRows);
|
||||
|
||||
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||
// the skey is updated.
|
||||
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
|
||||
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, pConf->uid, backward);
|
||||
if (pIter->iSttBlk != -1) {
|
||||
pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
|
||||
pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
|
||||
|
||||
if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
|
||||
(!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
|
||||
if ((!backward) && ((pConf->strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
|
||||
(!pConf->strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
|
||||
pIter->pSttBlk = NULL;
|
||||
}
|
||||
|
||||
if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
|
||||
(!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
|
||||
if (backward && ((pConf->strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
|
||||
(!pConf->strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
|
||||
pIter->pSttBlk = NULL;
|
||||
pIter->ignoreEarlierTs = true;
|
||||
}
|
||||
|
@ -708,8 +713,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
|||
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
|
||||
}
|
||||
|
||||
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
|
||||
|
||||
// SMergeTree =================================================
|
||||
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
|
||||
SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
|
||||
|
@ -737,7 +740,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
|
|||
return -1 * tLDataIterCmprFn(p1, p2);
|
||||
}
|
||||
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pSttDataInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pMTree->pIter = NULL;
|
||||
|
@ -758,17 +761,16 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
|||
goto _end;
|
||||
}
|
||||
|
||||
// add the list/iter placeholder
|
||||
adjustLDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
|
||||
adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
|
||||
|
||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
|
||||
SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
|
||||
SArray * pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
|
||||
|
||||
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
|
||||
SLDataIter *pIter = taosArrayGetP(pList, i);
|
||||
|
||||
SSttFileReader *pSttFileReader = pIter->pReader;
|
||||
SSttFileReader * pSttFileReader = pIter->pReader;
|
||||
SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
|
||||
|
||||
// open stt file reader if not opened yet
|
||||
|
@ -790,10 +792,11 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
|||
|
||||
memset(pIter, 0, sizeof(SLDataIter));
|
||||
|
||||
STimeWindow w = {0};
|
||||
int64_t numOfRows = 0;
|
||||
|
||||
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
|
||||
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
|
||||
&pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn,
|
||||
pConf->pReader);
|
||||
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, pMTree->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -801,6 +804,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
|||
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
|
||||
if (hasVal) {
|
||||
tMergeTreeAddIter(pMTree, pIter);
|
||||
|
||||
// let's record the time window for current table of uid in the stt files
|
||||
if (pSttDataInfo != NULL) {
|
||||
taosArrayPush(pSttDataInfo->pTimeWindowList, &w);
|
||||
pSttDataInfo->numOfRows += numOfRows;
|
||||
}
|
||||
} else {
|
||||
if (!pMTree->ignoreEarlierTs) {
|
||||
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
|
||||
|
|
|
@ -25,6 +25,16 @@
|
|||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
|
||||
|
||||
typedef struct {
|
||||
bool overlapWithNeighborBlock;
|
||||
bool hasDupTs;
|
||||
bool overlapWithDelInfo;
|
||||
bool overlapWithSttBlock;
|
||||
bool overlapWithKeyInBuf;
|
||||
bool partiallyRequired;
|
||||
bool moreThanCapcity;
|
||||
} SDataBlockToLoadInfo;
|
||||
|
||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
||||
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||
STsdbReader* pReader);
|
||||
|
@ -596,6 +606,13 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (pScanInfo->filesetWindow.skey > pRecord->firstKey) {
|
||||
pScanInfo->filesetWindow.skey = pRecord->firstKey;
|
||||
}
|
||||
if (pScanInfo->filesetWindow.ekey < pRecord->lastKey) {
|
||||
pScanInfo->filesetWindow.ekey = pRecord->lastKey;
|
||||
}
|
||||
|
||||
pBlockNum->numOfBlocks += 1;
|
||||
if (taosArrayGetSize(pTableScanInfoList) == 0) {
|
||||
taosArrayPush(pTableScanInfoList, &pScanInfo);
|
||||
|
@ -614,7 +631,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug(
|
||||
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
|
||||
"load block of %d tables completed, blocks:%d in %d tables, stt-files:%d, block-info-size:%.2f Kb, elapsed "
|
||||
"time:%.2f ms %s",
|
||||
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfSttFiles,
|
||||
sizeInDisk / 1000.0, el, pReader->idStr);
|
||||
|
@ -1226,78 +1243,6 @@ static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersio
|
|||
(pBlock->record.maxVer >= pVerRange->minVer) && (pBlock->record.minVer <= pVerRange->maxVer);
|
||||
}
|
||||
|
||||
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
|
||||
int32_t startIndex) {
|
||||
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
||||
|
||||
for (int32_t i = startIndex; i < num; i += 1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
||||
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
|
||||
if (p->version >= pRecord->minVer) {
|
||||
return true;
|
||||
}
|
||||
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
|
||||
if (p->version >= pRecord->minVer) {
|
||||
if (i < num - 1) {
|
||||
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
|
||||
if (pnext->ts >= pRecord->firstKey) {
|
||||
return true;
|
||||
}
|
||||
} else { // it must be the last point
|
||||
ASSERT(p->version == 0);
|
||||
}
|
||||
}
|
||||
} else { // (p->ts > pBlock->maxKey.ts) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
|
||||
if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// ts is not overlap
|
||||
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
|
||||
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
|
||||
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// version is not overlap
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
|
||||
} else {
|
||||
int32_t index = pBlockScanInfo->fileDelIndex;
|
||||
while (1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
|
||||
if (p->ts > pRecord->firstKey && index > 0) {
|
||||
index -= 1;
|
||||
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
|
||||
if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) {
|
||||
index -= 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, index);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
bool overlapWithNeighborBlock;
|
||||
bool hasDupTs;
|
||||
bool overlapWithDelInfo;
|
||||
bool overlapWithLastBlock;
|
||||
bool overlapWithKeyInBuf;
|
||||
bool partiallyRequired;
|
||||
bool moreThanCapcity;
|
||||
} SDataBlockToLoadInfo;
|
||||
|
||||
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
|
||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
|
||||
SBrinRecord rec = {0};
|
||||
|
@ -1318,7 +1263,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
pInfo->overlapWithLastBlock =
|
||||
pInfo->overlapWithSttBlock =
|
||||
!(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt);
|
||||
}
|
||||
|
||||
|
@ -1340,15 +1285,15 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
|
|||
|
||||
bool loadDataBlock =
|
||||
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
|
||||
info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||
info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithSttBlock);
|
||||
|
||||
// log the reason why load the datablock for profile
|
||||
if (loadDataBlock) {
|
||||
tsdbDebug("%p uid:%" PRIu64
|
||||
" need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
|
||||
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
|
||||
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithSttBlock:%d, %s",
|
||||
pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
|
||||
info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
|
||||
info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithSttBlock,
|
||||
pReader->idStr);
|
||||
}
|
||||
|
||||
|
@ -1360,7 +1305,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
|
|||
SDataBlockToLoadInfo info = {0};
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
|
||||
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
|
||||
info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||
info.overlapWithDelInfo || info.overlapWithSttBlock);
|
||||
return isCleanFileBlock;
|
||||
}
|
||||
|
||||
|
@ -2115,27 +2060,34 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||
// the last block reader has been initialized for this table.
|
||||
if (pLBlockReader->uid == pScanInfo->uid) {
|
||||
return hasDataInSttBlock(pLBlockReader);
|
||||
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||
bool hasData = true;
|
||||
|
||||
// the stt block reader has been initialized for this table.
|
||||
if (pSttBlockReader->uid == pScanInfo->uid) {
|
||||
return hasDataInSttBlock(pSttBlockReader);
|
||||
}
|
||||
|
||||
if (pLBlockReader->uid != 0) {
|
||||
tMergeTreeClose(&pLBlockReader->mergeTree);
|
||||
if (pSttBlockReader->uid != 0) {
|
||||
tMergeTreeClose(&pSttBlockReader->mergeTree);
|
||||
}
|
||||
|
||||
pLBlockReader->uid = pScanInfo->uid;
|
||||
pSttBlockReader->uid = pScanInfo->uid;
|
||||
|
||||
STimeWindow w = pLBlockReader->window;
|
||||
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
||||
// second time init stt block reader
|
||||
if (pScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) {
|
||||
return !pScanInfo->sttBlockReturned;
|
||||
}
|
||||
|
||||
STimeWindow w = pSttBlockReader->window;
|
||||
if (ASCENDING_TRAVERSE(pSttBlockReader->order)) {
|
||||
w.skey = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
} else {
|
||||
w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
|
||||
tsdbDebug("init stt block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
|
||||
pScanInfo->uid, pReader->idStr);
|
||||
|
||||
SMergeTreeConf conf = {
|
||||
|
@ -2143,20 +2095,22 @@ static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanIn
|
|||
.suid = pReader->info.suid,
|
||||
.pTsdb = pReader->pTsdb,
|
||||
.timewindow = w,
|
||||
.verRange = pLBlockReader->verRange,
|
||||
.verRange = pSttBlockReader->verRange,
|
||||
.strictTimeRange = false,
|
||||
.pSchema = pReader->info.pSchema,
|
||||
.pCurrentFileset = pReader->status.pCurrentFileset,
|
||||
.backward = (pLBlockReader->order == TSDB_ORDER_DESC),
|
||||
.backward = (pSttBlockReader->order == TSDB_ORDER_DESC),
|
||||
.pSttFileBlockIterArray = pReader->status.pLDataIterArray,
|
||||
.pCols = pReader->suppInfo.colId,
|
||||
.numOfCols = pReader->suppInfo.numOfCols,
|
||||
.loadTombFn = loadSttTombDataForAll,
|
||||
.pReader = pReader,
|
||||
.idstr = pReader->idStr,
|
||||
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
|
||||
};
|
||||
|
||||
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, &conf);
|
||||
SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))};
|
||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2164,13 +2118,44 @@ static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanIn
|
|||
initMemDataIterator(pScanInfo, pReader);
|
||||
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
||||
|
||||
code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
if (conf.rspRows) {
|
||||
pScanInfo->cleanSttBlocks =
|
||||
isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order);
|
||||
|
||||
if (pScanInfo->cleanSttBlocks) {
|
||||
pScanInfo->numOfRowsInStt = info.numOfRows;
|
||||
pScanInfo->sttWindow.skey = INT64_MAX;
|
||||
pScanInfo->sttWindow.ekey = INT64_MIN;
|
||||
|
||||
// calculate the time window for data in stt files
|
||||
for(int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) {
|
||||
STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i);
|
||||
if (pScanInfo->sttWindow.skey > pWindow->skey) {
|
||||
pScanInfo->sttWindow.skey = pWindow->skey;
|
||||
}
|
||||
|
||||
if (pScanInfo->sttWindow.ekey < pWindow->ekey) {
|
||||
pScanInfo->sttWindow.ekey = pWindow->ekey;
|
||||
}
|
||||
}
|
||||
|
||||
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList)? STT_FILE_HAS_DATA:STT_FILE_NO_DATA;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey;
|
||||
hasData = true;
|
||||
} else {
|
||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
}
|
||||
} else {
|
||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
}
|
||||
|
||||
taosArrayDestroy(info.pTimeWindowList);
|
||||
|
||||
int64_t el = taosGetTimestampUs() - st;
|
||||
pReader->cost.initSttBlockReader += (el / 1000.0);
|
||||
|
||||
tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
|
||||
return code;
|
||||
tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
|
||||
return hasData;
|
||||
}
|
||||
|
||||
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
|
||||
|
@ -2361,18 +2346,15 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
|
|||
|
||||
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t step = asc ? 1 : -1;
|
||||
double el = 0;
|
||||
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t step = asc ? 1 : -1;
|
||||
double el = 0;
|
||||
SBrinRecord* pRecord = &pBlockInfo->record;
|
||||
|
||||
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||
SBrinRecord* pRecord = &pBlockInfo->record;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
|
@ -2573,13 +2555,13 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
|||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey ||
|
||||
pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey ||
|
||||
(winFid.skey-1) < pReader->status.memTableMinKey);
|
||||
} else {
|
||||
pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) ||
|
||||
pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) ||
|
||||
pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey);
|
||||
}
|
||||
|
||||
|
||||
if (pReader->status.bProcMemPreFileset) {
|
||||
resetTableListIndex(&pReader->status);
|
||||
}
|
||||
|
@ -2589,9 +2571,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
|||
STsdReaderNotifyInfo info = {0};
|
||||
info.duration.filesetId = fid;
|
||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pReader->status.prevFilesetStartKey = winFid.skey;
|
||||
pReader->status.prevFilesetEndKey = winFid.ekey;
|
||||
}
|
||||
|
@ -2672,10 +2654,10 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
|
|||
}
|
||||
|
||||
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||
STableUidList* pUidList = &pStatus->uidList;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STableUidList* pUidList = &pStatus->uidList;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2711,8 +2693,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
continue;
|
||||
}
|
||||
|
||||
bool hasDataInLastFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||
if (!hasDataInLastFile) {
|
||||
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||
if (!hasDataInSttFile) {
|
||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2721,12 +2703,37 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// if only require the total rows, no need to load data from stt file if it is clean stt blocks
|
||||
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
|
||||
SDataBlockInfo* pInfo = &pResBlock->info;
|
||||
blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt);
|
||||
|
||||
pInfo->rows = pScanInfo->numOfRowsInStt;
|
||||
pInfo->id.uid = pScanInfo->uid;
|
||||
pInfo->dataLoad = 1;
|
||||
pInfo->window = pScanInfo->sttWindow;
|
||||
|
||||
setComposedBlockFlag(pReader, true);
|
||||
|
||||
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||
pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
||||
pScanInfo->sttBlockReturned = true;
|
||||
|
||||
pSttBlockReader->mergeTree.pIter = NULL;
|
||||
|
||||
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
|
||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||
pResBlock->info.rows, pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
while (1) {
|
||||
bool hasBlockLData = hasDataInSttBlock(pSttBlockReader);
|
||||
|
||||
// no data in last block and block, no need to proceed.
|
||||
if (hasBlockLData == false) {
|
||||
// no data in stt block and block, no need to proceed.
|
||||
if (!hasDataInSttBlock(pSttBlockReader)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2771,14 +2778,13 @@ static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockSc
|
|||
}
|
||||
|
||||
static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
|
||||
|
@ -2836,13 +2842,14 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
tsdbDebug("load data in last block firstly %s", pReader->idStr);
|
||||
tsdbDebug("load data in stt block firstly %s", pReader->idStr);
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
// let's load data from stt files
|
||||
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
||||
pScanInfo->cleanSttBlocks = false;
|
||||
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||
|
||||
// no data in last block, no need to proceed.
|
||||
// no data in stt block, no need to proceed.
|
||||
while (hasDataInSttBlock(pSttBlockReader)) {
|
||||
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||
|
||||
|
@ -2880,147 +2887,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
|
||||
}
|
||||
|
||||
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
LRUHandle* handle = NULL;
|
||||
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
|
||||
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
|
||||
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
|
||||
size_t num = taosArrayGetSize(aBlockIdx);
|
||||
if (num == 0) {
|
||||
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SBlockIdx* pBlockIdx = NULL;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
|
||||
if (pBlockIdx->suid != pReader->info.suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STableBlockScanInfo** p = tSimpleHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pScanInfo = *p;
|
||||
SDataBlk block = {0};
|
||||
// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||
// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
|
||||
// pReader->rowsNum += block.nRow;
|
||||
// }
|
||||
}
|
||||
#endif
|
||||
|
||||
_end:
|
||||
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
|
||||
#if 0
|
||||
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
|
||||
pBlockLoadInfo = &pSttBlockReader->pInfo[i];
|
||||
|
||||
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||
if (size >= 1) {
|
||||
SSttBlk* pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
|
||||
SSttBlk* pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
|
||||
|
||||
// all identical
|
||||
if (pStart->suid == pEnd->suid) {
|
||||
if (pStart->suid != pReader->info.suid) {
|
||||
// no qualified stt block existed
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
continue;
|
||||
}
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
|
||||
pReader->rowsNum += p->nRow;
|
||||
}
|
||||
} else {
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
|
||||
uint64_t s = p->suid;
|
||||
if (s < pReader->info.suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (s == pReader->info.suid) {
|
||||
pReader->rowsNum += p->nRow;
|
||||
} else if (s > pReader->info.suid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
while (1) {
|
||||
bool hasNext = false;
|
||||
code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!hasNext) { // no data files on disk
|
||||
break;
|
||||
}
|
||||
|
||||
// code = doSumFileBlockRows(pReader, pReader->pFileReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doSumSttBlockRows(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pReader->status.loadFromFile = false;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t memNum = 0, imemNum = 0;
|
||||
if (pReader->pReadSnap->pMem != NULL) {
|
||||
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
|
||||
}
|
||||
|
||||
if (pReader->pReadSnap->pIMem != NULL) {
|
||||
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
|
||||
}
|
||||
|
||||
pReader->rowsNum += memNum + imemNum;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableUidList* pUidList = &pStatus->uidList;
|
||||
|
@ -3149,7 +3015,7 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
|
|||
return TSDB_READ_RETURN;
|
||||
}
|
||||
|
||||
// all data blocks are checked in this last block file, now let's try the next file
|
||||
// all data blocks are checked in this stt file, now let's try the next file set
|
||||
ASSERT(pReader->status.pTableIter == NULL);
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
||||
|
@ -3875,7 +3741,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
|||
}
|
||||
|
||||
if (row.type == TSDBROW_ROW_FMT) {
|
||||
int64_t ts = row.pTSRow->ts;;
|
||||
int64_t ts = row.pTSRow->ts;
|
||||
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
|
||||
|
||||
if (freeTSRow) {
|
||||
|
@ -3986,14 +3852,14 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
|||
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
|
||||
pReader->status.bProcMemFirstFileset = true;
|
||||
}
|
||||
|
||||
|
||||
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
|
||||
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pStatus->fileIter.numOfFiles == 0) {
|
||||
pStatus->loadFromFile = false;
|
||||
} else if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
|
||||
// } else if (READER_EXEC_DATA == pReader->info.readMode) {
|
||||
// DO NOTHING
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
@ -4034,8 +3900,7 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
|
|||
|
||||
// ====================================== EXPOSED APIs ======================================
|
||||
int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
|
||||
SHashObj** pIgnoreTables) {
|
||||
SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables) {
|
||||
STimeWindow window = pCond->twindows;
|
||||
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
|
||||
|
||||
|
@ -4141,13 +4006,9 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
}
|
||||
|
||||
pReader->flag = READER_STATUS_SUSPEND;
|
||||
|
||||
if (countOnly) {
|
||||
pReader->info.readMode = READ_MODE_COUNT_ONLY;
|
||||
}
|
||||
pReader->info.execMode = pCond->notLoadData? READER_EXEC_ROWS : READER_EXEC_DATA;
|
||||
|
||||
pReader->pIgnoreTables = pIgnoreTables;
|
||||
|
||||
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
|
||||
" in this query %s",
|
||||
pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer,
|
||||
|
@ -4257,8 +4118,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
|
||||
pReader->status.suspendInvoked = true; // record the suspend status
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
if (pBlockInfo != NULL) {
|
||||
|
@ -4383,32 +4242,6 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
if (pReader->status.loadFromFile == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
code = readRowsCountFromFiles(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
code = readRowsCountFromMem(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pBlock->info.rows = pReader->rowsNum;
|
||||
pBlock->info.id.uid = 0;
|
||||
pBlock->info.dataLoad = 0;
|
||||
pReader->rowsNum = 0;
|
||||
|
||||
return pBlock->info.rows > 0;
|
||||
}
|
||||
|
||||
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -4421,7 +4254,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
|||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
code = buildBlockFromBufferSequentially(pReader, endKey);
|
||||
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) {
|
||||
return code;
|
||||
} else {
|
||||
|
@ -4489,9 +4322,6 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
|||
return code;
|
||||
}
|
||||
|
||||
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
|
||||
return tsdbReadRowsCountOnly(pReader);
|
||||
}
|
||||
if (!pReader->bFilesetDelimited) {
|
||||
code = doTsdbNextDataBlockFilesFirst(pReader);
|
||||
} else {
|
||||
|
@ -4793,7 +4623,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
|
|||
}
|
||||
|
||||
SReaderStatus* pStatus = &pTReader->status;
|
||||
if (pStatus->composedDataBlock) {
|
||||
if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
|
||||
return pTReader->resBlockInfo.pResBlock;
|
||||
}
|
||||
|
||||
|
@ -4830,9 +4660,9 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
|
||||
pReader->info.order = pCond->order;
|
||||
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
|
||||
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||
pStatus->loadFromFile = true;
|
||||
pStatus->pTableIter = NULL;
|
||||
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
||||
|
|
|
@ -22,15 +22,9 @@
|
|||
#include "tsdbUtil2.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
#define INIT_TIMEWINDOW(_w) do { (_w)->skey = INT64_MAX; (_w)->ekey = INT64_MIN;} while(0);
|
||||
|
||||
static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
||||
|
||||
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||
int32_t num = numOfTables / pBuf->numPerBucket;
|
||||
|
@ -61,6 +55,16 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||
if (numOfTables <= pBuf->numOfTables) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -153,6 +157,9 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
|
||||
|
||||
pScanInfo->uid = idList[j].uid;
|
||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
|
||||
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
|
||||
|
||||
pUidList->tableUidList[j] = idList[j].uid;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
|
||||
|
@ -243,6 +250,10 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
|||
taosArrayClear(pScanInfo->pBlockList);
|
||||
taosArrayClear(pScanInfo->pBlockIdxList);
|
||||
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
||||
pScanInfo->cleanSttBlocks = false;
|
||||
pScanInfo->numOfRowsInStt = 0;
|
||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
|
||||
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
}
|
||||
|
||||
|
@ -403,12 +414,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
|
||||
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
|
||||
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
|
||||
}
|
||||
taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||
pTableScanInfo->pBlockList = NULL;
|
||||
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
|
||||
|
@ -457,8 +466,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
|
||||
taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||
pTableScanInfo->pBlockList = NULL;
|
||||
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||
}
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
|
@ -488,7 +496,7 @@ typedef enum {
|
|||
BLK_CHECK_QUIT = 0x2,
|
||||
} ETombBlkCheckEnum;
|
||||
|
||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j);
|
||||
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
|
||||
ETombBlkCheckEnum* pRet) {
|
||||
|
@ -662,17 +670,17 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
|
|||
}
|
||||
}
|
||||
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid,
|
||||
const uint64_t* pUidList, int32_t numOfTables) {
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||
int32_t numOfTables) {
|
||||
int32_t num = 0;
|
||||
|
||||
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
|
||||
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].minTbid.suid < suid)) {
|
||||
while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].maxTbid.suid < suid)) {
|
||||
++i;
|
||||
}
|
||||
|
||||
|
@ -681,64 +689,65 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo
|
|||
}
|
||||
|
||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
if (pBlockLoadInfo->statisBlock == NULL) {
|
||||
pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||
tStatisBlockInit(pBlockLoadInfo->statisBlock);
|
||||
}
|
||||
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||
tStatisBlockInit(pStatisBlock);
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
tsdbSttFileReadStatisBlock(pSttFileReader, p, pBlockLoadInfo->statisBlock);
|
||||
pBlockLoadInfo->statisBlockIndex = i;
|
||||
tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||
pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
|
||||
STbStatisBlock *pBlock = pBlockLoadInfo->statisBlock;
|
||||
|
||||
int32_t index = 0;
|
||||
while (index < TARRAY2_SIZE(pBlock->suid) && pBlock->suid->data[index] < suid) {
|
||||
while (index < TARRAY2_SIZE(pStatisBlock->suid) && pStatisBlock->suid->data[index] < suid) {
|
||||
++index;
|
||||
}
|
||||
|
||||
if (index >= TARRAY2_SIZE(pBlock->suid)) {
|
||||
if (index >= TARRAY2_SIZE(pStatisBlock->suid)) {
|
||||
tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
return num;
|
||||
}
|
||||
|
||||
int32_t j = index;
|
||||
int32_t uidIndex = 0;
|
||||
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex <= numOfTables) {
|
||||
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
|
||||
p = &pStatisBlkArray->data[i];
|
||||
if (p->minTbid.suid > suid) {
|
||||
tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
return num;
|
||||
}
|
||||
|
||||
uint64_t uid = pUidList[uidIndex];
|
||||
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
num += pBlock->count->data[j];
|
||||
if (pStatisBlock->uid->data[j] == uid) {
|
||||
num += pStatisBlock->count->data[j];
|
||||
uidIndex += 1;
|
||||
j += 1;
|
||||
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j);
|
||||
} else if (pBlock->uid->data[j] < uid) {
|
||||
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j);
|
||||
} else if (pStatisBlock->uid->data[j] < uid) {
|
||||
j += 1;
|
||||
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j);
|
||||
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j);
|
||||
} else {
|
||||
uidIndex += 1;
|
||||
}
|
||||
}
|
||||
|
||||
tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
return num;
|
||||
}
|
||||
|
||||
// load next stt statistics block
|
||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
|
||||
if ((*j) >= numOfRows) {
|
||||
(*i) += 1;
|
||||
(*j) = 0;
|
||||
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pBlockLoadInfo->statisBlock);
|
||||
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -762,7 +771,7 @@ void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
|
||||
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
|
||||
int32_t numOfLevels = pFileSet->lvlArr->size;
|
||||
|
||||
// add the list/iter placeholder
|
||||
|
@ -791,7 +800,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
}
|
||||
|
||||
// add the list/iter placeholder
|
||||
adjustLDataIters(pSttFileBlockIterArray, pFileSet);
|
||||
adjustSttDataIters(pSttFileBlockIterArray, pFileSet);
|
||||
|
||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
|
||||
|
@ -819,7 +828,8 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
}
|
||||
|
||||
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pIter->pBlockLoadInfo->pSttStatisBlkArray);
|
||||
TStatisBlkArray *pStatisBlkArray = NULL;
|
||||
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
|
||||
continue;
|
||||
|
@ -829,9 +839,214 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
|||
STsdbReader* pReader = pConf->pReader;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
uint64_t* pUidList = pReader->status.uidList.tableUidList;
|
||||
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pConf->suid, pUidList, numOfTables);
|
||||
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
|
||||
numOfTables);
|
||||
}
|
||||
}
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static bool overlapHelper(const STimeWindow* pLeft, TSKEY minKey, TSKEY maxKey) {
|
||||
return (pLeft->ekey >= minKey) && (pLeft->skey <= maxKey);
|
||||
}
|
||||
|
||||
static bool overlapWithTimeWindow(STimeWindow* p1, STimeWindow* pQueryWindow, STableBlockScanInfo* pBlockScanInfo,
|
||||
int32_t order) {
|
||||
// overlap with query window
|
||||
if (!(p1->skey >= pQueryWindow->skey && p1->ekey <= pQueryWindow->ekey)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
SIterInfo* pMemIter = &pBlockScanInfo->iter;
|
||||
SIterInfo* pIMemIter = &pBlockScanInfo->iiter;
|
||||
|
||||
// overlap with mem data
|
||||
if (pMemIter->hasVal) {
|
||||
STbData* pTbData = pMemIter->iter->pTbData;
|
||||
if (overlapHelper(p1, pTbData->minKey, pTbData->maxKey)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// overlap with imem data
|
||||
if (pIMemIter->hasVal) {
|
||||
STbData* pITbData = pIMemIter->iter->pTbData;
|
||||
if (overlapHelper(p1, pITbData->minKey, pITbData->maxKey)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// overlap with data file block
|
||||
STimeWindow* pFileWin = &pBlockScanInfo->filesetWindow;
|
||||
if ((taosArrayGetSize(pBlockScanInfo->pBlockIdxList) > 0) && overlapHelper(p1, pFileWin->skey, pFileWin->ekey)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// overlap with deletion skyline
|
||||
SBrinRecord record = {.firstKey = p1->skey, .lastKey = p1->ekey};
|
||||
if (overlapWithDelSkylineWithoutVer(pBlockScanInfo, &record, order)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t sortUidComparFn(const void* p1, const void* p2) {
|
||||
const STimeWindow* px1 = p1;
|
||||
const STimeWindow* px2 = p2;
|
||||
if (px1->skey == px2->skey) {
|
||||
return 0;
|
||||
} else {
|
||||
return px1->skey < px2->skey? -1:1;
|
||||
}
|
||||
}
|
||||
|
||||
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo *pScanInfo, int32_t order) {
|
||||
// check if it overlap with del skyline
|
||||
taosArraySort(pTimewindowList, sortUidComparFn);
|
||||
|
||||
int32_t num = taosArrayGetSize(pTimewindowList);
|
||||
if (num == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
STimeWindow* p = taosArrayGet(pTimewindowList, 0);
|
||||
if (overlapWithTimeWindow(p, pQueryWindow, pScanInfo, order)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num - 1; ++i) {
|
||||
STimeWindow* p1 = taosArrayGet(pTimewindowList, i);
|
||||
STimeWindow* p2 = taosArrayGet(pTimewindowList, i + 1);
|
||||
|
||||
if (p1->ekey >= p2->skey) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool overlap = overlapWithTimeWindow(p2, pQueryWindow, pScanInfo, order);
|
||||
if (overlap) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
|
||||
int32_t startIndex) {
|
||||
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
||||
|
||||
for (int32_t i = startIndex; i < num; i += 1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
||||
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
|
||||
if (p->version >= pRecord->minVer) {
|
||||
return true;
|
||||
}
|
||||
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
|
||||
if (p->version >= pRecord->minVer) {
|
||||
if (i < num - 1) {
|
||||
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
|
||||
if (pnext->ts >= pRecord->firstKey) {
|
||||
return true;
|
||||
}
|
||||
} else { // it must be the last point
|
||||
ASSERT(p->version == 0);
|
||||
}
|
||||
}
|
||||
} else { // (p->ts > pBlock->maxKey.ts) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
|
||||
int32_t startIndex) {
|
||||
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
||||
|
||||
for (int32_t i = startIndex; i < num; i += 1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
||||
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
|
||||
return true;
|
||||
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
|
||||
if (i < num - 1) {
|
||||
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
|
||||
if (pnext->ts >= pRecord->firstKey) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else { // (p->ts > pBlock->maxKey.ts) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
|
||||
if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// ts is not overlap
|
||||
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
|
||||
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
|
||||
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// version is not overlap
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
return doCheckDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
|
||||
} else {
|
||||
int32_t index = pBlockScanInfo->fileDelIndex;
|
||||
while (1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
|
||||
if (p->ts > pRecord->firstKey && index > 0) {
|
||||
index -= 1;
|
||||
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
|
||||
if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) {
|
||||
index -= 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return doCheckDatablockOverlap(pBlockScanInfo, pRecord, index);
|
||||
}
|
||||
}
|
||||
|
||||
bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
|
||||
if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// ts is not overlap
|
||||
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
|
||||
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
|
||||
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// version is not overlap
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
|
||||
} else {
|
||||
int32_t index = pBlockScanInfo->fileDelIndex;
|
||||
while (1) {
|
||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
|
||||
if (p->ts > pRecord->firstKey && index > 0) {
|
||||
index -= 1;
|
||||
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
|
||||
if (p->ts == pRecord->firstKey && index > 0) {
|
||||
index -= 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, index);
|
||||
}
|
||||
}
|
|
@ -40,8 +40,7 @@ typedef enum {
|
|||
typedef struct STsdbReaderInfo {
|
||||
uint64_t suid;
|
||||
STSchema* pSchema;
|
||||
EReadMode readMode;
|
||||
uint64_t rowsNum;
|
||||
EExecMode execMode;
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int16_t order;
|
||||
|
@ -75,6 +74,11 @@ typedef struct SSttKeyInfo {
|
|||
int64_t nextProcKey;
|
||||
} SSttKeyInfo;
|
||||
|
||||
// clean stt file blocks:
|
||||
// 1. not overlap with stt blocks in other stt files of the same fileset
|
||||
// 2. not overlap with delete skyline
|
||||
// 3. not overlap with in-memory data (mem/imem)
|
||||
// 4. not overlap with data file blocks
|
||||
typedef struct STableBlockScanInfo {
|
||||
uint64_t uid;
|
||||
TSKEY lastProcKey;
|
||||
|
@ -89,6 +93,11 @@ typedef struct STableBlockScanInfo {
|
|||
int32_t fileDelIndex; // file block delete index
|
||||
int32_t sttBlockDelIndex; // delete index for last block
|
||||
bool iterInit; // whether to initialize the in-memory skip list iterator or not
|
||||
bool cleanSttBlocks; // stt block is clean in current fileset
|
||||
bool sttBlockReturned; // result block returned alreay
|
||||
int64_t numOfRowsInStt;
|
||||
STimeWindow sttWindow; // timestamp window for current stt files
|
||||
STimeWindow filesetWindow; // timestamp window for current file set
|
||||
} STableBlockScanInfo;
|
||||
|
||||
typedef struct SResultBlockInfo {
|
||||
|
@ -146,6 +155,7 @@ typedef struct SBlockLoadSuppInfo {
|
|||
bool smaValid; // the sma on all queried columns are activated
|
||||
} SBlockLoadSuppInfo;
|
||||
|
||||
// each blocks in stt file not overlaps with in-memory/data-file/tomb-files, and not overlap with any other blocks in stt-file
|
||||
typedef struct SSttBlockReader {
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
|
@ -187,7 +197,6 @@ typedef struct SFileBlockDumpInfo {
|
|||
} SFileBlockDumpInfo;
|
||||
|
||||
typedef struct SReaderStatus {
|
||||
bool suspendInvoked;
|
||||
bool loadFromFile; // check file stage
|
||||
bool composedDataBlock; // the returned data block is a composed block or not
|
||||
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||
|
@ -272,12 +281,17 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
|
|||
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
|
||||
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid,
|
||||
const uint64_t* pUidList, int32_t numOfTables);
|
||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||
int32_t numOfTables);
|
||||
|
||||
void destroyLDataIter(SLDataIter* pIter);
|
||||
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
|
||||
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
|
||||
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
|
||||
const char* pstr);
|
||||
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
|
||||
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
||||
|
||||
typedef struct {
|
||||
SArray* pTombData;
|
||||
} STableLoadInfo;
|
||||
|
|
|
@ -177,12 +177,15 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
|||
}
|
||||
|
||||
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
|
||||
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
|
||||
while (async->queue[i].next != &async->queue[i]) {
|
||||
SVATask *task = async->queue[i].next;
|
||||
task->prev->next = task->next;
|
||||
task->next->prev = task->prev;
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
|
||||
async->queue[2].next != &async->queue[2]) {
|
||||
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
|
||||
while (async->queue[i].next != &async->queue[i]) {
|
||||
SVATask *task = async->queue[i].next;
|
||||
task->prev->next = task->next;
|
||||
task->next->prev = task->prev;
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -42,7 +42,7 @@ void initStorageAPI(SStorageAPI* pAPI) {
|
|||
|
||||
void initTsdbReaderAPI(TsdReader* pReader) {
|
||||
pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*,
|
||||
bool, SHashObj**))tsdbReaderOpen2;
|
||||
SHashObj**))tsdbReaderOpen2;
|
||||
pReader->tsdReaderClose = tsdbReaderClose2;
|
||||
|
||||
pReader->tsdNextDataBlock = tsdbNextDataBlock2;
|
||||
|
|
|
@ -1734,6 +1734,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
pCond->endVersion = -1;
|
||||
pCond->skipRollup = readHandle->skipRollup;
|
||||
|
||||
// allowed read stt file optimization mode
|
||||
pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
|
||||
(pTableScanNode->scan.node.pConditions == NULL) &&
|
||||
(pTableScanNode->interval == 0);
|
||||
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
|
||||
|
|
|
@ -1232,7 +1232,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
if (pScanBaseInfo->dataReader == NULL) {
|
||||
int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(
|
||||
pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock,
|
||||
(void**)&pScanBaseInfo->dataReader, id, false, NULL);
|
||||
(void**)&pScanBaseInfo->dataReader, id, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
|
||||
terrno = code;
|
||||
|
@ -1291,7 +1291,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
int32_t size = tableListGetSize(pTableListInfo);
|
||||
|
||||
pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL,
|
||||
(void**)&pInfo->dataReader, NULL, false, NULL);
|
||||
(void**)&pInfo->dataReader, NULL, NULL);
|
||||
|
||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
||||
|
|
|
@ -889,7 +889,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
ASSERT(pInfo->base.dataReader == NULL);
|
||||
|
||||
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -1061,7 +1061,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
// blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1183,7 +1182,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
STsdbReader* pReader = NULL;
|
||||
int32_t code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
||||
(void**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
|
||||
(void**)&pReader, GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -3288,7 +3287,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||
if (pInfo->bOnlyRetrieveBlock) {
|
||||
pInfo->bOnlyRetrieveBlock = false;
|
||||
}
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -3311,7 +3310,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
@ -3427,7 +3426,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->mSkipTables);
|
||||
if (pInfo->filesetDelimited) {
|
||||
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
|
||||
}
|
||||
|
|
|
@ -2304,7 +2304,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
|||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
|
||||
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
|
||||
(void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL);
|
||||
(void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
if (code != 0) {
|
||||
goto _error;
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct SHttpModule {
|
|||
SAsyncPool* asyncPool;
|
||||
TdThread thread;
|
||||
SHashObj* connStatusTable;
|
||||
int8_t quit;
|
||||
} SHttpModule;
|
||||
|
||||
typedef struct SHttpMsg {
|
||||
|
@ -166,7 +167,7 @@ _OVER:
|
|||
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
||||
uint32_t ip = taosGetIpv4FromFqdn(server);
|
||||
if (ip == 0xffffffff) {
|
||||
tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
|
||||
tError("http-report failed to resolving domain names: %s", server);
|
||||
return -1;
|
||||
}
|
||||
char buf[128] = {0};
|
||||
|
@ -190,19 +191,40 @@ static void httpDestroyMsg(SHttpMsg* msg) {
|
|||
taosMemoryFree(msg->cont);
|
||||
taosMemoryFree(msg);
|
||||
}
|
||||
|
||||
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
||||
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||
if (atomic_load_8(&http->quit) == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&item->qmsg)) {
|
||||
queue* h = QUEUE_HEAD(&item->qmsg);
|
||||
QUEUE_REMOVE(h);
|
||||
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||
if (!msg->quit) {
|
||||
httpDestroyMsg(msg);
|
||||
} else {
|
||||
quitMsg = msg;
|
||||
}
|
||||
}
|
||||
if (quitMsg != NULL) {
|
||||
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
|
||||
}
|
||||
}
|
||||
static void httpAsyncCb(uv_async_t* handle) {
|
||||
SAsyncItem* item = handle->data;
|
||||
SHttpModule* http = item->pThrd;
|
||||
|
||||
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||
|
||||
queue wq;
|
||||
queue wq;
|
||||
QUEUE_INIT(&wq);
|
||||
|
||||
static int32_t BATCH_SIZE = 5;
|
||||
int32_t count = 0;
|
||||
|
||||
taosThreadMutexLock(&item->mtx);
|
||||
httpMayDiscardMsg(http, item);
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
|
||||
queue* h = QUEUE_HEAD(&item->qmsg);
|
||||
|
@ -497,9 +519,10 @@ static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
|||
static void transHttpEnvInit() {
|
||||
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
|
||||
|
||||
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
||||
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
|
||||
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
http->quit = 0;
|
||||
|
||||
uv_loop_init(http->loop);
|
||||
|
||||
|
@ -526,6 +549,8 @@ void transHttpEnvDestroy() {
|
|||
return;
|
||||
}
|
||||
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
||||
|
||||
atomic_store_8(&load->quit, 1);
|
||||
httpSendQuit();
|
||||
taosThreadJoin(load->thread, NULL);
|
||||
|
||||
|
|
Loading…
Reference in New Issue