diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index be82eb64a8..1b30c4ffca 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -422,7 +422,6 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField int32_t bytes = pInfo->pSqlExpr->resBytes; char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; - if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { int32_t realLen = varDataLen(pData); assert(realLen <= bytes - VARSTR_HEADER_SIZE); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 9d80c7ed50..8166f7ee5e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1385,6 +1385,11 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum return numOfTotalColumns; } +static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { + SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnListInsert(pQueryInfo->colList, &tsCol); +} + int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExprItem* pItem) { const char* msg0 = "invalid column name"; const char* msg1 = "tag for normal table query is not allowed"; @@ -1427,6 +1432,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t addProjectQueryCol(pQueryInfo, startPos, &index, pItem); } + + tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); } else { return TSDB_CODE_TSC_INVALID_SQL; } @@ -1499,8 +1506,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col switch (optr) { case TK_COUNT: { - if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) { /* more than one parameter for count() function */ + if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -1551,11 +1558,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } } else { // count(*) is equalled to count(primary_timestamp_key) index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); } - + + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1); @@ -1570,9 +1578,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } // the time stamp may be always needed - if (index.tableIndex > 0 && index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { - SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnListInsert(pQueryInfo->colList, &tsCol); + if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); } return TSDB_CODE_SUCCESS; @@ -1682,10 +1689,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); } } - - SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnListInsert(pQueryInfo->colList, &tsCol); - + + tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); return TSDB_CODE_SUCCESS; } case TK_FIRST: diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 100c8c77fd..706eb0c95c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4564,7 +4564,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // TODO handle the limit offset problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { - // skipBlocks(pRuntimeEnv); if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { pQInfo->tableIndex++; continue; @@ -4799,7 +4798,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - if (!isTopBottomQuery(pQuery) && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. + if (!pRuntimeEnv->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. return; } @@ -5639,6 +5638,20 @@ static int compareTableIdInfo(const void* a, const void* b) { static void freeQInfo(SQInfo *pQInfo); +static void calResultBufSize(SQuery* pQuery) { + const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes + const int32_t RESULT_MSG_MIN_ROWS = 8192; + const float RESULT_THRESHOLD_RATIO = 0.85; + + int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize; + if (numOfRes < RESULT_MSG_MIN_ROWS) { + numOfRes = RESULT_MSG_MIN_ROWS; + } + + pQuery->rec.capacity = numOfRes; + pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO; +} + static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) { int16_t numOfCols = pQueryMsg->numOfCols; @@ -5672,8 +5685,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->tagColList = pTagCols; - - // todo do not allocate ?? + pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQuery->colList == NULL) { goto _cleanup; @@ -5703,9 +5715,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, goto _cleanup; } - // set the output buffer capacity - pQuery->rec.capacity = 4096; - pQuery->rec.threshold = 4000; + calResultBufSize(pQuery); for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { assert(pExprs[col].interBytes >= pExprs[col].bytes); @@ -5967,7 +5977,6 @@ static void freeQInfo(SQInfo *pQInfo) { } tfree(pQuery->sdata); - tfree(pQuery); qDebug("QInfo:%p QInfo is freed", pQInfo); @@ -6503,7 +6512,8 @@ void* qOpenQueryMgmt(int32_t vgId) { } static void queryMgmtKillQueryFn(void* handle) { - qKillQuery(handle); + void** h = (void**) handle; + qKillQuery(*h); } void qQueryMgmtNotifyClosed(void* pQMgmt) { diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index def1fd9383..5cb45a6d26 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -374,7 +374,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand if (chandle == NULL) return -1; - return (int)send(pFdObj->fd, data, (size_t)len, 0); + return taosWriteMsg(pFdObj->fd, data, len); } static void taosReportBrokenLink(SFdObj *pFdObj) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7249e7472c..2195760e88 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -604,7 +604,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, QH_GET_NUM_OF_COLS(pQueryHandle)); -// int32_t ret = tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo); if (ret == TSDB_CODE_SUCCESS) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; @@ -1534,7 +1533,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); @@ -1542,7 +1541,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { colInfo.info = pCol->info; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); taosArrayPush(pSecQueryHandle->pColumns, &colInfo); - pSecQueryHandle->statis[i].colId = colInfo.info.colId; } size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); @@ -1564,7 +1562,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); - + pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); + bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); assert(ret); @@ -1811,22 +1810,26 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta int64_t stime = taosGetTimestampUs(); tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); - // todo opt perf + int16_t* colIds = pHandle->defaultLoadColumn->pData; + size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); + memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis)); for(int32_t i = 0; i < numOfCols; ++i) { - SDataStatis* st = &pHandle->statis[i]; - int32_t colId = st->colId; - - memset(st, 0, sizeof(SDataStatis)); - st->colId = colId; + pHandle->statis[i].colId = colIds[i]; } tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); - - *pBlockStatis = pHandle->statis; - + + // always load the first primary timestamp column data + SDataStatis* pPrimaryColStatis = &pHandle->statis[0]; + assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX); + + pPrimaryColStatis->numOfNull = 0; + pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst; + pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast; + //update the number of NULL data rows - for(int32_t i = 0; i < numOfCols; ++i) { + for(int32_t i = 1; i < numOfCols; ++i) { if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; } @@ -1842,6 +1845,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta int64_t elapsed = taosGetTimestampUs() - stime; pHandle->cost.statisInfoLoadTime += elapsed; + *pBlockStatis = pHandle->statis; return TSDB_CODE_SUCCESS; }