diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index ebd5de1ab3..c858bd5867 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -92,7 +92,7 @@ typedef struct SMergeTsCtx { }SMergeTsCtx; typedef struct SVgroupTableInfo { - SVgroupInfo vgInfo; + SVgroupMsg vgInfo; SArray *itemList; // SArray } SVgroupTableInfo; @@ -174,7 +174,9 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo); bool tscIsInsertData(char* sqlstr); -int tscAllocPayload(SSqlCmd* pCmd, int size); +// the memory is not reset in case of fast allocate payload function +int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size); +int32_t tscAllocPayload(SSqlCmd* pCmd, int size); TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes); @@ -288,7 +290,11 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); void* tscVgroupInfoClear(SVgroupsInfo *pInfo); + +#if 0 void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src); +#endif + /** * The create object function must be successful expect for the out of memory issue. * diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b8eb0a5286..ff796cdcbf 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -234,7 +234,6 @@ typedef struct STableDataBlocks { typedef struct { STableMeta *pTableMeta; SArray *vgroupIdList; -// SVgroupsInfo *pVgroupsInfo; } STableMetaVgroupInfo; typedef struct SInsertStatementParam { @@ -286,20 +285,14 @@ typedef struct { int32_t resColumnId; } SSqlCmd; -typedef struct SResRec { - int numOfRows; - int numOfTotal; -} SResRec; - typedef struct { int32_t numOfRows; // num of results in current retrieval - int64_t numOfRowsGroup; // num of results of current group int64_t numOfTotal; // num of total results int64_t numOfClauseTotal; // num of total result in current subclause char * pRsp; int32_t rspType; int32_t rspLen; - uint64_t qId; + uint64_t qId; // query id of SQInfo int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable int32_t row; @@ -307,8 +300,6 @@ typedef struct { int16_t precision; bool completed; int32_t code; - int32_t numOfGroups; - SResRec * pGroupRec; char * data; TAOS_ROW tsrow; TAOS_ROW urow; @@ -316,8 +307,7 @@ typedef struct { char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) SColumnIndex* pColumnIndex; - TAOS_FIELD* final; - SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions + TAOS_FIELD* final; struct SGlobalMerger *pMerger; } SSqlRes; @@ -377,7 +367,6 @@ typedef struct SSqlObj { tsem_t rspSem; SSqlCmd cmd; SSqlRes res; - bool isBind; SSubqueryState subState; struct SSqlObj **pSubs; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index bbddc4bff9..d0ac0ccf4e 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1491,7 +1491,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { pSql->signature = pSql; pSql->pTscObj = pObj; pSql->maxRetry = TSDB_MAX_REPLICA; - pSql->isBind = true; pStmt->pSql = pSql; pStmt->last = STMT_INIT; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 87b6b07652..e4728a410a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8685,7 +8685,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod if (p->vgroupIdList != NULL) { size_t s = taosArrayGetSize(p->vgroupIdList); - size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo); + size_t vgroupsz = sizeof(SVgroupMsg) * s + sizeof(SVgroupsInfo); pTableMetaInfo->vgroupList = calloc(1, vgroupsz); if (pTableMetaInfo->vgroupList == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -8700,14 +8700,11 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); assert(existVgroupInfo.inUse >= 0); - SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; + SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; pVgroup->numOfEps = existVgroupInfo.numOfEps; pVgroup->vgId = existVgroupInfo.vgId; - for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { - pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; - pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); - } + memcpy(&pVgroup->epAddr, &existVgroupInfo.ep, sizeof(pVgroup->epAddr)); } } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bda2dbb2cc..b87ec92ff1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) { return ret; } -static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { +static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. @@ -93,6 +93,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { existed = true; } } + assert(existed); } @@ -723,7 +724,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); - SVgroupInfo* pVgroupInfo = NULL; + SVgroupMsg* pVgroupInfo = NULL; if (pTableMetaInfo->vgroupList && pTableMetaInfo->vgroupList->numOfVgroups > 0) { assert(index < pTableMetaInfo->vgroupList->numOfVgroups); pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; @@ -861,8 +862,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, (*pMsg) += sizeof(SSqlExpr); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log - pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); - pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); + pSqlExpr->param[j].nType = htonl(pExpr->param[j].nType); + pSqlExpr->param[j].nLen = htonl(pExpr->param[j].nLen); if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen); @@ -880,17 +881,22 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = NULL; + STableMeta *pTableMeta = NULL; + STableMetaInfo *pTableMetaInfo = NULL; + int32_t code = TSDB_CODE_SUCCESS; int32_t size = tscEstimateQueryMsgSize(pSql); + assert(size > 0); - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { + if (TSDB_CODE_SUCCESS != tscAllocPayloadFast(pCmd, size)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_TSC_INVALID_OPERATION; // todo add test for this } - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + pQueryInfo = tscGetQueryInfo(pCmd); + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + pTableMeta = pTableMetaInfo->pTableMeta; SQueryAttr query = {{0}}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); @@ -941,14 +947,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->pointInterpQuery = query.pointInterpQuery; pQueryMsg->needReverseScan = query.needReverseScan; pQueryMsg->stateWindow = query.stateWindow; - pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->sw.gap = htobe64(query.sw.gap); pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); - pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number + pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); @@ -968,7 +973,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tableCols[i].type = htons(pCol->type); //pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters); pQueryMsg->tableCols[i].flist.numOfFilters = 0; - + pQueryMsg->tableCols[i].flist.filterInfo = 0; // append the filter information after the basic column information //serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg); } @@ -981,6 +986,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pCond->len; } + } else { + pQueryMsg->colCondLen = 0; } for (int32_t i = 0; i < query.numOfOutput; ++i) { @@ -1060,6 +1067,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pCond->len; } + } else { + pQueryMsg->tagCondLen = 0; } if (pQueryInfo->bufLen > 0) { @@ -1089,6 +1098,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); + } else { + pQueryMsg->tsBuf.tsLen = 0; + pQueryMsg->tsBuf.tsNumOfBlocks = 0; } int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator); @@ -1126,6 +1138,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += pUdfInfo->contLen; } + } else { + pQueryMsg->udfContentOffset = 0; + pQueryMsg->udfContentLen = 0; } memcpy(pMsg, pSql->sqlstr, sqlLen); @@ -2146,7 +2161,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t *size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg)); - size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); + size_t vgroupsz = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsInfo *pVgroupInfo = calloc(1, vgroupsz); assert(pVgroupInfo != NULL); @@ -2156,7 +2171,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t } else { for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) { // just init, no need to lock - SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j]; + SVgroupMsg *pVgroup = &pVgroupInfo->vgroups[j]; SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; vmsg->vgId = htonl(vmsg->vgId); @@ -2168,7 +2183,8 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t pVgroup->vgId = vmsg->vgId; for (int32_t k = 0; k < vmsg->numOfEps; ++k) { pVgroup->epAddr[k].port = vmsg->epAddr[k].port; - pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); + tstrncpy(pVgroup->epAddr[k].fqdn, vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); +// pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } doUpdateVgroupInfo(pVgroup->vgId, vmsg); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index edc3dbfc82..275042a238 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -623,13 +623,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); // set the tag column id for executor to extract correct tag value -#ifndef _TD_NINGSI_60 - pExpr->base.param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)}; -#else - pExpr->base.param[0].i64 = colId; - pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT; - pExpr->base.param[0].nLen = sizeof(int64_t); -#endif + tVariant* pVariant = &pExpr->base.param[0]; + + pVariant->i64 = colId; + pVariant->nType = TSDB_DATA_TYPE_BIGINT; + pVariant->nLen = sizeof(int64_t); + pExpr->base.numOfParams = 1; } @@ -748,10 +747,11 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr SVgroupTableInfo info = {{0}}; for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { if (tt->vgId == pvg->vgroups[m].vgId) { - tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); + memcpy(&info.vgInfo, &pvg->vgroups[m], sizeof(info.vgInfo)); break; } } + assert(info.vgInfo.numOfEps != 0); vgTables = taosArrayInit(4, sizeof(STableIdInfo)); @@ -2463,7 +2463,7 @@ static void doConcurrentlySendSubQueries(SSqlObj* pSql) { SSubqueryState *pState = &pSql->subState; // concurrently sent the query requests. - const int32_t MAX_REQUEST_PER_TASK = 8; + const int32_t MAX_REQUEST_PER_TASK = 4; int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; assert(numOfTasks >= 1); @@ -2550,13 +2550,14 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; - trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); + trs->localBuffer = (tFilePage *)malloc(nBufferSize + sizeof(tFilePage)); if (trs->localBuffer == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); tfree(trs); break; } - + + trs->localBuffer->num = 0; trs->subqueryIndex = i; trs->pParentSql = pSql; @@ -2651,7 +2652,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 int32_t subqueryIndex = trsupport->subqueryIndex; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); @@ -2879,7 +2880,6 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.precision = pSql->res.precision; pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - pParentSql->res.numOfGroups = 0; tscFreeRetrieveSup(pSql); @@ -2930,7 +2930,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSubqueryState* pState = &pParentSql->subState; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; if (pParentSql->res.code != TSDB_CODE_SUCCESS) { trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -3058,7 +3058,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { assert(pQueryInfo->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; // stable query killed or other subquery failed, all query stopped if (pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -3404,7 +3404,6 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { return; } -// tscRestoreFuncForSTableQuery(pQueryInfo); int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); assert(numOfRes * rowSize > 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fe3e330aa9..2bd601d812 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1347,14 +1347,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { tfree(pRes->buffer); tfree(pRes->urow); - tfree(pRes->pGroupRec); tfree(pRes->pColumnIndex); - - if (pRes->pArithSup != NULL) { - tfree(pRes->pArithSup->data); - tfree(pRes->pArithSup); - } - tfree(pRes->final); pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free @@ -2087,32 +2080,35 @@ bool tscIsInsertData(char* sqlstr) { } while (1); } -int tscAllocPayload(SSqlCmd* pCmd, int size) { +int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size) { if (pCmd->payload == NULL) { assert(pCmd->allocSize == 0); - pCmd->payload = (char*)calloc(1, size); - if (pCmd->payload == NULL) { + pCmd->payload = malloc(size); + pCmd->allocSize = (uint32_t) size; + } else if (pCmd->allocSize < size) { + char* tmp = realloc(pCmd->payload, size); + if (tmp == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pCmd->allocSize = size; - } else { - if (pCmd->allocSize < (uint32_t)size) { - char* b = realloc(pCmd->payload, size); - if (b == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + pCmd->payload = tmp; + pCmd->allocSize = (uint32_t) size; + } - pCmd->payload = b; - pCmd->allocSize = size; - } + assert(pCmd->allocSize >= size); + return TSDB_CODE_SUCCESS; +} +int32_t tscAllocPayload(SSqlCmd* pCmd, int size) { + assert(size > 0); + + int32_t code = tscAllocPayloadFast(pCmd, (size_t) size); + if (code == TSDB_CODE_SUCCESS) { memset(pCmd->payload, 0, pCmd->allocSize); } - assert(pCmd->allocSize >= (uint32_t)size && size > 0); - return TSDB_CODE_SUCCESS; + return code; } TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) { @@ -3369,11 +3365,11 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) { size_t num = taosArrayGetSize(pVgroupTables); for (size_t i = 0; i < num; i++) { SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); - +#if 0 for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { tfree(pInfo->vgInfo.epAddr[j].fqdn); } - +#endif taosArrayDestroy(pInfo->itemList); } @@ -3387,9 +3383,9 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { assert(size > index); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index); - for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { - tfree(pInfo->vgInfo.epAddr[j].fqdn); - } +// for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { +// tfree(pInfo->vgInfo.epAddr[j].fqdn); +// } taosArrayDestroy(pInfo->itemList); taosArrayRemove(pVgroupTable, index); @@ -3399,9 +3395,12 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) { memset(info, 0, sizeof(SVgroupTableInfo)); info->vgInfo = pInfo->vgInfo; + +#if 0 for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); } +#endif if (pInfo->itemList) { info->itemList = taosArrayDup(pInfo->itemList); @@ -3464,13 +3463,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableM } pTableMetaInfo->pTableMeta = pTableMeta; - if (pTableMetaInfo->pTableMeta == NULL) { - pTableMetaInfo->tableMetaSize = 0; - } else { - pTableMetaInfo->tableMetaSize = tscGetTableMetaSize(pTableMeta); - } + pTableMetaInfo->tableMetaSize = (pTableMetaInfo->pTableMeta == NULL)? 0:tscGetTableMetaSize(pTableMeta); + pTableMetaInfo->tableMetaCapacity = (size_t)(pTableMetaInfo->tableMetaSize); - if (vgroupList != NULL) { pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList); @@ -3718,8 +3713,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } - pNewQueryInfo->numOfFillVal = pQueryInfo->fieldsInfo.numOfOutput; + pNewQueryInfo->numOfFillVal = pQueryInfo->fieldsInfo.numOfOutput; memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); } @@ -3760,7 +3755,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); - } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, 0); if (pPrevInfo->pTableMeta && pPrevInfo->pTableMeta->tableType < 0) { @@ -3770,8 +3764,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t STableMeta* pPrevTableMeta = tscTableMetaDup(pPrevInfo->pTableMeta); SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList, - pTableMetaInfo->pVgroupTables); + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, + pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); } // this case cannot be happened @@ -4415,8 +4409,8 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { return NULL; } - size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupList->numOfVgroups; - SVgroupsInfo* pNew = calloc(1, size); + size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups; + SVgroupsInfo* pNew = malloc(size); if (pNew == NULL) { return NULL; } @@ -4424,15 +4418,15 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { pNew->numOfVgroups = vgroupList->numOfVgroups; for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupInfo* pNewVInfo = &pNew->vgroups[i]; + SVgroupMsg* pNewVInfo = &pNew->vgroups[i]; - SVgroupInfo* pvInfo = &vgroupList->vgroups[i]; + SVgroupMsg* pvInfo = &vgroupList->vgroups[i]; pNewVInfo->vgId = pvInfo->vgId; pNewVInfo->numOfEps = pvInfo->numOfEps; for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { - pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; + tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN); } } @@ -4444,8 +4438,9 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { return NULL; } +#if 0 for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { - SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i]; + SVgroupMsg* pVgroupInfo = &vgroupList->vgroups[i]; for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) { tfree(pVgroupInfo->epAddr[j].fqdn); @@ -4456,10 +4451,11 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { } } +#endif tfree(vgroupList); return NULL; } - +# if 0 void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) { dst->vgId = src->vgId; dst->numOfEps = src->numOfEps; @@ -4472,6 +4468,8 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) { } } +#endif + char* serializeTagData(STagData* pTagData, char* pMsg) { int32_t n = (int32_t) strlen(pTagData->name); *(int32_t*) pMsg = htonl(n); @@ -4612,11 +4610,12 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) { assert(pVgroupsInfo != NULL); - size_t size = sizeof(SVgroupInfo) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); + size_t size = sizeof(SVgroupMsg) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo); SVgroupsInfo* pInfo = calloc(1, size); pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups; for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) { - tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); + memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg)); +// tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); } return pInfo; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8f5269c158..bb93c52142 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -766,27 +766,16 @@ typedef struct SSTableVgroupMsg { int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; -typedef struct { - int32_t vgId; - int8_t numOfEps; - SEpAddr1 epAddr[TSDB_MAX_REPLICA]; -} SVgroupInfo; - typedef struct { int32_t vgId; int8_t numOfEps; SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; } SVgroupMsg; -typedef struct { - int32_t numOfVgroups; - SVgroupInfo vgroups[]; -} SVgroupsInfo; - typedef struct { int32_t numOfVgroups; SVgroupMsg vgroups[]; -} SVgroupsMsg; +} SVgroupsMsg, SVgroupsInfo; typedef struct STableMetaMsg { int32_t contLen; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 31db6492f6..19ca8e7ed8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -86,11 +86,18 @@ typedef struct SResultRow { char *key; // start key of current result row } SResultRow; +typedef struct SResultRowCell { + uint64_t groupId; + SResultRow *pRow; +} SResultRowCell; + typedef struct SGroupResInfo { int32_t totalGroup; int32_t currentGroup; int32_t index; SArray* pRows; // SArray + bool ordered; + int32_t position; } SGroupResInfo; /** @@ -284,8 +291,9 @@ typedef struct SQueryRuntimeEnv { SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer - SResultRowPool* pool; // window result object pool + SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. char** prevRow; SArray* prevResult; // intermediate result, SArray diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0202e53b3e..54a5423219 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -544,6 +544,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult // add a new result set for a new group taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); + SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult}; + taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell); } else { pResult = *p1; } @@ -2107,9 +2109,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); + pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell)); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize); pRuntimeEnv->tagVal = malloc(pQueryAttr->tagLen); @@ -2384,6 +2387,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); + taosArrayDestroy(pRuntimeEnv->pResultRowArrayList); pRuntimeEnv->prevResult = NULL; } @@ -4808,7 +4812,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; pQueryAttr->tsdb = tsdb; - if (tsdb != NULL) { int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); if (code != TSDB_CODE_SUCCESS) { @@ -6379,6 +6382,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { if (!pRuntimeEnv->pQueryAttr->stableQuery) { sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); } + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -7600,8 +7604,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); - pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); + pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType); + pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen); if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { pExprMsg->param[j].pz = pMsg; @@ -7648,8 +7652,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg += sizeof(SSqlExpr); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { - pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); - pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); + pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType); + pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen); if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { pExprMsg->param[j].pz = pMsg; @@ -8648,7 +8652,6 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* SArray* prevResult = NULL; if (prevResultLen > 0) { prevResult = interResFromBinary(param->prevResult, prevResultLen); - pRuntimeEnv->prevResult = prevResult; } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 4caf351799..bc27e094db 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -436,13 +436,13 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void * } STableQueryInfo** pList = supporter->pTableQueryInfo; - - SResultRowInfo *pWindowResInfo1 = &(pList[left]->resInfo); - SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); + SResultRow* pWindowRes1 = pList[left]->resInfo.pResult[leftPos]; +// SResultRow * pWindowRes1 = getResultRow(&(pList[left]->resInfo), leftPos); TSKEY leftTimestamp = pWindowRes1->win.skey; - SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo); - SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); +// SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo); +// SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); + SResultRow* pWindowRes2 = pList[right]->resInfo.pResult[rightPos]; TSKEY rightTimestamp = pWindowRes2->win.skey; if (leftTimestamp == rightTimestamp) { @@ -456,7 +456,77 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void * } } -static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, +int32_t tsAscOrder(const void* p1, const void* p2) { + SResultRowCell* pc1 = (SResultRowCell*) p1; + SResultRowCell* pc2 = (SResultRowCell*) p2; + + if (pc1->groupId == pc2->groupId) { + if (pc1->pRow->win.skey == pc2->pRow->win.skey) { + return 0; + } else { + return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1; + } + } else { + return (pc1->groupId < pc2->groupId)? -1:1; + } +} + +int32_t tsDescOrder(const void* p1, const void* p2) { + SResultRowCell* pc1 = (SResultRowCell*) p1; + SResultRowCell* pc2 = (SResultRowCell*) p2; + + if (pc1->groupId == pc2->groupId) { + if (pc1->pRow->win.skey == pc2->pRow->win.skey) { + return 0; + } else { + return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1; + } + } else { + return (pc1->groupId < pc2->groupId)? -1:1; + } +} + +void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { + __compar_fn_t fn = NULL; + if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { + fn = tsAscOrder; + } else { + fn = tsDescOrder; + } + + taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); +} + +static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { + if (!pGroupResInfo->ordered) { + orderTheResultRows(pRuntimeEnv); + pGroupResInfo->ordered = true; + } + + if (pGroupResInfo->pRows == NULL) { + pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); + } + + size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList); + for(; pGroupResInfo->position < len; ++pGroupResInfo->position) { + SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position); + if (pResultRowCell->groupId != groupId) { + break; + } + + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pResultRowCell->pRow, rowCellInfoOffset); + if (num <= 0) { + continue; + } + + taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow); + pResultRowCell->pRow->numOfRows = (uint32_t) num; + } + + return TSDB_CODE_SUCCESS; +} + +static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, int32_t* rowCellInfoOffset) { bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); @@ -562,12 +632,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu int64_t st = taosGetTimestampUs(); while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - - int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } + mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset); // this group generates at least one result, return results if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { @@ -583,7 +648,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); -// pQInfo->summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; } diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 51801c843c..8ce5e7ade8 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k } static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) { - STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose STSchema* pSchema = NULL; STSchema* pTSchema = NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c1b935e0ee..14c5a04ece 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; - info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; - assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); @@ -2218,7 +2216,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO SBlock* pBlock = pTableCheck->pCompInfo->blocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; - char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); + char* buf = malloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); if (buf == NULL) { cleanBlockOrderSupporter(&sup, numOfQualTables); return TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -3618,8 +3616,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC for(int32_t i = 0; i < size; ++i) { STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); - assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE); - tsdbRefTable(pKeyInfo->pTable); STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey}; diff --git a/src/util/inc/tlosertree.h b/src/util/inc/tlosertree.h index 4c731625dd..58f2ca8c5c 100644 --- a/src/util/inc/tlosertree.h +++ b/src/util/inc/tlosertree.h @@ -26,7 +26,7 @@ typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param); typedef struct SLoserTreeNode { int32_t index; - void * pData; + void *pData; } SLoserTreeNode; typedef struct SLoserTreeInfo { @@ -34,8 +34,7 @@ typedef struct SLoserTreeInfo { int32_t totalEntries; __merge_compare_fn_t comparFn; void * param; - - SLoserTreeNode *pNode; + SLoserTreeNode *pNode; } SLoserTreeInfo; uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); diff --git a/src/util/src/hash.c b/src/util/src/hash.c index a22ce34a0e..6577a0a0f4 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -741,17 +741,19 @@ void taosHashTableResize(SHashObj *pHashObj) { } SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { - SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize); + SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize); if (pNewNode == NULL) { uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - pNewNode->keyLen = (uint32_t)keyLen; + pNewNode->keyLen = (uint32_t)keyLen; pNewNode->hashVal = hashVal; pNewNode->dataLen = (uint32_t) dsize; - pNewNode->count = 1; + pNewNode->count = 1; + pNewNode->removed = 0; + pNewNode->next = NULL; memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 2d6c513cb5..007ce06829 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -24,11 +24,12 @@ void* taosArrayInit(size_t size, size_t elemSize) { size = TARRAY_MIN_SIZE; } - SArray* pArray = calloc(1, sizeof(SArray)); + SArray* pArray = malloc(sizeof(SArray)); if (pArray == NULL) { return NULL; } + pArray->size = 0; pArray->pData = calloc(size, elemSize); if (pArray->pData == NULL) { free(pArray); diff --git a/src/util/src/tlosertree.c b/src/util/src/tlosertree.c index e793548407..0f104c4b63 100644 --- a/src/util/src/tlosertree.c +++ b/src/util/src/tlosertree.c @@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { SLoserTreeNode kLeaf = pTree->pNode[idx]; while (parentId > 0) { - if (pTree->pNode[parentId].index == -1) { + SLoserTreeNode* pCur = &pTree->pNode[parentId]; + if (pCur->index == -1) { pTree->pNode[parentId] = kLeaf; return; } - int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param); + int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param); if (ret < 0) { SLoserTreeNode t = pTree->pNode[parentId]; pTree->pNode[parentId] = kLeaf;