TD-2367
This commit is contained in:
parent
b21f78d320
commit
be5940fe7e
|
@ -397,13 +397,14 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
|
||||||
|
|
||||||
if (pStable->vgHash == NULL) {
|
if (pStable->vgHash == NULL) {
|
||||||
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
mDebug("table:%s, create hash:%p", pStable->info.tableId, pStable->vgHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStable->vgHash != NULL) {
|
if (pStable->vgHash != NULL) {
|
||||||
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
|
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
|
||||||
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
|
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
|
||||||
mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
mDebug("table:%s, vgId:%d is put into stable hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
||||||
(int32_t)taosHashGetSize(pStable->vgHash));
|
pStable->vgHash, taosHashGetSize(pStable->vgHash));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
|
||||||
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
|
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
|
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
|
||||||
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
mDebug("table:%s, vgId:%d is remove from stable hash:%p sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
||||||
(int32_t)taosHashGetSize(pStable->vgHash));
|
pStable->vgHash, taosHashGetSize(pStable->vgHash));
|
||||||
}
|
}
|
||||||
mnodeDecVgroupRef(pVgroup);
|
mnodeDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeDestroySuperTable(SSTableObj *pStable) {
|
static void mnodeDestroySuperTable(SSTableObj *pStable) {
|
||||||
|
mDebug("table:%s, is destroyed, stable hash:%p", pStable->info.tableId, pStable->vgHash);
|
||||||
if (pStable->vgHash != NULL) {
|
if (pStable->vgHash != NULL) {
|
||||||
taosHashCleanup(pStable->vgHash);
|
taosHashCleanup(pStable->vgHash);
|
||||||
pStable->vgHash = NULL;
|
pStable->vgHash = NULL;
|
||||||
|
@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
|
||||||
SSTableObj *pNew = pRow->pObj;
|
SSTableObj *pNew = pRow->pObj;
|
||||||
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
|
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
|
||||||
if (pTable != NULL && pTable != pNew) {
|
if (pTable != NULL && pTable != pNew) {
|
||||||
|
mDebug("table:%s, will be updated, hash:%p sizeOfVgList:%d, new hash:%p sizeOfVgList:%d", pTable->info.tableId,
|
||||||
|
pTable->vgHash, taosHashGetSize(pTable->vgHash), pNew->vgHash, taosHashGetSize(pNew->vgHash));
|
||||||
|
|
||||||
void *oldTableId = pTable->info.tableId;
|
void *oldTableId = pTable->info.tableId;
|
||||||
void *oldSchema = pTable->schema;
|
void *oldSchema = pTable->schema;
|
||||||
void *oldVgHash = pTable->vgHash;
|
void *oldVgHash = pTable->vgHash;
|
||||||
|
@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
|
||||||
free(pNew);
|
free(pNew);
|
||||||
free(oldTableId);
|
free(oldTableId);
|
||||||
free(oldSchema);
|
free(oldSchema);
|
||||||
|
|
||||||
|
mDebug("table:%s, update finished, hash:%p sizeOfVgList:%d", pTable->info.tableId, pTable->vgHash,
|
||||||
|
taosHashGetSize(pTable->vgHash));
|
||||||
}
|
}
|
||||||
|
|
||||||
mnodeDecTableRef(pTable);
|
mnodeDecTableRef(pTable);
|
||||||
|
@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
|
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
|
||||||
SSTableObj *pSTable = (SSTableObj *)pMsg->pTable;
|
SSTableObj *pSTable = (SSTableObj *)pMsg->pTable;
|
||||||
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d",
|
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", pMsg,
|
||||||
pMsg, pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash));
|
pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, taosHashGetSize(pSTable->vgHash));
|
||||||
return mnodeProcessDropSuperTableMsg(pMsg);
|
return mnodeProcessDropSuperTableMsg(pMsg);
|
||||||
} else {
|
} else {
|
||||||
SCTableObj *pCTable = (SCTableObj *)pMsg->pTable;
|
SCTableObj *pCTable = (SCTableObj *)pMsg->pTable;
|
||||||
|
@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
||||||
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
||||||
|
|
||||||
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
|
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
|
||||||
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
|
mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
|
||||||
|
|
||||||
|
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
|
||||||
int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL);
|
int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL);
|
||||||
while (pVgId) {
|
while (pVgId) {
|
||||||
SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
||||||
|
@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
||||||
pDrop->uid = htobe64(pStable->uid);
|
pDrop->uid = htobe64(pStable->uid);
|
||||||
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
|
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
|
||||||
|
|
||||||
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle,
|
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d, hash:%p sizeOfVgList:%d", pMsg,
|
||||||
pStable->info.tableId, pVgroup->vgId);
|
pMsg->rpcMsg.ahandle, pStable->info.tableId, pVgroup->vgId, pStable->vgHash,
|
||||||
|
taosHashGetSize(pStable->vgHash));
|
||||||
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
|
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
|
||||||
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
|
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
|
||||||
dnodeSendMsgToDnode(&epSet, &rpcMsg);
|
dnodeSendMsgToDnode(&epSet, &rpcMsg);
|
||||||
|
@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
pMsg->rpcRsp.rsp = pMeta;
|
pMsg->rpcRsp.rsp = pMeta;
|
||||||
|
|
||||||
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg, pMsg->rpcMsg.ahandle,
|
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved, sizeOfVgList:%d numOfTables:%d", pMsg,
|
||||||
pTable->info.tableId, pTable->uid);
|
pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->uid, taosHashGetSize(pTable->vgHash), pTable->numOfTables);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
||||||
char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg);
|
char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
char * stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||||
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
||||||
|
@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
||||||
msg += sizeof(SVgroupsMsg);
|
msg += sizeof(SVgroupsMsg);
|
||||||
} else {
|
} else {
|
||||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||||
|
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
|
||||||
|
|
||||||
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
||||||
int32_t vgSize = 0;
|
int32_t vgSize = 0;
|
||||||
|
|
|
@ -79,7 +79,7 @@ bool httpInitContexts() {
|
||||||
void httpCleanupContexts() {
|
void httpCleanupContexts() {
|
||||||
if (tsHttpServer.contextCache != NULL) {
|
if (tsHttpServer.contextCache != NULL) {
|
||||||
SCacheObj *cache = tsHttpServer.contextCache;
|
SCacheObj *cache = tsHttpServer.contextCache;
|
||||||
httpInfo("context cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable));
|
httpInfo("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
|
||||||
taosCacheCleanup(tsHttpServer.contextCache);
|
taosCacheCleanup(tsHttpServer.contextCache);
|
||||||
tsHttpServer.contextCache = NULL;
|
tsHttpServer.contextCache = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ static void httpDestroySession(void *data) {
|
||||||
void httpCleanUpSessions() {
|
void httpCleanUpSessions() {
|
||||||
if (tsHttpServer.sessionCache != NULL) {
|
if (tsHttpServer.sessionCache != NULL) {
|
||||||
SCacheObj *cache = tsHttpServer.sessionCache;
|
SCacheObj *cache = tsHttpServer.sessionCache;
|
||||||
httpInfo("session cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable));
|
httpInfo("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
|
||||||
taosCacheCleanup(tsHttpServer.sessionCache);
|
taosCacheCleanup(tsHttpServer.sessionCache);
|
||||||
tsHttpServer.sessionCache = NULL;
|
tsHttpServer.sessionCache = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
|
||||||
* @param pHashObj
|
* @param pHashObj
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
size_t taosHashGetSize(const SHashObj *pHashObj);
|
int32_t taosHashGetSize(const SHashObj *pHashObj);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* put element into hash table, if the element with the same key exists, update it
|
* put element into hash table, if the element with the same key exists, update it
|
||||||
|
|
|
@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
|
||||||
return pHashObj;
|
return pHashObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; }
|
int32_t taosHashGetSize(const SHashObj *pHashObj) { return (int32_t)((pHashObj == NULL) ? 0 : pHashObj->size); }
|
||||||
|
|
||||||
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
|
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
|
||||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||||
|
|
Loading…
Reference in New Issue