diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 83ec28898c..9a627d5cd6 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -151,7 +151,8 @@ typedef struct STableDataBlocks { typedef struct { STableMeta *pTableMeta; - SVgroupsInfo *pVgroupInfo; + SArray *vgroupIdList; +// SVgroupsInfo *pVgroupsInfo; } STableMetaVgroupInfo; typedef struct SInsertStatementParam { @@ -415,7 +416,8 @@ int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows); extern int32_t sentinel; extern SHashObj *tscVgroupMap; -extern SHashObj *tscTableMetaInfo; +extern SHashObj *tscTableMetaMap; +extern SCacheObj *tscVgroupListBuf; extern int tscObjRef; extern void *tscTmr; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index d1a325be35..641f62f22b 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -920,7 +920,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { } else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) { pRes->code = tscProcessShowCreateDatabase(pSql); } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosHashClear(tscTableMetaInfo); + taosHashClear(tscTableMetaMap); pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { pRes->code = tscProcessServerVer(pSql); diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index cccc81274d..0386850f63 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -444,7 +444,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) { uint32_t size = tscGetTableMetaMaxSize(); STableMeta* tableMeta = calloc(1, size); - taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1); + taosHashGetClone(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, tableMeta); tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); schema->precision = tableMeta->tableInfo.precision; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c0627f4c31..36f252a9ba 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -80,8 +80,8 @@ static void getColumnName(tSqlExprItem* pItem, char* resultFieldName, char* rawN static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult, SUdfInfo* pUdfInfo); -static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, - int8_t type, char* fieldName, SExprInfo* pSqlExpr); +static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pColList, int16_t bytes, + int8_t type, char* fieldName, SExprInfo* pSqlExpr); static uint8_t convertRelationalOperator(SStrToken *pToken); @@ -8113,6 +8113,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } } + pTableMeta = calloc(1, maxSize); plist = taosArrayInit(4, POINTER_BYTES); @@ -8128,9 +8129,16 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { size_t len = strlen(name); memset(pTableMeta, 0, maxSize); - taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMeta, -1); + taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMeta); if (pTableMeta->id.uid > 0) { + tscDebug("0x%"PRIx64" retrieve table meta %s from local buf", pSql->self, name); + + // avoid mem leak, may should update pTableMeta + const char* pTableName = tNameGetTableName(pname); + size_t nameLen = strlen(pTableName); + + void* pVgroupIdList = NULL; if (pTableMeta->tableType == TSDB_CHILD_TABLE) { code = tscCreateTableMetaFromSTableMeta(pTableMeta, name, pSql->pBuf); @@ -8142,23 +8150,33 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { } } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { // the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time - char* t = strdup(name); - taosArrayPush(pVgroupList, &t); + void* pv = taosCacheAcquireByKey(tscVgroupListBuf, pTableName, nameLen); + if (pv == NULL) { + char* t = strdup(name); + taosArrayPush(pVgroupList, &t); + tscDebug("0x%"PRIx64" failed to retrieve stable %s vgroup id list in cache, try fetch from mnode", pSql->self, pTableName); + } else { + tFilePage* pdata = (tFilePage*) pv; + pVgroupIdList = taosArrayInit(pdata->num, sizeof(int32_t)); + if (pVgroupIdList == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + taosArrayAddBatch(pVgroupIdList, pdata->data, pdata->num); + taosCacheRelease(tscVgroupListBuf, &pv, false); + } } - //STableMeta* pMeta = tscTableMetaDup(pTableMeta); - //STableMetaVgroupInfo p = { .pTableMeta = pMeta }; - - //const char* px = tNameGetTableName(pname); - //taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); - // avoid mem leak, may should update pTableMeta - const char* px = tNameGetTableName(pname); - if (taosHashGet(pCmd->pTableMetaMap, px, strlen(px)) == NULL) { + if (taosHashGet(pCmd->pTableMetaMap, pTableName, nameLen) == NULL) { STableMeta* pMeta = tscTableMetaDup(pTableMeta); - STableMetaVgroupInfo p = { .pTableMeta = pMeta, .pVgroupInfo = NULL}; - taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); + STableMetaVgroupInfo tvi = { .pTableMeta = pMeta, .vgroupIdList = pVgroupIdList}; + taosHashPut(pCmd->pTableMetaMap, pTableName, nameLen, &tvi, sizeof(STableMetaVgroupInfo)); } - } else { // add to the retrieve table meta array list. + } else { + // Add to the retrieve table meta array list. + // If the tableMeta is missing, the cached vgroup list for the corresponding super table will be ignored. + tscDebug("0x%"PRIx64" failed to retrieve table meta %s from local buf", pSql->self, name); + char* t = strdup(name); taosArrayPush(plist, &t); } @@ -8278,16 +8296,37 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod pTableMetaInfo->pTableMeta = tscTableMetaDup(p->pTableMeta); assert(pTableMetaInfo->pTableMeta != NULL); - if (p->pVgroupInfo != NULL) { - pTableMetaInfo->vgroupList = tscVgroupsInfoDup(p->pVgroupInfo); - } + if (p->vgroupIdList != NULL) { + size_t s = taosArrayGetSize(p->vgroupIdList); - if (code != TSDB_CODE_SUCCESS) { - return code; + size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo); + pTableMetaInfo->vgroupList = calloc(1, vgroupsz); + if (pTableMetaInfo->vgroupList == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pTableMetaInfo->vgroupList->numOfVgroups = (int32_t) s; + for(int32_t j = 0; j < s; ++j) { + int32_t* id = taosArrayGet(p->vgroupIdList, j); + + // check if current buffer contains the vgroup info. If not, add it + SNewVgroupInfo existVgroupInfo = {.inUse = -1,}; + taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo); + + assert(existVgroupInfo.inUse >= 0); + SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j]; + + pVgroup->numOfEps = existVgroupInfo.numOfEps; + pVgroup->vgId = existVgroupInfo.vgId; + for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { + pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port; + pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN); + } + } } } - return TSDB_CODE_SUCCESS; + return code; } static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e975dd7b06..63bb9ee214 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -157,7 +157,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) { assert(vgId > 0); SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); @@ -344,6 +344,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { rpcFreeCont(rpcMsg->pCont); return; } + assert(pSql->self == handle); STscObj *pObj = pSql->pTscObj; @@ -614,7 +615,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); @@ -687,7 +688,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab vgId = pTableMeta->vgId; SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); } @@ -1582,7 +1583,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; SNewVgroupInfo vgroupInfo = {.vgId = -1}; - taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); assert(vgroupInfo.vgId > 0); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); @@ -1809,34 +1810,6 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { -#if 0 - SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload; - - int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pInfoMsg->tableFname); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0); - - char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg); - - if (pCmd->autoCreated && pCmd->tagData.dataLen != 0) { - pMsg = serializeTagData(&pCmd->tagData, pMsg); - } - - pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg); - pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; -#endif - - return TSDB_CODE_SUCCESS; -} - /** * multi table meta req pkg format: * |SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ...... @@ -1996,20 +1969,17 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { } // update the vgroupInfo if needed -static void doUpdateVgroupInfo(STableMeta *pTableMeta, SVgroupMsg *pVgroupMsg) { - if (pTableMeta->vgId > 0) { - int32_t vgId = pTableMeta->vgId; - assert(pTableMeta->tableType != TSDB_SUPER_TABLE); +static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) { + assert(vgId > 0); - SNewVgroupInfo vgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + SNewVgroupInfo vgroupInfo = {.inUse = -1}; + taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo); - // vgroup info exists, compare with it - if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { - vgroupInfo = createNewVgroupInfo(pVgroupMsg); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); - } + // vgroup info exists, compare with it + if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { + vgroupInfo = createNewVgroupInfo(pVgroupMsg); + taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); + tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); } } @@ -2022,18 +1992,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet if (updateSTable) { STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); uint32_t size = tscGetTableMetaSize(pSupTableMeta); - int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size); + int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size); assert(code == TSDB_CODE_SUCCESS); tfree(pSupTableMeta); } CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); - taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); + taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); tfree(cMeta); } else { uint32_t s = tscGetTableMetaSize(pTableMeta); - taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); + taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); } } @@ -2058,7 +2028,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0); doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true); - doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); + if (pTableMeta->tableType != TSDB_SUPER_TABLE) { + doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup); + } tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns, @@ -2068,6 +2040,37 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } +static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) { + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; + + pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); + *size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg)); + + SArray* vgroupIdList = taosArrayInit(pVgroupMsg->numOfVgroups, sizeof(int32_t)); + + if (pVgroupMsg->numOfVgroups <= 0) { + tscDebug("0x%" PRIx64 " empty vgroup id list, no corresponding tables for stable:%s", id, name); + } else { + // just init, no need to lock + for (int32_t j = 0; j < pVgroupMsg->numOfVgroups; ++j) { + SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; + vmsg->vgId = htonl(vmsg->vgId); + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); + } + + taosArrayPush(vgroupIdList, &vmsg->vgId); + + if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) { + taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0); + doUpdateVgroupInfo(vmsg->vgId, vmsg); + } + } + } + + return vgroupIdList; +} + static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) { SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); @@ -2092,24 +2095,14 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); } - SNewVgroupInfo newVi = createNewVgroupInfo(vmsg); - pVgroup->numOfEps = newVi.numOfEps; - pVgroup->vgId = newVi.vgId; + pVgroup->numOfEps = vmsg->numOfEps; + pVgroup->vgId = vmsg->vgId; for (int32_t k = 0; k < vmsg->numOfEps; ++k) { - pVgroup->epAddr[k].port = newVi.ep[k].port; - pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN); + pVgroup->epAddr[k].port = vmsg->epAddr[k].port; + pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); } - // check if current buffer contains the vgroup info. - // If not, add it - SNewVgroupInfo existVgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo)); - - if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) || - (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it - taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi)); - tscDebug("0x%" PRIx64 " add new VgroupInfo, vgId:%d, total cached:%d", id, newVi.vgId, (int32_t)taosHashGetSize(tscVgroupMap)); - } + doUpdateVgroupInfo(pVgroup->vgId, vmsg); } } @@ -2187,6 +2180,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { char* buf = NULL; char* pMsg = pMultiMeta->meta; + + // decompresss the message payload if (pMultiMeta->compressed) { buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta)); int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1, @@ -2245,7 +2240,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { // for each vgroup, only update the information once. int64_t vgId = pMetaMsg->vgroup.vgId; if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { - doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); + doUpdateVgroupInfo(vgId, &pMetaMsg->vgroup); taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); } @@ -2263,11 +2258,26 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { assert(p != NULL); int32_t size = 0; - if (p->pVgroupInfo!= NULL) { - tscVgroupInfoClear(p->pVgroupInfo); - //tfree(p->pTableMeta); + if (p->vgroupIdList!= NULL) { + taosArrayDestroy(p->vgroupIdList); } - p->pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self); + + char tableName[TSDB_TABLE_FNAME_LEN] = {0}; + tstrncpy(tableName, name, TSDB_TABLE_NAME_LEN); + p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, tableName, &size, pSql->self); + + int32_t numOfVgId = taosArrayGetSize(p->vgroupIdList); + int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t); + + tFilePage* idList = calloc(1, s); + idList->num = numOfVgId; + memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t)); + + void* idListInst = taosCachePut(tscVgroupListBuf, tableName, strlen(tableName), idList, s, 5000); + taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false); + + tfree(idList); + pMsg += size; } @@ -2503,7 +2513,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { //TODO LOCK DB WHEN MODIFY IT //pSql->pTscObj->db[0] = 0; - taosHashClear(tscTableMetaInfo); + taosHashClear(tscTableMetaMap); return 0; } @@ -2514,8 +2524,8 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); - tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaInfo)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaMap)); tfree(pTableMetaInfo->pTableMeta); return 0; @@ -2530,11 +2540,11 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); tfree(pTableMetaInfo->pTableMeta); if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta - taosHashClear(tscTableMetaInfo); + taosHashClear(tscTableMetaMap); } return 0; @@ -2801,7 +2811,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool tNameExtractFullName(&pTableMetaInfo->name, name); size_t len = strlen(name); - taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMetaInfo->pTableMeta, -1); + taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMetaInfo->pTableMeta); // TODO resize the tableMeta assert(size < 80 * TSDB_MAX_COLUMNS); @@ -2914,7 +2924,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { // remove stored tableMeta info in hash table size_t len = strlen(name); - taosHashRemove(tscTableMetaInfo, name, len); + taosHashRemove(tscTableMetaMap, name, len); return getTableMetaFromMnode(pSql, pTableMetaInfo, false); } @@ -2966,8 +2976,6 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self); pSql->svgroupRid = pNew->self; - - tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables); pNew->fp = tscTableMetaCallBack; @@ -3010,7 +3018,6 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; -// tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg; tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index da5bdf669f..502ef22d4b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -206,7 +206,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); tfree(pTableMetaInfo->pTableMeta); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b72bd78b1b..b98ffd7638 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3125,7 +3125,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } pParentObj->res.code = TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7b8f24a093..c04765b065 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -19,15 +19,12 @@ #include "trpc.h" #include "tnote.h" #include "ttimer.h" -#include "tutil.h" #include "tsched.h" #include "tscLog.h" -#include "tscUtil.h" #include "tsclient.h" #include "tglobal.h" #include "tconfig.h" #include "ttimezone.h" -#include "tlocale.h" #include "qScript.h" // global, not configurable @@ -36,8 +33,10 @@ int32_t sentinel = TSC_VAR_NOT_RELEASE; -SHashObj *tscVgroupMap; // hash map to keep the global vgroup info -SHashObj *tscTableMetaInfo; // table meta info +SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode +SHashObj *tscTableMetaMap; // table meta info buffer +SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list + int32_t tscObjRef = -1; void *tscTmr; void *tscQhandle; @@ -45,17 +44,21 @@ int32_t tscRefId = -1; int32_t tscNumOfObj = 0; // number of sqlObj in current process. static void *tscCheckDiskUsageTmr; void *tscRpcCache; // cache to keep rpc obj -int32_t tscNumOfThreads = 1; // num of rpc threads -char tscLogFileName[12] = "taoslog"; -int tscLogFileNum = 10; -static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently -static pthread_once_t tscinit = PTHREAD_ONCE_INIT; +int32_t tscNumOfThreads = 1; // num of rpc threads +char tscLogFileName[12] = "taoslog"; +int tscLogFileNum = 10; + +static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently +static pthread_once_t tscinit = PTHREAD_ONCE_INIT; + +// pthread_once can not return result code, so result code is set to a global variable. static volatile int tscInitRes = 0; void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { taosGetDisk(); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } + void tscFreeRpcObj(void *param) { assert(param); SRpcObj *pRpcObj = (SRpcObj *)(param); @@ -67,10 +70,9 @@ void tscReleaseRpc(void *param) { if (param == NULL) { return; } - pthread_mutex_lock(&rpcObjMutex); - taosCacheRelease(tscRpcCache, (void *)¶m, false); - pthread_mutex_unlock(&rpcObjMutex); -} + + taosCacheRelease(tscRpcCache, (void *)¶m, false); +} int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { pthread_mutex_lock(&rpcObjMutex); @@ -80,7 +82,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry *ppRpcObj = pRpcObj; pthread_mutex_unlock(&rpcObjMutex); return 0; - } + } SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -104,7 +106,8 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry pthread_mutex_unlock(&rpcObjMutex); tscError("failed to init connection to TDengine"); return -1; - } + } + pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5); if (pRpcObj == NULL) { rpcClose(rpcObj.pDnodeConn); @@ -118,7 +121,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry } void taos_init_imp(void) { - char temp[128] = {0}; + char temp[128] = {0}; errno = TSDB_CODE_SUCCESS; srand(taosGetTimestampSec()); @@ -151,36 +154,41 @@ void taos_init_imp(void) { rpcInit(); scriptEnvPoolInit(); + tscDebug("starting to initialize TAOS client ..."); tscDebug("Local End Point is:%s", tsLocalEp); } taosSetCoreDump(); tscInitMsgsFp(); - int queueSize = tsMaxConnections*2; double factor = (tscEmbedded == 0)? 2.0:4.0; tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); if (tscNumOfThreads < 2) { tscNumOfThreads = 2; } + + int32_t queueSize = tsMaxConnections*2; tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc"); if (NULL == tscQhandle) { - tscError("failed to init scheduler"); + tscError("failed to init task queue"); tscInitRes = -1; return; } + tscDebug("client task queue is initialized, numOfWorkers: %d", tscNumOfThreads); + tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC"); if(0 == tscEmbedded){ taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } - if (tscTableMetaInfo == NULL) { - tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); - tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - tscDebug("TableMeta:%p", tscTableMetaInfo); + if (tscTableMetaMap == NULL) { + tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); + tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list"); + tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap); } int refreshTime = 5; @@ -189,14 +197,17 @@ void taos_init_imp(void) { tscRefId = taosOpenRef(200, tscCloseTscObj); - // in other language APIs, taos_cleanup is not available yet. - // So, to make sure taos_cleanup will be invoked to clean up the allocated - // resource to suppress the valgrind warning. + // In the APIs of other program language, taos_cleanup is not available yet. + // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); + tscDebug("client is initialized successfully"); } -int taos_init() { pthread_once(&tscinit, taos_init_imp); return tscInitRes;} +int taos_init() { + pthread_once(&tscinit, taos_init_imp); + return tscInitRes; +} // this function may be called by user or system, or by both simultaneously. void taos_cleanup(void) { @@ -205,11 +216,13 @@ void taos_cleanup(void) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { return; } + if (tscEmbedded == 0) { scriptEnvPoolCleanup(); } - taosHashCleanup(tscTableMetaInfo); - tscTableMetaInfo = NULL; + + taosHashCleanup(tscTableMetaMap); + tscTableMetaMap = NULL; taosHashCleanup(tscVgroupMap); tscVgroupMap = NULL; @@ -236,6 +249,9 @@ void taos_cleanup(void) { pthread_mutex_destroy(&rpcObjMutex); } + taosCacheCleanup(tscVgroupListBuf); + tscVgroupListBuf = NULL; + if (tscEmbedded == 0) { rpcCleanup(); taosCloseLog(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0d69fe173f..4454844ea0 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1388,7 +1388,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { if (pCmd->pTableMetaMap != NULL) { STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); while (p) { - tscVgroupInfoClear(p->pVgroupInfo); + taosArrayDestroy(p->vgroupIdList); tfree(p->pTableMeta); p = taosHashIterate(pCmd->pTableMetaMap, p); } @@ -1522,7 +1522,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pDataBlock->tableName, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } if (!pDataBlock->cloned) { @@ -3365,7 +3365,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { if (removeMeta) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); } tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); @@ -4360,7 +4360,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v assert(pChild != NULL && buf != NULL); STableMeta* p = buf; - taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1); + taosHashGetClone(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p); // tableMeta exists, build child table meta according to the super table meta // the uid need to be checked in addition to the general name of the super table. @@ -4374,7 +4374,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v memcpy(pChild->schema, p->schema, sizeof(SSchema) *total); return TSDB_CODE_SUCCESS; } else { // super table has been removed, current tableMeta is also expired. remove it here - taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); return -1; } } diff --git a/src/query/src/qScript.c b/src/query/src/qScript.c index 261164a84c..74ddf5f548 100644 --- a/src/query/src/qScript.c +++ b/src/query/src/qScript.c @@ -342,6 +342,7 @@ int32_t scriptEnvPoolInit() { env->lua_state = createLuaEnv(); tdListAppend(pool->scriptEnvs, (void *)(&env)); } + pool->mSize = size; pool->cSize = size; return 0; diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index 616b844c13..a53aa602c1 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -123,10 +123,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); * @param keyLen * @param fp * @param d - * @param dsize * @return */ -void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d); /** * remove item with the specified key diff --git a/src/util/src/hash.c b/src/util/src/hash.c index d7bee9b67c..2e18f36a17 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -294,10 +294,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0); + return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL); } -void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) { if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { return NULL; } diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 8b17d3a5f2..e14b5a385e 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -93,7 +93,7 @@ static void vnodeIncRef(void *ptNode) { void *vnodeAcquire(int32_t vgId) { SVnodeObj *pVnode = NULL; if (tsVnodesHash != NULL) { - taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *)); + taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode); } if (pVnode == NULL) {