From 130d4cd20c03877e8ee1daca57ab81cd8050a350 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 16 Apr 2020 11:08:39 +0800 Subject: [PATCH 1/3] [td-98] add the group by support for super table query --- src/client/inc/tscUtil.h | 5 +- src/client/inc/tsclient.h | 10 +-- src/client/src/tscAsync.c | 4 +- src/client/src/tscPrepare.c | 4 +- src/client/src/tscSQLParser.c | 27 +++--- src/client/src/tscSecondaryMerge.c | 2 +- src/client/src/tscServer.c | 86 +++++++++--------- src/client/src/tscSql.c | 7 +- src/client/src/tscStream.c | 1 - src/client/src/tscSub.c | 2 +- src/client/src/tscSubquery.c | 134 ++++++++++++++--------------- src/client/src/tscUtil.c | 36 ++++---- src/inc/taosmsg.h | 14 ++- src/mnode/src/mgmtTable.c | 26 ++++-- 14 files changed, 193 insertions(+), 165 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6e8559538d..57cf821eb1 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -200,8 +200,9 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer STableMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index); void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); -STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, SArray* vgroupList, - int16_t numOfTags, int16_t* tags); +STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, + SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags); + STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); void tscFreeSubqueryInfo(SSqlCmd* pCmd); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0d684b72dc..bcd6f54799 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -40,6 +40,8 @@ extern "C" { // forward declaration struct SSqlInfo; +typedef SCMSTableVgroupRspMsg SVgroupsInfo; + typedef struct SSqlGroupbyExpr { int16_t tableIndex; int16_t numOfGroupCols; @@ -70,14 +72,12 @@ typedef struct STableMeta { typedef struct STableMetaInfo { STableMeta * pTableMeta; // table meta, cached in client side and acquried by name -// SSuperTableMeta *pMetricMeta; // metricmeta - SArray* vgroupIdList; - + SVgroupsInfo* vgroupList; /* * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion */ - int32_t vnodeIndex; + int32_t dnodeIndex; char name[TSDB_TABLE_ID_LEN]; // (super) table name int16_t numOfTags; // total required tags in query, including groupby tags int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection @@ -210,7 +210,6 @@ typedef struct STableDataBlocks { } STableDataBlocks; typedef struct SDataBlockList { - int32_t idx; uint32_t nSize; uint32_t nAlloc; STableDataBlocks **pData; @@ -257,7 +256,6 @@ typedef struct { union { bool existsCheck; // check if the table exists or not - bool inStream; // denote if current sql is executed in stream or not bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta int8_t dataSourceType; // load data from file or not }; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 25f5856877..610027151f 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -443,12 +443,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vnodeIndex >= 0 && pSql->param != NULL); + assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->dnodeIndex >= 0 && pSql->param != NULL); SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SSqlObj * pParObj = trs->pParentSqlObj; - assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vnodeIndex && + assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->dnodeIndex && tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0); tscTrace("%p get metricMeta during super table query successfully", pSql); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index caa14d082b..ab5c3f9f72 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) { pCmd->batchSize = 0; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - pTableMetaInfo->vnodeIndex = 0; + pTableMetaInfo->dnodeIndex = 0; return TSDB_CODE_SUCCESS; } @@ -438,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) { } // set the next sent data vnode index in data block arraylist - pTableMetaInfo->vnodeIndex = 1; + pTableMetaInfo->dnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5657b6b47d..c541a35d12 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2470,7 +2470,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* const char* msg8 = "not allowed column type for group by"; const char* msg9 = "tags not allowed for table query"; - // todo : handle two meter situation + // todo : handle two tables situation STableMetaInfo* pTableMetaInfo = NULL; if (pList == NULL) { @@ -2493,7 +2493,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* SSQLToken token = {pVar->nLen, pVar->nType, pVar->pz}; SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&token, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } @@ -2523,13 +2522,13 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* return invalidSqlErrMsg(pQueryInfo->msg, msg9); } - int32_t relIndex = index.columnIndex; - if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { - relIndex -= tscGetNumOfColumns(pTableMeta); - } +// int32_t relIndex = index.columnIndex; +// if (index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { +// relIndex -= tscGetNumOfColumns(pTableMeta); +// } pQueryInfo->groupbyExpr.columnInfo[i] = - (SColIndex){.colIndex = relIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId}; // relIndex; + (SColIndex){.colIndex = index.columnIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId}; // relIndex; addRequiredTagColumn(pQueryInfo, pQueryInfo->groupbyExpr.columnInfo[i].colIndex, index.tableIndex); } else { // check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by @@ -5095,9 +5094,8 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { bytes = TSDB_TABLE_NAME_LEN; name = TSQL_TBNAME_L; } else { - colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? tscGetNumOfColumns(pTableMetaInfo->pTableMeta) + pColIndex->colIndex - : pColIndex->colIndex; - +// colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? tscGetNumOfColumns(pTableMetaInfo->pTableMeta) + pColIndex->colIndex +// : pColIndex->colIndex; type = pSchema[colIndex].type; bytes = pSchema[colIndex].bytes; name = pSchema[colIndex].name; @@ -5108,11 +5106,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index, type, bytes, bytes); - + + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); + strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN); + pExpr->colInfo.flag = TSDB_COL_TAG; // NOTE: tag column does not add to source column list - SColumnList ids = {0}; + SColumnList ids = getColumnList(1, 0, pColIndex->colIndex); insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs-1, &ids, bytes, type, name, pExpr); } else { // if this query is "group by" normal column, interval is not allowed @@ -5693,7 +5694,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr); if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - int32_t code = tscGetSTableVgroupInfo(pSql, index); + code = tscGetSTableVgroupInfo(pSql, index); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index d1ec5088d3..65259205ab 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -636,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); - size_t numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + size_t numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; for (int32_t i = 0; i < numOfSubs; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ebac676b77..d379b54439 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -413,7 +413,7 @@ int tscProcessSql(SSqlObj *pSql) { type = pQueryInfo->type; - // for hearbeat, numOfTables == 0; + // for heartbeat, numOfTables == 0; assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); } @@ -424,19 +424,6 @@ int tscProcessSql(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_OTHERS; return pSql->res.code; } - - // temp -// pSql->ipList = tscMgmtIpList; -// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { -// pSql->index = pTableMetaInfo->pTableMeta->index; -// } else { // it must be the parent SSqlObj for super table query -// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { -// int32_t idx = pTableMetaInfo->vnodeIndex; -// -// SVnodeSidList *pSidList = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); -// pSql->index = pSidList->index; -// } -// } } else if (pSql->cmd.command < TSDB_SQL_LOCAL) { pSql->ipList = tscMgmtIpList; } else { // local handler @@ -522,8 +509,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pRetrieveMsg->free = htons(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type); - STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; - pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + // todo valid the vgroupId at the client side + if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) { + SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList; + assert(pVgroupInfo->dnodeVgroups->numOfVgroups == 1); // todo fix me + + pRetrieveMsg->header.vgId = htonl(pVgroupInfo->dnodeVgroups[0].vgId[0]); + } else { + STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; + pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + } + pMsg += sizeof(SRetrieveTableMsg); pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); @@ -584,7 +580,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { #if 0 SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids; int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg); @@ -655,21 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->head.vgId = htonl(pTableMeta->vgId); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query super table - if (pTableMetaInfo->vnodeIndex < 0) { - tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex); + + if (pTableMetaInfo->dnodeIndex < 0) { + tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->dnodeIndex); return -1; } - pSql->ipList.numOfIps = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + pSql->ipList.numOfIps = 1; // todo fix me pSql->ipList.port = tsDnodeShellPort; pSql->ipList.inUse = 0; - - for(int32_t i = 0; i < pSql->ipList.numOfIps; ++i) { - pSql->ipList.ip[i] = *(uint32_t*) taosArrayGet(pTableMetaInfo->vgroupIdList, i); - } + + // todo extract method + STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[pTableMetaInfo->dnodeIndex]; + pSql->ipList.ip[0] = pVgroupInfo->ipAddr.ip; #if 0 - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; numOfTables = pVnodeSidList->numOfSids; @@ -679,9 +676,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } #endif - uint32_t vnodeId = 1; - tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); - pQueryMsg->head.vgId = htonl(vnodeId); + tscTrace("%p query on super table, numOfVgroup:%d, dnodeIndex:%d", pSql, pVgroupInfo->numOfVgroups, + pTableMetaInfo->dnodeIndex); + + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId[0]); numOfTables = 1; } @@ -859,7 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t numOfBlocks = 0; if (pQueryInfo->tsBuf != NULL) { - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vnodeIndex); + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->dnodeIndex); assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent // todo refactor @@ -1851,7 +1849,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { } for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip); pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId); } @@ -2116,21 +2113,30 @@ _error_clean: free(sizes); free(metricMetaList); #endif + SSqlRes* pRes = &pSql->res; - SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp; + SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp; pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes); - SSqlObj* pparent = pSql->param; - assert(pparent != NULL); + // master sqlObj locates in param + SSqlObj* parent = pSql->param; + assert(parent != NULL); - SSqlCmd* pCmd = &pparent->cmd; + SSqlCmd* pCmd = &parent->cmd; STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t)); - // todo opt performance - for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) { - int32_t ip = htonl(pStableVgroup->dnodeIps[i]); - taosArrayPush(pInfo->vgroupIdList, &ip); + pInfo->vgroupList = malloc(pRes->rspLen); + memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen); + + for(int32_t i = 0; i < pInfo->vgroupList->numOfDnodes; ++i) { + STableDnodeVgroupInfo* pVgroups = &pInfo->vgroupList->dnodeVgroups[i]; + pVgroups->numOfVgroups = htonl(pVgroups->numOfVgroups); + pVgroups->ipAddr.ip = htonl(pVgroups->ipAddr.ip); + pVgroups->ipAddr.port = htons(pVgroups->ipAddr.port); + + for(int32_t j = 0; j < pVgroups->numOfVgroups; ++j) { + pVgroups->vgId[j] = htonl(pVgroups->vgId[j]); + } } return pSql->res.code; @@ -2492,7 +2498,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { // bool required = false; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); - if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) { + if (pQueryInfo->pTableMetaInfo[0]->vgroupList != NULL) { return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 41cfb77c5d..641d0b449c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -852,10 +852,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { } } else { // if no free resource msg is sent to vnode, we free this object immediately. - bool free = tscShouldFreeAsyncSqlObj(pSql); - if (free) { - assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); - + STscObj* pTscObj = pSql->pTscObj; + + if (pTscObj->pSql != pSql) { tscFreeSqlObj(pSql); tscTrace("%p sql result is freed by app", pSql); } else { diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f586db3d08..aabaa9330a 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -517,7 +517,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } - pSql->cmd.inStream = 1; // 1 means sql in stream, allowed the sliding clause. pRes->code = tscToSQLCmd(pSql, &SQLInfo); SQLInfoDestroy(&SQLInfo); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index b7d01941d6..2f7f5ebb1c 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -382,7 +382,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->cmd.command = TSDB_SQL_SELECT; pQueryInfo->type = type; - tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vnodeIndex = 0; + tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->dnodeIndex = 0; } tscDoQuery(pSql); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0589a6359b..9f975a4cbe 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -341,8 +341,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscPrintSelectClause(pNew, 0); - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, 0, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + tscTrace("%p subquery:%p tableIndex:%d, dnodeIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, 0, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); } @@ -457,7 +457,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vnodeIndex); + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->dnodeIndex); tsBufDestory(pBuf); } @@ -478,9 +478,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { // for projection query, need to try next vnode // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; int32_t totalVnode = 0; - if ((++pTableMetaInfo->vnodeIndex) < totalVnode) { + if ((++pTableMetaInfo->dnodeIndex) < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); + pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotal); pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; @@ -542,7 +542,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for projection query, need to try next vnode if current vnode is exhausted -// if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { +// if ((++pTableMetaInfo->dnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { // pSupporter->pState->numOfCompleted = 0; // pSupporter->pState->numOfTotal = 1; // @@ -609,7 +609,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { // STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && +// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && // (!tscHasReachLimitation(pQueryInfo, pRes))) { // numOfFetch++; // } @@ -647,8 +647,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (pRes1->row >= pRes1->numOfRows) { - tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, - pSupporter->subqueryIndex, pTableMetaInfo->vnodeIndex); + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, dnodeIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pTableMetaInfo->dnodeIndex); tscResetForNextRetrieve(pRes1); pSql1->fp = joinRetrieveCallback; @@ -785,11 +785,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); /** - * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of + * if the query is a continue query (dnodeIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ - if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { -// assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); + if (pTableMetaInfo->dnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { +// assert(pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value pSql->fp = joinRetrieveCallback; // continue retrieve data @@ -897,14 +897,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, + pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); @@ -1005,7 +1005,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSql->numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList); + pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; assert(pSql->numOfSubs > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); @@ -1111,6 +1111,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { } static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); +static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows); static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { // set no disk space error info @@ -1132,10 +1133,10 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES pthread_mutex_unlock(&trsupport->queryMutex); - tscRetrieveFromDnodeCallBack(trsupport, tres, trsupport->pState->code); + tscHandleSubqueryError(trsupport, tres, trsupport->pState->code); } -static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { +void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { SSqlObj *pPObj = trsupport->pParentSqlObj; int32_t subqueryIndex = trsupport->subqueryIndex; @@ -1144,9 +1145,9 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && pPObj->numOfSubs == pState->numOfTotal); - /* retrieved in subquery failed. OR query cancelled in retrieve phase. */ + // retrieved in subquery failed. OR query cancelled in retrieve phase. if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { - pState->code = -(int)pPObj->res.code; + pState->code = pPObj->res.code; /* * kill current sub-query connection, which may retrieve data from vnodes; @@ -1179,7 +1180,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry", + tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry", trsupport->pParentSqlObj, pSql); pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -1235,10 +1236,13 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SSubqueryState* pState = trsupport->pState; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; + // data in from current vnode is stored in cache and disk -// uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; -// tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, -// pSvd->vnode, numOfRowsFromSubquery, idx); + uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; + tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, + pTableMetaInfo->vgroupList->dnodeVgroups[0].ipAddr.ip, pTableMetaInfo->vgroupList->dnodeVgroups[0].vgId[0], + numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); @@ -1326,7 +1330,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR pthread_mutex_lock(&trsupport->queryMutex); if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { - return tscHandleSubRetrievalError(trsupport, pSql, numOfRows); + return tscHandleSubqueryError(trsupport, pSql, numOfRows); } SSqlRes * pRes = &pSql->res; @@ -1352,7 +1356,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR return; } - #ifdef _DEBUG_VIEW printf("received data from vnode: %d rows\n", pRes->numOfRows); SSrcColumnInfo colInfo[256] = {0}; @@ -1360,6 +1363,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); #endif + if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, tsAvailTmpDirGB, tsMinimalTmpDirGB); @@ -1371,8 +1375,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); if (ret < 0) { // set no disk space error info, and abort retry tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); + } else if (pRes->completed) { tscAllDataRetrievedFromDnode(trsupport, pSql); + return; + } else { // continue fetch data from dnode pthread_mutex_unlock(&trsupport->queryMutex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); @@ -1380,6 +1387,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR } else { // all data has been retrieved to client tscAllDataRetrievedFromDnode(trsupport, pSql); } + pthread_mutex_unlock(&trsupport->queryMutex); } @@ -1393,9 +1401,9 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); - // launch subquery for each vnode, so the subquery index equals to the vnodeIndex. + // launch subquery for each vnode, so the subquery index equals to the dnodeIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); - pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex; + pTableMetaInfo->dnodeIndex = trsupport->subqueryIndex; pSql->pSubs[trsupport->subqueryIndex] = pNew; } @@ -1404,37 +1412,34 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu } void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { - SRetrieveSupport *trsupport = (SRetrieveSupport *)param; + SRetrieveSupport *trsupport = (SRetrieveSupport *) param; SSqlObj* pParentSql = trsupport->pParentSqlObj; - SSqlObj* pSql = (SSqlObj *)tres; + SSqlObj* pSql = (SSqlObj *) tres; -// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); -// int32_t idx = pTableMetaInfo->vnodeIndex; - - SVnodeSidList *vnodeInfo = NULL; - SVnodeDesc * pSvd = NULL; -// if (pTableMetaInfo->pMetricMeta != NULL) { -// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); -// pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; -// } + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[0]; SSubqueryState* pState = trsupport->pState; assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && pParentSql->numOfSubs == pState->numOfTotal); if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { - // metric query is killed, Note: code must be less than 0 + + // stable query is killed, abort further retry trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - code = -(int)(pParentSql->res.code); + code = pParentSql->res.code; } else { code = pState->code; } - tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql, - trsupport->subqueryIndex, code); + + tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql, + trsupport->subqueryIndex, tstrerror(code)); } /* @@ -1442,51 +1447,40 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack * function to abort current and remain retrieve process. * - * NOTE: threadsafe is required. + * NOTE: thread safe is required. */ if (code != TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { - tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code); + tscTrace("%p sub:%p reach the max retry times, set global code:%d", pParentSql, pSql, code); atomic_val_compare_exchange_32(&pState->code, 0, code); - } else { // does not reach the maximum retry count, go on + } else { // does not reach the maximum retry time, go on tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); if (pNew == NULL) { - tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex); + tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", + trsupport->pParentSqlObj, pSql, pVgroupInfo->vgId[0], trsupport->subqueryIndex); - pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; + pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; } else { -// SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); -// assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL); + tscProcessSql(pNew); return; } } } - if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort - if (vnodeInfo != NULL) { - tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, - trsupport->subqueryIndex, pState->code); - } else { - tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, - trsupport->subqueryIndex, pState->code); - } - - tscRetrieveFromDnodeCallBack(param, tres, pState->code); + if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query + tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, + pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex, pState->code); + + tscHandleSubqueryError(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode - if (vnodeInfo != NULL) { - tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, - trsupport->subqueryIndex); - } else { - tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - trsupport->subqueryIndex); - } + tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, + pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7ad7e65c51..e63d8ccf14 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1178,8 +1178,9 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi pExprInfo->pExprs[index] = pExpr; pExpr->functionId = functionId; + int16_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - + // set the correct column index if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) { pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX; @@ -1190,7 +1191,6 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi // tag columns require the column index revised. if (pColIndex->columnIndex >= numOfCols) { - pColIndex->columnIndex -= numOfCols; pExpr->colInfo.flag = TSDB_COL_TAG; } else { if (pColIndex->columnIndex != TSDB_TBNAME_COLUMN_INDEX) { @@ -1916,7 +1916,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { } STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, - SArray* vgroupList, int16_t numOfTags, int16_t* tags) { + SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags) { void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); if (pAlloc == NULL) { return NULL; @@ -1937,7 +1937,12 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST pTableMetaInfo->numOfTags = numOfTags; if (vgroupList != NULL) { - pTableMetaInfo->vgroupIdList = taosArrayClone(vgroupList); + assert(vgroupList->numOfDnodes == 1); // todo fix me + size_t size = sizeof(SVgroupsInfo) + (sizeof(STableDnodeVgroupInfo) + + vgroupList->dnodeVgroups[0].numOfVgroups * sizeof(int32_t)) * vgroupList->numOfDnodes; + + pTableMetaInfo->vgroupList = malloc(size); + memcpy(pTableMetaInfo->vgroupList, vgroupList, size); } if (tags != NULL) { @@ -1952,7 +1957,7 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) { return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, 0, NULL); } -void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { +void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { if (index < 0 || index >= pQueryInfo->numOfTables) { return; } @@ -1975,7 +1980,7 @@ void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool int32_t index = pQueryInfo->numOfTables; while (index >= 0) { - doRemoveMeterMetaInfo(pQueryInfo, --index, removeFromCache); + doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache); } tfree(pQueryInfo->pTableMetaInfo); @@ -1987,6 +1992,7 @@ void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) } taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); + tfree(pTableMetaInfo->vgroupList); // taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache); } @@ -2014,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); + tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); free(pNew); return NULL; @@ -2058,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { - tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); + tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); tscFreeSqlObj(pNew); return NULL; } @@ -2128,7 +2134,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void // pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key); // } - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupIdList, pTableMetaInfo->numOfTags, + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags, pTableMetaInfo->tagColumnIndex); } else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object. // STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); @@ -2149,13 +2155,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscTrace( "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, + pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); tscPrintSelectClause(pNew, 0); } else { - tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vnodeIndex); + tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->dnodeIndex); } return pNew; @@ -2252,7 +2258,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; // return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && -// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1); +// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->dnodeIndex < totalVnode - 1); } void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { @@ -2271,9 +2277,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { int32_t totalVnode = 0; // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - while (++pTableMetaInfo->vnodeIndex < totalVnode) { + while (++pTableMetaInfo->dnodeIndex < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); + pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); /* * update the limit and offset value for the query on the next vnode, @@ -2292,7 +2298,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, - pTableMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + pTableMetaInfo->dnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); /* * For project query with super table join, the numOfSub is equalled to the number of all subqueries. diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c580bdecdc..99a1d0f0ef 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -188,6 +188,11 @@ extern char *taosMsg[]; #pragma pack(push, 1) +typedef struct { + uint32_t ip; + uint16_t port; +} SIpAddr; + typedef struct { int32_t numOfVnodes; } SMsgDesc; @@ -469,7 +474,6 @@ typedef struct { int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx - uint64_t groupbyTagIds; int64_t limit; int64_t offset; uint16_t queryType; // denote another query process @@ -616,9 +620,15 @@ typedef struct SCMSTableVgroupMsg { char tableId[TSDB_TABLE_ID_LEN]; } SCMSTableVgroupMsg; +typedef struct { + SIpAddr ipAddr; + int32_t numOfVgroups; + int32_t vgId[]; +} STableDnodeVgroupInfo; + typedef struct { int32_t numOfDnodes; - uint32_t dnodeIps[]; + STableDnodeVgroupInfo dnodeVgroups[]; } SCMSTableVgroupRspMsg; typedef struct { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index e6d597f5f5..9f106ed651 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1103,12 +1103,26 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { if (pRsp == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; - } - - pRsp->numOfDnodes = htonl(1); - pRsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); + } + + int32_t numOfVgroups = 1; + int32_t numOfDnodes = 1; + + pRsp->numOfDnodes = htonl(numOfDnodes); + STableDnodeVgroupInfo* pVgroupInfo = pRsp->dnodeVgroups; + pVgroupInfo->ipAddr.ip = htonl(inet_addr(tsPrivateIp)); + + pVgroupInfo->ipAddr.port = htons(0); // todo fix it + pVgroupInfo->numOfVgroups = htonl(numOfVgroups); // todo fix it + int32_t* vgIdList = pVgroupInfo->vgId; + + for(int32_t i = 0; i < numOfVgroups; ++i) { + vgIdList[i] = htonl(2); // todo fix it + } + + assert(numOfDnodes == 1); // this size is valid only when numOfDnodes equals 1 + int32_t msgLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(STableDnodeVgroupInfo) + numOfVgroups * sizeof(int32_t); - int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); SRpcMsg rpcRsp = {0}; rpcRsp.handle = pMsg->thandle; rpcRsp.pCont = pRsp; @@ -1510,7 +1524,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { } else { pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp); } - pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); +// pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; From b6f64b3fbb28e66f25a91cabae4c6e65bdd86fea Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 16 Apr 2020 11:10:11 +0800 Subject: [PATCH 2/3] [td-98] add qsort and move to algo file --- src/query/inc/qextbuffer.h | 30 ++- src/query/inc/queryExecutor.h | 4 +- src/query/src/qast.c | 2 +- src/query/src/qextbuffer.c | 10 +- src/query/src/queryExecutor.c | 230 ++++++++++---------- src/util/inc/talgo.h | 30 +++ src/util/inc/tskiplist.h | 2 +- src/util/src/hash.c | 2 - src/util/src/talgo.c | 213 ++++++++++++++++++ src/util/src/tskiplist.c | 2 +- src/util/src/tutil.c | 69 ------ src/util/tests/skiplistTest.cpp | 12 +- src/vnode/tsdb/inc/tsdb.h | 13 +- src/vnode/tsdb/inc/tsdbMain.h | 1 + src/vnode/tsdb/src/tsdbMain.c | 2 +- src/vnode/tsdb/src/tsdbMeta.c | 32 ++- src/vnode/tsdb/src/tsdbRead.c | 368 ++++++++++++++++++-------------- tests/examples/c/demo.c | 22 +- 18 files changed, 643 insertions(+), 401 deletions(-) create mode 100644 src/util/inc/talgo.h create mode 100644 src/util/src/talgo.c diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qextbuffer.h index 598b809d92..f34deda6c1 100644 --- a/src/query/inc/qextbuffer.h +++ b/src/query/inc/qextbuffer.h @@ -19,9 +19,14 @@ extern "C" { #endif + #include "os.h" #include "taosmsg.h" + +#include "tarray.h" #include "tutil.h" +#include "dataformat.h" +#include "talgo.h" #define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo #define MIN_BUFFER_SIZE (1 << 19) @@ -55,12 +60,12 @@ typedef struct tFlushoutData { tFlushoutInfo *pFlushoutInfo; } tFlushoutData; -typedef struct SFileInfo { +typedef struct SExtFileInfo { uint32_t nFileSize; // in pages uint32_t pageSize; uint32_t numOfElemsInFile; tFlushoutData flushoutData; -} SFileInfo; +} SExtFileInfo; typedef struct tFilePage { uint64_t numOfElems; @@ -109,26 +114,17 @@ typedef struct tExtMemBuffer { char * path; FILE * file; - SFileInfo fileMeta; + SExtFileInfo fileMeta; SColumnModel * pColumnModel; EXT_BUFFER_FLUSH_MODEL flushModel; } tExtMemBuffer; -typedef struct tTagSchema { - struct SSchema *pSchema; - int32_t numOfCols; - int32_t colOffset[]; -} tTagSchema; - -typedef struct tSidSet { - int32_t numOfSids; - int32_t numOfSubSet; - STableIdInfo **pTableIdList; - int32_t * starterPos; // position of each subgroup, generated according to - SColumnModel *pColumnModel; - SColumnOrderInfo orderIdx; -} tSidSet; +//typedef struct tTagSchema { +// struct SSchema *pSchema; +// int32_t numOfCols; +// int32_t colOffset[]; +//} tTagSchema; /** * diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 0302732995..fb8a908910 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -39,7 +39,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int typedef struct SSqlGroupbyExpr { int16_t tableIndex; int16_t numOfGroupCols; - SColIndex columnInfo[TSDB_MAX_TAGS]; // group by columns information + SColIndex* columnInfo; // group by columns information int16_t orderIndex; // order by column index int16_t orderType; // order by type: asc/desc } SSqlGroupbyExpr; @@ -171,7 +171,7 @@ typedef struct SQInfo { int32_t pointsInterpo; int32_t code; // error code to returned to client sem_t dataReady; - SArray* pTableIdList; // table id list + SArray* pTableList; // table id list void* tsdb; SQueryRuntimeEnv runtimeEnv; diff --git a/src/query/src/qast.c b/src/query/src/qast.c index ffc47d2abe..809a5202f2 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -869,7 +869,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) { SQueryCond cond = {0}; - /*int32_t ret = */setQueryCond(pQueryInfo, &cond); + /*int32_t ret = */ setQueryCond(pQueryInfo, &cond); tQueryOnSkipList(pSkipList, &cond, pQueryInfo->q.nType, result); } else { /* Brutal force scan the whole skip list to find the appropriate result, diff --git a/src/query/src/qextbuffer.c b/src/query/src/qextbuffer.c index 0195ae8b26..754970993b 100644 --- a/src/query/src/qextbuffer.c +++ b/src/query/src/qextbuffer.c @@ -43,7 +43,7 @@ tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnMo pMemBuffer->path = strdup(name); pTrace("create tmp file:%s", pMemBuffer->path); - SFileInfo *pFMeta = &pMemBuffer->fileMeta; + SExtFileInfo *pFMeta = &pMemBuffer->fileMeta; pFMeta->pageSize = DEFAULT_PAGE_SIZE; @@ -63,7 +63,7 @@ void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) { } // release flush out info link - SFileInfo *pFileMeta = &pMemBuffer->fileMeta; + SExtFileInfo *pFileMeta = &pMemBuffer->fileMeta; if (pFileMeta->flushoutData.nAllocSize != 0 && pFileMeta->flushoutData.pFlushoutInfo != NULL) { tfree(pFileMeta->flushoutData.pFlushoutInfo); } @@ -97,7 +97,7 @@ void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) { /* * alloc more memory for flush out info entries. */ -static bool allocFlushoutInfoEntries(SFileInfo *pFileMeta) { +static bool allocFlushoutInfoEntries(SExtFileInfo *pFileMeta) { pFileMeta->flushoutData.nAllocSize = pFileMeta->flushoutData.nAllocSize << 1; tFlushoutInfo *tmp = (tFlushoutInfo *)realloc(pFileMeta->flushoutData.pFlushoutInfo, @@ -208,7 +208,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow } static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) { - SFileInfo *pFileMeta = &pMemBuffer->fileMeta; + SExtFileInfo *pFileMeta = &pMemBuffer->fileMeta; if (pMemBuffer->flushModel == MULTIPLE_APPEND_MODEL) { if (pFileMeta->flushoutData.nLength == pFileMeta->flushoutData.nAllocSize && !allocFlushoutInfoEntries(pFileMeta)) { @@ -238,7 +238,7 @@ static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) { } static void tExtMemBufferClearFlushoutInfo(tExtMemBuffer *pMemBuffer) { - SFileInfo *pFileMeta = &pMemBuffer->fileMeta; + SExtFileInfo *pFileMeta = &pMemBuffer->fileMeta; pFileMeta->flushoutData.nLength = 0; memset(pFileMeta->flushoutData.pFlushoutInfo, 0, sizeof(tFlushoutInfo) * pFileMeta->flushoutData.nAllocSize); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 9607157bda..a507bf3184 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -92,9 +92,7 @@ enum { TS_JOIN_TAG_NOT_EQUALS = 2, }; -static int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, - int32_t end); - +static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); @@ -2187,7 +2185,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { num = 128; } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table - size_t s = taosArrayGetSize(pQInfo->pTableIdList); + size_t s = taosArrayGetSize(pQInfo->pTableList); num = MAX(s, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset num = 1;//pQInfo->pSidSet->numOfSubSet; @@ -2255,7 +2253,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void // get one queried meter assert(0); - // SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[0]->sid); + // SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[0]->sid); pRuntimeEnv->pTSBuf = param; pRuntimeEnv->cur.vnodeIndex = -1; @@ -2272,7 +2270,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void // return ret; // } - // tSidSetSort(pQInfo->pSidSet); + // createTableGroup(pQInfo->pSidSet); int32_t size = getInitialPageNum(pQInfo); int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); @@ -2303,7 +2301,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void SArray *sa = taosArrayInit(1, POINTER_BYTES); // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { - // SMeterObj *p1 = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid); + // SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid); // taosArrayPush(sa, &p1); // } @@ -2312,7 +2310,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void taosArrayPush(cols, &pQuery->colList[i]); } - pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(NULL, &cond, sa, cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, sa, cols); // metric query do not invoke interpolation, it will be done at the second-stage merge if (!isPointInterpoQuery(pQuery)) { @@ -2333,18 +2331,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void */ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { - // assert(taosHashGetSize(pQInfo->pTableIdList) >= 1); + // assert(taosHashGetSize(pQInfo->pTableList) >= 1); } #if 0 - if (pQInfo == NULL || pQInfo->numOfMeters == 1) { + if (pQInfo == NULL || pQInfo->numOfTables == 1) { atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1); dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries); } else { int32_t num = 0; - for (int32_t i = 0; i < pQInfo->numOfMeters; ++i) { - SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid); + for (int32_t i = 0; i < pQInfo->numOfTables; ++i) { + SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); if (pMeter->numOfQueries > 0) { @@ -2358,9 +2356,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { * in order to reduce log output, for all meters of which numOfQueries count are 0, * we do not output corresponding information */ - num = pQInfo->numOfMeters - num; + num = pQInfo->numOfTables - num; dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo, - pQInfo->numOfMeters, num); + pQInfo->numOfTables, num); } #endif } @@ -2683,9 +2681,9 @@ static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, vo #endif } -void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, void *pMeterSidInfo) { +void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, void *pMeterSidInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; - SColumnModel *pTagSchema = pSidSet->pColumnModel; + SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { @@ -2909,37 +2907,35 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) return leftTimestamp > rightTimestamp ? 1 : -1; } -int32_t mergeResultsToGroup(SQInfo *pQInfo) { -// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; -// SQuery * pQuery = pRuntimeEnv->pQuery; +int32_t mergeIntoGroupResult(SQInfo *pQInfo) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; // int64_t st = taosGetTimestampMs(); -// int32_t ret = TSDB_CODE_SUCCESS; + int32_t ret = TSDB_CODE_SUCCESS; - // while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { - // int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; - // int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; - // - // assert(0); - // // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); - // if (ret < 0) { // not enough disk space to save the data into disk - // return -1; - // } - // - // pQInfo->subgroupIdx += 1; - // - // // this group generates at least one result, return results - // if (ret > 0) { - // break; - // } - // - // assert(pQInfo->numOfGroupResultPages == 0); - // dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); - // } - // - // dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", - // GET_QINFO_ADDR(pQuery), - // pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); +// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { + int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; + int32_t end = taosArrayGetSize(pQInfo->pTableList) - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; + + ret = mergeIntoGroupResultImpl(pQInfo, pQInfo->pTableDataInfo, start, end); + if (ret < 0) { // not enough disk space to save the data into disk + return -1; + } + + pQInfo->subgroupIdx += 1; + + // this group generates at least one result, return results +// if (ret > 0) { +// break; +// } + + assert(pQInfo->numOfGroupResultPages == 0); + dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); +// } + +// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", +// GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -2949,7 +2945,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { pQInfo->numOfGroupResultPages = 0; // current results of group has been sent to client, try next group - if (mergeResultsToGroup(pQInfo) != TSDB_CODE_SUCCESS) { + if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) { return; // failed to save data in the disk } @@ -3019,27 +3015,27 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW return maxOutput; } -UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) { +int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pQInfo->runtimeEnv.pQuery; + SQuery *pQuery = pRuntimeEnv->pQuery; tFilePage ** buffer = (tFilePage **)pQuery->sdata; int32_t * posList = calloc((end - start), sizeof(int32_t)); STableDataInfo **pTableList = malloc(POINTER_BYTES * (end - start)); // todo opt for the case of one table per group - int32_t numOfMeters = 0; + int32_t numOfTables = 0; for (int32_t i = start; i < end; ++i) { int32_t tid = pTableDataInfo[i].pTableQInfo->tid; SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid); if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) { - pTableList[numOfMeters] = &pTableDataInfo[i]; - numOfMeters += 1; + pTableList[numOfTables] = &pTableDataInfo[i]; + numOfTables += 1; } } - if (numOfMeters == 0) { + if (numOfTables == 0) { tfree(posList); tfree(pTableList); @@ -3050,14 +3046,13 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf SCompSupporter cs = {pTableList, posList, pQInfo}; SLoserTreeInfo *pTree = NULL; - tLoserTreeCreate(&pTree, numOfMeters, &cs, tableResultComparFn); + tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); - resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); + int64_t lastTimestamp = -1; - int64_t startt = taosGetTimestampMs(); while (1) { @@ -3078,7 +3073,7 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf cs.position[pos] = -1; // all input sources are exhausted - if (--numOfMeters == 0) { + if (--numOfTables == 0) { break; } } @@ -3086,14 +3081,13 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf if (ts == lastTimestamp) { // merge with the last one doMerge(pRuntimeEnv, ts, pWindowRes, true); } else { // copy data to disk buffer - assert(0); - // if (buffer[0]->numOfElems == pQuery->pointsToRead) { - // if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { - // return -1; - // } + if (buffer[0]->numOfElems == pQuery->rec.capacity) { + if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { + return -1; + } - // resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); - // } + resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); + } doMerge(pRuntimeEnv, ts, pWindowRes, false); buffer[0]->numOfElems += 1; @@ -3106,7 +3100,7 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf cs.position[pos] = -1; // all input sources are exhausted - if (--numOfMeters == 0) { + if (--numOfTables == 0) { break; } } @@ -3117,8 +3111,8 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf if (buffer[0]->numOfElems != 0) { // there are data in buffer if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { - // dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), - // pQInfo->extBufFile); + dError("QInfo:%p failed to flush data into temp file, abort query", pQInfo); + tfree(pTree); tfree(pTableList); tfree(posList); @@ -3264,7 +3258,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { } if (isIntervalQuery(pQuery)) { - size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); for (int32_t i = 0; i < numOfTables; ++i) { STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; @@ -3563,7 +3557,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } if (pRuntimeEnv->pSecQueryHandle != NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, pQInfo->pTableList, cols); } taosArrayDestroy(cols); @@ -3694,8 +3688,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; - pQuery->window.skey = pTableQueryInfo->win.skey; - pQuery->window.ekey = pTableQueryInfo->win.ekey; + pQuery->window = pTableQueryInfo->win; pQuery->lastKey = pTableQueryInfo->lastKey; assert(((pQuery->lastKey >= pQuery->window.skey) && QUERY_IS_ASC_QUERY(pQuery)) || @@ -3802,7 +3795,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK pTableQueryInfo->lastKey = key; } else { pQuery->window.skey = key; - STimeWindow win = {.skey = key, pQuery->window.ekey}; + STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey}; // for too small query range, no data in this interval. if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) || @@ -4200,7 +4193,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) taosArrayPush(cols, &pQuery->colList[i]); } - pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, pQInfo->pTableList, cols); taosArrayDestroy(cols); pQInfo->tsdb = tsdb; @@ -4301,7 +4294,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) return TSDB_CODE_SUCCESS; } -static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, tSidSet *pSidset) { +static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupList *pSidset) { if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { return false; } @@ -4342,7 +4335,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; int64_t st = taosGetTimestampMs(); - size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { @@ -4500,7 +4493,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); #if 0 -// tSidSet *pTableIdList = pSupporter->pSidSet; +// STableGroupList *pTableIdList = pSupporter->pSidSet; int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; @@ -4599,7 +4592,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { resetCtxOutputBuf(pRuntimeEnv); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); - while (pSupporter->meterIdx < pSupporter->numOfMeters) { + while (pSupporter->meterIdx < pSupporter->numOfTables) { int32_t k = pSupporter->meterIdx; if (isQueryKilled(pQInfo)) { @@ -4743,7 +4736,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pQuery->pointsOffset = pQuery->pointsToRead; dTrace( - "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," + "QInfo %p vid:%d, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); @@ -4754,7 +4747,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; // todo make sure the table are added the reference count to gauranteed that all involved tables are valid - int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList); if (pQInfo->pTableDataInfo == NULL) { pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables); @@ -4766,7 +4759,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { int32_t groupId = 0; for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info - STableId *id = taosArrayGet(pQInfo->pTableIdList, i); + STableId *id = taosArrayGet(pQInfo->pTableList, i); STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i]; setTableDataInfo(pInfo, i, groupId); @@ -4777,7 +4770,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); for (int32_t i = 0; i < numOfTables; ++i) { STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; @@ -4816,7 +4809,7 @@ static void doRestoreContext(SQInfo* pQInfo) { static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); if (isIntervalQuery(pQuery)) { for (int32_t i = 0; i < numOfTables; ++i) { @@ -4898,7 +4891,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { // assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); - if (mergeResultsToGroup(pQInfo) == TSDB_CODE_SUCCESS) { + if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); #ifdef _DEBUG_VIEW @@ -4910,7 +4903,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } // handle the limitation of output buffer - dTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); + dTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } /* @@ -5147,13 +5140,13 @@ static void tableQueryImpl(SQInfo* pQInfo) { // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); - assert(taosArrayGetSize(pQInfo->pTableIdList) == 1); + assert(taosArrayGetSize(pQInfo->pTableList) == 1); /* check if query is killed or not */ if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { - STableId* pTableId = taosArrayGet(pQInfo->pTableIdList, 0); + STableId* pTableId = taosArrayGet(pQInfo->pTableList, 0); dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } @@ -5182,7 +5175,7 @@ static void stableQueryImpl(SQInfo* pQInfo) { // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); if (pQuery->rec.rows == 0) { - int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList); dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); } @@ -5276,7 +5269,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p * @return */ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, - char** tagCond) { + char** tagCond, SColIndex** groupbyCols) { pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); @@ -5399,17 +5392,26 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList); if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns -// if (pQueryMsg->numOfGroupCols > 0) { -// pQueryMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryMsg->numOfTagsCols]); -// } else { -// pQueryMsg->groupbyTagIds = 0; -// } + *groupbyCols = malloc(pQueryMsg->numOfGroupCols*sizeof(SColIndex)); + + for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { + (*groupbyCols)[i].colId = *(int16_t*) pMsg; + pMsg += sizeof((*groupbyCols)[i].colId); + + (*groupbyCols)[i].colIndex = *(int16_t*) pMsg; + pMsg += sizeof((*groupbyCols)[i].colIndex); + + (*groupbyCols)[i].flag = *(int16_t*) pMsg; + pMsg += sizeof((*groupbyCols)[i].flag); + + memcpy((*groupbyCols)[i].name, pMsg, tListLen(groupbyCols[i]->name)); + pMsg += tListLen((*groupbyCols)[i].name); + } + pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); pQueryMsg->orderType = htons(pQueryMsg->orderType); pMsg += sizeof(SColIndex) * pQueryMsg->numOfGroupCols; - } else { - pQueryMsg->groupbyTagIds = 0; } pQueryMsg->interpoType = htons(pQueryMsg->interpoType); @@ -5570,26 +5572,23 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct return TSDB_CODE_SUCCESS; } -static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) { +static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex* pColIndex, int32_t *code) { if (pQueryMsg->numOfGroupCols == 0) { return NULL; } // using group by tag columns - SSqlGroupbyExpr *pGroupbyExpr = - (SSqlGroupbyExpr *)malloc(sizeof(SSqlGroupbyExpr) + pQueryMsg->numOfGroupCols * sizeof(SColIndex)); + SSqlGroupbyExpr *pGroupbyExpr = (SSqlGroupbyExpr *)calloc(1, sizeof(SSqlGroupbyExpr)); if (pGroupbyExpr == NULL) { *code = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } - SColIndex *pGroupbyColInfo = (SColIndex *)pQueryMsg->groupbyTagIds; - pGroupbyExpr->numOfGroupCols = pQueryMsg->numOfGroupCols; pGroupbyExpr->orderType = pQueryMsg->orderType; pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx; - memcpy(pGroupbyExpr->columnInfo, pGroupbyColInfo, sizeof(SColIndex) * pGroupbyExpr->numOfGroupCols); + pGroupbyExpr->columnInfo = pColIndex; return pGroupbyExpr; } @@ -5711,7 +5710,7 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) { } static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, - SArray *pTableIdList) { + SArray *pTableList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { goto _clean_pQInfo_memory; @@ -5809,7 +5808,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - pQInfo->pTableIdList = pTableIdList; + pQInfo->pTableList = pTableList; pQuery->pos = -1; @@ -5920,7 +5919,7 @@ static void freeQInfo(SQInfo *pQInfo) { teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); if (pQInfo->pTableDataInfo != NULL) { - // size_t num = taosHashGetSize(pQInfo->pTableIdList); + // size_t num = taosHashGetSize(pQInfo->pTableList); for (int32_t j = 0; j < 0; ++j) { destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); } @@ -5959,7 +5958,7 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); - taosArrayDestroy(pQInfo->pTableIdList); + taosArrayDestroy(pQInfo->pTableList); dTrace("QInfo:%p QInfo is freed", pQInfo); @@ -6032,7 +6031,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) char* tagCond = NULL; SArray *pTableIdList = NULL; SSqlFuncExprMsg** pExprMsg = NULL; - if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) { + SColIndex* pGroupColIndex = NULL; + + if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) { return code; } @@ -6054,32 +6055,35 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) goto _query_over; } - SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, &code); + SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code); if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { goto _query_over; } bool isSTableQuery = false; - SArray* res = NULL; + SArray* pGroupList = NULL; + if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { isSTableQuery = true; STableId* id = taosArrayGet(pTableIdList, 0); - id->uid = -1; + id->uid = -1; //todo fix me - res = taosArrayInit(8, sizeof(STableId)); - - /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, res); - if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode + /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &pGroupList, pGroupColIndex, pQueryMsg->numOfGroupCols); + if (taosArrayGetSize(pGroupList) == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; } } else { assert(taosArrayGetSize(pTableIdList) == 1); - res = pTableIdList; + + STableId* id = taosArrayGet(pTableIdList, 0); + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &pGroupList)) != TSDB_CODE_SUCCESS) { + goto _query_over; + } } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, res); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pGroupList); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; } diff --git a/src/util/inc/talgo.h b/src/util/inc/talgo.h new file mode 100644 index 0000000000..e89795c086 --- /dev/null +++ b/src/util/inc/talgo.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TALGO_H +#define TDENGINE_TALGO_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param); + +void tqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn); + +#ifdef __cplusplus +} +#endif +#endif // TDENGINE_TALGO_H diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index dc67d6dad5..176135cf92 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -159,7 +159,7 @@ void *tSkipListDestroy(SSkipList *pSkipList); * @param level * @param headSize */ -void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); +void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); /** * put the skip list node into the skip list. diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 9cad14e8c7..23f7544ad5 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -461,9 +461,7 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) { pNode->next = NULL; pNode->prev = NULL; - pTrace("key:%s %p remove from hash table", pNode->key, pNode); tfree(pNode); - __unlock(pHashObj->lock); } diff --git a/src/util/src/talgo.c b/src/util/src/talgo.c new file mode 100644 index 0000000000..1004d5ab67 --- /dev/null +++ b/src/util/src/talgo.c @@ -0,0 +1,213 @@ + +#include "os.h" +#include "tutil.h" +#include "talgo.h" + +#define doswap(__left, __right, __size, __buf) do {\ + memcpy((__buf), (__left), (__size));\ + memcpy((__left), (__right),(__size));\ + memcpy((__right), (__buf), (__size));\ +} while (0); + +#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx)) + +static void median(void *src, size_t size, size_t s, size_t e, const void *param, __ext_compar_fn_t comparFn, void* buf) { + int32_t mid = ((e - s) >> 1u) + s; + + if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) == 1) { + doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf); + } + + if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, e), param) == 1) { + doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf); + doswap(elePtrAt(src, size, mid), elePtrAt(src, size, e), size, buf); + } else if (comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) == 1) { + doswap(elePtrAt(src, size, s), elePtrAt(src, size, e), size, buf); + } + + assert(comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) <= 0 && comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) <= 0); + +#ifdef _DEBUG_VIEW + tTagsPrints(src[s], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx); + tTagsPrints(src[mid], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx); + tTagsPrints(src[e], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx); +#endif +} + +static void tInsertSort(void *src, size_t size, int32_t s, int32_t e, const void *param, __ext_compar_fn_t comparFn, + void* buf) { + for (int32_t i = s + 1; i <= e; ++i) { + for (int32_t j = i; j > s; --j) { + if (comparFn(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), param) == -1) { + doswap(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), size, buf); + } else { + break; + } + } + } +} + +void tqsortImpl(void *src, int32_t start, int32_t end, size_t size, const void *param, __ext_compar_fn_t comparFn, + void* buf) { + // short array sort, incur another sort procedure instead of quick sort process + const int32_t THRESHOLD_SIZE = 6; + if (end - start + 1 <= THRESHOLD_SIZE) { + tInsertSort(src, size, start, end, param, comparFn, buf); + return; + } + + median(src, size, start, end, param, comparFn, buf); + + int32_t s = start, e = end; + int32_t endRightS = end, startLeftS = start; + + while (s < e) { + while (e > s) { + int32_t ret = comparFn(elePtrAt(src, size, e), elePtrAt(src, size, s), param); + if (ret < 0) { + break; + } + + //move the data that equals to pivotal value to the right end of the list + if (ret == 0 && e != endRightS) { + doswap(elePtrAt(src, size, e), elePtrAt(src, size, endRightS), size, buf); + endRightS--; + } + + e--; + } + + if (e != s) { + doswap(elePtrAt(src, size, e), elePtrAt(src, size, s), size, buf); + } + + while (s < e) { + int32_t ret = comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param); + if (ret > 0) { + break; + } + + if (ret == 0 && s != startLeftS) { + doswap(elePtrAt(src, size, s), elePtrAt(src, size, startLeftS), size, buf); + startLeftS++; + } + s++; + } + + if (e != s) { + doswap(elePtrAt(src, size, s), elePtrAt(src, size, e), size, buf); + } + } + + int32_t rightPartStart = e + 1; + if (endRightS != end && e < end) { + int32_t left = rightPartStart; + int32_t right = end; + + while (right > endRightS && left <= endRightS) { + doswap(elePtrAt(src, size, left), elePtrAt(src, size, right), size, buf); + + left++; + right--; + } + + rightPartStart += (end - endRightS); + } + + int32_t leftPartEnd = e - 1; + if (startLeftS != end && s > start) { + int32_t left = start; + int32_t right = leftPartEnd; + + while (left < startLeftS && right >= startLeftS) { + doswap(elePtrAt(src, size, left), elePtrAt(src, size, right), size, buf); + + left++; + right--; + } + + leftPartEnd -= (startLeftS - start); + } + + if (leftPartEnd > start) { + tqsortImpl(src, size, start, leftPartEnd, param, comparFn, buf); + } + + if (rightPartStart < end) { + tqsortImpl(src, size, rightPartStart, end, param, comparFn, buf); + } +} + +void tqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn) { + char *buf = calloc(1, size); // prepare the swap buffer + tqsortImpl(src, 0, numOfElem - 1, size, param, comparFn, buf); + tfree(buf); +} + +void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) { + // TODO: need to check the correctness of this function + int l = 0; + int r = nmemb; + int idx = 0; + int comparison; + + if (flags == TD_EQ) { + return bsearch(key, base, nmemb, size, compar); + } else if (flags == TD_GE) { + if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0); + if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL; + + while (l < r) { + idx = (l + r) / 2; + comparison = (*compar)(key, elePtrAt(base, size, idx)); + if (comparison < 0) { + r = idx; + } else if (comparison > 0) { + l = idx + 1; + } else { + return elePtrAt(base, size, idx); + } + } + + if ((*compar)(key, elePtrAt(base, size, idx)) < 0) { + return elePtrAt(base, size, idx); + } else { + if (idx + 1 > nmemb - 1) { + return NULL; + } else { + return elePtrAt(base, size, idx + 1); + } + } + } else if (flags == TD_LE) { + if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1); + if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL; + + while (l < r) { + idx = (l + r) / 2; + comparison = (*compar)(key, elePtrAt(base, size, idx)); + if (comparison < 0) { + r = idx; + } else if (comparison > 0) { + l = idx + 1; + } else { + return elePtrAt(base, size, idx); + } + } + + if ((*compar)(key, elePtrAt(base, size, idx)) > 0) { + return elePtrAt(base, size, idx); + } else { + if (idx == 0) { + return NULL; + } else { + return elePtrAt(base, size, idx - 1); + } + } + + } else { + assert(0); + return NULL; + } + + return NULL; +} diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index beb831ea67..867309c163 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -190,7 +190,7 @@ void *tSkipListDestroy(SSkipList *pSkipList) { return NULL; } -void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { +void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { if (pSkipList == NULL) { return; } diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 5f4c903279..7496ad482b 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -618,72 +618,3 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } - -#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx)) -void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) { - // TODO: need to check the correctness of this function - int l = 0; - int r = nmemb; - int idx = 0; - int comparison; - - if (flags == TD_EQ) { - return bsearch(key, base, nmemb, size, compar); - } else if (flags == TD_GE) { - if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0); - if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL; - - while (l < r) { - idx = (l + r) / 2; - comparison = (*compar)(key, elePtrAt(base, size, idx)); - if (comparison < 0) { - r = idx; - } else if (comparison > 0) { - l = idx + 1; - } else { - return elePtrAt(base, size, idx); - } - } - - if ((*compar)(key, elePtrAt(base, size, idx)) < 0) { - return elePtrAt(base, size, idx); - } else { - if (idx + 1 > nmemb - 1) { - return NULL; - } else { - return elePtrAt(base, size, idx + 1); - } - } - } else if (flags == TD_LE) { - if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1); - if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL; - - while (l < r) { - idx = (l + r) / 2; - comparison = (*compar)(key, elePtrAt(base, size, idx)); - if (comparison < 0) { - r = idx; - } else if (comparison > 0) { - l = idx + 1; - } else { - return elePtrAt(base, size, idx); - } - } - - if ((*compar)(key, elePtrAt(base, size, idx)) > 0) { - return elePtrAt(base, size, idx); - } else { - if (idx == 0) { - return NULL; - } else { - return elePtrAt(base, size, idx - 1); - } - } - - } else { - assert(0); - return NULL; - } - - return NULL; -} diff --git a/src/util/tests/skiplistTest.cpp b/src/util/tests/skiplistTest.cpp index 8bfa3a43c0..3713e71a01 100644 --- a/src/util/tests/skiplistTest.cpp +++ b/src/util/tests/skiplistTest.cpp @@ -28,7 +28,7 @@ void doubleSkipListTest() { int32_t level = 0; int32_t size = 0; - tSkipListRandNodeInfo(pSkipList, &level, &size); + tSkipListNewNodeInfo(pSkipList, &level, &size); auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2); d->level = level; @@ -81,7 +81,7 @@ void randKeyTest() { int32_t level = 0; int32_t s = 0; - tSkipListRandNodeInfo(pSkipList, &level, &s); + tSkipListNewNodeInfo(pSkipList, &level, &s); auto d = (SSkipListNode*)calloc(1, s + sizeof(int32_t) * 2); d->level = level; @@ -112,7 +112,7 @@ void stringKeySkiplistTest() { int32_t level = 0; int32_t headsize = 0; - tSkipListRandNodeInfo(pSkipList, &level, &headsize); + tSkipListNewNodeInfo(pSkipList, &level, &headsize); auto pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); pNode->level = level; @@ -124,7 +124,7 @@ void stringKeySkiplistTest() { tSkipListPut(pSkipList, pNode); - tSkipListRandNodeInfo(pSkipList, &level, &headsize); + tSkipListNewNodeInfo(pSkipList, &level, &headsize); pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); pNode->level = level; @@ -164,7 +164,7 @@ void stringKeySkiplistTest() { int32_t total = 10000; for (int32_t i = 0; i < total; ++i) { int32_t n = sprintf(k, "abc_%d_%d", i, i); - tSkipListRandNodeInfo(pSkipList, &level, &headsize); + tSkipListNewNodeInfo(pSkipList, &level, &headsize); auto pNode = (SSkipListNode*)calloc(1, headsize + 20 + sizeof(double)); pNode->level = level; @@ -222,7 +222,7 @@ void skiplistPerformanceTest() { char* p = total; for (int32_t i = 0; i < size; ++i) { - tSkipListRandNodeInfo(pSkipList, &level, &headsize); + tSkipListNewNodeInfo(pSkipList, &level, &headsize); SSkipListNode* d = (SSkipListNode*)p; p += headsize + sizeof(double) * 2; diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 4f86066755..e385239dba 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -181,8 +181,10 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg); typedef void* tsdb_query_handle_t; // Use void to hide implementation details -// typedef struct { -// } SColumnFilterInfo; +typedef struct STableGroupList { // qualified table object list in group + SArray* pGroupList; + int32_t numOfTables; +} STableGroupList; // query condition to build vnode iterator typedef struct STsdbQueryCond { @@ -233,7 +235,7 @@ typedef void *tsdbpos_t; * @param pTableList table sid list * @return */ -tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); +tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); /** * move to next block @@ -335,7 +337,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); * @param pTagCond. tag query condition * */ -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char *pTagCond, size_t len, SArray* list); +int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList, + SColIndex* pColIndex, int32_t numOfCols); + +int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList); /** * clean up the query handle diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 077bdf45c3..d9fed4327b 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -100,6 +100,7 @@ typedef struct { STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables); int32_t tsdbFreeMeta(STsdbMeta *pMeta); STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); +STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable); // ---- Operation on STable #define TSDB_TABLE_ID(pTable) ((pTable)->tableId) diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 6873c69c2a..2b26eec53a 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -704,7 +704,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable pTable->mem->keyLast = 0; } - tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); + tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize); TSKEY key = dataRowKey(row); // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 0eb6dde1d0..68f06673e7 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -105,8 +105,9 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { if (pTable == NULL) return -1; if (pTable->type == TSDB_SUPER_TABLE) { - pTable->pIndex = - tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, 0, getTagIndexKey); + STColumn* pColSchema = schemaColAt(pTable->tagSchema, 0); + pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, + 1, 0, 0, getTagIndexKey); } tsdbAddTableToMeta(pMeta, pTable, false); @@ -201,6 +202,18 @@ STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) { } } +STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { + if (pTable->type == TSDB_SUPER_TABLE) { + return pTable->tagSchema; + } else if (pTable->type == TSDB_CHILD_TABLE) { + STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid); + if (pSuper == NULL) return NULL; + return pSuper->tagSchema; + } else { + return NULL; + } +} + int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { if (tsdbCheckTableCfg(pCfg) < 0) return -1; @@ -222,8 +235,11 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { super->schema = tdDupSchema(pCfg->schema); super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagVal = tdDataRowDup(pCfg->tagValues); - super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, - 0, 0, getTagIndexKey); // Allow duplicate key, no lock + + // index the first tag column + STColumn* pColSchema = schemaColAt(super->tagSchema, 0); + super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, + 1, 0, 0, getTagIndexKey); // Allow duplicate key, no lock if (super->pIndex == NULL) { tdFreeSchema(super->schema); @@ -411,11 +427,11 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { int32_t level = 0; int32_t headSize = 0; - // first tag column - STColumn* s = schemaColAt(pSTable->tagSchema, 0); + tSkipListNewNodeInfo(pSTable->pIndex, &level, &headSize); - tSkipListRandNodeInfo(pSTable->pIndex, &level, &headSize); - SSkipListNode* pNode = calloc(1, headSize + s->bytes + POINTER_BYTES); + // NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the + // actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function + SSkipListNode* pNode = calloc(1, headSize + POINTER_BYTES); pNode->level = level; SSkipList* list = pSTable->pIndex; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index bf45c2d0af..8d48b1ec48 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -15,10 +15,12 @@ #include "os.h" +#include "talgo.h" #include "tlog.h" #include "tutil.h" #include "../../../query/inc/qast.h" +#include "../../../query/inc/qextbuffer.h" #include "../../../query/inc/tlosertree.h" #include "../../../query/inc/tsqlfunction.h" #include "tsdb.h" @@ -141,9 +143,8 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->fileListIndex = -1; } -tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* idList, SArray* pColumnInfo) { +tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* groupList, SArray* pColumnInfo) { // todo 1. filter not exist table - // todo 2. add the reference count for each table that is involved in query STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); @@ -156,26 +157,26 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond pQueryHandle->isFirstSlot = true; pQueryHandle->cur.fid = -1; - size_t size = taosArrayGetSize(idList); + size_t size = taosArrayGetSize(groupList); assert(size >= 1); pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); for (int32_t i = 0; i < size; ++i) { - STableId id = *(STableId*) taosArrayGet(idList, i); - - STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid); - if (pTable == NULL) { - dError("%p failed to get table, error uid:%" PRIu64, pQueryHandle, id.uid); - continue; - } - - STableCheckInfo info = { - .lastKey = pQueryHandle->window.skey, - .tableId = id, - .pTableObj = pTable, - }; + SArray* group = *(SArray**)taosArrayGet(groupList, i); - taosArrayPush(pQueryHandle->pTableCheckInfo, &info); + size_t gsize = taosArrayGetSize(group); + for (int32_t j = 0; j < gsize; ++j) { + STable* pTable = *(STable**)taosArrayGet(group, j); + assert(pTable != NULL); + + STableCheckInfo info = { + .lastKey = pQueryHandle->window.skey, + .tableId = pTable->tableId, + .pTableObj = pTable, + }; + + taosArrayPush(pQueryHandle->pTableCheckInfo, &info); + } } dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); @@ -208,7 +209,8 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond } static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { - assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); + size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); + assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); pHandle->cur.fid = -1; STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); @@ -312,7 +314,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file - assert(0); + continue;//no data blocks in the file belongs to pCheckInfo->pTable } else { if (pCheckInfo->compSize < compIndex->len) { assert(compIndex->len > 0); @@ -488,61 +490,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock return pQueryHandle->realNumOfRows > 0; } -//bool moveToNextBlock(STsdbQueryHandle* pQueryHandle, int32_t step) { -// SQueryFilePos* cur = &pQueryHandle->cur; -// -// if (pQueryHandle->cur.fid >= 0) { -// /* -// * 1. ascending order. The last data block of data file -// * 2. descending order. The first block of file -// */ -// STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); -// int32_t tid = pCheckInfo->tableId.tid; -// -// if ((step == QUERY_ASC_FORWARD_STEP && -// (pQueryHandle->cur.slot == pQueryHandle->compIndex[tid].numOfSuperBlocks - 1)) || -// (step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) { -// // temporarily keep the position value, in case of no data qualified when move forwards(backwards) -// // SQueryFilePos save = pQueryHandle->cur; -// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter); -// -// int32_t fid = -1; -// int32_t numOfBlocks = 0; -// -// if (pQueryHandle->pFileGroup != NULL) { -// if ((fid = getFileCompInfo(pQueryHandle, &numOfBlocks, 1)) < 0) { -// } else { -// cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1; -// cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1; -// -// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; -// cur->fid = pQueryHandle->pFileGroup->fileId; -// assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); -// -// if (pBlock->keyFirst > pQueryHandle->window.ekey) { // done -// return false; -// } -// -// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); -// } -// } else { // check data in cache -// pQueryHandle->cur.fid = -1; -// return hasMoreDataInCache(pQueryHandle); -// } -// } else { // next block in the same file -// cur->slot += step; -// -// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; -// cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; -// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); -// } -// } else { // data in cache -// return hasMoreDataInCache(pQueryHandle); -// } -// -// return false; -//} - static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; int numOfPoints; @@ -732,71 +679,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { return midPos; } -//static bool getQualifiedDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) { -// STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); -// int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); -// -// tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD); -// tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); -// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter); -// -// SQueryFilePos* cur = &pQueryHandle->cur; -// -// int32_t tid = pCheckInfo->tableId.tid; -// int32_t numOfBlocks = 0; -// -// while (pQueryHandle->pFileGroup != NULL) { -// if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) { -// break; -// } -// -// assert(pCheckInfo->numOfBlocks >= 0); -// -// // no data block in current file, try next -// if (pCheckInfo->numOfBlocks > 0) { -// cur->fid = pQueryHandle->pFileGroup->fileId; -// break; -// } -// -// dTrace("%p no data block in file, fid:%d, tid:%d, try next, %p", pQueryHandle, pQueryHandle->pFileGroup->fileId, -// tid, pQueryHandle->qinfo); -// -// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter); -// } -// -// if (pCheckInfo->numOfBlocks == 0) { -// return false; -// } -// -// cur->slot = 0; // always start from the first slot -// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; -// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); -//} - -//static UNUSED_FUNC bool hasMoreDataForSingleTable(STsdbQueryHandle* pHandle) { -// assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); -// -// STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); -// STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); -// -// if (!pCheckInfo->checkFirstFileBlock) { -// pCheckInfo->checkFirstFileBlock = true; -// -// if (pFileHandle != NULL) { -// bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1); -// if (found) { -// return true; -// } -// } -// -// // no data in file, try cache -// pHandle->cur.fid = -1; -// return hasMoreDataInCache(pHandle); -// } else { // move to next data block in file or in cache -// return moveToNextBlock(pHandle, 1); -// } -//} - static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) { tfree(pSupporter->numOfBlocksPerMeter); tfree(pSupporter->blockIndexArray); @@ -862,23 +744,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO } int32_t cnt = 0; - int32_t numOfQualMeters = 0; + int32_t numOfQualTables = 0; for (int32_t j = 0; j < numOfTables; ++j) { STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j); - + if (pTableCheck->numOfBlocks <= 0) { + continue; + } + SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; - sup.numOfBlocksPerMeter[numOfQualMeters] = pTableCheck->numOfBlocks; + sup.numOfBlocksPerMeter[numOfQualTables] = pTableCheck->numOfBlocks; char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); if (buf == NULL) { - cleanBlockOrderSupporter(&sup, numOfQualMeters); + cleanBlockOrderSupporter(&sup, numOfQualTables); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - sup.pDataBlockInfo[numOfQualMeters] = (STableBlockInfo*)buf; + sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf; for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) { - STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualMeters][k]; + STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualTables][k]; pBlockInfoEx->pBlock.compBlock = &pBlock[k]; pBlockInfoEx->pBlock.fields = NULL; @@ -889,13 +774,13 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO cnt++; } - numOfQualMeters++; + numOfQualTables++; } - dTrace("%p create data blocks info struct completed", pQueryHandle); + dTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables); - assert(cnt <= numOfBlocks && numOfQualMeters <= numOfTables); // the pMeterDataInfo[j]->numOfBlocks may be 0 - sup.numOfTables = numOfQualMeters; + assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pMeterDataInfo[j]->numOfBlocks may be 0 + sup.numOfTables = numOfQualTables; SLoserTreeInfo* pTree = NULL; uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); @@ -1256,11 +1141,11 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) { SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex); while (tSkipListIterNext(iter)) { SSkipListNode* pNode = tSkipListIterGet(iter); - STable* t = *(STable**)SL_GET_NODE_DATA(pNode); - - taosArrayPush(list, &t->tableId); + + STable* t = *(STable**)SL_GET_NODE_DATA(pNode); + taosArrayPush(list, t); } - + return TSDB_CODE_SUCCESS; } @@ -1348,6 +1233,132 @@ void filterPrepare(void* expr, void* param) { tVariantTypeSetType(&pInfo->q, pInfo->sch.type); } +int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) { + switch (type) { + case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2)); + case TSDB_DATA_TYPE_DOUBLE: DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2)); + case TSDB_DATA_TYPE_FLOAT: DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2)); + case TSDB_DATA_TYPE_BIGINT: DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2)); + case TSDB_DATA_TYPE_SMALLINT: DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2)); + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2)); + case TSDB_DATA_TYPE_NCHAR: { + int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE); + if (ret == 0) { + return ret; + } + return (ret < 0) ? -1 : 1; + } + default: { + int32_t ret = strncmp(f1, f2, (size_t)size); + if (ret == 0) { + return ret; + } + + return (ret < 0) ? -1 : 1; + } + } +} + +typedef struct STableGroupSupporter { + int32_t numOfCols; + SColIndex* pCols; + STSchema* pTagSchema; +} STableGroupSupporter; + +int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { + STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; + + STable *pTable1 = *(STable **) p1; + STable *pTable2 = *(STable **) p2; + + for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) { + SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; + int32_t colIndex = pColIndex->colIndex; + + char * f1 = NULL; + char * f2 = NULL; + int32_t type = 0; + int32_t bytes = 0; + + if (colIndex == -1) { // table name, todo fix me +// f1 = s1->tags; +// f2 = s2->tags; + type = TSDB_DATA_TYPE_BINARY; + bytes = TSDB_TABLE_NAME_LEN; + } else { + f1 = dataRowTuple(pTable1->tagVal); + f2 = dataRowTuple(pTable2->tagVal); + + type = schemaColAt(pTableGroupSupp->pTagSchema, colIndex)->type; + bytes = schemaColAt(pTableGroupSupp->pTagSchema, colIndex)->bytes; + } + + int32_t ret = doCompare(f1, f2, type, bytes); + if (ret == 0) { + continue; + } else { + return ret; + } + } + + return 0; +} + +void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { + SArray* g = taosArrayInit(16, POINTER_BYTES); + taosArrayPush(g, &pTables[0]); + + for (int32_t i = 1; i < numOfTables; ++i) { + int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp); + assert(ret == 0 || ret == -1); + + if (ret == 0) { + taosArrayPush(g, &pTables[i]); + } else { + taosArrayPush(pGroups, &g); // current group is ended, start a new group + + g = taosArrayInit(16, POINTER_BYTES); + taosArrayPush(g, &pTables[i]); + } + } +} + +SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) { + assert(pTableList != NULL && taosArrayGetSize(pTableList) > 0); + SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); + + size_t size = taosArrayGetSize(pTableList); + if (size == 0) { + pTrace("no qualified tables"); + return pTableGroup; + } + + if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table + taosArrayPush(pTableGroup, pTableList); + pTrace("all %d tables belong to one group", size); + +#ifdef _DEBUG_VIEW + tSidSetDisplay(pTableGroup); +#endif + } else { + STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter)); + pSupp->numOfCols = numOfOrderCols; + pSupp->pTagSchema = pTagSchema; + pSupp->pCols = pCols; + + tqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn); + createTableGroupImpl(pTableGroup, pTableList->pData, size, pSupp, tableGroupComparFn); + +#ifdef _DEBUG_VIEW + tSidSetDisplay(pTableGroup); +#endif + tfree(pSupp); + } + + return pTableGroup; +} + bool tSkipListNodeFilterCallback(const void* pNode, void* param) { tQueryInfo* pInfo = (tQueryInfo*)param; @@ -1419,13 +1430,29 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) return TSDB_CODE_SUCCESS; } -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray* res) { - if (pTagCond == NULL || len == 0) { // no condition, all tables created according to this stable are involved - return getAllTableIdList(tsdb, uid, res); - } - +int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList, + SColIndex* pColIndex, int32_t numOfCols) { + STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); - assert(pSTable != NULL); + if (pSTable == NULL) { + dError("failed to get stable, uid:%" PRIu64, uid); + return TSDB_CODE_INVALID_TABLE_ID; + } + + SArray* res = taosArrayInit(8, POINTER_BYTES); + STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pSTable); + + if (pTagCond == NULL || len == 0) { // no tags condition, all tables created according to this stable are involved + int32_t ret = getAllTableIdList(tsdb, uid, res); + if (ret != TSDB_CODE_SUCCESS) { + taosArrayDestroy(res); + return ret; + } + + *pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); + taosArrayDestroy(res); + return ret; + } tExprNode* pExprNode = NULL; int32_t ret = TSDB_CODE_SUCCESS; @@ -1433,12 +1460,33 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size // failed to build expression, no result, return immediately if ((ret = exprTreeFromBinary(pTagCond, len, &pExprNode) != TSDB_CODE_SUCCESS) || (pExprNode == NULL)) { dError("stable:%" PRIu64 ", failed to deserialize expression tree, error exists", uid); + taosArrayDestroy(res); return ret; } - return doQueryTableList(pSTable, res, pExprNode); + doQueryTableList(pSTable, res, pExprNode); + *pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); + + taosArrayDestroy(res); + return ret; } +int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList) { + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); + if (pTable == NULL) { + return TSDB_CODE_INVALID_TABLE_ID; + } + + //todo assert table type, add the table ref count + + *pGroupList = taosArrayInit(1, POINTER_BYTES); + SArray* group = taosArrayInit(1, POINTER_BYTES); + + taosArrayPush(group, &pTable); + taosArrayPush(*pGroupList, &group); + + return TSDB_CODE_SUCCESS; +} void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; if (pQueryHandle == NULL) { diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index e06dbedfca..d32170873a 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -75,17 +75,17 @@ int main(int argc, char *argv[]) { doQuery(taos, "create database if not exists test"); doQuery(taos, "use test"); - doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); - doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); - doQuery(taos, "select * from tm0;"); +// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); +// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); + doQuery(taos, "select sum(k),count(*) from m1 group by a"); taos_close(taos); return 0; From 53b8867aabc7595998b89d6e213abe28001ca73d Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 16 Apr 2020 11:18:04 +0800 Subject: [PATCH 3/3] [td-98] refactor qsort and bsearch functions --- src/util/inc/talgo.h | 30 +++++++++++++++++++++++++++++- src/util/inc/tutil.h | 8 -------- src/util/src/talgo.c | 20 +++++++++++++++++--- src/vnode/tsdb/src/tsdbFile.c | 3 ++- src/vnode/tsdb/src/tsdbMain.c | 1 + src/vnode/tsdb/src/tsdbRead.c | 2 +- 6 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/util/inc/talgo.h b/src/util/inc/talgo.h index e89795c086..d5e089b687 100644 --- a/src/util/inc/talgo.h +++ b/src/util/inc/talgo.h @@ -20,9 +20,37 @@ extern "C" { #endif +#define TD_EQ 0x1 +#define TD_GT 0x2 +#define TD_LT 0x4 +#define TD_GE (TD_EQ | TD_GT) +#define TD_LE (TD_EQ | TD_LT) + typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param); -void tqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn); +/** + * quick sort, with the compare function requiring additional parameters support + * + * @param src + * @param numOfElem + * @param size + * @param param + * @param comparFn + */ +void taosqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn); + +/** + * binary search, with range support + * + * @param key + * @param base + * @param nmemb + * @param size + * @param fn + * @param flags + * @return + */ +void *taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, __compar_fn_t fn, int flags); #ifdef __cplusplus } diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index c4c802a688..81cf177e73 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -170,14 +170,6 @@ uint32_t ip2uint(const char *const ip_addr); void taosSetAllocMode(int mode, const char* path, bool autoDump); void taosDumpMemoryLeak(); -#define TD_EQ 0x1 -#define TD_GT 0x2 -#define TD_LT 0x4 -#define TD_GE (TD_EQ | TD_GT) -#define TD_LE (TD_EQ | TD_LT) -void *taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, - int (*compar)(const void *, const void *), int flags); - #ifdef TAOS_MEM_CHECK void * taos_malloc(size_t size, const char *file, uint32_t line); diff --git a/src/util/src/talgo.c b/src/util/src/talgo.c index 1004d5ab67..7a682cd466 100644 --- a/src/util/src/talgo.c +++ b/src/util/src/talgo.c @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include "os.h" #include "tutil.h" @@ -47,7 +61,7 @@ static void tInsertSort(void *src, size_t size, int32_t s, int32_t e, const void } } -void tqsortImpl(void *src, int32_t start, int32_t end, size_t size, const void *param, __ext_compar_fn_t comparFn, +static void tqsortImpl(void *src, int32_t start, int32_t end, size_t size, const void *param, __ext_compar_fn_t comparFn, void* buf) { // short array sort, incur another sort procedure instead of quick sort process const int32_t THRESHOLD_SIZE = 6; @@ -138,13 +152,13 @@ void tqsortImpl(void *src, int32_t start, int32_t end, size_t size, const void * } } -void tqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn) { +void taosqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn) { char *buf = calloc(1, size); // prepare the swap buffer tqsortImpl(src, 0, numOfElem - 1, size, param, comparFn, buf); tfree(buf); } -void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) { +void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, __compar_fn_t compar, int flags) { // TODO: need to check the correctness of this function int l = 0; int r = nmemb; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index d025144ba9..7576717dd1 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -22,8 +22,9 @@ #include #include -#include "tutil.h" #include "tsdbMain.h" +#include "tutil.h" +#include "talgo.h" const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 2b26eec53a..f558982419 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -15,6 +15,7 @@ // #include "taosdef.h" // #include "disk.h" #include "os.h" +#include "talgo.h" #include "tsdb.h" #include "tsdbMain.h" diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 8d48b1ec48..495cf8c7f5 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -1347,7 +1347,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC pSupp->pTagSchema = pTagSchema; pSupp->pCols = pCols; - tqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn); + taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn); createTableGroupImpl(pTableGroup, pTableList->pData, size, pSupp, tableGroupComparFn); #ifdef _DEBUG_VIEW