Merge pull request #28756 from taosdata/fix/main/tsbs_perf_optimization
fix tsbs perf issue
This commit is contained in:
commit
e8d663809d
|
@ -102,6 +102,7 @@ typedef struct SCatalogReq {
|
||||||
bool svrVerRequired;
|
bool svrVerRequired;
|
||||||
bool forceUpdate;
|
bool forceUpdate;
|
||||||
bool cloned;
|
bool cloned;
|
||||||
|
bool forceFetchViewMeta;
|
||||||
} SCatalogReq;
|
} SCatalogReq;
|
||||||
|
|
||||||
typedef struct SMetaRes {
|
typedef struct SMetaRes {
|
||||||
|
|
|
@ -186,11 +186,25 @@ static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) {
|
static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) {
|
||||||
return tBufferGet(reader, sizeof(*value), value);
|
if (reader->offset + sizeof(int32_t) > reader->buffer->size) {
|
||||||
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
if (value) {
|
||||||
|
*value = *(int32_t*)BR_PTR(reader);
|
||||||
|
}
|
||||||
|
reader->offset += sizeof(int32_t);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) {
|
static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) {
|
||||||
return tBufferGet(reader, sizeof(*value), value);
|
if (reader->offset + sizeof(int64_t) > reader->buffer->size) {
|
||||||
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
if (value) {
|
||||||
|
*value = *(int64_t*)BR_PTR(reader);
|
||||||
|
}
|
||||||
|
reader->offset += sizeof(int64_t);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); }
|
static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); }
|
||||||
|
|
|
@ -972,7 +972,7 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) {
|
if ((writer->brinBlock->numOfRecords) >= 256) {
|
||||||
TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
|
TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -836,6 +836,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
||||||
pList = &pReader->status.uidList;
|
pList = &pReader->status.uidList;
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
int32_t j = 0;
|
||||||
while (i < TARRAY2_SIZE(pBlkArray)) {
|
while (i < TARRAY2_SIZE(pBlkArray)) {
|
||||||
pBrinBlk = &pBlkArray->data[i];
|
pBrinBlk = &pBlkArray->data[i];
|
||||||
if (pBrinBlk->maxTbid.suid < pReader->info.suid) {
|
if (pBrinBlk->maxTbid.suid < pReader->info.suid) {
|
||||||
|
@ -851,7 +852,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
||||||
(pBrinBlk->minTbid.suid <= pReader->info.suid) && (pBrinBlk->maxTbid.suid >= pReader->info.suid), code, lino,
|
(pBrinBlk->minTbid.suid <= pReader->info.suid) && (pBrinBlk->maxTbid.suid >= pReader->info.suid), code, lino,
|
||||||
_end, TSDB_CODE_INTERNAL_ERROR);
|
_end, TSDB_CODE_INTERNAL_ERROR);
|
||||||
|
|
||||||
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
|
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[j]) {
|
||||||
i += 1;
|
i += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -864,6 +865,14 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
||||||
TSDB_CHECK_NULL(p1, code, lino, _end, terrno);
|
TSDB_CHECK_NULL(p1, code, lino, _end, terrno);
|
||||||
|
|
||||||
i += 1;
|
i += 1;
|
||||||
|
if (pBrinBlk->maxTbid.suid == pReader->info.suid) {
|
||||||
|
while (j < numOfTables && pList->tableUidList[j] < pBrinBlk->maxTbid.uid) {
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
if (j >= numOfTables) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
et2 = taosGetTimestampUs();
|
et2 = taosGetTimestampUs();
|
||||||
|
|
|
@ -271,6 +271,7 @@ typedef struct SCtgViewsCtx {
|
||||||
SArray* pNames;
|
SArray* pNames;
|
||||||
SArray* pResList;
|
SArray* pResList;
|
||||||
SArray* pFetchs;
|
SArray* pFetchs;
|
||||||
|
bool forceFetch;
|
||||||
} SCtgViewsCtx;
|
} SCtgViewsCtx;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -20,6 +20,11 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
|
typedef struct SCtgViewTaskParam {
|
||||||
|
bool forceFetch;
|
||||||
|
SArray* pTableReqs;
|
||||||
|
} SCtgViewTaskParam;
|
||||||
|
|
||||||
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
|
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
|
||||||
SCtgTask* pTask = NULL;
|
SCtgTask* pTask = NULL;
|
||||||
|
|
||||||
|
@ -500,7 +505,7 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
|
|
||||||
int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
SCtgTask task = {0};
|
SCtgTask task = {0};
|
||||||
|
SCtgViewTaskParam* p = param;
|
||||||
task.type = CTG_TASK_GET_VIEW;
|
task.type = CTG_TASK_GET_VIEW;
|
||||||
task.taskId = taskIdx;
|
task.taskId = taskIdx;
|
||||||
task.pJob = pJob;
|
task.pJob = pJob;
|
||||||
|
@ -511,7 +516,8 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCtgViewsCtx* ctx = task.taskCtx;
|
SCtgViewsCtx* ctx = task.taskCtx;
|
||||||
ctx->pNames = param;
|
ctx->pNames = p->pTableReqs;
|
||||||
|
ctx->forceFetch = p->forceFetch;
|
||||||
ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes));
|
ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes));
|
||||||
if (NULL == ctx->pResList) {
|
if (NULL == ctx->pResList) {
|
||||||
qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum,
|
qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum,
|
||||||
|
@ -849,13 +855,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
||||||
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
|
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
|
||||||
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
|
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
|
||||||
int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs);
|
int32_t tbTsmaNum = tsQuerySmaOptimize ? (int32_t)taosArrayGetSize(pReq->pTableTSMAs) : 0;
|
||||||
int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs);
|
int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs);
|
||||||
int32_t tbNameNum = (int32_t)ctgGetTablesReqNum(pReq->pTableName);
|
int32_t tbNameNum = (int32_t)ctgGetTablesReqNum(pReq->pTableName);
|
||||||
|
|
||||||
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
|
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
|
||||||
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum + tbNameNum;
|
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum + tbNameNum;
|
||||||
|
|
||||||
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
||||||
if (NULL == *job) {
|
if (NULL == *job) {
|
||||||
ctgError("failed to calloc, size:%d,QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
|
ctgError("failed to calloc, size:%d,QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
|
||||||
|
@ -1014,7 +1019,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
}
|
}
|
||||||
|
|
||||||
if (viewNum > 0) {
|
if (viewNum > 0) {
|
||||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, pReq->pView, NULL));
|
SCtgViewTaskParam param = {.forceFetch = pReq->forceFetchViewMeta, .pTableReqs = pReq->pView};
|
||||||
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, ¶m, NULL));
|
||||||
}
|
}
|
||||||
if (tbTsmaNum > 0) {
|
if (tbTsmaNum > 0) {
|
||||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
|
||||||
|
@ -3712,16 +3718,14 @@ int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) {
|
||||||
bool tbMetaDone = false;
|
bool tbMetaDone = false;
|
||||||
SName* pName = NULL;
|
SName* pName = NULL;
|
||||||
|
|
||||||
/*
|
|
||||||
ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
|
ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
|
||||||
if (tbMetaDone) {
|
if (tbMetaDone && !pCtx->forceFetch) {
|
||||||
CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
|
CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
|
||||||
TSWAP(pTask->res, pCtx->pResList);
|
TSWAP(pTask->res, pCtx->pResList);
|
||||||
|
|
||||||
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
||||||
int32_t fetchIdx = 0;
|
int32_t fetchIdx = 0;
|
||||||
|
|
|
@ -3037,37 +3037,36 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||||
|
|
||||||
#if 0
|
if (pCtx->order == TSDB_ORDER_ASC && !pCtx->hasPrimaryKey) {
|
||||||
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||||
|
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
|
||||||
|
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
|
||||||
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
|
numOfElems++;
|
||||||
|
|
||||||
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
|
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else if (!pCtx->hasPrimaryKey && pCtx->order == TSDB_ORDER_DESC) {
|
||||||
// the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
|
// the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
|
||||||
// this assumption is NOT always works if project operator exists in downstream.
|
// this assumption is NOT always works if project operator exists in downstream.
|
||||||
if (blockDataOrder == TSDB_ORDER_ASC) {
|
|
||||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
|
||||||
numOfElems++;
|
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
|
||||||
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else { // descending order
|
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
|
||||||
|
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
#else
|
|
||||||
|
|
||||||
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||||
int from = -1;
|
int from = -1;
|
||||||
int32_t i = -1;
|
int32_t i = -1;
|
||||||
|
@ -3091,7 +3090,7 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
}
|
||||||
|
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -115,6 +115,7 @@ typedef struct SParseMetaCache {
|
||||||
SHashObj* pTableName; // key is tbFUid, elements is STableMeta*(append with tbName)
|
SHashObj* pTableName; // key is tbFUid, elements is STableMeta*(append with tbName)
|
||||||
SArray* pDnodes; // element is SEpSet
|
SArray* pDnodes; // element is SEpSet
|
||||||
bool dnodeRequired;
|
bool dnodeRequired;
|
||||||
|
bool forceFetchViewMeta;
|
||||||
} SParseMetaCache;
|
} SParseMetaCache;
|
||||||
|
|
||||||
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
|
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
|
||||||
|
|
|
@ -810,7 +810,7 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
|
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
|
||||||
}
|
}
|
||||||
|
pCxt->pMetaCache->forceFetchViewMeta = true;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,6 +888,7 @@ static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreat
|
||||||
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
|
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
|
||||||
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName,
|
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName,
|
||||||
pStmt->viewName, AUTH_TYPE_ALTER, pCxt->pMetaCache);
|
pStmt->viewName, AUTH_TYPE_ALTER, pCxt->pMetaCache);
|
||||||
|
pCxt->pMetaCache->forceFetchViewMeta = true;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -817,6 +817,7 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
|
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
|
||||||
|
pCatalogReq->forceFetchViewMeta = pMetaCache->forceFetchViewMeta;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -204,6 +204,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode
|
||||||
// case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
// case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_FILL:
|
||||||
if (pNode == pNodeForcePropagate) {
|
if (pNode == pNodeForcePropagate) {
|
||||||
pNode->outputTsOrder = order;
|
pNode->outputTsOrder = order;
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue