[TD-2378]<enhance>: reduce table meta memory consumption.
This commit is contained in:
parent
b451e6859d
commit
8b705be302
|
@ -216,7 +216,7 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
|
|||
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
|
||||
SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
|
||||
|
||||
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
|
||||
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
|
||||
|
||||
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
|
||||
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
|
||||
|
@ -276,7 +276,7 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
|
|||
bool hasMoreVnodesToTry(SSqlObj *pSql);
|
||||
bool hasMoreClauseToTry(SSqlObj* pSql);
|
||||
|
||||
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache);
|
||||
void tscFreeQueryInfo(SSqlCmd* pCmd);
|
||||
|
||||
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
|
||||
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
@ -290,6 +290,14 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
|
|||
char* serializeTagData(STagData* pTagData, char* pMsg);
|
||||
int32_t copyTagData(STagData* dst, const STagData* src);
|
||||
|
||||
STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
|
||||
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
|
||||
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
|
||||
uint32_t tscGetTableMetaMaxSize();
|
||||
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
|
||||
STableMeta* tscTableMetaClone(STableMeta* pTableMeta);
|
||||
|
||||
|
||||
void* malloc_throw(size_t size);
|
||||
void* calloc_throw(size_t nmemb, size_t size);
|
||||
char* strdup_throw(const char* str);
|
||||
|
|
|
@ -105,7 +105,10 @@ SSchema tscGetTbnameColumnSchema();
|
|||
* @param size size of the table meta
|
||||
* @return
|
||||
*/
|
||||
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size);
|
||||
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg);
|
||||
|
||||
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
|
||||
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -56,23 +56,39 @@ typedef struct STableComInfo {
|
|||
int32_t rowSize;
|
||||
} STableComInfo;
|
||||
|
||||
typedef struct SCorVgroupInfo {
|
||||
int32_t version;
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
|
||||
} SCorVgroupInfo;
|
||||
typedef struct SNewVgroupInfo {
|
||||
int32_t vgId;
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddrMsg ep[TSDB_MAX_REPLICA];
|
||||
} SNewVgroupInfo;
|
||||
|
||||
typedef struct STableMeta {
|
||||
typedef struct CSuperTableMeta {
|
||||
STableComInfo tableInfo;
|
||||
uint8_t tableType;
|
||||
int16_t sversion;
|
||||
int16_t tversion;
|
||||
char sTableId[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
SCorVgroupInfo corVgroupInfo;
|
||||
char sTableName[TSDB_TABLE_FNAME_LEN];
|
||||
STableId id;
|
||||
// union {int64_t stableUid; SSchema* schema;};
|
||||
int32_t childList;
|
||||
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
|
||||
} CSuperTableMeta;
|
||||
|
||||
typedef struct CChildTableMeta {
|
||||
int32_t vgId;
|
||||
STableId id;
|
||||
uint8_t tableType;
|
||||
char sTableName[TSDB_TABLE_FNAME_LEN];
|
||||
} CChildTableMeta;
|
||||
|
||||
typedef struct STableMeta {
|
||||
int32_t vgId;
|
||||
STableId id;
|
||||
uint8_t tableType;
|
||||
char sTableName[TSDB_TABLE_FNAME_LEN];
|
||||
int16_t sversion;
|
||||
int16_t tversion;
|
||||
STableComInfo tableInfo;
|
||||
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
|
||||
} STableMeta;
|
||||
|
||||
|
@ -171,7 +187,7 @@ typedef struct SParamInfo {
|
|||
} SParamInfo;
|
||||
|
||||
typedef struct STableDataBlocks {
|
||||
char tableId[TSDB_TABLE_FNAME_LEN];
|
||||
char tableName[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
bool ordered; // if current rows are ordered or not
|
||||
int64_t vgId; // virtual group id
|
||||
|
@ -249,7 +265,7 @@ typedef struct {
|
|||
int8_t submitSchema; // submit block is built with table schema
|
||||
STagData tagData; // NOTE: pTagData->data is used as a variant length array
|
||||
|
||||
STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
|
||||
char **pTableNameList; // all involved tableMeta list of current insert sql statement.
|
||||
int32_t numOfTables;
|
||||
|
||||
SHashObj *pTableBlockHashList; // data block for each table
|
||||
|
@ -400,7 +416,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
|
|||
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
||||
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||
|
||||
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
|
||||
void tscResetSqlCmdObj(SSqlCmd *pCmd);
|
||||
|
||||
/**
|
||||
* free query result of the sql object
|
||||
|
@ -414,7 +430,6 @@ void tscFreeSqlResult(SSqlObj *pSql);
|
|||
*/
|
||||
void tscFreeSqlObj(SSqlObj *pSql);
|
||||
void tscFreeRegisteredSqlObj(void *pSql);
|
||||
void tscFreeTableMetaHelper(void *pTableMeta);
|
||||
|
||||
void tscCloseTscObj(void *pObj);
|
||||
|
||||
|
@ -480,6 +495,8 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
|
|||
}
|
||||
|
||||
extern SCacheObj *tscMetaCache;
|
||||
extern SHashObj *tscHashMap;
|
||||
extern SHashObj *tscTableMetaInfo;
|
||||
|
||||
extern int tscObjRef;
|
||||
extern void *tscTmr;
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
#include "tnote.h"
|
||||
#include "trpc.h"
|
||||
#include "tcache.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscSubquery.h"
|
||||
#include "tscLocalMerge.h"
|
||||
|
@ -423,7 +422,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
|
||||
// check if it is a sub-query of super table query first, if true, enter another routine
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
|
||||
tscDebug("%p update table meta in local cache, continue to process sql and send the corresponding query", pSql);
|
||||
tscDebug("%p update local table meta, continue to process sql and send the corresponding query", pSql);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
|
@ -440,7 +439,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
return;
|
||||
} else { // continue to process normal async query
|
||||
if (pCmd->parseFinished) {
|
||||
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
||||
tscDebug("%p update local table meta, continue to process sql and send corresponding query", pSql);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
|
@ -455,7 +454,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
if (pCmd->command == TSDB_SQL_SELECT) {
|
||||
tscDebug("%p redo parse sql string and proceed", pSql);
|
||||
pCmd->parseFinished = false;
|
||||
tscResetSqlCmdObj(pCmd, false);
|
||||
tscResetSqlCmdObj(pCmd);
|
||||
|
||||
code = tsParseSql(pSql, true);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
#include "taosmsg.h"
|
||||
|
||||
#include "taosdef.h"
|
||||
#include "tcache.h"
|
||||
#include "tname.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscUtil.h"
|
||||
|
@ -571,7 +570,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
|
|||
|
||||
char fullName[TSDB_TABLE_FNAME_LEN * 2] = {0};
|
||||
extractDBName(pTableMetaInfo->name, fullName);
|
||||
extractTableName(pMeta->sTableId, param->sTableName);
|
||||
extractTableName(pMeta->sTableName, param->sTableName);
|
||||
snprintf(fullName + strlen(fullName), TSDB_TABLE_FNAME_LEN - strlen(fullName), ".%s", param->sTableName);
|
||||
extractTableName(pTableMetaInfo->name, param->buf);
|
||||
|
||||
|
@ -901,7 +900,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
|
|||
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
|
||||
pRes->code = tscProcessShowCreateDatabase(pSql);
|
||||
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
|
||||
taosCacheEmpty(tscMetaCache);
|
||||
taosHashEmpty(tscTableMetaInfo);
|
||||
pRes->code = TSDB_CODE_SUCCESS;
|
||||
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
|
||||
pRes->code = tscProcessServerVer(pSql);
|
||||
|
|
|
@ -1339,7 +1339,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
|
|||
if (sqlstr == NULL || pSql->parseRetry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
|
||||
free(sqlstr);
|
||||
} else {
|
||||
tscResetSqlCmdObj(pCmd, true);
|
||||
tscResetSqlCmdObj(pCmd);
|
||||
free(pSql->sqlstr);
|
||||
pSql->sqlstr = sqlstr;
|
||||
pSql->parseRetry++;
|
||||
|
@ -1351,7 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
|
|||
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
|
||||
ret = tscToSQLCmd(pSql, &SQLInfo);
|
||||
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
|
||||
tscResetSqlCmdObj(pCmd, true);
|
||||
tscResetSqlCmdObj(pCmd);
|
||||
pSql->parseRetry++;
|
||||
ret = tscToSQLCmd(pSql, &SQLInfo);
|
||||
}
|
||||
|
@ -1439,7 +1439,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
|
|||
int32_t count = 0;
|
||||
int32_t maxRows = 0;
|
||||
|
||||
tfree(pCmd->pTableMetaList);
|
||||
tfree(pCmd->pTableNameList);
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
|
||||
if (pCmd->pTableBlockHashList == NULL) {
|
||||
|
|
|
@ -910,7 +910,7 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa
|
|||
* that are corresponding to the old name for the new table name.
|
||||
*/
|
||||
if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
|
||||
tscClearTableMetaInfo(pTableMetaInfo, false);
|
||||
tscClearTableMetaInfo(pTableMetaInfo);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -130,19 +130,8 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupMsg *pVgroupMsg) {
|
||||
corVgroupInfo->version = 0;
|
||||
corVgroupInfo->inUse = 0;
|
||||
corVgroupInfo->numOfEps = pVgroupMsg->numOfEps;
|
||||
|
||||
for (int32_t i = 0; i < pVgroupMsg->numOfEps; i++) {
|
||||
corVgroupInfo->epAddr[i].fqdn = strndup(pVgroupMsg->epAddr[i].fqdn, tListLen(pVgroupMsg->epAddr[0].fqdn));
|
||||
corVgroupInfo->epAddr[i].port = pVgroupMsg->epAddr[i].port;
|
||||
}
|
||||
}
|
||||
|
||||
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
|
||||
assert(pTableMetaMsg != NULL);
|
||||
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
|
||||
assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2 && pTableMetaMsg->numOfTags >= 0);
|
||||
|
||||
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
|
||||
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
|
||||
|
@ -159,11 +148,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
|||
pTableMeta->id.tid = pTableMetaMsg->tid;
|
||||
pTableMeta->id.uid = pTableMetaMsg->uid;
|
||||
|
||||
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMetaMsg->vgroup);
|
||||
|
||||
pTableMeta->sversion = pTableMetaMsg->sversion;
|
||||
pTableMeta->tversion = pTableMetaMsg->tversion;
|
||||
tstrncpy(pTableMeta->sTableId, pTableMetaMsg->sTableId, TSDB_TABLE_FNAME_LEN);
|
||||
tstrncpy(pTableMeta->sTableName, pTableMetaMsg->sTableName, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
|
||||
|
||||
|
@ -172,13 +159,44 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
|||
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
|
||||
}
|
||||
|
||||
if (size != NULL) {
|
||||
*size = sizeof(STableMeta) + schemaSize;
|
||||
}
|
||||
|
||||
return pTableMeta;
|
||||
}
|
||||
|
||||
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src) {
|
||||
assert(pExisted != NULL && src != NULL);
|
||||
if (pExisted->numOfEps != src->numOfEps) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pExisted->numOfEps; ++i) {
|
||||
if (pExisted->ep[i].port != src->epAddr[i].port) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (strncmp(pExisted->ep[i].fqdn, src->epAddr[i].fqdn, tListLen(pExisted->ep[i].fqdn)) != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
|
||||
assert(pVgroupMsg != NULL);
|
||||
|
||||
SNewVgroupInfo info = {0};
|
||||
info.numOfEps = pVgroupMsg->numOfEps;
|
||||
info.vgId = pVgroupMsg->vgId;
|
||||
info.inUse = 0;
|
||||
|
||||
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
|
||||
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||
info.ep[i].port = pVgroupMsg->epAddr[i].port;
|
||||
}
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
UNUSED_FUNC static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tcache.h"
|
||||
#include "tcmdtype.h"
|
||||
#include "trpc.h"
|
||||
#include "tscLocalMerge.h"
|
||||
|
@ -85,7 +84,8 @@ static void tscEpSetHtons(SRpcEpSet *s) {
|
|||
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
|
||||
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < s1->numOfEps; i++) {
|
||||
if (s1->port[i] != s2->port[i]
|
||||
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
|
||||
|
@ -93,6 +93,7 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
|
||||
// no need to update if equal
|
||||
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
|
||||
|
@ -101,37 +102,38 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
|
|||
taosCorEndWrite(&pCorEpSet->version);
|
||||
}
|
||||
|
||||
static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SCorVgroupInfo *pVgroupInfo) {
|
||||
static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) {
|
||||
if (pVgroupInfo == NULL) { return;}
|
||||
taosCorBeginRead(&pVgroupInfo->version);
|
||||
int8_t inUse = pVgroupInfo->inUse;
|
||||
pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
|
||||
pEpSet->numOfEps = pVgroupInfo->numOfEps;
|
||||
for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
|
||||
tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, sizeof(pEpSet->fqdn[i]));
|
||||
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
|
||||
tstrncpy(pEpSet->fqdn[i], pVgroupInfo->ep[i].fqdn, sizeof(pEpSet->fqdn[i]));
|
||||
pEpSet->port[i] = pVgroupInfo->ep[i].port;
|
||||
}
|
||||
taosCorEndRead(&pVgroupInfo->version);
|
||||
}
|
||||
|
||||
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||
SSqlCmd *pCmd = &pObj->cmd;
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
|
||||
SCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
|
||||
|
||||
taosCorBeginWrite(&pVgroupInfo->version);
|
||||
tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
|
||||
pVgroupInfo->inUse = pEpSet->inUse;
|
||||
pVgroupInfo->numOfEps = pEpSet->numOfEps;
|
||||
for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
|
||||
tfree(pVgroupInfo->epAddr[i].fqdn);
|
||||
pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
|
||||
pVgroupInfo->epAddr[i].port = pEpSet->port[i];
|
||||
int32_t vgId = pTableMetaInfo->pTableMeta->vgId;
|
||||
|
||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||
taosHashGetClone(tscHashMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
||||
|
||||
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||
vgroupInfo.inUse = pEpSet->inUse;
|
||||
vgroupInfo.numOfEps = pEpSet->numOfEps;
|
||||
for (int32_t i = 0; i < vgroupInfo.numOfEps; i++) {
|
||||
strncpy(vgroupInfo.ep[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
|
||||
vgroupInfo.ep[i].port = pEpSet->port[i];
|
||||
}
|
||||
|
||||
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
|
||||
taosCorEndWrite(&pVgroupInfo->version);
|
||||
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||
taosHashPut(tscHashMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
}
|
||||
|
||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||
|
@ -303,7 +305,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (pEpSet) {
|
||||
if (pEpSet) { // todo update this
|
||||
if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||
if (pCmd->command < TSDB_SQL_MGMT) {
|
||||
tscUpdateVgroupInfo(pSql, pEpSet);
|
||||
|
@ -549,7 +551,10 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
// pSql->cmd.payloadLen is set during copying data into payload
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo);
|
||||
|
||||
SNewVgroupInfo vgroupInfo = {0};
|
||||
taosHashGetClone(tscHashMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||
|
||||
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit,
|
||||
pSql->epSet.numOfEps);
|
||||
|
@ -611,7 +616,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
} else {
|
||||
vgId = pTableMeta->vgId;
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo);
|
||||
|
||||
SNewVgroupInfo vgroupInfo = {0};
|
||||
taosHashGetClone(tscHashMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||
}
|
||||
|
||||
pSql->epSet.inUse = rand()%pSql->epSet.numOfEps;
|
||||
|
@ -1447,10 +1455,14 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
|||
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
|
||||
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMetaInfo->pTableMeta->corVgroupInfo);
|
||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||
taosHashGetClone(tscHashMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
assert(vgroupInfo.vgId > 0);
|
||||
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1808,19 +1820,42 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
pSchema++;
|
||||
}
|
||||
|
||||
size_t size = 0;
|
||||
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
|
||||
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
|
||||
|
||||
// todo add one more function: taosAddDataIfNotExists();
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
assert(pTableMetaInfo->pTableMeta == NULL);
|
||||
|
||||
pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
|
||||
strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
|
||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
||||
// check if super table hashmap or not
|
||||
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
if (pTableMetaInfo->pTableMeta == NULL) {
|
||||
free(pTableMeta);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// super tableMeta data alreay exists, create it according to tableMeta and add it to hash map
|
||||
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
|
||||
|
||||
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
|
||||
int32_t code = taosHashPut(tscTableMetaInfo, pTableMeta->sTableName, len, pSupTableMeta, size);
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
|
||||
tfree(pSupTableMeta);
|
||||
|
||||
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
|
||||
taosHashPut(tscTableMetaInfo, pTableMetaInfo->name, strlen(pTableMetaInfo->name), cMeta, sizeof(CChildTableMeta));
|
||||
tfree(cMeta);
|
||||
} else {
|
||||
uint32_t s = tscGetTableMetaSize(pTableMeta);
|
||||
taosHashPut(tscTableMetaInfo, pTableMetaInfo->name, strlen(pTableMetaInfo->name), pTableMeta, s);
|
||||
}
|
||||
|
||||
// update the vgroupInfo if needed
|
||||
int32_t vgId = pTableMeta->vgId;
|
||||
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
||||
taosHashGetClone(tscHashMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
|
||||
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, &pMetaMsg->vgroup)) ||
|
||||
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
||||
taosHashPut(tscHashMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscHashMap));
|
||||
}
|
||||
|
||||
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
|
||||
|
@ -1831,8 +1866,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
|
||||
/**
|
||||
* multi table meta rsp pkg format:
|
||||
* | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
|
||||
* |...... 1B 1B 4B
|
||||
* | STaosRsp | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
|
||||
* |...... 1B 4B
|
||||
**/
|
||||
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
||||
#if 0
|
||||
|
@ -1986,14 +2021,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
|||
return pSql->res.code;
|
||||
}
|
||||
|
||||
/*
|
||||
* current process do not use the cache at all
|
||||
*/
|
||||
int tscProcessShowRsp(SSqlObj *pSql) {
|
||||
STableMetaMsg *pMetaMsg;
|
||||
SShowRsp * pShow;
|
||||
SSchema * pSchema;
|
||||
char key[20];
|
||||
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
@ -2018,20 +2049,10 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
|||
pSchema++;
|
||||
}
|
||||
|
||||
key[0] = pCmd->msgType + 'a';
|
||||
strcpy(key + 1, "showlist");
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
pTableMetaInfo->pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
|
||||
|
||||
if (pTableMetaInfo->pTableMeta != NULL) {
|
||||
taosCacheRelease(tscMetaCache, (void *)&(pTableMetaInfo->pTableMeta), false);
|
||||
}
|
||||
|
||||
size_t size = 0;
|
||||
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
|
||||
|
||||
pTableMetaInfo->pTableMeta = taosCachePut(tscMetaCache, key, strlen(key), (char *)pTableMeta, size,
|
||||
tsTableMetaKeepTimer * 1000);
|
||||
SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
if (pQueryInfo->colList == NULL) {
|
||||
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
@ -2054,12 +2075,9 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
|||
|
||||
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
|
||||
tscFieldInfoUpdateOffset(pQueryInfo);
|
||||
|
||||
tfree(pTableMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO multithread problem
|
||||
static void createHBObj(STscObj* pObj) {
|
||||
if (pObj->hbrid != 0) {
|
||||
return;
|
||||
|
@ -2141,51 +2159,34 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
|
|||
|
||||
int tscProcessDropDbRsp(SSqlObj *pSql) {
|
||||
pSql->pTscObj->db[0] = 0;
|
||||
taosCacheEmpty(tscMetaCache);
|
||||
taosHashEmpty(tscTableMetaInfo);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
|
||||
STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
|
||||
if (pTableMeta == NULL) { /* not in cache, abort */
|
||||
return 0;
|
||||
}
|
||||
//The cached tableMeta is expired in this case, so clean it in hash table
|
||||
taosHashRemove(tscTableMetaInfo, pTableMetaInfo->name, strnlen(pTableMetaInfo->name, TSDB_TABLE_FNAME_LEN));
|
||||
tscDebug("%p remove table meta after drop table:%s, numOfRemain:%d", pSql, pTableMetaInfo->name,
|
||||
(int32_t) taosHashGetSize(tscTableMetaInfo));
|
||||
|
||||
/*
|
||||
* 1. if a user drops one table, which is the only table in a vnode, remove operation will incur vnode to be removed.
|
||||
* 2. Then, a user creates a new metric followed by a table with identical name of removed table but different schema,
|
||||
* here the table will reside in a new vnode.
|
||||
* The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
|
||||
* instead.
|
||||
*/
|
||||
tscDebug("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
|
||||
taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
|
||||
assert(pTableMetaInfo->pTableMeta == NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||
|
||||
STableMeta *pTableMeta = taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
|
||||
if (pTableMeta == NULL) { /* not in cache, abort */
|
||||
return 0;
|
||||
}
|
||||
char* name = pTableMetaInfo->name;
|
||||
tscDebug("%p remove tableMeta in hashMap after alter-table: %s", pSql, name);
|
||||
|
||||
tscDebug("%p force release metermeta in cache after alter-table: %s", pSql, pTableMetaInfo->name);
|
||||
taosCacheRelease(tscMetaCache, (void **)&pTableMeta, true);
|
||||
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||
taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
|
||||
|
||||
if (isSuperTable) { // if it is a super table, reset whole query cache
|
||||
tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
|
||||
taosCacheEmpty(tscMetaCache);
|
||||
}
|
||||
if (isSuperTable) { // if it is a super table, iterate the hashTable and remove all the childTableMeta
|
||||
taosHashEmpty(tscTableMetaInfo);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -2195,6 +2196,7 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
|
|||
UNUSED(pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tscProcessShowCreateRsp(SSqlObj *pSql) {
|
||||
return tscLocalResultCommonBuilder(pSql, 1);
|
||||
}
|
||||
|
@ -2315,7 +2317,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
|
|||
|
||||
int32_t code = tscProcessSql(pNew);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
|
||||
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -2323,21 +2325,29 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
|
|||
|
||||
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
||||
assert(strlen(pTableMetaInfo->name) != 0);
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
|
||||
// If this STableMetaInfo owns a table meta, release it first
|
||||
if (pTableMetaInfo->pTableMeta != NULL) {
|
||||
taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), false);
|
||||
}
|
||||
|
||||
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscMetaCache, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
|
||||
if (pTableMetaInfo->pTableMeta != NULL) {
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
|
||||
tinfo.numOfTags, pTableMetaInfo->pTableMeta);
|
||||
uint32_t size = tscGetTableMetaMaxSize();
|
||||
pTableMetaInfo->pTableMeta = calloc(1, size);
|
||||
|
||||
pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1;
|
||||
int32_t len = (int32_t) strlen(pTableMetaInfo->name);
|
||||
|
||||
taosHashGetClone(tscTableMetaInfo, pTableMetaInfo->name, len, NULL, pTableMetaInfo->pTableMeta, -1);
|
||||
|
||||
// TODO resize the tableMeta
|
||||
STableMeta* pMeta = pTableMetaInfo->pTableMeta;
|
||||
if (pMeta->id.uid > 0) {
|
||||
if (pMeta->tableType == TSDB_CHILD_TABLE) {
|
||||
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMetaInfo->pTableMeta, pTableMetaInfo->name);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return getTableMetaFromMnode(pSql, pTableMetaInfo);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
return getTableMetaFromMnode(pSql, pTableMetaInfo);
|
||||
}
|
||||
|
||||
|
@ -2364,7 +2374,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
|
|||
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
|
||||
}
|
||||
|
||||
taosCacheRelease(tscMetaCache, (void **)&(pTableMetaInfo->pTableMeta), true);
|
||||
taosHashRemove(tscTableMetaInfo, pTableMetaInfo->name, strnlen(pTableMetaInfo->name, TSDB_TABLE_FNAME_LEN));
|
||||
return getTableMetaFromMnode(pSql, pTableMetaInfo);
|
||||
}
|
||||
|
||||
|
@ -2405,7 +2415,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
|
|||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
|
||||
STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta);
|
||||
STableMeta* pTableMeta = tscTableMetaClone(pMInfo->pTableMeta);
|
||||
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
|
||||
}
|
||||
|
||||
|
|
|
@ -909,7 +909,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
|||
|
||||
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
|
||||
// must before clean the sqlcmd object
|
||||
tscResetSqlCmdObj(&pSql->cmd, false);
|
||||
tscResetSqlCmdObj(&pSql->cmd);
|
||||
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
|
|
|
@ -2231,7 +2231,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
|||
numOfFailed += 1;
|
||||
|
||||
// clean up tableMeta in cache
|
||||
tscFreeQueryInfo(&pSql->cmd, true);
|
||||
tscFreeQueryInfo(&pSql->cmd);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
|
||||
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
|
||||
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
|
||||
|
@ -2243,15 +2243,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
|||
tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
|
||||
pParentObj->res.numOfRows, numOfFailed, numOfSub);
|
||||
|
||||
tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables);
|
||||
tscDebug("%p cleanup %d tableMeta in hashTable", pParentObj, pParentObj->cmd.numOfTables);
|
||||
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
|
||||
taosCacheRelease(tscMetaCache, (void**)&(pParentObj->cmd.pTableMetaList[i]), true);
|
||||
char* name = pParentObj->cmd.pTableNameList[i];
|
||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||
}
|
||||
|
||||
pParentObj->cmd.parseFinished = false;
|
||||
pParentObj->subState.numOfRemain = numOfFailed;
|
||||
|
||||
tscResetSqlCmdObj(&pParentObj->cmd, false);
|
||||
tscResetSqlCmdObj(&pParentObj->cmd);
|
||||
|
||||
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
|
||||
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
// global, not configurable
|
||||
SCacheObj *tscMetaCache; // table meta cache
|
||||
SHashObj *tscHashMap; // hash map to keep the global vgroup info
|
||||
SHashObj *tscTableMetaInfo; // table meta info
|
||||
int tscObjRef = -1;
|
||||
void *tscTmr;
|
||||
void *tscQhandle;
|
||||
|
@ -131,9 +132,11 @@ void taos_init_imp(void) {
|
|||
|
||||
int64_t refreshTime = 10; // 10 seconds by default
|
||||
if (tscMetaCache == NULL) {
|
||||
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
|
||||
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
|
||||
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
|
||||
tscHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
tscDebug("TableMeta:%p", tscTableMetaInfo);
|
||||
}
|
||||
|
||||
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include "os.h"
|
||||
#include "qAst.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tcache.h"
|
||||
#include "tkey.h"
|
||||
#include "tmd5.h"
|
||||
#include "tscLocalMerge.h"
|
||||
|
@ -31,7 +30,7 @@
|
|||
#include "ttokendef.h"
|
||||
|
||||
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
|
||||
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
|
||||
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo);
|
||||
|
||||
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
|
||||
if (pTagCond->pCond == NULL) {
|
||||
|
@ -379,17 +378,16 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
|||
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
|
||||
}
|
||||
|
||||
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
|
||||
void tscFreeQueryInfo(SSqlCmd* pCmd) {
|
||||
if (pCmd == NULL || pCmd->numOfClause == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
|
||||
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
|
||||
|
||||
freeQueryInfoImpl(pQueryInfo);
|
||||
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, removeFromCache);
|
||||
clearAllTableMetaInfo(pQueryInfo);
|
||||
tfree(pQueryInfo);
|
||||
}
|
||||
|
||||
|
@ -397,7 +395,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
|
|||
tfree(pCmd->pQueryInfo);
|
||||
}
|
||||
|
||||
void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
|
||||
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
|
||||
pCmd->command = 0;
|
||||
pCmd->numOfCols = 0;
|
||||
pCmd->count = 0;
|
||||
|
@ -407,17 +405,17 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
|
|||
pCmd->autoCreated = 0;
|
||||
|
||||
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
|
||||
if (pCmd->pTableMetaList && pCmd->pTableMetaList[i]) {
|
||||
taosCacheRelease(tscMetaCache, (void**)&(pCmd->pTableMetaList[i]), false);
|
||||
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) {
|
||||
tfree(pCmd->pTableNameList[i]);
|
||||
}
|
||||
}
|
||||
|
||||
pCmd->numOfTables = 0;
|
||||
tfree(pCmd->pTableMetaList);
|
||||
tfree(pCmd->pTableNameList);
|
||||
|
||||
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
tscFreeQueryInfo(pCmd, removeFromCache);
|
||||
tscFreeQueryInfo(pCmd);
|
||||
}
|
||||
|
||||
void tscFreeSqlResult(SSqlObj* pSql) {
|
||||
|
@ -468,17 +466,6 @@ void tscFreeRegisteredSqlObj(void *pSql) {
|
|||
tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
|
||||
}
|
||||
|
||||
void tscFreeTableMetaHelper(void *pTableMeta) {
|
||||
STableMeta* p = (STableMeta*) pTableMeta;
|
||||
|
||||
int32_t numOfEps1 = p->corVgroupInfo.numOfEps;
|
||||
assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA);
|
||||
|
||||
for(int32_t i = 0; i < numOfEps1; ++i) {
|
||||
tfree(p->corVgroupInfo.epAddr[i].fqdn);
|
||||
}
|
||||
}
|
||||
|
||||
void tscFreeSqlObj(SSqlObj* pSql) {
|
||||
if (pSql == NULL || pSql->signature != pSql) {
|
||||
return;
|
||||
|
@ -506,7 +493,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
|||
pSql->self = 0;
|
||||
|
||||
tscFreeSqlResult(pSql);
|
||||
tscResetSqlCmdObj(pCmd, false);
|
||||
tscResetSqlCmdObj(pCmd);
|
||||
|
||||
tfree(pCmd->tagData.data);
|
||||
pCmd->tagData.dataLen = 0;
|
||||
|
@ -529,7 +516,9 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
|
|||
|
||||
// free the refcount for metermeta
|
||||
if (pDataBlock->pTableMeta != NULL) {
|
||||
taosCacheRelease(tscMetaCache, (void**)&(pDataBlock->pTableMeta), false);
|
||||
tfree(pDataBlock->pTableMeta);
|
||||
|
||||
// taosCacheRelease(tscMetaCache, (void**)&(pDataBlock->pTableMeta), false);
|
||||
}
|
||||
|
||||
tfree(pDataBlock);
|
||||
|
@ -600,15 +589,16 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
|
|||
|
||||
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
|
||||
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
|
||||
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
|
||||
tstrncpy(pTableMetaInfo->name, pDataBlock->tableName, sizeof(pTableMetaInfo->name));
|
||||
|
||||
if (pTableMetaInfo->pTableMeta != NULL) {
|
||||
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
// taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
|
||||
}
|
||||
|
||||
pTableMetaInfo->pTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pDataBlock->pTableMeta);
|
||||
pTableMetaInfo->pTableMeta = tscTableMetaClone(pDataBlock->pTableMeta);//taosCacheTransfer(tscMetaCache, (void**)&pDataBlock->pTableMeta);
|
||||
} else {
|
||||
assert(strncmp(pTableMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
|
||||
assert(strncmp(pTableMetaInfo->name, pDataBlock->tableName, tListLen(pDataBlock->tableName)) == 0);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -671,14 +661,10 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
|
|||
dataBuf->size = startOffset;
|
||||
dataBuf->tsSource = -1;
|
||||
|
||||
tstrncpy(dataBuf->tableId, name, sizeof(dataBuf->tableId));
|
||||
tstrncpy(dataBuf->tableName, name, sizeof(dataBuf->tableName));
|
||||
|
||||
/*
|
||||
* The table meta may be released since the table meta cache are completed clean by other thread
|
||||
* due to operation such as drop database. So here we add the reference count directly instead of invoke
|
||||
* taosGetDataFromCache, which may return NULL value.
|
||||
*/
|
||||
dataBuf->pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMeta);
|
||||
//Here we keep the tableMeta to avoid it to be remove by other threads.
|
||||
dataBuf->pTableMeta = tscTableMetaClone(pTableMeta);
|
||||
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
|
||||
|
||||
*dataBlocks = dataBuf;
|
||||
|
@ -786,13 +772,13 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
|||
|
||||
static void extractTableMeta(SSqlCmd* pCmd) {
|
||||
pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList);
|
||||
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
|
||||
pCmd->pTableNameList = calloc(pCmd->numOfTables, POINTER_BYTES);
|
||||
|
||||
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL);
|
||||
int32_t i = 0;
|
||||
while(p1) {
|
||||
STableDataBlocks* pBlocks = *p1;
|
||||
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
|
||||
pCmd->pTableNameList[i++] = strndup(pBlocks->tableName, TSDB_TABLE_FNAME_LEN);
|
||||
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
|
||||
}
|
||||
|
||||
|
@ -815,7 +801,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
|||
STableDataBlocks* dataBuf = NULL;
|
||||
|
||||
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||
INSERT_HEAD_SIZE, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
||||
INSERT_HEAD_SIZE, 0, pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
|
@ -849,7 +835,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
|||
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
||||
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||
|
||||
tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
|
||||
tscDebug("%p name:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableName,
|
||||
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
||||
|
||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
@ -1823,14 +1809,12 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
|
|||
return pa;
|
||||
}
|
||||
|
||||
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
|
||||
tscDebug("%p unref %d tables in the tableMeta cache", address, pQueryInfo->numOfTables);
|
||||
|
||||
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo) {
|
||||
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
|
||||
|
||||
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
|
||||
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
|
||||
tscClearTableMetaInfo(pTableMetaInfo);
|
||||
free(pTableMetaInfo);
|
||||
}
|
||||
|
||||
|
@ -1884,14 +1868,12 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
|
|||
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
|
||||
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) {
|
||||
if (pTableMetaInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pTableMetaInfo->pTableMeta != NULL) {
|
||||
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
|
||||
}
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
|
||||
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
|
||||
tscColumnListDestroy(pTableMetaInfo->tagColList);
|
||||
|
@ -2031,7 +2013,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
|
||||
pnCmd->numOfTables = 0;
|
||||
pnCmd->parseFinished = 1;
|
||||
pnCmd->pTableMetaList = NULL;
|
||||
pnCmd->pTableNameList = NULL;
|
||||
pnCmd->pTableBlockHashList = NULL;
|
||||
|
||||
if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2113,8 +2095,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
char* name = pTableMetaInfo->name;
|
||||
STableMetaInfo* pFinalInfo = NULL;
|
||||
|
||||
if (pPrevSql == NULL) { // get by name may failed due to the cache cleanup
|
||||
STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta);
|
||||
if (pPrevSql == NULL) {
|
||||
STableMeta* pTableMeta = tscTableMetaClone(pTableMetaInfo->pTableMeta);
|
||||
assert(pTableMeta != NULL);
|
||||
|
||||
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList,
|
||||
|
@ -2122,15 +2104,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
} else { // transfer the ownership of pTableMeta to the newly create sql object.
|
||||
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
|
||||
|
||||
STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta);
|
||||
|
||||
STableMeta* pPrevTableMeta = tscTableMetaClone(pPrevInfo->pTableMeta);
|
||||
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
|
||||
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList,
|
||||
pTableMetaInfo->pVgroupTables);
|
||||
}
|
||||
|
||||
// this case cannot be happened
|
||||
if (pFinalInfo->pTableMeta == NULL) {
|
||||
tscError("%p new subquery failed since no tableMeta in cache, name:%s", pSql, name);
|
||||
tscError("%p new subquery failed since no tableMeta, name:%s", pSql, name);
|
||||
|
||||
if (pPrevSql != NULL) { // pass the previous error to client
|
||||
assert(pPrevSql->res.code != TSDB_CODE_SUCCESS);
|
||||
|
@ -2557,6 +2539,7 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
|
|||
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
|
||||
tfree(pVgroupInfo->epAddr[j].fqdn);
|
||||
}
|
||||
|
||||
for(int32_t j = pVgroupInfo->numOfEps; j < TSDB_MAX_REPLICA; j++) {
|
||||
assert( pVgroupInfo->epAddr[j].fqdn == NULL );
|
||||
}
|
||||
|
@ -2610,3 +2593,87 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
|
||||
assert(pChild != NULL);
|
||||
int32_t total = pChild->numOfColumns + pChild->numOfTags;
|
||||
|
||||
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total);
|
||||
pTableMeta->tableType = TSDB_SUPER_TABLE;
|
||||
pTableMeta->tableInfo.numOfTags = pChild->numOfTags;
|
||||
pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns;
|
||||
pTableMeta->tableInfo.precision = pChild->precision;
|
||||
|
||||
pTableMeta->id.tid = 0;
|
||||
pTableMeta->id.uid = pChild->suid;
|
||||
pTableMeta->tversion = pChild->tversion;
|
||||
pTableMeta->sversion = pChild->sversion;
|
||||
|
||||
memcpy(pTableMeta->schema, pChild->schema, sizeof(SSchema) * total);
|
||||
|
||||
int32_t num = pTableMeta->tableInfo.numOfColumns;
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
|
||||
}
|
||||
|
||||
return pTableMeta;
|
||||
}
|
||||
|
||||
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta) {
|
||||
assert(pTableMeta != NULL);
|
||||
|
||||
int32_t totalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
|
||||
return sizeof(STableMeta) + totalCols * sizeof(SSchema);
|
||||
}
|
||||
|
||||
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
|
||||
assert(pTableMeta != NULL);
|
||||
|
||||
CChildTableMeta* cMeta = calloc(1, sizeof(CChildTableMeta));
|
||||
cMeta->tableType = TSDB_CHILD_TABLE;
|
||||
cMeta->vgId = pTableMeta->vgId;
|
||||
cMeta->id = pTableMeta->id;
|
||||
tstrncpy(cMeta->sTableName, pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
return cMeta;
|
||||
}
|
||||
|
||||
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name) {
|
||||
assert(pChild != NULL);
|
||||
|
||||
uint32_t size = tscGetTableMetaMaxSize();
|
||||
STableMeta* p = calloc(1, size);
|
||||
|
||||
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1);
|
||||
if (p->id.uid > 0) { // tableMeta exists, build child table meta and return
|
||||
pChild->sversion = p->sversion;
|
||||
pChild->tversion = p->tversion;
|
||||
|
||||
memcpy(&pChild->tableInfo, &p->tableInfo, sizeof(STableInfo));
|
||||
int32_t total = pChild->tableInfo.numOfColumns + pChild->tableInfo.numOfTags;
|
||||
|
||||
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
|
||||
|
||||
tfree(p);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // super table has been removed, current tableMeta is also expired. remove it here
|
||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||
|
||||
tfree(p);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t tscGetTableMetaMaxSize() {
|
||||
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
|
||||
}
|
||||
|
||||
STableMeta* tscTableMetaClone(STableMeta* pTableMeta) {
|
||||
assert(pTableMeta != NULL);
|
||||
uint32_t size = tscGetTableMetaSize(pTableMeta);
|
||||
STableMeta* p = calloc(1, size);
|
||||
memcpy(p, pTableMeta, size);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -716,7 +716,6 @@ typedef struct {
|
|||
typedef struct STableMetaMsg {
|
||||
int32_t contLen;
|
||||
char tableId[TSDB_TABLE_FNAME_LEN]; // table id
|
||||
char sTableId[TSDB_TABLE_FNAME_LEN];
|
||||
uint8_t numOfTags;
|
||||
uint8_t precision;
|
||||
uint8_t tableType;
|
||||
|
@ -726,6 +725,9 @@ typedef struct STableMetaMsg {
|
|||
int32_t tid;
|
||||
uint64_t uid;
|
||||
SVgroupMsg vgroup;
|
||||
|
||||
char sTableName[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
SSchema schema[];
|
||||
} STableMetaMsg;
|
||||
|
||||
|
|
|
@ -2171,11 +2171,12 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->tableType = pTable->info.type;
|
||||
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
|
||||
if (pTable->superTable != NULL) {
|
||||
tstrncpy(pMeta->sTableId, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
|
||||
}
|
||||
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE && pTable->superTable != NULL) {
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||
assert(pTable->superTable != NULL);
|
||||
tstrncpy(pMeta->sTableName, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
pMeta->suid = pTable->superTable->uid;
|
||||
pMeta->sversion = htons(pTable->superTable->sversion);
|
||||
pMeta->tversion = htons(pTable->superTable->tversion);
|
||||
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
|
||||
|
|
|
@ -130,16 +130,14 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
|
|||
|
||||
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
|
||||
|
||||
void taosHashEmpty(SHashObj *pHashObj);
|
||||
|
||||
/**
|
||||
* clean up hash table
|
||||
* @param handle
|
||||
*/
|
||||
void taosHashCleanup(SHashObj *pHashObj);
|
||||
|
||||
/*
|
||||
void *SHashMutableIterator* taosHashCreateIter(SHashObj *pHashObj, void *);
|
||||
*/
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pHashObj
|
||||
|
|
|
@ -313,10 +313,10 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void
|
|||
}
|
||||
|
||||
if (d != NULL) {
|
||||
memcpy(d, GET_HASH_NODE_DATA(pNode), dsize);
|
||||
} else {
|
||||
data = GET_HASH_NODE_DATA(pNode);
|
||||
memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen);
|
||||
}
|
||||
|
||||
data = GET_HASH_NODE_DATA(pNode);
|
||||
}
|
||||
|
||||
if (pHashObj->type == HASH_ENTRY_LOCK) {
|
||||
|
@ -472,38 +472,49 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
|
|||
return 0;
|
||||
}
|
||||
|
||||
void taosHashCleanup(SHashObj *pHashObj) {
|
||||
void taosHashEmpty(SHashObj *pHashObj) {
|
||||
if (pHashObj == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
uDebug("hash:%p cleanup hash table", pHashObj);
|
||||
|
||||
SHashNode *pNode, *pNext;
|
||||
|
||||
__wr_lock(&pHashObj->lock, pHashObj->type);
|
||||
|
||||
if (pHashObj->hashList) {
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
if (pEntry->num == 0) {
|
||||
assert(pEntry->next == 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
pNode = pEntry->next;
|
||||
assert(pNode != NULL);
|
||||
|
||||
while (pNode) {
|
||||
pNext = pNode->next;
|
||||
FREE_HASH_NODE(pHashObj, pNode);
|
||||
|
||||
pNode = pNext;
|
||||
}
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
if (pEntry->num == 0) {
|
||||
assert(pEntry->next == 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
free(pHashObj->hashList);
|
||||
pNode = pEntry->next;
|
||||
assert(pNode != NULL);
|
||||
|
||||
while (pNode) {
|
||||
pNext = pNode->next;
|
||||
FREE_HASH_NODE(pHashObj, pNode);
|
||||
|
||||
pNode = pNext;
|
||||
}
|
||||
|
||||
pEntry->num = 0;
|
||||
pEntry->next = NULL;
|
||||
}
|
||||
|
||||
pHashObj->size = 0;
|
||||
__wr_unlock(&pHashObj->lock, pHashObj->type);
|
||||
}
|
||||
|
||||
void taosHashCleanup(SHashObj *pHashObj) {
|
||||
if (pHashObj == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosHashEmpty(pHashObj);
|
||||
tfree(pHashObj->hashList);
|
||||
|
||||
// destroy mem block
|
||||
size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock);
|
||||
|
|
Loading…
Reference in New Issue