add a removed flag in pNode
This commit is contained in:
parent
dbd975b3a0
commit
4b09f05290
|
@ -1 +1 @@
|
||||||
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
|
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
|
|
@ -1 +1 @@
|
||||||
Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
|
Subproject commit d598db167eb256fe67409b7bb3d0eb7fffc3ff8c
|
|
@ -371,10 +371,13 @@ void taosCloseTcpConnection(void *chandle) {
|
||||||
|
|
||||||
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
||||||
SFdObj *pFdObj = chandle;
|
SFdObj *pFdObj = chandle;
|
||||||
|
|
||||||
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
|
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
|
||||||
|
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||||
|
|
||||||
return taosWriteMsg(pFdObj->fd, data, len);
|
int ret = taosWriteMsg(pFdObj->fd, data, len);
|
||||||
|
tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret);
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosReportBrokenLink(SFdObj *pFdObj) {
|
static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
|
@ -409,7 +412,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
|
||||||
|
|
||||||
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
||||||
if (headLen != sizeof(SRpcHead)) {
|
if (headLen != sizeof(SRpcHead)) {
|
||||||
tDebug("%s %p read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
|
tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,7 +423,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
|
||||||
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
|
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
tTrace("TCP malloc mem:%p size:%d", buffer, size);
|
tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
msg = buffer + tsRpcOverhead;
|
msg = buffer + tsRpcOverhead;
|
||||||
|
@ -583,8 +586,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&pThreadObj->mutex);
|
pthread_mutex_unlock(&pThreadObj->mutex);
|
||||||
|
|
||||||
tDebug("%s %p TCP connection is closed, FD:%p numOfFds:%d",
|
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d",
|
||||||
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
|
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds);
|
||||||
|
|
||||||
tfree(pFdObj);
|
tfree(pFdObj);
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SHashNode {
|
||||||
uint32_t keyLen; // length of the key
|
uint32_t keyLen; // length of the key
|
||||||
uint32_t dataLen; // length of data
|
uint32_t dataLen; // length of data
|
||||||
int8_t count; // reference count
|
int8_t count; // reference count
|
||||||
|
int8_t removed; // flag to indicate removed
|
||||||
char data[];
|
char data[];
|
||||||
} SHashNode;
|
} SHashNode;
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
||||||
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
|
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
|
||||||
SHashNode *pNode = pe->next;
|
SHashNode *pNode = pe->next;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) {
|
if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) {
|
||||||
assert(pNode->hashVal == hashVal);
|
assert(pNode->hashVal == hashVal);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -222,7 +222,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
||||||
|
|
||||||
SHashNode* prev = NULL;
|
SHashNode* prev = NULL;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0)) {
|
if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) {
|
||||||
assert(pNode->hashVal == hashVal);
|
assert(pNode->hashVal == hashVal);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -361,7 +361,7 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
|
||||||
SHashNode *prevNode = NULL;
|
SHashNode *prevNode = NULL;
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0))
|
if ((pNode->keyLen == keyLen) && (memcmp(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
prevNode = pNode;
|
prevNode = pNode;
|
||||||
|
@ -372,6 +372,7 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
|
||||||
code = 0; // it is found
|
code = 0; // it is found
|
||||||
|
|
||||||
pNode->count--;
|
pNode->count--;
|
||||||
|
pNode->removed = 1;
|
||||||
if (pNode->count <= 0) {
|
if (pNode->count <= 0) {
|
||||||
if (prevNode) {
|
if (prevNode) {
|
||||||
prevNode->next = pNode->next;
|
prevNode->next = pNode->next;
|
||||||
|
@ -704,6 +705,11 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
|
while (pNode) {
|
||||||
|
if (pNode->removed == 0) break;
|
||||||
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
pOld->count--;
|
pOld->count--;
|
||||||
if (pOld->count <=0) {
|
if (pOld->count <=0) {
|
||||||
if (prevNode) {
|
if (prevNode) {
|
||||||
|
@ -755,6 +761,11 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode = pe->next;
|
pNode = pe->next;
|
||||||
|
while (pNode) {
|
||||||
|
if (pNode->removed == 0) break;
|
||||||
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
if (pNode) break;
|
if (pNode) break;
|
||||||
|
|
||||||
if (pHashObj->type == HASH_ENTRY_LOCK) {
|
if (pHashObj->type == HASH_ENTRY_LOCK) {
|
||||||
|
|
Loading…
Reference in New Issue