From 5e5fe11f05d759980826f6c3fc542ccba9799f54 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 14:07:45 +0800 Subject: [PATCH 01/21] [td-255] remove unused attributes of object. --- src/client/inc/tsclient.h | 15 ++------------- src/client/src/tscPrepare.c | 1 - src/client/src/tscSubquery.c | 1 - src/client/src/tscUtil.c | 17 +++-------------- 4 files changed, 5 insertions(+), 29 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b8eb0a5286..ff796cdcbf 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -234,7 +234,6 @@ typedef struct STableDataBlocks { typedef struct { STableMeta *pTableMeta; SArray *vgroupIdList; -// SVgroupsInfo *pVgroupsInfo; } STableMetaVgroupInfo; typedef struct SInsertStatementParam { @@ -286,20 +285,14 @@ typedef struct { int32_t resColumnId; } SSqlCmd; -typedef struct SResRec { - int numOfRows; - int numOfTotal; -} SResRec; - typedef struct { int32_t numOfRows; // num of results in current retrieval - int64_t numOfRowsGroup; // num of results of current group int64_t numOfTotal; // num of total results int64_t numOfClauseTotal; // num of total result in current subclause char * pRsp; int32_t rspType; int32_t rspLen; - uint64_t qId; + uint64_t qId; // query id of SQInfo int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable int32_t row; @@ -307,8 +300,6 @@ typedef struct { int16_t precision; bool completed; int32_t code; - int32_t numOfGroups; - SResRec * pGroupRec; char * data; TAOS_ROW tsrow; TAOS_ROW urow; @@ -316,8 +307,7 @@ typedef struct { char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) SColumnIndex* pColumnIndex; - TAOS_FIELD* final; - SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions + TAOS_FIELD* final; struct SGlobalMerger *pMerger; } SSqlRes; @@ -377,7 +367,6 @@ typedef struct SSqlObj { tsem_t rspSem; SSqlCmd cmd; SSqlRes res; - bool isBind; SSubqueryState subState; struct SSqlObj **pSubs; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index bbddc4bff9..d0ac0ccf4e 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1491,7 +1491,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { pSql->signature = pSql; pSql->pTscObj = pObj; pSql->maxRetry = TSDB_MAX_REPLICA; - pSql->isBind = true; pStmt->pSql = pSql; pStmt->last = STMT_INIT; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index edc3dbfc82..5be623dc94 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2879,7 +2879,6 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.precision = pSql->res.precision; pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - pParentSql->res.numOfGroups = 0; tscFreeRetrieveSup(pSql); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fe3e330aa9..d66eb64fb5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1347,14 +1347,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { tfree(pRes->buffer); tfree(pRes->urow); - tfree(pRes->pGroupRec); tfree(pRes->pColumnIndex); - - if (pRes->pArithSup != NULL) { - tfree(pRes->pArithSup->data); - tfree(pRes->pArithSup); - } - tfree(pRes->final); pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free @@ -3464,13 +3457,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableM } pTableMetaInfo->pTableMeta = pTableMeta; - if (pTableMetaInfo->pTableMeta == NULL) { - pTableMetaInfo->tableMetaSize = 0; - } else { - pTableMetaInfo->tableMetaSize = tscGetTableMetaSize(pTableMeta); - } + pTableMetaInfo->tableMetaSize = (pTableMetaInfo->pTableMeta == NULL)? 0:tscGetTableMetaSize(pTableMeta); + pTableMetaInfo->tableMetaCapacity = (size_t)(pTableMetaInfo->tableMetaSize); - if (vgroupList != NULL) { pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList); @@ -3718,8 +3707,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } - pNewQueryInfo->numOfFillVal = pQueryInfo->fieldsInfo.numOfOutput; + pNewQueryInfo->numOfFillVal = pQueryInfo->fieldsInfo.numOfOutput; memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); } From f86ab16b8798a315136de0034976917589b8688a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 14:58:12 +0800 Subject: [PATCH 02/21] [td-255] refactor for experiment purpose. --- src/client/inc/tscUtil.h | 6 ++++- src/client/src/tscSQLParser.c | 18 +++++++-------- src/client/src/tscServer.c | 22 ++++++++++++------- src/client/src/tscSql.c | 4 ++-- src/client/src/tscSubquery.c | 37 +++++++++++++++++-------------- src/client/src/tscUtil.c | 41 ++++++++++++++++++++--------------- src/inc/taosmsg.h | 12 +++++----- 7 files changed, 81 insertions(+), 59 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index ebd5de1ab3..cf2aadc107 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -92,7 +92,7 @@ typedef struct SMergeTsCtx { }SMergeTsCtx; typedef struct SVgroupTableInfo { - SVgroupInfo vgInfo; + SVgroupMsg vgInfo; SArray *itemList; // SArray } SVgroupTableInfo; @@ -288,7 +288,11 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); void* tscVgroupInfoClear(SVgroupsInfo *pInfo); + +#if 0 void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src); +#endif + /** * The create object function must be successful expect for the out of memory issue. * diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 87b6b07652..20d14958c8 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8685,7 +8685,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod if (p->vgroupIdList != NULL) { size_t s = taosArrayGetSize(p->vgroupIdList); - size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo); + size_t vgroupsz = sizeof(SVgroupMsg) * s + sizeof(SVgroupsInfo); pTableMetaInfo->vgroupList = calloc(1, vgroupsz); if (pTableMetaInfo->vgroupList == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -8700,14 +8700,14 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); assert(existVgroupInfo.inUse >= 0); - SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; - - pVgroup->numOfEps = existVgroupInfo.numOfEps; - pVgroup->vgId = existVgroupInfo.vgId; - for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { - pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; - pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); - } + SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; + memcpy(pVgroup, &existVgroupInfo, sizeof(SVgroupMsg)); +// pVgroup->numOfEps = existVgroupInfo.numOfEps; +// pVgroup->vgId = existVgroupInfo.vgId; +// for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { +// pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; +// pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); +// } } } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9d523f2730..b713eeb858 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) { return ret; } -static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { +static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. @@ -93,6 +93,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { existed = true; } } + assert(existed); } @@ -723,7 +724,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); - SVgroupInfo* pVgroupInfo = NULL; + SVgroupMsg* pVgroupInfo = NULL; if (pTableMetaInfo->vgroupList && pTableMetaInfo->vgroupList->numOfVgroups > 0) { assert(index < pTableMetaInfo->vgroupList->numOfVgroups); pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; @@ -880,6 +881,10 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = NULL; + STableMeta *pTableMeta = NULL; + STableMetaInfo *pTableMetaInfo = NULL; + int32_t code = TSDB_CODE_SUCCESS; int32_t size = tscEstimateQueryMsgSize(pSql); @@ -888,9 +893,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_OPERATION; // todo add test for this } - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + pQueryInfo = tscGetQueryInfo(pCmd); + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + pTableMeta = pTableMetaInfo->pTableMeta; SQueryAttr query = {{0}}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); @@ -2146,7 +2151,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t *size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg)); - size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); + size_t vgroupsz = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsInfo *pVgroupInfo = calloc(1, vgroupsz); assert(pVgroupInfo != NULL); @@ -2156,7 +2161,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t } else { for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) { // just init, no need to lock - SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j]; + SVgroupMsg *pVgroup = &pVgroupInfo->vgroups[j]; SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; vmsg->vgId = htonl(vmsg->vgId); @@ -2168,7 +2173,8 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t pVgroup->vgId = vmsg->vgId; for (int32_t k = 0; k < vmsg->numOfEps; ++k) { pVgroup->epAddr[k].port = vmsg->epAddr[k].port; - pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); + tstrncpy(pVgroup->epAddr[k].fqdn, vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); +// pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } doUpdateVgroupInfo(pVgroup->vgId, vmsg); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5fdaad0d66..faa1c2ff41 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -588,8 +588,8 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]); - tscBuildAndSendRequest(pSql, NULL); - return false; +// tscBuildAndSendRequest(pSql, NULL); +// return false; } return true; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5be623dc94..1a88270b27 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -623,13 +623,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); // set the tag column id for executor to extract correct tag value -#ifndef _TD_NINGSI_60 - pExpr->base.param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)}; -#else - pExpr->base.param[0].i64 = colId; - pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT; - pExpr->base.param[0].nLen = sizeof(int64_t); -#endif + tVariant* pVariant = &pExpr->base.param[0]; + + pVariant->i64 = colId; + pVariant->nType = TSDB_DATA_TYPE_BIGINT; + pVariant->nLen = sizeof(int64_t); + pExpr->base.numOfParams = 1; } @@ -748,10 +747,12 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr SVgroupTableInfo info = {{0}}; for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { if (tt->vgId == pvg->vgroups[m].vgId) { - tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); + memcpy(&info.vgInfo, &pvg->vgroups[m], sizeof(info.vgInfo)); +// tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); break; } } + assert(info.vgInfo.numOfEps != 0); vgTables = taosArrayInit(4, sizeof(STableIdInfo)); @@ -2459,7 +2460,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { tfree(p); } -static void doConcurrentlySendSubQueries(SSqlObj* pSql) { +static UNUSED_FUNC void doConcurrentlySendSubQueries(SSqlObj* pSql) { SSubqueryState *pState = &pSql->subState; // concurrently sent the query requests. @@ -2550,13 +2551,14 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; - trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); + trs->localBuffer = (tFilePage *)malloc(nBufferSize + sizeof(tFilePage)); if (trs->localBuffer == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); tfree(trs); break; } - + + trs->localBuffer->num = 0; trs->subqueryIndex = i; trs->pParentSql = pSql; @@ -2577,6 +2579,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self, trs->subqueryIndex); + + tfree(trs->localBuffer); + tfree(trs); } if (i < pState->numOfSub) { @@ -2594,7 +2599,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return pRes->code; } - doConcurrentlySendSubQueries(pSql); + pSql->fp(pSql->param, pSql, 0); +// doConcurrentlySendSubQueries(pSql); return TSDB_CODE_SUCCESS; } @@ -2651,7 +2657,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 int32_t subqueryIndex = trsupport->subqueryIndex; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); @@ -2929,7 +2935,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSubqueryState* pState = &pParentSql->subState; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; if (pParentSql->res.code != TSDB_CODE_SUCCESS) { trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -3057,7 +3063,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { assert(pQueryInfo->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; // stable query killed or other subquery failed, all query stopped if (pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -3403,7 +3409,6 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { return; } -// tscRestoreFuncForSTableQuery(pQueryInfo); int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); assert(numOfRes * rowSize > 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d66eb64fb5..a83d3e62f5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3362,11 +3362,11 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) { size_t num = taosArrayGetSize(pVgroupTables); for (size_t i = 0; i < num; i++) { SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); - +#if 0 for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { tfree(pInfo->vgInfo.epAddr[j].fqdn); } - +#endif taosArrayDestroy(pInfo->itemList); } @@ -3380,9 +3380,9 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { assert(size > index); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index); - for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { - tfree(pInfo->vgInfo.epAddr[j].fqdn); - } +// for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { +// tfree(pInfo->vgInfo.epAddr[j].fqdn); +// } taosArrayDestroy(pInfo->itemList); taosArrayRemove(pVgroupTable, index); @@ -3392,9 +3392,12 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) { memset(info, 0, sizeof(SVgroupTableInfo)); info->vgInfo = pInfo->vgInfo; + +#if 0 for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); } +#endif if (pInfo->itemList) { info->itemList = taosArrayDup(pInfo->itemList); @@ -3615,7 +3618,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - pNew->sqlstr = strdup(pSql->sqlstr); tsem_init(&pNew->rspSem, 0, 0); SSqlCmd* pnCmd = &pNew->cmd; @@ -3749,7 +3751,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); - } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, 0); if (pPrevInfo->pTableMeta && pPrevInfo->pTableMeta->tableType < 0) { @@ -3759,8 +3760,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t STableMeta* pPrevTableMeta = tscTableMetaDup(pPrevInfo->pTableMeta); SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList, - pTableMetaInfo->pVgroupTables); + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, + pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); } // this case cannot be happened @@ -4404,7 +4405,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { return NULL; } - size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupList->numOfVgroups; + size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups; SVgroupsInfo* pNew = calloc(1, size); if (pNew == NULL) { return NULL; @@ -4413,15 +4414,16 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { pNew->numOfVgroups = vgroupList->numOfVgroups; for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupInfo* pNewVInfo = &pNew->vgroups[i]; + SVgroupMsg* pNewVInfo = &pNew->vgroups[i]; - SVgroupInfo* pvInfo = &vgroupList->vgroups[i]; + SVgroupMsg* pvInfo = &vgroupList->vgroups[i]; pNewVInfo->vgId = pvInfo->vgId; pNewVInfo->numOfEps = pvInfo->numOfEps; for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { - pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); +// pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; + tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN); } } @@ -4433,8 +4435,9 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { return NULL; } +#if 0 for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i]; + SVgroupMsg* pVgroupInfo = &vgroupList->vgroups[i]; for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) { tfree(pVgroupInfo->epAddr[j].fqdn); @@ -4445,10 +4448,11 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { } } +#endif tfree(vgroupList); return NULL; } - +# if 0 void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) { dst->vgId = src->vgId; dst->numOfEps = src->numOfEps; @@ -4461,6 +4465,8 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) { } } +#endif + char* serializeTagData(STagData* pTagData, char* pMsg) { int32_t n = (int32_t) strlen(pTagData->name); *(int32_t*) pMsg = htonl(n); @@ -4601,11 +4607,12 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) { assert(pVgroupsInfo != NULL); - size_t size = sizeof(SVgroupInfo) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); + size_t size = sizeof(SVgroupMsg) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsInfo* pInfo = calloc(1, size); pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups; for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) { - tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); + memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg)); +// tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); } return pInfo; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8f5269c158..616ee1d972 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -766,11 +766,11 @@ typedef struct SSTableVgroupMsg { int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; -typedef struct { - int32_t vgId; - int8_t numOfEps; - SEpAddr1 epAddr[TSDB_MAX_REPLICA]; -} SVgroupInfo; +//typedef struct { +// int32_t vgId; +// int8_t numOfEps; +// SEpAddr1 epAddr[TSDB_MAX_REPLICA]; +//} SVgroupInfo; typedef struct { int32_t vgId; @@ -780,7 +780,7 @@ typedef struct { typedef struct { int32_t numOfVgroups; - SVgroupInfo vgroups[]; + SVgroupMsg vgroups[]; } SVgroupsInfo; typedef struct { From 626cd4cc58810249bb65d53209543bd4bff537fb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 17:06:07 +0800 Subject: [PATCH 03/21] [td-255] merge develop. --- src/client/src/tscUtil.c | 1 + src/util/src/tarray.c | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a83d3e62f5..75b7aecb21 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3618,6 +3618,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; + pNew->sqlstr = strdup(pSql->sqlstr); tsem_init(&pNew->rspSem, 0, 0); SSqlCmd* pnCmd = &pNew->cmd; diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 2d6c513cb5..007ce06829 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -24,11 +24,12 @@ void* taosArrayInit(size_t size, size_t elemSize) { size = TARRAY_MIN_SIZE; } - SArray* pArray = calloc(1, sizeof(SArray)); + SArray* pArray = malloc(sizeof(SArray)); if (pArray == NULL) { return NULL; } + pArray->size = 0; pArray->pData = calloc(size, elemSize); if (pArray->pData == NULL) { free(pArray); From 258c221a44a2dd7db74a57362d58c483a40f55e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 17:11:00 +0800 Subject: [PATCH 04/21] [td-255] refactor code. --- src/client/src/tscSQLParser.c | 11 ++++------- src/client/src/tscSql.c | 4 ++-- src/client/src/tscSubquery.c | 7 +------ src/inc/taosmsg.h | 6 ------ 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 20d14958c8..e4728a410a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8701,13 +8701,10 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod assert(existVgroupInfo.inUse >= 0); SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; - memcpy(pVgroup, &existVgroupInfo, sizeof(SVgroupMsg)); -// pVgroup->numOfEps = existVgroupInfo.numOfEps; -// pVgroup->vgId = existVgroupInfo.vgId; -// for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { -// pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; -// pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); -// } + + pVgroup->numOfEps = existVgroupInfo.numOfEps; + pVgroup->vgId = existVgroupInfo.vgId; + memcpy(&pVgroup->epAddr, &existVgroupInfo.ep, sizeof(pVgroup->epAddr)); } } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index faa1c2ff41..5fdaad0d66 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -588,8 +588,8 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]); -// tscBuildAndSendRequest(pSql, NULL); -// return false; + tscBuildAndSendRequest(pSql, NULL); + return false; } return true; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1a88270b27..8a52bc776d 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -748,7 +748,6 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { if (tt->vgId == pvg->vgroups[m].vgId) { memcpy(&info.vgInfo, &pvg->vgroups[m], sizeof(info.vgInfo)); -// tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); break; } } @@ -2579,9 +2578,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self, trs->subqueryIndex); - - tfree(trs->localBuffer); - tfree(trs); } if (i < pState->numOfSub) { @@ -2599,8 +2595,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return pRes->code; } - pSql->fp(pSql->param, pSql, 0); -// doConcurrentlySendSubQueries(pSql); + doConcurrentlySendSubQueries(pSql); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 616ee1d972..3c1d89134c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -766,12 +766,6 @@ typedef struct SSTableVgroupMsg { int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; -//typedef struct { -// int32_t vgId; -// int8_t numOfEps; -// SEpAddr1 epAddr[TSDB_MAX_REPLICA]; -//} SVgroupInfo; - typedef struct { int32_t vgId; int8_t numOfEps; From c0aff8098d64cb807682a8ce450d1d9ca87aee2a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 17:16:56 +0800 Subject: [PATCH 05/21] [td-255]refactor. --- src/client/src/tscUtil.c | 3 +-- src/inc/taosmsg.h | 7 +------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 75b7aecb21..a9b74ecb86 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -4407,7 +4407,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { } size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups; - SVgroupsInfo* pNew = calloc(1, size); + SVgroupsInfo* pNew = malloc(size); if (pNew == NULL) { return NULL; } @@ -4422,7 +4422,6 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { pNewVInfo->numOfEps = pvInfo->numOfEps; for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { -// pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 3c1d89134c..bb93c52142 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -775,12 +775,7 @@ typedef struct { typedef struct { int32_t numOfVgroups; SVgroupMsg vgroups[]; -} SVgroupsInfo; - -typedef struct { - int32_t numOfVgroups; - SVgroupMsg vgroups[]; -} SVgroupsMsg; +} SVgroupsMsg, SVgroupsInfo; typedef struct STableMetaMsg { int32_t contLen; From cc68a1414c47caf3c9bd7b77f5e4bf2c726c5df2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 17:52:30 +0800 Subject: [PATCH 06/21] [td-255] update the threshold of concurrent launch query to be 4. --- src/client/src/tscSubquery.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8a52bc776d..275042a238 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2459,11 +2459,11 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { tfree(p); } -static UNUSED_FUNC void doConcurrentlySendSubQueries(SSqlObj* pSql) { +static void doConcurrentlySendSubQueries(SSqlObj* pSql) { SSubqueryState *pState = &pSql->subState; // concurrently sent the query requests. - const int32_t MAX_REQUEST_PER_TASK = 8; + const int32_t MAX_REQUEST_PER_TASK = 4; int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; assert(numOfTasks >= 1); From 267e6e0ba6d292398eddfd09912250f2184bfcaa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Sep 2021 14:03:00 +0800 Subject: [PATCH 07/21] [td-255] avoid memset the allocated memory to improve the query performance. --- src/client/inc/tscUtil.h | 4 +++- src/client/src/tscServer.c | 3 ++- src/client/src/tscUtil.c | 30 ++++++++++++++++-------------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index cf2aadc107..c858bd5867 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -174,7 +174,9 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo); bool tscIsInsertData(char* sqlstr); -int tscAllocPayload(SSqlCmd* pCmd, int size); +// the memory is not reset in case of fast allocate payload function +int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size); +int32_t tscAllocPayload(SSqlCmd* pCmd, int size); TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b713eeb858..6133bc4a9c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -887,8 +887,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t size = tscEstimateQueryMsgSize(pSql); + assert(size > 0); - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { + if (TSDB_CODE_SUCCESS != tscAllocPayloadFast(pCmd, size)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_TSC_INVALID_OPERATION; // todo add test for this } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a9b74ecb86..b68bf6d83a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2080,32 +2080,34 @@ bool tscIsInsertData(char* sqlstr) { } while (1); } -int tscAllocPayload(SSqlCmd* pCmd, int size) { +int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size) { if (pCmd->payload == NULL) { assert(pCmd->allocSize == 0); - pCmd->payload = (char*)calloc(1, size); - if (pCmd->payload == NULL) { + pCmd->payload = malloc(size); + } else if (pCmd->allocSize < size) { + char* tmp = realloc(pCmd->payload, size); + if (tmp == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pCmd->payload = tmp; pCmd->allocSize = size; - } else { - if (pCmd->allocSize < (uint32_t)size) { - char* b = realloc(pCmd->payload, size); - if (b == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + } - pCmd->payload = b; - pCmd->allocSize = size; - } + assert(pCmd->allocSize >= size); + return TSDB_CODE_SUCCESS; +} +int32_t tscAllocPayload(SSqlCmd* pCmd, int size) { + assert(size > 0); + + int32_t code = tscAllocPayloadFast(pCmd, (size_t) size); + if (code == TSDB_CODE_SUCCESS) { memset(pCmd->payload, 0, pCmd->allocSize); } - assert(pCmd->allocSize >= (uint32_t)size && size > 0); - return TSDB_CODE_SUCCESS; + return code; } TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) { From f10ef79e5ef0d754e0c821bf223494cf9384ac19 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Sep 2021 14:06:17 +0800 Subject: [PATCH 08/21] [td-255] fix a typo. --- src/client/src/tscUtil.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b68bf6d83a..6a75ff5f09 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2085,6 +2085,7 @@ int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size) { assert(pCmd->allocSize == 0); pCmd->payload = malloc(size); + pCmd->allocSize = size; } else if (pCmd->allocSize < size) { char* tmp = realloc(pCmd->payload, size); if (tmp == NULL) { From da545a9974c3f0891892c69f392a7dfb1ce81913 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Sep 2021 17:20:03 +0800 Subject: [PATCH 09/21] [td-6563] refactor the code to improve the client-side performance. --- src/client/src/tscServer.c | 8 +++++--- src/query/src/qExecutor.c | 8 ++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 6133bc4a9c..788574a837 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -862,8 +862,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, (*pMsg) += sizeof(SSqlExpr); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log - pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); - pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); + pSqlExpr->param[j].nType = htonl(pExpr->param[j].nType); + pSqlExpr->param[j].nLen = htonl(pExpr->param[j].nLen); if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen); @@ -954,7 +954,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); - pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number + pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); @@ -987,6 +987,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pCond->len; } + } else { + pQueryMsg->colCondLen = 0; } for (int32_t i = 0; i < query.numOfOutput; ++i) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8fefed51c8..727837012b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7600,8 +7600,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); - pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); + pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType); + pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen); if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { pExprMsg->param[j].pz = pMsg; @@ -7648,8 +7648,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); - pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); + pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType); + pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen); if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { pExprMsg->param[j].pz = pMsg; From f14fd21aa560b7780efbc1c16920d2ee52544f71 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Sep 2021 19:31:42 +0800 Subject: [PATCH 10/21] [td-6563]remove duplicated code. --- src/tsdb/src/tsdbRead.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c1b935e0ee..747b22a7a8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; - info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; - assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); From 7ed1aa261822f00cc0d0c98ef99b94c5d88c1cbb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Sep 2021 19:35:29 +0800 Subject: [PATCH 11/21] [td-6563] --- src/tsdb/src/tsdbRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 747b22a7a8..5a91d8f790 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2216,7 +2216,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO SBlock* pBlock = pTableCheck->pCompInfo->blocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; - char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); + char* buf = malloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); if (buf == NULL) { cleanBlockOrderSupporter(&sup, numOfQualTables); return TSDB_CODE_TDB_OUT_OF_MEMORY; From a4260aeb32b2a64f2ac305e4555d9512fa17a2e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Sep 2021 10:31:34 +0800 Subject: [PATCH 12/21] [td-6563] --- src/tsdb/inc/tsdbMeta.h | 2 +- src/tsdb/src/tsdbRead.c | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 51801c843c..8ce5e7ade8 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k } static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) { - STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose STSchema* pSchema = NULL; STSchema* pTSchema = NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 5a91d8f790..14c5a04ece 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -3616,8 +3616,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC for(int32_t i = 0; i < size; ++i) { STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); - assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE); - tsdbRefTable(pKeyInfo->pTable); STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey}; From 616f32e018a7ca40b9617177f7ef7cb74be456a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Sep 2021 15:36:17 +0800 Subject: [PATCH 13/21] [td-6563] remove the calloc to improve the query performance. --- src/util/src/hash.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/util/src/hash.c b/src/util/src/hash.c index a22ce34a0e..6577a0a0f4 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -741,17 +741,19 @@ void taosHashTableResize(SHashObj *pHashObj) { } SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { - SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize); + SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize); if (pNewNode == NULL) { uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - pNewNode->keyLen = (uint32_t)keyLen; + pNewNode->keyLen = (uint32_t)keyLen; pNewNode->hashVal = hashVal; pNewNode->dataLen = (uint32_t) dsize; - pNewNode->count = 1; + pNewNode->count = 1; + pNewNode->removed = 0; + pNewNode->next = NULL; memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); From ad0b62328cfa15773dc6a2684024135dba316f82 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Sep 2021 15:45:32 +0800 Subject: [PATCH 14/21] [td-6563] --- src/util/inc/tlosertree.h | 5 ++--- src/util/src/tlosertree.c | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/util/inc/tlosertree.h b/src/util/inc/tlosertree.h index 4c731625dd..58f2ca8c5c 100644 --- a/src/util/inc/tlosertree.h +++ b/src/util/inc/tlosertree.h @@ -26,7 +26,7 @@ typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param); typedef struct SLoserTreeNode { int32_t index; - void * pData; + void *pData; } SLoserTreeNode; typedef struct SLoserTreeInfo { @@ -34,8 +34,7 @@ typedef struct SLoserTreeInfo { int32_t totalEntries; __merge_compare_fn_t comparFn; void * param; - - SLoserTreeNode *pNode; + SLoserTreeNode *pNode; } SLoserTreeInfo; uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); diff --git a/src/util/src/tlosertree.c b/src/util/src/tlosertree.c index e793548407..0f104c4b63 100644 --- a/src/util/src/tlosertree.c +++ b/src/util/src/tlosertree.c @@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { SLoserTreeNode kLeaf = pTree->pNode[idx]; while (parentId > 0) { - if (pTree->pNode[parentId].index == -1) { + SLoserTreeNode* pCur = &pTree->pNode[parentId]; + if (pCur->index == -1) { pTree->pNode[parentId] = kLeaf; return; } - int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param); + int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param); if (ret < 0) { SLoserTreeNode t = pTree->pNode[parentId]; pTree->pNode[parentId] = kLeaf; From 671c99683b0282ff88febeb96581420f80852409 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Sep 2021 15:48:38 +0800 Subject: [PATCH 15/21] [td-6563] --- src/query/src/qExecutor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 727837012b..ddc14f4cef 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2107,7 +2107,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); From aa79a68baaf3bf15da0091a5c30614856f76e6bd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Sep 2021 19:33:10 +0800 Subject: [PATCH 16/21] [td-6563] --- src/query/src/qUtil.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 4caf351799..539c292bb3 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -436,13 +436,13 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void * } STableQueryInfo** pList = supporter->pTableQueryInfo; - - SResultRowInfo *pWindowResInfo1 = &(pList[left]->resInfo); - SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); + SResultRow* pWindowRes1 = pList[left]->resInfo.pResult[leftPos]; +// SResultRow * pWindowRes1 = getResultRow(&(pList[left]->resInfo), leftPos); TSKEY leftTimestamp = pWindowRes1->win.skey; - SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo); - SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); +// SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo); +// SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); + SResultRow* pWindowRes2 = pList[right]->resInfo.pResult[rightPos]; TSKEY rightTimestamp = pWindowRes2->win.skey; if (leftTimestamp == rightTimestamp) { From 7152a19728f0c98992b99cc373bcee8daf6aaa2b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Sep 2021 14:19:50 +0800 Subject: [PATCH 17/21] [td-6563] refactor the group result merge function. --- src/client/src/tscServer.c | 5 +++ src/query/inc/qExecutor.h | 10 ++++- src/query/src/qExecutor.c | 5 ++- src/query/src/qUtil.c | 85 +++++++++++++++++++++++++++++++++++--- 4 files changed, 97 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 788574a837..3a57d333ad 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1097,6 +1097,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); + } else { + pQueryMsg->tsBuf.tsLen = 0; + pQueryMsg->tsBuf.tsNumOfBlocks = 0; } int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator); @@ -1134,6 +1137,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pUdfInfo->contLen; } + } else { + pQueryMsg->udfContentOffset = 0; } memcpy(pMsg, pSql->sqlstr, sqlLen); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 31db6492f6..19ca8e7ed8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -86,11 +86,18 @@ typedef struct SResultRow { char *key; // start key of current result row } SResultRow; +typedef struct SResultRowCell { + uint64_t groupId; + SResultRow *pRow; +} SResultRowCell; + typedef struct SGroupResInfo { int32_t totalGroup; int32_t currentGroup; int32_t index; SArray* pRows; // SArray + bool ordered; + int32_t position; } SGroupResInfo; /** @@ -284,8 +291,9 @@ typedef struct SQueryRuntimeEnv { SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer - SResultRowPool* pool; // window result object pool + SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. char** prevRow; SArray* prevResult; // intermediate result, SArray diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ddc14f4cef..5ca2e58cc9 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -544,6 +544,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult // add a new result set for a new group taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); + SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult}; + taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell); } else { pResult = *p1; } @@ -2110,6 +2112,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); + pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell)); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize); pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen); @@ -6379,6 +6382,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { if (!pRuntimeEnv->pQueryAttr->stableQuery) { sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); } + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -8647,7 +8651,6 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* SArray* prevResult = NULL; if (prevResultLen > 0) { prevResult = interResFromBinary(param->prevResult, prevResultLen); - pRuntimeEnv->prevResult = prevResult; } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 539c292bb3..961b388c39 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -456,7 +456,79 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void * } } -static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, +int32_t tsAscOrder(const void* p1, const void* p2) { + SResultRowCell* pc1 = (SResultRowCell*) p1; + SResultRowCell* pc2 = (SResultRowCell*) p2; + + if (pc1->groupId == pc2->groupId) { + if (pc1->pRow->win.skey == pc2->pRow->win.skey) { + return 0; + } else { + return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1; + } + } else { + return (pc1->groupId < pc2->groupId)? -1:1; + } +} + +int32_t tsDescOrder(const void* p1, const void* p2) { + SResultRowCell* pc1 = (SResultRowCell*) p1; + SResultRowCell* pc2 = (SResultRowCell*) p2; + + if (pc1->groupId == pc2->groupId) { + if (pc1->pRow->win.skey == pc2->pRow->win.skey) { + return 0; + } else { + return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1; + } + } else { + return (pc1->groupId < pc2->groupId)? -1:1; + } +} + +void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { + __compar_fn_t fn = NULL; + if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { + fn = tsAscOrder; + } else { + fn = tsDescOrder; + } + + taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); +} + +static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { + if (!pGroupResInfo->ordered) { + orderTheResultRows(pRuntimeEnv); + pGroupResInfo->ordered = true; + } + + if (pGroupResInfo->pRows == NULL) { + pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); + } + + size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList); + for(; pGroupResInfo->position < len; ++pGroupResInfo->position) { + + SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position); + if (pResultRowCell->groupId != groupId) { + break; + } + + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pResultRowCell->pRow, rowCellInfoOffset); + if (num <= 0) { + continue; + } + + taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow); + pResultRowCell->pRow->numOfRows = (uint32_t) num; + + } + + return TSDB_CODE_SUCCESS; +} + +static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, int32_t* rowCellInfoOffset) { bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); @@ -562,12 +634,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu int64_t st = taosGetTimestampUs(); while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); +// SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } + mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset); +// int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset); +// if (ret != TSDB_CODE_SUCCESS) { +// return ret; +// } // this group generates at least one result, return results if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { From c6be1bb809536182f7d4f27c0d8267b3b25c9354 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Sep 2021 14:26:32 +0800 Subject: [PATCH 18/21] [td-6563]fix memory leak problem. --- src/query/src/qExecutor.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5ca2e58cc9..677add2c8d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2387,6 +2387,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); + taosArrayDestroy(pRuntimeEnv->pResultRowArrayList); pRuntimeEnv->prevResult = NULL; } From 394896cb1d2c2e1c48dd876e8a57c507c355b54c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Sep 2021 15:42:38 +0800 Subject: [PATCH 19/21] [td-6563] --- src/client/src/tscServer.c | 3 +-- src/query/src/qExecutor.c | 1 - src/query/src/qUtil.c | 9 --------- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3a57d333ad..c583e566b9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -947,7 +947,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->pointInterpQuery = query.pointInterpQuery; pQueryMsg->needReverseScan = query.needReverseScan; pQueryMsg->stateWindow = query.stateWindow; - pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->sw.gap = htobe64(query.sw.gap); @@ -974,7 +973,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tableCols[i].type = htons(pCol->type); //pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters); pQueryMsg->tableCols[i].flist.numOfFilters = 0; - + pQueryMsg->tableCols[i].flist.filterInfo = 0; // append the filter information after the basic column information //serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 677add2c8d..cf5142d359 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4812,7 +4812,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; pQueryAttr->tsdb = tsdb; - if (tsdb != NULL) { int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 961b388c39..bc27e094db 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -509,7 +509,6 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList); for(; pGroupResInfo->position < len; ++pGroupResInfo->position) { - SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position); if (pResultRowCell->groupId != groupId) { break; @@ -522,7 +521,6 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow); pResultRowCell->pRow->numOfRows = (uint32_t) num; - } return TSDB_CODE_SUCCESS; @@ -634,13 +632,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu int64_t st = taosGetTimestampUs(); while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { -// SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset); -// int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset); -// if (ret != TSDB_CODE_SUCCESS) { -// return ret; -// } // this group generates at least one result, return results if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { @@ -656,7 +648,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); -// pQInfo->summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; } From 6c1eb1ee92f5849463c5e65985b6a1c01a158692 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Sep 2021 15:07:50 +0800 Subject: [PATCH 20/21] [td-6563]fix compiler error. --- src/client/src/tscUtil.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6a75ff5f09..2bd601d812 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2085,7 +2085,7 @@ int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size) { assert(pCmd->allocSize == 0); pCmd->payload = malloc(size); - pCmd->allocSize = size; + pCmd->allocSize = (uint32_t) size; } else if (pCmd->allocSize < size) { char* tmp = realloc(pCmd->payload, size); if (tmp == NULL) { @@ -2093,7 +2093,7 @@ int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size) { } pCmd->payload = tmp; - pCmd->allocSize = size; + pCmd->allocSize = (uint32_t) size; } assert(pCmd->allocSize >= size); From c90d48a763548d040a22fea244985e2110d7b817 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Sep 2021 15:45:01 +0800 Subject: [PATCH 21/21] [td-6563]set the value to be 0 if the corresponding attributes are missing in query message. --- src/client/src/tscServer.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c583e566b9..e23cd88bb7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1067,6 +1067,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pCond->len; } + } else { + pQueryMsg->tagCondLen = 0; } if (pQueryInfo->bufLen > 0) { @@ -1138,6 +1140,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } else { pQueryMsg->udfContentOffset = 0; + pQueryMsg->udfContentLen = 0; } memcpy(pMsg, pSql->sqlstr, sqlLen);