From f86ab16b8798a315136de0034976917589b8688a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Sep 2021 14:58:12 +0800 Subject: [PATCH] [td-255] refactor for experiment purpose. --- src/client/inc/tscUtil.h | 6 ++++- src/client/src/tscSQLParser.c | 18 +++++++-------- src/client/src/tscServer.c | 22 ++++++++++++------- src/client/src/tscSql.c | 4 ++-- src/client/src/tscSubquery.c | 37 +++++++++++++++++-------------- src/client/src/tscUtil.c | 41 ++++++++++++++++++++--------------- src/inc/taosmsg.h | 12 +++++----- 7 files changed, 81 insertions(+), 59 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index ebd5de1ab3..cf2aadc107 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -92,7 +92,7 @@ typedef struct SMergeTsCtx { }SMergeTsCtx; typedef struct SVgroupTableInfo { - SVgroupInfo vgInfo; + SVgroupMsg vgInfo; SArray *itemList; // SArray } SVgroupTableInfo; @@ -288,7 +288,11 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); void* tscVgroupInfoClear(SVgroupsInfo *pInfo); + +#if 0 void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src); +#endif + /** * The create object function must be successful expect for the out of memory issue. * diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 87b6b07652..20d14958c8 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8685,7 +8685,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod if (p->vgroupIdList != NULL) { size_t s = taosArrayGetSize(p->vgroupIdList); - size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo); + size_t vgroupsz = sizeof(SVgroupMsg) * s + sizeof(SVgroupsInfo); pTableMetaInfo->vgroupList = calloc(1, vgroupsz); if (pTableMetaInfo->vgroupList == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -8700,14 +8700,14 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); assert(existVgroupInfo.inUse >= 0); - SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; - - pVgroup->numOfEps = existVgroupInfo.numOfEps; - pVgroup->vgId = existVgroupInfo.vgId; - for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { - pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; - pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); - } + SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; + memcpy(pVgroup, &existVgroupInfo, sizeof(SVgroupMsg)); +// pVgroup->numOfEps = existVgroupInfo.numOfEps; +// pVgroup->vgId = existVgroupInfo.vgId; +// for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { +// pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; +// pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); +// } } } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9d523f2730..b713eeb858 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) { return ret; } -static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { +static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. @@ -93,6 +93,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { existed = true; } } + assert(existed); } @@ -723,7 +724,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); - SVgroupInfo* pVgroupInfo = NULL; + SVgroupMsg* pVgroupInfo = NULL; if (pTableMetaInfo->vgroupList && pTableMetaInfo->vgroupList->numOfVgroups > 0) { assert(index < pTableMetaInfo->vgroupList->numOfVgroups); pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; @@ -880,6 +881,10 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = NULL; + STableMeta *pTableMeta = NULL; + STableMetaInfo *pTableMetaInfo = NULL; + int32_t code = TSDB_CODE_SUCCESS; int32_t size = tscEstimateQueryMsgSize(pSql); @@ -888,9 +893,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_OPERATION; // todo add test for this } - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + pQueryInfo = tscGetQueryInfo(pCmd); + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + pTableMeta = pTableMetaInfo->pTableMeta; SQueryAttr query = {{0}}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); @@ -2146,7 +2151,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t *size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg)); - size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); + size_t vgroupsz = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsInfo *pVgroupInfo = calloc(1, vgroupsz); assert(pVgroupInfo != NULL); @@ -2156,7 +2161,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t } else { for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) { // just init, no need to lock - SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j]; + SVgroupMsg *pVgroup = &pVgroupInfo->vgroups[j]; SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; vmsg->vgId = htonl(vmsg->vgId); @@ -2168,7 +2173,8 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t pVgroup->vgId = vmsg->vgId; for (int32_t k = 0; k < vmsg->numOfEps; ++k) { pVgroup->epAddr[k].port = vmsg->epAddr[k].port; - pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); + tstrncpy(pVgroup->epAddr[k].fqdn, vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); +// pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } doUpdateVgroupInfo(pVgroup->vgId, vmsg); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5fdaad0d66..faa1c2ff41 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -588,8 +588,8 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]); - tscBuildAndSendRequest(pSql, NULL); - return false; +// tscBuildAndSendRequest(pSql, NULL); +// return false; } return true; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5be623dc94..1a88270b27 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -623,13 +623,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); // set the tag column id for executor to extract correct tag value -#ifndef _TD_NINGSI_60 - pExpr->base.param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)}; -#else - pExpr->base.param[0].i64 = colId; - pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT; - pExpr->base.param[0].nLen = sizeof(int64_t); -#endif + tVariant* pVariant = &pExpr->base.param[0]; + + pVariant->i64 = colId; + pVariant->nType = TSDB_DATA_TYPE_BIGINT; + pVariant->nLen = sizeof(int64_t); + pExpr->base.numOfParams = 1; } @@ -748,10 +747,12 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr SVgroupTableInfo info = {{0}}; for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { if (tt->vgId == pvg->vgroups[m].vgId) { - tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); + memcpy(&info.vgInfo, &pvg->vgroups[m], sizeof(info.vgInfo)); +// tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); break; } } + assert(info.vgInfo.numOfEps != 0); vgTables = taosArrayInit(4, sizeof(STableIdInfo)); @@ -2459,7 +2460,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { tfree(p); } -static void doConcurrentlySendSubQueries(SSqlObj* pSql) { +static UNUSED_FUNC void doConcurrentlySendSubQueries(SSqlObj* pSql) { SSubqueryState *pState = &pSql->subState; // concurrently sent the query requests. @@ -2550,13 +2551,14 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; - trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); + trs->localBuffer = (tFilePage *)malloc(nBufferSize + sizeof(tFilePage)); if (trs->localBuffer == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); tfree(trs); break; } - + + trs->localBuffer->num = 0; trs->subqueryIndex = i; trs->pParentSql = pSql; @@ -2577,6 +2579,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self, trs->subqueryIndex); + + tfree(trs->localBuffer); + tfree(trs); } if (i < pState->numOfSub) { @@ -2594,7 +2599,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return pRes->code; } - doConcurrentlySendSubQueries(pSql); + pSql->fp(pSql->param, pSql, 0); +// doConcurrentlySendSubQueries(pSql); return TSDB_CODE_SUCCESS; } @@ -2651,7 +2657,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 int32_t subqueryIndex = trsupport->subqueryIndex; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); @@ -2929,7 +2935,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSubqueryState* pState = &pParentSql->subState; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; if (pParentSql->res.code != TSDB_CODE_SUCCESS) { trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -3057,7 +3063,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { assert(pQueryInfo->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; // stable query killed or other subquery failed, all query stopped if (pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -3403,7 +3409,6 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { return; } -// tscRestoreFuncForSTableQuery(pQueryInfo); int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); assert(numOfRes * rowSize > 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d66eb64fb5..a83d3e62f5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3362,11 +3362,11 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) { size_t num = taosArrayGetSize(pVgroupTables); for (size_t i = 0; i < num; i++) { SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); - +#if 0 for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { tfree(pInfo->vgInfo.epAddr[j].fqdn); } - +#endif taosArrayDestroy(pInfo->itemList); } @@ -3380,9 +3380,9 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { assert(size > index); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index); - for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { - tfree(pInfo->vgInfo.epAddr[j].fqdn); - } +// for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { +// tfree(pInfo->vgInfo.epAddr[j].fqdn); +// } taosArrayDestroy(pInfo->itemList); taosArrayRemove(pVgroupTable, index); @@ -3392,9 +3392,12 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) { memset(info, 0, sizeof(SVgroupTableInfo)); info->vgInfo = pInfo->vgInfo; + +#if 0 for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); } +#endif if (pInfo->itemList) { info->itemList = taosArrayDup(pInfo->itemList); @@ -3615,7 +3618,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - pNew->sqlstr = strdup(pSql->sqlstr); tsem_init(&pNew->rspSem, 0, 0); SSqlCmd* pnCmd = &pNew->cmd; @@ -3749,7 +3751,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); - } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, 0); if (pPrevInfo->pTableMeta && pPrevInfo->pTableMeta->tableType < 0) { @@ -3759,8 +3760,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t STableMeta* pPrevTableMeta = tscTableMetaDup(pPrevInfo->pTableMeta); SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList, - pTableMetaInfo->pVgroupTables); + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, + pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); } // this case cannot be happened @@ -4404,7 +4405,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { return NULL; } - size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupList->numOfVgroups; + size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups; SVgroupsInfo* pNew = calloc(1, size); if (pNew == NULL) { return NULL; @@ -4413,15 +4414,16 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { pNew->numOfVgroups = vgroupList->numOfVgroups; for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupInfo* pNewVInfo = &pNew->vgroups[i]; + SVgroupMsg* pNewVInfo = &pNew->vgroups[i]; - SVgroupInfo* pvInfo = &vgroupList->vgroups[i]; + SVgroupMsg* pvInfo = &vgroupList->vgroups[i]; pNewVInfo->vgId = pvInfo->vgId; pNewVInfo->numOfEps = pvInfo->numOfEps; for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { - pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); +// pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; + tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN); } } @@ -4433,8 +4435,9 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { return NULL; } +#if 0 for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i]; + SVgroupMsg* pVgroupInfo = &vgroupList->vgroups[i]; for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) { tfree(pVgroupInfo->epAddr[j].fqdn); @@ -4445,10 +4448,11 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { } } +#endif tfree(vgroupList); return NULL; } - +# if 0 void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) { dst->vgId = src->vgId; dst->numOfEps = src->numOfEps; @@ -4461,6 +4465,8 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) { } } +#endif + char* serializeTagData(STagData* pTagData, char* pMsg) { int32_t n = (int32_t) strlen(pTagData->name); *(int32_t*) pMsg = htonl(n); @@ -4601,11 +4607,12 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) { assert(pVgroupsInfo != NULL); - size_t size = sizeof(SVgroupInfo) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); + size_t size = sizeof(SVgroupMsg) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsInfo* pInfo = calloc(1, size); pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups; for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) { - tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); + memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg)); +// tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); } return pInfo; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8f5269c158..616ee1d972 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -766,11 +766,11 @@ typedef struct SSTableVgroupMsg { int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; -typedef struct { - int32_t vgId; - int8_t numOfEps; - SEpAddr1 epAddr[TSDB_MAX_REPLICA]; -} SVgroupInfo; +//typedef struct { +// int32_t vgId; +// int8_t numOfEps; +// SEpAddr1 epAddr[TSDB_MAX_REPLICA]; +//} SVgroupInfo; typedef struct { int32_t vgId; @@ -780,7 +780,7 @@ typedef struct { typedef struct { int32_t numOfVgroups; - SVgroupInfo vgroups[]; + SVgroupMsg vgroups[]; } SVgroupsInfo; typedef struct {