refactor: do some internal refactor.
This commit is contained in:
parent
1145aabb94
commit
3bae30412c
|
@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
.reader = handle.execHandle.pExecReader[i],
|
.reader = handle.execHandle.pExecReader[i],
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.pMsgCb = &pTq->pVnode->msgCb,
|
.pMsgCb = &pTq->pVnode->msgCb,
|
||||||
|
.vnode = pTq->pVnode,
|
||||||
};
|
};
|
||||||
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
|
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
|
||||||
ASSERT(handle.execHandle.execCol.task[i]);
|
ASSERT(handle.execHandle.execCol.task[i]);
|
||||||
|
|
|
@ -154,6 +154,7 @@ static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArr
|
||||||
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
||||||
STSRow** pTSRow);
|
STSRow** pTSRow);
|
||||||
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData);
|
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData);
|
||||||
|
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr);
|
||||||
|
|
||||||
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
@ -373,7 +374,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
initReaderStatus(&pReader->status);
|
initReaderStatus(&pReader->status);
|
||||||
|
|
||||||
pReader->pTsdb = pVnode->pTsdb;
|
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr);
|
||||||
pReader->suid = pCond->suid;
|
pReader->suid = pCond->suid;
|
||||||
pReader->order = pCond->order;
|
pReader->order = pCond->order;
|
||||||
pReader->capacity = 4096;
|
pReader->capacity = 4096;
|
||||||
|
@ -2375,6 +2376,43 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr) {
|
||||||
|
if (VND_IS_RSMA(pVnode)) {
|
||||||
|
int level = 0;
|
||||||
|
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
|
||||||
|
|
||||||
|
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||||
|
SRetention* pRetention = retentions + level;
|
||||||
|
if (pRetention->keep <= 0) {
|
||||||
|
if (level > 0) {
|
||||||
|
--level;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if ((now - pRetention->keep) <= winSKey) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
++level;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
const char* str = (idStr != NULL)? idStr:"";
|
||||||
|
|
||||||
|
if (level == TSDB_RETENTION_L0) {
|
||||||
|
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
|
||||||
|
return VND_RSMA0(pVnode);
|
||||||
|
} else if (level == TSDB_RETENTION_L1) {
|
||||||
|
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
|
||||||
|
return VND_RSMA1(pVnode);
|
||||||
|
} else {
|
||||||
|
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
|
||||||
|
return VND_RSMA2(pVnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return VND_TSDB(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
// // todo not unref yet, since it is not support multi-group interpolation query
|
// // todo not unref yet, since it is not support multi-group interpolation query
|
||||||
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
|
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
|
||||||
// // filter the queried time stamp in the first place
|
// // filter the queried time stamp in the first place
|
||||||
|
@ -3280,8 +3318,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(0);
|
tsdbDebug("%p reset reader, suid:%"PRIu64", numOfTables:%d, query range:%"PRId64" - %"PRId64" in query %s", pReader, pReader->suid,
|
||||||
tsdbDebug("%p reset tsdbreader in query %s", pReader, numOfTables, pReader->idStr);
|
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -305,9 +305,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
|
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
|
||||||
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
|
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
|
||||||
if (pTagIndexCond) {
|
if (pTagIndexCond) {
|
||||||
SIndexMetaArg metaArg = {
|
|
||||||
.metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid};
|
|
||||||
|
|
||||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||||
// code = doFilterTag(pTagIndexCond, &metaArg, res);
|
// code = doFilterTag(pTagIndexCond, &metaArg, res);
|
||||||
code = TSDB_CODE_INDEX_REBUILDING;
|
code = TSDB_CODE_INDEX_REBUILDING;
|
||||||
|
|
Loading…
Reference in New Issue