diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ef635e6efc..6f5e3be8ab 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -20,6 +20,7 @@ #include "tlog.h" #include "tutil.h" #include "ttimer.h" +#include "tref.h" #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" @@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN]; static ttpool_h tsTcpPool; static void * syncTmrCtrl = NULL; static void * vgIdHash; +static int tsSyncRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer); static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); -static void syncAddNodeRef(SSyncNode *pNode); -static void syncDecNodeRef(SSyncNode *pNode); +static void syncFreeNode(void *); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); char* syncRole[] = { @@ -106,6 +108,12 @@ int32_t syncInit() { return -1; } + tsSyncRefId = taosOpenRef(200, syncFreeNode); + if (tsSyncRefId < 0) { + syncCleanUp(); + return -1; + } + tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); sInfo("sync module initialized successfully"); @@ -128,6 +136,9 @@ void syncCleanUp() { vgIdHash = NULL; } + taosCloseRef(tsSyncRefId); + tsSyncRefId = -1; + sInfo("sync module is cleaned up"); } @@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; + int ret = taosAddRef(tsSyncRefId, pNode); + if (ret < 0) { + syncFreeNode(pNode); + return NULL; + } + for (int i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); @@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) { } } - syncAddNodeRef(pNode); - if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; @@ -210,7 +225,9 @@ void syncStop(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; - if (pNode == NULL) return; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + sInfo("vgId:%d, cleanup sync", pNode->vgId); pthread_mutex_lock(&(pNode->mutex)); @@ -228,14 +245,17 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); + taosRemoveRef(tsSyncRefId, pNode); } int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { SSyncNode *pNode = param; int i, j; - if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); + taosReleaseRef(tsSyncRefId, pNode); + return 0; } int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { SSyncNode *pNode = param; - SSyncPeer *pPeer; - SSyncHead *pSyncHead; - SWalHead * pWalHead = data; - int fwdLen; - int code = 0; - if (pNode == NULL) return 0; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return 0; - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { - sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, - pWalHead->version, nodeVersion); - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - syncRestartConnection(pPeer); - } - return TSDB_CODE_SYN_INVALID_VERSION; - } + int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - // always update version - nodeVersion = pWalHead->version; - sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], - qtype, pWalHead->version); - - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; - - // only pkt from RPC or CQ can be forwarded - if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; - - // a hacker way to improve the performance - pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); - pSyncHead->type = TAOS_SMSG_FORWARD; - pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; - fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - - pthread_mutex_lock(&(pNode->mutex)); - - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - if (pPeer == NULL || pPeer->peerFd < 0) continue; - if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - - if (pNode->quorum > 1 && code == 0) { - syncSaveFwdInfo(pNode, pWalHead->version, mhandle); - code = 1; - } - - int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); - if (retLen == fwdLen) { - sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); - } else { - sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); - syncRestartConnection(pPeer); - } - } - - pthread_mutex_unlock(&(pNode->mutex)); + taosReleaseRef(tsSyncRefId, pNode); return code; } void syncConfirmForward(void *param, uint64_t version, int32_t code) { SSyncNode *pNode = param; - if (pNode == NULL) return; - if (pNode->quorum <= 1) return; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; SSyncPeer *pPeer = pNode->pMaster; - if (pPeer == NULL) return; + if (pPeer && pNode->quorum > 1) { + char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; - char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + SSyncHead *pHead = (SSyncHead *)msg; + pHead->type = TAOS_SMSG_FORWARD_RSP; + pHead->len = sizeof(SFwdRsp); - SSyncHead *pHead = (SSyncHead *)msg; - pHead->type = TAOS_SMSG_FORWARD_RSP; - pHead->len = sizeof(SFwdRsp); + SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); + pFwdRsp->version = version; + pFwdRsp->code = code; - SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); - pFwdRsp->version = version; - pFwdRsp->code = code; + int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); + int retLen = write(pPeer->peerFd, msg, msgLen); - int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int retLen = write(pPeer->peerFd, msg, msgLen); - - if (retLen == msgLen) { - sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); - } else { - sDebug("%s, failed to send forward ack, restart", pPeer->id); - syncRestartConnection(pPeer); + if (retLen == msgLen) { + sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); + } else { + sDebug("%s, failed to send forward ack, restart", pPeer->id); + syncRestartConnection(pPeer); + } } + + taosReleaseRef(tsSyncRefId, pNode); } void syncRecover(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -414,17 +392,24 @@ void syncRecover(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); + + taosReleaseRef(tsSyncRefId, pNode); } int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return -1; + pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } + taosReleaseRef(tsSyncRefId, pNode); + return 0; } @@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) { pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); } +static void syncFreeNode(void *param) { + SSyncNode *pNode = param; -static void syncDecNodeRef(SSyncNode *pNode) { - if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { - pthread_mutex_destroy(&pNode->mutex); - taosTFree(pNode->pRecv); - taosTFree(pNode->pSyncFwds); - taosTFree(pNode); - } + pthread_mutex_destroy(&pNode->mutex); + taosTFree(pNode->pRecv); + taosTFree(pNode->pSyncFwds); + taosTFree(pNode); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - syncDecNodeRef(pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - syncAddNodeRef(pNode); + taosAcquireRef(tsSyncRefId, pNode); return pPeer; } @@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - syncAddNodeRef(pNode); + if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if ( ret < 0) return; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; - if (pSyncFwds == NULL) return; - uint64_t time = taosGetTimestampMs(); + if (pSyncFwds) {; + uint64_t time = taosGetTimestampMs(); - if (pSyncFwds->fwds > 0) { - pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pSyncFwds->fwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; - if (time - pFwdInfo->time < 2000) break; - syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + if (pSyncFwds->fwds > 0) { + pthread_mutex_lock(&(pNode->mutex)); + for (int i = 0; i < pSyncFwds->fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; + if (time - pFwdInfo->time < 2000) break; + syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + syncRemoveConfirmedFwdInfo(pNode); + pthread_mutex_unlock(&(pNode->mutex)); } - syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&(pNode->mutex)); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + taosReleaseRef(tsSyncRefId, pNode); } + +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead * pWalHead = data; + int fwdLen; + int32_t code = 0; + + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, + pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + return TSDB_CODE_SYN_INVALID_VERSION; + } + + // always update version + nodeVersion = pWalHead->version; + sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], + qtype, pWalHead->version); + + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; + + // only pkt from RPC or CQ can be forwarded + if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + + // a hacker way to improve the performance + pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead->type = TAOS_SMSG_FORWARD; + pSyncHead->pversion = 0; + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head + + pthread_mutex_lock(&(pNode->mutex)); + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer == NULL || pPeer->peerFd < 0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + + if (pNode->quorum > 1 && code == 0) { + syncSaveFwdInfo(pNode, pWalHead->version, mhandle); + code = 1; + } + + int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + if (retLen == fwdLen) { + sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + } else { + sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + syncRestartConnection(pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return code; +} + diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f3bd91f038..a2affb8c4f 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -562,12 +562,12 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { void tsdbRefTable(STable *pTable) { int32_t ref = T_REF_INC(pTable); UNUSED(ref); - // tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); + tsdbDebug("ref table %s uid %" PRIu64 " tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref); } void tsdbUnRefTable(STable *pTable) { int32_t ref = T_REF_DEC(pTable); - tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); + tsdbDebug("unref table %s uid:%"PRIu64" tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref); if (ref == 0) { // tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable)); @@ -745,7 +745,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) { T_REF_INC(pTable); - tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), + tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); return pTable; @@ -889,7 +889,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro } if (lock) tsdbUnlockRepoMeta(pRepo); - tsdbDebug("vgId:%d table %s is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); + tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable)); tsdbUnRefTable(pTable); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 92305be714..90e16aee53 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2122,8 +2122,8 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { } // clear current group, unref unused table - for(int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pGroup, i); + for (int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); // keyInfo.pTable may be NULL here. if (pKeyInfo->pTable != keyInfo.pTable) { diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index 6619ff407e..ead8e2eb90 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -21,15 +21,42 @@ extern "C" { #endif -int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs -void taosCloseRef(int refId); -int taosListRef(); // return the number of references in system -int taosAddRef(int refId, void *p); -int taosAcquireRef(int refId, void *p); -void taosReleaseRef(int refId, void *p); +// open an instance, return refId which will be used by other APIs +int taosOpenRef(int max, void (*fp)(void *)); +// close the Ref instance +void taosCloseRef(int refId); + +// add ref, p is the pointer to resource or pointer ID +int taosAddRef(int refId, void *p); #define taosRemoveRef taosReleaseRef +// acquire ref, p is the pointer to resource or pointer ID +int taosAcquireRef(int refId, void *p); + +// release ref, p is the pointer to resource or pinter ID +void taosReleaseRef(int refId, void *p); + +// return the first if p is null, otherwise return the next after p +void *taosIterateRef(int refId, void *p); + +// return the number of references in system +int taosListRef(); + +/* sample code to iterate the refs + +void demoIterateRefs(int refId) { + + void *p = taosIterateRef(refId, NULL); + while (p) { + + // process P + + p = taosIterateRef(refId, p); + } +} + +*/ #ifdef __cplusplus } diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 4c3b836340..23a7210e99 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -143,8 +143,6 @@ int taosAddRef(int refId, void *p) return TSDB_CODE_REF_INVALID_ID; } - uTrace("refId:%d p:%p try to add", refId, p); - pSet = tsRefSetList + refId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { @@ -203,8 +201,6 @@ int taosAcquireRef(int refId, void *p) return TSDB_CODE_REF_INVALID_ID; } - uTrace("refId:%d p:%p try to acquire", refId, p); - pSet = tsRefSetList + refId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { @@ -254,8 +250,6 @@ void taosReleaseRef(int refId, void *p) return; } - uTrace("refId:%d p:%p try to release", refId, p); - pSet = tsRefSetList + refId; if (pSet->state == TSDB_REF_STATE_EMPTY) { uTrace("refId:%d p:%p failed to release, cleaned", refId, p); @@ -305,6 +299,75 @@ void taosReleaseRef(int refId, void *p) if (released) taosDecRefCount(pSet); } +// if p is NULL, return the first p in hash list, otherwise, return the next after p +void *taosIterateRef(int refId, void *p) { + SRefNode *pNode = NULL; + SRefSet *pSet; + + if (refId < 0 || refId >= TSDB_REF_OBJECTS) { + uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); + return NULL; + } + + pSet = tsRefSetList + refId; + taosIncRefCount(pSet); + if (pSet->state != TSDB_REF_STATE_ACTIVE) { + uTrace("refId:%d p:%p failed to iterate, not active", refId, p); + taosDecRefCount(pSet); + return NULL; + } + + int hash = 0; + if (p) { + hash = taosHashRef(pSet, p); + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if (pNode->p == p) break; + pNode = pNode->next; + } + + if (pNode == NULL) { + uError("refId:%d p:%p not there, quit", refId, p); + taosUnlockList(pSet->lockedBy+hash); + return NULL; + } + + // p is there + pNode = pNode->next; + if (pNode == NULL) { + taosUnlockList(pSet->lockedBy+hash); + hash++; + } + } + + if (pNode == NULL) { + for (; hash < pSet->max; ++hash) { + taosLockList(pSet->lockedBy+hash); + pNode = pSet->nodeList[hash]; + if (pNode) break; + taosUnlockList(pSet->lockedBy+hash); + } + } + + void *newP = NULL; + if (pNode) { + pNode->count++; // acquire it + newP = pNode->p; + taosUnlockList(pSet->lockedBy+hash); + uTrace("refId:%d p:%p is returned", refId, p); + } else { + uTrace("refId:%d p:%p the list is over", refId, p); + } + + if (p) taosReleaseRef(refId, p); // release the current one + + taosDecRefCount(pSet); + + return newP; +} + int taosListRef() { SRefSet *pSet; SRefNode *pNode; diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index 486f9f6d6d..09ffccd7b5 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -17,6 +17,19 @@ typedef struct { void **p; } SRefSpace; +void iterateRefs(int refId) { + int count = 0; + + void *p = taosIterateRef(refId, NULL); + while (p) { + // process P + count++; + p = taosIterateRef(refId, p); + } + + printf(" %d ", count); +} + void *takeRefActions(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int code, id; @@ -44,6 +57,9 @@ void *takeRefActions(void *param) { usleep(id % 5 + 1); taosReleaseRef(pSpace->refId, pSpace->p[id]); } + + id = random() % pSpace->refNum; + iterateRefs(id); } for (int i=0; i < pSpace->refNum; ++i) { @@ -63,7 +79,7 @@ void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; printf("c"); - pSpace->refId = taosOpenRef(10000, myfree); + pSpace->refId = taosOpenRef(50, myfree); if (pSpace->refId < 0) { printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 294bc52a94..b14321a4ef 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -154,6 +154,7 @@ python3 ./test.py -f query/queryConnection.py python3 ./test.py -f query/queryCountCSVData.py python3 ./test.py -f query/natualInterval.py python3 ./test.py -f query/bug1471.py +python3 ./test.py -f query/dataLossTest.py #stream python3 ./test.py -f stream/metric_1.py diff --git a/tests/pytest/query/dataLossTest.py b/tests/pytest/query/dataLossTest.py new file mode 100644 index 0000000000..b29dc1fa9f --- /dev/null +++ b/tests/pytest/query/dataLossTest.py @@ -0,0 +1,76 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * +import inspect + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + self.numberOfTables = 240 + self.numberOfRecords = 10000 + + def run(self): + tdSql.prepare() + + os.system("yes | taosdemo -t %d -n %d" % (self.numberOfTables, self.numberOfRecords)) + print("==============step1") + + tdSql.execute("use test") + sql = "select count(*) from meters" + tdSql.query(sql) + rows = tdSql.getData(0, 0) + print ("number of records: %d" % rows) + + newRows = rows + for i in range(10000): + print("kill taosd") + time.sleep(10) + os.system("sudo kill -9 $(pgrep taosd)") + tdDnodes.startWithoutSleep(1) + while True: + try: + tdSql.query(sql) + newRows = tdSql.getData(0, 0) + print("numer of records after kill taosd %d" % newRows) + time.sleep(10) + break + except Exception as e: + pass + continue + + if newRows < rows: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, newRows, rows) + tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) + break + + tdSql.query(sql) + tdSql.checkData(0, 0, rows) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 1ac492bb3a..757399b4a2 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -15,6 +15,7 @@ import sys import os import os.path import subprocess +from time import sleep from util.log import * @@ -210,6 +211,7 @@ class TDDnode: (self.index, self.cfgPath)) def getBuildPath(self): + buildPath = "" selfPath = os.path.dirname(os.path.realpath(__file__)) if ("community" in selfPath): @@ -256,6 +258,35 @@ class TDDnode: tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) time.sleep(5) + + def startWithoutSleep(self): + buildPath = self.getBuildPath() + + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + + binPath = buildPath + "/build/bin/taosd" + + if self.deployed == 0: + tdLog.exit("dnode:%d is not deployed" % (self.index)) + + if self.valgrind == 0: + cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( + binPath, self.cfgDir) + else: + valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" + + cmd = "nohup %s %s -c %s 2>&1 & " % ( + valgrindCmdline, binPath, self.cfgDir) + + print(cmd) + + if os.system(cmd) != 0: + tdLog.exit(cmd) + self.running = 1 + tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) def stop(self): if self.valgrind == 0: @@ -425,6 +456,10 @@ class TDDnodes: def start(self, index): self.check(index) self.dnodes[index - 1].start() + + def startWithoutSleep(self, index): + self.check(index) + self.dnodes[index - 1].startWithoutSleep() def stop(self, index): self.check(index) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 9abec354c6..b2ed6212fd 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -25,7 +25,7 @@ class TDSql: self.queryCols = 0 self.affectedRows = 0 - def init(self, cursor, log=True): + def init(self, cursor, log=False): self.cursor = cursor if (log):