[td-5407]<enhance>: add a local buffer for super table vgroup id list to improve the query performance.
This commit is contained in:
parent
e422c53df7
commit
1073ac5160
|
@ -151,7 +151,8 @@ typedef struct STableDataBlocks {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STableMeta *pTableMeta;
|
STableMeta *pTableMeta;
|
||||||
SVgroupsInfo *pVgroupInfo;
|
SArray *vgroupIdList;
|
||||||
|
// SVgroupsInfo *pVgroupsInfo;
|
||||||
} STableMetaVgroupInfo;
|
} STableMetaVgroupInfo;
|
||||||
|
|
||||||
typedef struct SInsertStatementParam {
|
typedef struct SInsertStatementParam {
|
||||||
|
@ -415,7 +416,8 @@ int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
|
||||||
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
|
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
|
||||||
extern int32_t sentinel;
|
extern int32_t sentinel;
|
||||||
extern SHashObj *tscVgroupMap;
|
extern SHashObj *tscVgroupMap;
|
||||||
extern SHashObj *tscTableMetaInfo;
|
extern SHashObj *tscTableMetaMap;
|
||||||
|
extern SCacheObj *tscVgroupListBuf;
|
||||||
|
|
||||||
extern int tscObjRef;
|
extern int tscObjRef;
|
||||||
extern void *tscTmr;
|
extern void *tscTmr;
|
||||||
|
|
|
@ -920,7 +920,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
|
||||||
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
|
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
|
||||||
pRes->code = tscProcessShowCreateDatabase(pSql);
|
pRes->code = tscProcessShowCreateDatabase(pSql);
|
||||||
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
|
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
|
||||||
taosHashClear(tscTableMetaInfo);
|
taosHashClear(tscTableMetaMap);
|
||||||
pRes->code = TSDB_CODE_SUCCESS;
|
pRes->code = TSDB_CODE_SUCCESS;
|
||||||
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
|
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
|
||||||
pRes->code = tscProcessServerVer(pSql);
|
pRes->code = tscProcessServerVer(pSql);
|
||||||
|
|
|
@ -444,7 +444,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
|
||||||
|
|
||||||
uint32_t size = tscGetTableMetaMaxSize();
|
uint32_t size = tscGetTableMetaMaxSize();
|
||||||
STableMeta* tableMeta = calloc(1, size);
|
STableMeta* tableMeta = calloc(1, size);
|
||||||
taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1);
|
taosHashGetClone(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, tableMeta);
|
||||||
|
|
||||||
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
|
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
|
||||||
schema->precision = tableMeta->tableInfo.precision;
|
schema->precision = tableMeta->tableInfo.precision;
|
||||||
|
|
|
@ -80,8 +80,8 @@ static void getColumnName(tSqlExprItem* pItem, char* resultFieldName, char* rawN
|
||||||
|
|
||||||
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem,
|
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem,
|
||||||
bool finalResult, SUdfInfo* pUdfInfo);
|
bool finalResult, SUdfInfo* pUdfInfo);
|
||||||
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
|
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pColList, int16_t bytes,
|
||||||
int8_t type, char* fieldName, SExprInfo* pSqlExpr);
|
int8_t type, char* fieldName, SExprInfo* pSqlExpr);
|
||||||
|
|
||||||
static uint8_t convertRelationalOperator(SStrToken *pToken);
|
static uint8_t convertRelationalOperator(SStrToken *pToken);
|
||||||
|
|
||||||
|
@ -8113,6 +8113,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableMeta = calloc(1, maxSize);
|
pTableMeta = calloc(1, maxSize);
|
||||||
|
|
||||||
plist = taosArrayInit(4, POINTER_BYTES);
|
plist = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -8128,9 +8129,16 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
size_t len = strlen(name);
|
size_t len = strlen(name);
|
||||||
memset(pTableMeta, 0, maxSize);
|
memset(pTableMeta, 0, maxSize);
|
||||||
taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMeta, -1);
|
taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMeta);
|
||||||
|
|
||||||
if (pTableMeta->id.uid > 0) {
|
if (pTableMeta->id.uid > 0) {
|
||||||
|
tscDebug("0x%"PRIx64" retrieve table meta %s from local buf", pSql->self, name);
|
||||||
|
|
||||||
|
// avoid mem leak, may should update pTableMeta
|
||||||
|
const char* pTableName = tNameGetTableName(pname);
|
||||||
|
size_t nameLen = strlen(pTableName);
|
||||||
|
|
||||||
|
void* pVgroupIdList = NULL;
|
||||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
||||||
code = tscCreateTableMetaFromSTableMeta(pTableMeta, name, pSql->pBuf);
|
code = tscCreateTableMetaFromSTableMeta(pTableMeta, name, pSql->pBuf);
|
||||||
|
|
||||||
|
@ -8142,23 +8150,33 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
}
|
}
|
||||||
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
// the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time
|
// the vgroup list of super table is not kept in local buffer, so here need retrieve it from the mnode each time
|
||||||
char* t = strdup(name);
|
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, pTableName, nameLen);
|
||||||
taosArrayPush(pVgroupList, &t);
|
if (pv == NULL) {
|
||||||
|
char* t = strdup(name);
|
||||||
|
taosArrayPush(pVgroupList, &t);
|
||||||
|
tscDebug("0x%"PRIx64" failed to retrieve stable %s vgroup id list in cache, try fetch from mnode", pSql->self, pTableName);
|
||||||
|
} else {
|
||||||
|
tFilePage* pdata = (tFilePage*) pv;
|
||||||
|
pVgroupIdList = taosArrayInit(pdata->num, sizeof(int32_t));
|
||||||
|
if (pVgroupIdList == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayAddBatch(pVgroupIdList, pdata->data, pdata->num);
|
||||||
|
taosCacheRelease(tscVgroupListBuf, &pv, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
if (taosHashGet(pCmd->pTableMetaMap, pTableName, nameLen) == NULL) {
|
||||||
//STableMetaVgroupInfo p = { .pTableMeta = pMeta };
|
|
||||||
|
|
||||||
//const char* px = tNameGetTableName(pname);
|
|
||||||
//taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
|
|
||||||
// avoid mem leak, may should update pTableMeta
|
|
||||||
const char* px = tNameGetTableName(pname);
|
|
||||||
if (taosHashGet(pCmd->pTableMetaMap, px, strlen(px)) == NULL) {
|
|
||||||
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
||||||
STableMetaVgroupInfo p = { .pTableMeta = pMeta, .pVgroupInfo = NULL};
|
STableMetaVgroupInfo tvi = { .pTableMeta = pMeta, .vgroupIdList = pVgroupIdList};
|
||||||
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
|
taosHashPut(pCmd->pTableMetaMap, pTableName, nameLen, &tvi, sizeof(STableMetaVgroupInfo));
|
||||||
}
|
}
|
||||||
} else { // add to the retrieve table meta array list.
|
} else {
|
||||||
|
// Add to the retrieve table meta array list.
|
||||||
|
// If the tableMeta is missing, the cached vgroup list for the corresponding super table will be ignored.
|
||||||
|
tscDebug("0x%"PRIx64" failed to retrieve table meta %s from local buf", pSql->self, name);
|
||||||
|
|
||||||
char* t = strdup(name);
|
char* t = strdup(name);
|
||||||
taosArrayPush(plist, &t);
|
taosArrayPush(plist, &t);
|
||||||
}
|
}
|
||||||
|
@ -8278,16 +8296,37 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
|
||||||
pTableMetaInfo->pTableMeta = tscTableMetaDup(p->pTableMeta);
|
pTableMetaInfo->pTableMeta = tscTableMetaDup(p->pTableMeta);
|
||||||
assert(pTableMetaInfo->pTableMeta != NULL);
|
assert(pTableMetaInfo->pTableMeta != NULL);
|
||||||
|
|
||||||
if (p->pVgroupInfo != NULL) {
|
if (p->vgroupIdList != NULL) {
|
||||||
pTableMetaInfo->vgroupList = tscVgroupsInfoDup(p->pVgroupInfo);
|
size_t s = taosArrayGetSize(p->vgroupIdList);
|
||||||
}
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo);
|
||||||
return code;
|
pTableMetaInfo->vgroupList = calloc(1, vgroupsz);
|
||||||
|
if (pTableMetaInfo->vgroupList == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableMetaInfo->vgroupList->numOfVgroups = (int32_t) s;
|
||||||
|
for(int32_t j = 0; j < s; ++j) {
|
||||||
|
int32_t* id = taosArrayGet(p->vgroupIdList, j);
|
||||||
|
|
||||||
|
// check if current buffer contains the vgroup info. If not, add it
|
||||||
|
SNewVgroupInfo existVgroupInfo = {.inUse = -1,};
|
||||||
|
taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo);
|
||||||
|
|
||||||
|
assert(existVgroupInfo.inUse >= 0);
|
||||||
|
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j];
|
||||||
|
|
||||||
|
pVgroup->numOfEps = existVgroupInfo.numOfEps;
|
||||||
|
pVgroup->vgId = existVgroupInfo.vgId;
|
||||||
|
for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) {
|
||||||
|
pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port;
|
||||||
|
pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
|
static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
|
||||||
|
|
|
@ -157,7 +157,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
|
||||||
assert(vgId > 0);
|
assert(vgId > 0);
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||||
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo);
|
||||||
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
||||||
|
|
||||||
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||||
|
@ -344,6 +344,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
rpcFreeCont(rpcMsg->pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pSql->self == handle);
|
assert(pSql->self == handle);
|
||||||
|
|
||||||
STscObj *pObj = pSql->pTscObj;
|
STscObj *pObj = pSql->pTscObj;
|
||||||
|
@ -614,7 +615,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {0};
|
SNewVgroupInfo vgroupInfo = {0};
|
||||||
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
|
||||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
|
tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
|
||||||
|
@ -687,7 +688,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab
|
||||||
vgId = pTableMeta->vgId;
|
vgId = pTableMeta->vgId;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {0};
|
SNewVgroupInfo vgroupInfo = {0};
|
||||||
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
|
||||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1582,7 +1583,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
||||||
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||||
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
|
||||||
assert(vgroupInfo.vgId > 0);
|
assert(vgroupInfo.vgId > 0);
|
||||||
|
|
||||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
|
@ -1809,34 +1810,6 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|
||||||
#if 0
|
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
|
||||||
|
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
||||||
STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload;
|
|
||||||
|
|
||||||
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pInfoMsg->tableFname);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
|
|
||||||
|
|
||||||
char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg);
|
|
||||||
|
|
||||||
if (pCmd->autoCreated && pCmd->tagData.dataLen != 0) {
|
|
||||||
pMsg = serializeTagData(&pCmd->tagData, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
|
|
||||||
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* multi table meta req pkg format:
|
* multi table meta req pkg format:
|
||||||
* |SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
|
* |SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
|
||||||
|
@ -1996,20 +1969,17 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the vgroupInfo if needed
|
// update the vgroupInfo if needed
|
||||||
static void doUpdateVgroupInfo(STableMeta *pTableMeta, SVgroupMsg *pVgroupMsg) {
|
static void doUpdateVgroupInfo(int32_t vgId, SVgroupMsg *pVgroupMsg) {
|
||||||
if (pTableMeta->vgId > 0) {
|
assert(vgId > 0);
|
||||||
int32_t vgId = pTableMeta->vgId;
|
|
||||||
assert(pTableMeta->tableType != TSDB_SUPER_TABLE);
|
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
||||||
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo);
|
||||||
|
|
||||||
// vgroup info exists, compare with it
|
// vgroup info exists, compare with it
|
||||||
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) {
|
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) {
|
||||||
vgroupInfo = createNewVgroupInfo(pVgroupMsg);
|
vgroupInfo = createNewVgroupInfo(pVgroupMsg);
|
||||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||||
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
tscDebug("add/update new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2022,18 +1992,18 @@ static void doAddTableMetaToLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMet
|
||||||
if (updateSTable) {
|
if (updateSTable) {
|
||||||
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
|
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
|
||||||
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
|
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
|
||||||
int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size);
|
int32_t code = taosHashPut(tscTableMetaMap, pTableMeta->sTableName, len, pSupTableMeta, size);
|
||||||
assert(code == TSDB_CODE_SUCCESS);
|
assert(code == TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
tfree(pSupTableMeta);
|
tfree(pSupTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
|
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
|
||||||
taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
|
taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
|
||||||
tfree(cMeta);
|
tfree(cMeta);
|
||||||
} else {
|
} else {
|
||||||
uint32_t s = tscGetTableMetaSize(pTableMeta);
|
uint32_t s = tscGetTableMetaSize(pTableMeta);
|
||||||
taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s);
|
taosHashPut(tscTableMetaMap, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2058,7 +2028,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0);
|
assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0);
|
||||||
|
|
||||||
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true);
|
doAddTableMetaToLocalBuf(pTableMeta, pMetaMsg, true);
|
||||||
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
if (pTableMeta->tableType != TSDB_SUPER_TABLE) {
|
||||||
|
doUpdateVgroupInfo(pTableMeta->vgId, &pMetaMsg->vgroup);
|
||||||
|
}
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
|
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
|
||||||
pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns,
|
pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns,
|
||||||
|
@ -2068,6 +2040,37 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SArray* createVgroupIdListFromMsg(char* pMsg, SHashObj* pSet, char* name, int32_t* size, uint64_t id) {
|
||||||
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
|
||||||
|
|
||||||
|
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
|
||||||
|
*size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg));
|
||||||
|
|
||||||
|
SArray* vgroupIdList = taosArrayInit(pVgroupMsg->numOfVgroups, sizeof(int32_t));
|
||||||
|
|
||||||
|
if (pVgroupMsg->numOfVgroups <= 0) {
|
||||||
|
tscDebug("0x%" PRIx64 " empty vgroup id list, no corresponding tables for stable:%s", id, name);
|
||||||
|
} else {
|
||||||
|
// just init, no need to lock
|
||||||
|
for (int32_t j = 0; j < pVgroupMsg->numOfVgroups; ++j) {
|
||||||
|
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||||
|
vmsg->vgId = htonl(vmsg->vgId);
|
||||||
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
|
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(vgroupIdList, &vmsg->vgId);
|
||||||
|
|
||||||
|
if (taosHashGet(pSet, &vmsg->vgId, sizeof(vmsg->vgId)) == NULL) {
|
||||||
|
taosHashPut(pSet, &vmsg->vgId, sizeof(vmsg->vgId), "", 0);
|
||||||
|
doUpdateVgroupInfo(vmsg->vgId, vmsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return vgroupIdList;
|
||||||
|
}
|
||||||
|
|
||||||
static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) {
|
static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) {
|
||||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)pMsg;
|
||||||
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
|
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
|
||||||
|
@ -2092,24 +2095,14 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
|
||||||
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||||
}
|
}
|
||||||
|
|
||||||
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
|
pVgroup->numOfEps = vmsg->numOfEps;
|
||||||
pVgroup->numOfEps = newVi.numOfEps;
|
pVgroup->vgId = vmsg->vgId;
|
||||||
pVgroup->vgId = newVi.vgId;
|
|
||||||
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
pVgroup->epAddr[k].port = newVi.ep[k].port;
|
pVgroup->epAddr[k].port = vmsg->epAddr[k].port;
|
||||||
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
|
pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if current buffer contains the vgroup info.
|
doUpdateVgroupInfo(pVgroup->vgId, vmsg);
|
||||||
// If not, add it
|
|
||||||
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
|
|
||||||
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
|
|
||||||
|
|
||||||
if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
|
|
||||||
(existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
|
||||||
taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
|
|
||||||
tscDebug("0x%" PRIx64 " add new VgroupInfo, vgId:%d, total cached:%d", id, newVi.vgId, (int32_t)taosHashGetSize(tscVgroupMap));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2187,6 +2180,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
char* buf = NULL;
|
char* buf = NULL;
|
||||||
char* pMsg = pMultiMeta->meta;
|
char* pMsg = pMultiMeta->meta;
|
||||||
|
|
||||||
|
// decompresss the message payload
|
||||||
if (pMultiMeta->compressed) {
|
if (pMultiMeta->compressed) {
|
||||||
buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta));
|
buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta));
|
||||||
int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
|
int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
|
||||||
|
@ -2245,7 +2240,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
// for each vgroup, only update the information once.
|
// for each vgroup, only update the information once.
|
||||||
int64_t vgId = pMetaMsg->vgroup.vgId;
|
int64_t vgId = pMetaMsg->vgroup.vgId;
|
||||||
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
|
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
|
||||||
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
doUpdateVgroupInfo(vgId, &pMetaMsg->vgroup);
|
||||||
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
|
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2263,11 +2258,26 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
assert(p != NULL);
|
assert(p != NULL);
|
||||||
|
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
if (p->pVgroupInfo!= NULL) {
|
if (p->vgroupIdList!= NULL) {
|
||||||
tscVgroupInfoClear(p->pVgroupInfo);
|
taosArrayDestroy(p->vgroupIdList);
|
||||||
//tfree(p->pTableMeta);
|
|
||||||
}
|
}
|
||||||
p->pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
|
|
||||||
|
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
tstrncpy(tableName, name, TSDB_TABLE_NAME_LEN);
|
||||||
|
p->vgroupIdList = createVgroupIdListFromMsg(pMsg, pSet, tableName, &size, pSql->self);
|
||||||
|
|
||||||
|
int32_t numOfVgId = taosArrayGetSize(p->vgroupIdList);
|
||||||
|
int32_t s = sizeof(tFilePage) + numOfVgId * sizeof(int32_t);
|
||||||
|
|
||||||
|
tFilePage* idList = calloc(1, s);
|
||||||
|
idList->num = numOfVgId;
|
||||||
|
memcpy(idList->data, TARRAY_GET_START(p->vgroupIdList), numOfVgId * sizeof(int32_t));
|
||||||
|
|
||||||
|
void* idListInst = taosCachePut(tscVgroupListBuf, tableName, strlen(tableName), idList, s, 5000);
|
||||||
|
taosCacheRelease(tscVgroupListBuf, (void*) &idListInst, false);
|
||||||
|
|
||||||
|
tfree(idList);
|
||||||
|
|
||||||
pMsg += size;
|
pMsg += size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2503,7 +2513,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
|
||||||
//TODO LOCK DB WHEN MODIFY IT
|
//TODO LOCK DB WHEN MODIFY IT
|
||||||
//pSql->pTscObj->db[0] = 0;
|
//pSql->pTscObj->db[0] = 0;
|
||||||
|
|
||||||
taosHashClear(tscTableMetaInfo);
|
taosHashClear(tscTableMetaMap);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2514,8 +2524,8 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||||
|
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaInfo));
|
tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaMap));
|
||||||
|
|
||||||
tfree(pTableMetaInfo->pTableMeta);
|
tfree(pTableMetaInfo->pTableMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2530,11 +2540,11 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
|
||||||
tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name);
|
tscDebug("0x%"PRIx64" remove tableMeta in hashMap after alter-table: %s", pSql->self, name);
|
||||||
|
|
||||||
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
tfree(pTableMetaInfo->pTableMeta);
|
tfree(pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta
|
if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta
|
||||||
taosHashClear(tscTableMetaInfo);
|
taosHashClear(tscTableMetaMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2801,7 +2811,7 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||||
|
|
||||||
size_t len = strlen(name);
|
size_t len = strlen(name);
|
||||||
taosHashGetClone(tscTableMetaInfo, name, len, NULL, pTableMetaInfo->pTableMeta, -1);
|
taosHashGetClone(tscTableMetaMap, name, len, NULL, pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
// TODO resize the tableMeta
|
// TODO resize the tableMeta
|
||||||
assert(size < 80 * TSDB_MAX_COLUMNS);
|
assert(size < 80 * TSDB_MAX_COLUMNS);
|
||||||
|
@ -2914,7 +2924,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
|
||||||
|
|
||||||
// remove stored tableMeta info in hash table
|
// remove stored tableMeta info in hash table
|
||||||
size_t len = strlen(name);
|
size_t len = strlen(name);
|
||||||
taosHashRemove(tscTableMetaInfo, name, len);
|
taosHashRemove(tscTableMetaMap, name, len);
|
||||||
|
|
||||||
return getTableMetaFromMnode(pSql, pTableMetaInfo, false);
|
return getTableMetaFromMnode(pSql, pTableMetaInfo, false);
|
||||||
}
|
}
|
||||||
|
@ -2966,8 +2976,6 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
|
||||||
tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self);
|
tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self);
|
||||||
|
|
||||||
pSql->svgroupRid = pNew->self;
|
pSql->svgroupRid = pNew->self;
|
||||||
|
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables);
|
tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables);
|
||||||
|
|
||||||
pNew->fp = tscTableMetaCallBack;
|
pNew->fp = tscTableMetaCallBack;
|
||||||
|
@ -3010,7 +3018,6 @@ void tscInitMsgsFp() {
|
||||||
|
|
||||||
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
|
||||||
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
|
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
|
||||||
// tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
|
|
||||||
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
|
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
|
||||||
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
|
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
|
||||||
|
|
||||||
|
|
|
@ -206,7 +206,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||||
|
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
|
|
||||||
tfree(pTableMetaInfo->pTableMeta);
|
tfree(pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
|
|
|
@ -3125,7 +3125,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
||||||
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
|
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
|
tNameExtractFullName(pParentObj->cmd.insertParam.pTableNameList[i], name);
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
}
|
}
|
||||||
|
|
||||||
pParentObj->res.code = TSDB_CODE_SUCCESS;
|
pParentObj->res.code = TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -19,15 +19,12 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tnote.h"
|
#include "tnote.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
|
||||||
#include "tsched.h"
|
#include "tsched.h"
|
||||||
#include "tscLog.h"
|
#include "tscLog.h"
|
||||||
#include "tscUtil.h"
|
|
||||||
#include "tsclient.h"
|
#include "tsclient.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tconfig.h"
|
#include "tconfig.h"
|
||||||
#include "ttimezone.h"
|
#include "ttimezone.h"
|
||||||
#include "tlocale.h"
|
|
||||||
#include "qScript.h"
|
#include "qScript.h"
|
||||||
|
|
||||||
// global, not configurable
|
// global, not configurable
|
||||||
|
@ -36,8 +33,10 @@
|
||||||
|
|
||||||
int32_t sentinel = TSC_VAR_NOT_RELEASE;
|
int32_t sentinel = TSC_VAR_NOT_RELEASE;
|
||||||
|
|
||||||
SHashObj *tscVgroupMap; // hash map to keep the global vgroup info
|
SHashObj *tscVgroupMap; // hash map to keep the vgroup info from mnode
|
||||||
SHashObj *tscTableMetaInfo; // table meta info
|
SHashObj *tscTableMetaMap; // table meta info buffer
|
||||||
|
SCacheObj *tscVgroupListBuf; // super table vgroup list information, only survives 5 seconds for each super table vgroup list
|
||||||
|
|
||||||
int32_t tscObjRef = -1;
|
int32_t tscObjRef = -1;
|
||||||
void *tscTmr;
|
void *tscTmr;
|
||||||
void *tscQhandle;
|
void *tscQhandle;
|
||||||
|
@ -45,17 +44,21 @@ int32_t tscRefId = -1;
|
||||||
int32_t tscNumOfObj = 0; // number of sqlObj in current process.
|
int32_t tscNumOfObj = 0; // number of sqlObj in current process.
|
||||||
static void *tscCheckDiskUsageTmr;
|
static void *tscCheckDiskUsageTmr;
|
||||||
void *tscRpcCache; // cache to keep rpc obj
|
void *tscRpcCache; // cache to keep rpc obj
|
||||||
int32_t tscNumOfThreads = 1; // num of rpc threads
|
int32_t tscNumOfThreads = 1; // num of rpc threads
|
||||||
char tscLogFileName[12] = "taoslog";
|
char tscLogFileName[12] = "taoslog";
|
||||||
int tscLogFileNum = 10;
|
int tscLogFileNum = 10;
|
||||||
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
|
|
||||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
static pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
|
||||||
|
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
|
// pthread_once can not return result code, so result code is set to a global variable.
|
||||||
static volatile int tscInitRes = 0;
|
static volatile int tscInitRes = 0;
|
||||||
|
|
||||||
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
|
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
|
||||||
taosGetDisk();
|
taosGetDisk();
|
||||||
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeRpcObj(void *param) {
|
void tscFreeRpcObj(void *param) {
|
||||||
assert(param);
|
assert(param);
|
||||||
SRpcObj *pRpcObj = (SRpcObj *)(param);
|
SRpcObj *pRpcObj = (SRpcObj *)(param);
|
||||||
|
@ -67,9 +70,8 @@ void tscReleaseRpc(void *param) {
|
||||||
if (param == NULL) {
|
if (param == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(&rpcObjMutex);
|
|
||||||
taosCacheRelease(tscRpcCache, (void *)¶m, false);
|
taosCacheRelease(tscRpcCache, (void *)¶m, false);
|
||||||
pthread_mutex_unlock(&rpcObjMutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) {
|
int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) {
|
||||||
|
@ -105,6 +107,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
|
||||||
tscError("failed to init connection to TDengine");
|
tscError("failed to init connection to TDengine");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5);
|
pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*5);
|
||||||
if (pRpcObj == NULL) {
|
if (pRpcObj == NULL) {
|
||||||
rpcClose(rpcObj.pDnodeConn);
|
rpcClose(rpcObj.pDnodeConn);
|
||||||
|
@ -118,7 +121,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_init_imp(void) {
|
void taos_init_imp(void) {
|
||||||
char temp[128] = {0};
|
char temp[128] = {0};
|
||||||
|
|
||||||
errno = TSDB_CODE_SUCCESS;
|
errno = TSDB_CODE_SUCCESS;
|
||||||
srand(taosGetTimestampSec());
|
srand(taosGetTimestampSec());
|
||||||
|
@ -151,36 +154,41 @@ void taos_init_imp(void) {
|
||||||
rpcInit();
|
rpcInit();
|
||||||
|
|
||||||
scriptEnvPoolInit();
|
scriptEnvPoolInit();
|
||||||
|
|
||||||
tscDebug("starting to initialize TAOS client ...");
|
tscDebug("starting to initialize TAOS client ...");
|
||||||
tscDebug("Local End Point is:%s", tsLocalEp);
|
tscDebug("Local End Point is:%s", tsLocalEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosSetCoreDump();
|
taosSetCoreDump();
|
||||||
tscInitMsgsFp();
|
tscInitMsgsFp();
|
||||||
int queueSize = tsMaxConnections*2;
|
|
||||||
|
|
||||||
double factor = (tscEmbedded == 0)? 2.0:4.0;
|
double factor = (tscEmbedded == 0)? 2.0:4.0;
|
||||||
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
|
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
|
||||||
if (tscNumOfThreads < 2) {
|
if (tscNumOfThreads < 2) {
|
||||||
tscNumOfThreads = 2;
|
tscNumOfThreads = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queueSize = tsMaxConnections*2;
|
||||||
tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
|
tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
|
||||||
if (NULL == tscQhandle) {
|
if (NULL == tscQhandle) {
|
||||||
tscError("failed to init scheduler");
|
tscError("failed to init task queue");
|
||||||
tscInitRes = -1;
|
tscInitRes = -1;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tscDebug("client task queue is initialized, numOfWorkers: %d", tscNumOfThreads);
|
||||||
|
|
||||||
tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC");
|
tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC");
|
||||||
if(0 == tscEmbedded){
|
if(0 == tscEmbedded){
|
||||||
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscTableMetaInfo == NULL) {
|
if (tscTableMetaMap == NULL) {
|
||||||
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
|
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
|
||||||
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
tscTableMetaMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
tscDebug("TableMeta:%p", tscTableMetaInfo);
|
tscVgroupListBuf = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, NULL, "stable-vgroup-list");
|
||||||
|
tscDebug("TableMeta:%p, vgroup:%p is initialized", tscTableMetaMap, tscVgroupMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
int refreshTime = 5;
|
int refreshTime = 5;
|
||||||
|
@ -189,14 +197,17 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
||||||
|
|
||||||
// in other language APIs, taos_cleanup is not available yet.
|
// In the APIs of other program language, taos_cleanup is not available yet.
|
||||||
// So, to make sure taos_cleanup will be invoked to clean up the allocated
|
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
|
||||||
// resource to suppress the valgrind warning.
|
|
||||||
atexit(taos_cleanup);
|
atexit(taos_cleanup);
|
||||||
|
|
||||||
tscDebug("client is initialized successfully");
|
tscDebug("client is initialized successfully");
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_init() { pthread_once(&tscinit, taos_init_imp); return tscInitRes;}
|
int taos_init() {
|
||||||
|
pthread_once(&tscinit, taos_init_imp);
|
||||||
|
return tscInitRes;
|
||||||
|
}
|
||||||
|
|
||||||
// this function may be called by user or system, or by both simultaneously.
|
// this function may be called by user or system, or by both simultaneously.
|
||||||
void taos_cleanup(void) {
|
void taos_cleanup(void) {
|
||||||
|
@ -205,11 +216,13 @@ void taos_cleanup(void) {
|
||||||
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
|
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscEmbedded == 0) {
|
if (tscEmbedded == 0) {
|
||||||
scriptEnvPoolCleanup();
|
scriptEnvPoolCleanup();
|
||||||
}
|
}
|
||||||
taosHashCleanup(tscTableMetaInfo);
|
|
||||||
tscTableMetaInfo = NULL;
|
taosHashCleanup(tscTableMetaMap);
|
||||||
|
tscTableMetaMap = NULL;
|
||||||
|
|
||||||
taosHashCleanup(tscVgroupMap);
|
taosHashCleanup(tscVgroupMap);
|
||||||
tscVgroupMap = NULL;
|
tscVgroupMap = NULL;
|
||||||
|
@ -236,6 +249,9 @@ void taos_cleanup(void) {
|
||||||
pthread_mutex_destroy(&rpcObjMutex);
|
pthread_mutex_destroy(&rpcObjMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosCacheCleanup(tscVgroupListBuf);
|
||||||
|
tscVgroupListBuf = NULL;
|
||||||
|
|
||||||
if (tscEmbedded == 0) {
|
if (tscEmbedded == 0) {
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
|
@ -1388,7 +1388,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
|
||||||
if (pCmd->pTableMetaMap != NULL) {
|
if (pCmd->pTableMetaMap != NULL) {
|
||||||
STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
|
STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
|
||||||
while (p) {
|
while (p) {
|
||||||
tscVgroupInfoClear(p->pVgroupInfo);
|
taosArrayDestroy(p->vgroupIdList);
|
||||||
tfree(p->pTableMeta);
|
tfree(p->pTableMeta);
|
||||||
p = taosHashIterate(pCmd->pTableMetaMap, p);
|
p = taosHashIterate(pCmd->pTableMetaMap, p);
|
||||||
}
|
}
|
||||||
|
@ -1522,7 +1522,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(&pDataBlock->tableName, name);
|
tNameExtractFullName(&pDataBlock->tableName, name);
|
||||||
|
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pDataBlock->cloned) {
|
if (!pDataBlock->cloned) {
|
||||||
|
@ -3365,7 +3365,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) {
|
||||||
if (removeMeta) {
|
if (removeMeta) {
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
}
|
}
|
||||||
|
|
||||||
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
|
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
|
||||||
|
@ -4360,7 +4360,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v
|
||||||
assert(pChild != NULL && buf != NULL);
|
assert(pChild != NULL && buf != NULL);
|
||||||
|
|
||||||
STableMeta* p = buf;
|
STableMeta* p = buf;
|
||||||
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1);
|
taosHashGetClone(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p);
|
||||||
|
|
||||||
// tableMeta exists, build child table meta according to the super table meta
|
// tableMeta exists, build child table meta according to the super table meta
|
||||||
// the uid need to be checked in addition to the general name of the super table.
|
// the uid need to be checked in addition to the general name of the super table.
|
||||||
|
@ -4374,7 +4374,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, v
|
||||||
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
|
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else { // super table has been removed, current tableMeta is also expired. remove it here
|
} else { // super table has been removed, current tableMeta is also expired. remove it here
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -342,6 +342,7 @@ int32_t scriptEnvPoolInit() {
|
||||||
env->lua_state = createLuaEnv();
|
env->lua_state = createLuaEnv();
|
||||||
tdListAppend(pool->scriptEnvs, (void *)(&env));
|
tdListAppend(pool->scriptEnvs, (void *)(&env));
|
||||||
}
|
}
|
||||||
|
|
||||||
pool->mSize = size;
|
pool->mSize = size;
|
||||||
pool->cSize = size;
|
pool->cSize = size;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -123,10 +123,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
|
||||||
* @param keyLen
|
* @param keyLen
|
||||||
* @param fp
|
* @param fp
|
||||||
* @param d
|
* @param d
|
||||||
* @param dsize
|
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize);
|
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* remove item with the specified key
|
* remove item with the specified key
|
||||||
|
|
|
@ -294,10 +294,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0);
|
return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) {
|
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) {
|
||||||
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
|
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ static void vnodeIncRef(void *ptNode) {
|
||||||
void *vnodeAcquire(int32_t vgId) {
|
void *vnodeAcquire(int32_t vgId) {
|
||||||
SVnodeObj *pVnode = NULL;
|
SVnodeObj *pVnode = NULL;
|
||||||
if (tsVnodesHash != NULL) {
|
if (tsVnodesHash != NULL) {
|
||||||
taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *));
|
taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue