refactor(query): do some internal refactor.
This commit is contained in:
parent
59b7435170
commit
04f087793e
|
@ -813,7 +813,6 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
pRequest->metric.syntaxEnd = taosGetTimestampUs();
|
pRequest->metric.syntaxEnd = taosGetTimestampUs();
|
||||||
|
|
||||||
if (!updateMetaForce) {
|
if (!updateMetaForce) {
|
||||||
STscObj *pTscObj = pRequest->pTscObj;
|
|
||||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
if (NULL == pRequest->pQuery->pRoot) {
|
if (NULL == pRequest->pQuery->pRoot) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
|
|
|
@ -662,8 +662,7 @@ typedef struct {
|
||||||
} SSkmInfo;
|
} SSkmInfo;
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pBlockLoadInfo, STSchema *pSchema,
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pBlockLoadInfo, bool destroyLoadInfo, const char *idStr);
|
||||||
int16_t *pCols, int32_t numOfCols, const char *idStr);
|
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
|
|
|
@ -420,6 +420,7 @@ typedef enum {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SFSLASTNEXTROWSTATES state; // [input]
|
SFSLASTNEXTROWSTATES state; // [input]
|
||||||
STsdb *pTsdb; // [input]
|
STsdb *pTsdb; // [input]
|
||||||
|
STSchema *pTSchema;// [input]
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
int32_t nFileSet;
|
int32_t nFileSet;
|
||||||
|
@ -455,9 +456,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
struct SSttBlockLoadInfo* pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0);
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL);
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL);
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
|
@ -892,6 +894,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
||||||
pIter->fsLastState.pTsdb = pTsdb;
|
pIter->fsLastState.pTsdb = pTsdb;
|
||||||
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
||||||
|
pIter->fsLastState.pTSchema = pTSchema;
|
||||||
pIter->fsLastState.suid = suid;
|
pIter->fsLastState.suid = suid;
|
||||||
pIter->fsLastState.uid = uid;
|
pIter->fsLastState.uid = uid;
|
||||||
|
|
||||||
|
|
|
@ -474,8 +474,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema,
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, bool destroyLoadInfo, const char* idStr) {
|
||||||
int16_t* pCols, int32_t numOfCols, const char* idStr) {
|
|
||||||
pMTree->backward = backward;
|
pMTree->backward = backward;
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -488,22 +487,14 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SSttBlockLoadInfo* pLoadInfo = NULL;
|
pMTree->pLoadInfo = pBlockLoadInfo;
|
||||||
if (pBlockLoadInfo == NULL) {
|
pMTree->destroyLoadInfo = destroyLoadInfo;
|
||||||
ASSERT(0);
|
ASSERT(pMTree->pLoadInfo != NULL);
|
||||||
if (pMTree->pLoadInfo == NULL) {
|
|
||||||
pMTree->destroyLoadInfo = true;
|
|
||||||
pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols);
|
|
||||||
}
|
|
||||||
|
|
||||||
pLoadInfo = pMTree->pLoadInfo;
|
|
||||||
} else {
|
|
||||||
pLoadInfo = pBlockLoadInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
struct SLDataIter* pIter = NULL;
|
struct SLDataIter* pIter = NULL;
|
||||||
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]);
|
code =
|
||||||
|
tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i]);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2000,8 +2000,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
||||||
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo,
|
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, pReader->idStr);
|
||||||
pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -487,6 +487,9 @@ int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* tas
|
||||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
|
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
|
||||||
void* param) {
|
void* param) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
|
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
|
||||||
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
|
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
|
||||||
int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash);
|
int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash);
|
||||||
|
@ -634,8 +637,9 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
|
|
||||||
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId,
|
double el = (taosGetTimestampUs() - st)/1000.0;
|
||||||
taskNum, pReq->forceUpdate);
|
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2fms", pJob->queryId, pJob->refId,
|
||||||
|
taskNum, pReq->forceUpdate, el);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
Loading…
Reference in New Issue