diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index df3f87973f..7c6f02513e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -102,6 +102,7 @@ typedef struct SCatalogReq { bool svrVerRequired; bool forceUpdate; bool cloned; + bool forceFetchViewMeta; } SCatalogReq; typedef struct SMetaRes { diff --git a/include/util/tbuffer.inc b/include/util/tbuffer.inc index 39090fb7fa..633517ca58 100644 --- a/include/util/tbuffer.inc +++ b/include/util/tbuffer.inc @@ -186,11 +186,25 @@ static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value) { } static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) { - return tBufferGet(reader, sizeof(*value), value); + if (reader->offset + sizeof(int32_t) > reader->buffer->size) { + return TSDB_CODE_OUT_OF_RANGE; + } + if (value) { + *value = *(int32_t*)BR_PTR(reader); + } + reader->offset += sizeof(int32_t); + return 0; } static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) { - return tBufferGet(reader, sizeof(*value), value); + if (reader->offset + sizeof(int64_t) > reader->buffer->size) { + return TSDB_CODE_OUT_OF_RANGE; + } + if (value) { + *value = *(int64_t*)BR_PTR(reader); + } + reader->offset += sizeof(int64_t); + return 0; } static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); } diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 720ba68414..f51ffe0c83 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -972,7 +972,7 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR break; } - if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) { + if ((writer->brinBlock->numOfRecords) >= 256) { TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 527486270c..ac8e8505e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -836,6 +836,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead pList = &pReader->status.uidList; int32_t i = 0; + int32_t j = 0; while (i < TARRAY2_SIZE(pBlkArray)) { pBrinBlk = &pBlkArray->data[i]; if (pBrinBlk->maxTbid.suid < pReader->info.suid) { @@ -851,7 +852,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead (pBrinBlk->minTbid.suid <= pReader->info.suid) && (pBrinBlk->maxTbid.suid >= pReader->info.suid), code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) { + if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[j]) { i += 1; continue; } @@ -864,6 +865,14 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead TSDB_CHECK_NULL(p1, code, lino, _end, terrno); i += 1; + if (pBrinBlk->maxTbid.suid == pReader->info.suid) { + while (j < numOfTables && pList->tableUidList[j] < pBrinBlk->maxTbid.uid) { + j++; + } + if (j >= numOfTables) { + break; + } + } } et2 = taosGetTimestampUs(); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index e757163ba8..bfc5ca9142 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -271,6 +271,7 @@ typedef struct SCtgViewsCtx { SArray* pNames; SArray* pResList; SArray* pFetchs; + bool forceFetch; } SCtgViewsCtx; typedef enum { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index c1dcdf2741..9bfb4102aa 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -20,6 +20,11 @@ #include "tref.h" #include "trpc.h" +typedef struct SCtgViewTaskParam { + bool forceFetch; + SArray* pTableReqs; +} SCtgViewTaskParam; + void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) { SCtgTask* pTask = NULL; @@ -500,7 +505,7 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { SCtgTask task = {0}; - + SCtgViewTaskParam* p = param; task.type = CTG_TASK_GET_VIEW; task.taskId = taskIdx; task.pJob = pJob; @@ -511,7 +516,8 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { } SCtgViewsCtx* ctx = task.taskCtx; - ctx->pNames = param; + ctx->pNames = p->pTableReqs; + ctx->forceFetch = p->forceFetch; ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes)); if (NULL == ctx->pResList) { qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum, @@ -849,13 +855,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag); int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView); - int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs); + int32_t tbTsmaNum = tsQuerySmaOptimize ? (int32_t)taosArrayGetSize(pReq->pTableTSMAs) : 0; int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs); int32_t tbNameNum = (int32_t)ctgGetTablesReqNum(pReq->pTableName); int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum + tbNameNum; - *job = taosMemoryCalloc(1, sizeof(SCtgJob)); if (NULL == *job) { ctgError("failed to calloc, size:%d,QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId); @@ -1014,7 +1019,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const } if (viewNum > 0) { - CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, pReq->pView, NULL)); + SCtgViewTaskParam param = {.forceFetch = pReq->forceFetchViewMeta, .pTableReqs = pReq->pView}; + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, ¶m, NULL)); } if (tbTsmaNum > 0) { CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL)); @@ -3712,16 +3718,14 @@ int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) { bool tbMetaDone = false; SName* pName = NULL; - /* - ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone); - if (tbMetaDone) { - CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx)); - TSWAP(pTask->res, pCtx->pResList); + ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone); + if (tbMetaDone && !pCtx->forceFetch) { + CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx)); + TSWAP(pTask->res, pCtx->pResList); - CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); - return TSDB_CODE_SUCCESS; - } - */ + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + return TSDB_CODE_SUCCESS; + } int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t fetchIdx = 0; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c2e2e9c17c..83227dea9e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3037,61 +3037,60 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { TSKEY startKey = getRowPTs(pInput->pPTS, 0); TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); -#if 0 - int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - - // the optimized version only valid if all tuples in one block are monotonious increasing or descreasing. - // this assumption is NOT always works if project operator exists in downstream. - if (blockDataOrder == TSDB_ORDER_ASC) { + if (pCtx->order == TSDB_ORDER_ASC && !pCtx->hasPrimaryKey) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { - char* data = colDataGetData(pInputCol, i); + bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); + char* data = isNull ? NULL : colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveLastrow(pCtx, data, i, cts, pInfo); + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) return code; } break; } - } else { // descending order + } else if (!pCtx->hasPrimaryKey && pCtx->order == TSDB_ORDER_DESC) { + // the optimized version only valid if all tuples in one block are monotonious increasing or descreasing. + // this assumption is NOT always works if project operator exists in downstream. for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - char* data = colDataGetData(pInputCol, i); + bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); + char* data = isNull ? NULL : colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveLastrow(pCtx, data, i, cts, pInfo); + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) return code; } break; } - } -#else + } else { + int64_t* pts = (int64_t*)pInput->pPTS->pData; + int from = -1; + int32_t i = -1; + while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) { + bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); + char* data = isNull ? NULL : colDataGetData(pInputCol, i); + TSKEY cts = pts[i]; - int64_t* pts = (int64_t*)pInput->pPTS->pData; - int from = -1; - int32_t i = -1; - while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) { - bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); - char* data = isNull ? NULL : colDataGetData(pInputCol, i); - TSKEY cts = pts[i]; - - numOfElems++; - char* pkData = NULL; - if (pCtx->hasPrimaryKey) { - pkData = colDataGetData(pkCol, i); - } - if (pResInfo->numOfRes == 0 || pInfo->ts < cts || - (pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) { - int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); - if (code != TSDB_CODE_SUCCESS) { - return code; + numOfElems++; + char* pkData = NULL; + if (pCtx->hasPrimaryKey) { + pkData = colDataGetData(pkCol, i); + } + if (pResInfo->numOfRes == 0 || pInfo->ts < cts || + (pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) { + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + pResInfo->numOfRes = 1; } - pResInfo->numOfRes = 1; } - } -#endif + } SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 857c7604a9..7298b04eb0 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -115,6 +115,7 @@ typedef struct SParseMetaCache { SHashObj* pTableName; // key is tbFUid, elements is STableMeta*(append with tbName) SArray* pDnodes; // element is SEpSet bool dnodeRequired; + bool forceFetchViewMeta; } SParseMetaCache; int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index eecc04658b..b78e10768f 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -810,7 +810,7 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC if (TSDB_CODE_SUCCESS == code) { code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache); } - + pCxt->pMetaCache->forceFetchViewMeta = true; return code; } @@ -888,6 +888,7 @@ static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreat static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) { int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_ALTER, pCxt->pMetaCache); + pCxt->pMetaCache->forceFetchViewMeta = true; return code; } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index e35eea9e72..44e44982a3 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -817,6 +817,7 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog } #endif pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired; + pCatalogReq->forceFetchViewMeta = pMetaCache->forceFetchViewMeta; return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 39024731ed..a1809ff137 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -204,6 +204,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode // case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_SORT: + case QUERY_NODE_LOGIC_PLAN_FILL: if (pNode == pNodeForcePropagate) { pNode->outputTsOrder = order; break;