TD-2616
This commit is contained in:
parent
e7d5499372
commit
44b170427d
|
@ -2503,7 +2503,7 @@ bool tscSetSqlOwner(SSqlObj* pSql) {
|
||||||
SSqlRes* pRes = &pSql->res;
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
// set the sql object owner
|
// set the sql object owner
|
||||||
uint64_t threadId = taosGetPthreadId();
|
uint64_t threadId = taosGetSelfPthreadId();
|
||||||
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
||||||
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -29,12 +29,13 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// TAOS_OS_FUNC_SEMPHONE_PTHREAD
|
// TAOS_OS_FUNC_SEMPHONE_PTHREAD
|
||||||
bool taosCheckPthreadValid(pthread_t thread);
|
bool taosCheckPthreadValid(pthread_t thread);
|
||||||
int64_t taosGetPthreadId();
|
int64_t taosGetSelfPthreadId();
|
||||||
void taosResetPthread(pthread_t *thread);
|
int64_t taosGetPthreadId(pthread_t thread);
|
||||||
bool taosComparePthread(pthread_t first, pthread_t second);
|
void taosResetPthread(pthread_t* thread);
|
||||||
|
bool taosComparePthread(pthread_t first, pthread_t second);
|
||||||
int32_t taosGetPId();
|
int32_t taosGetPId();
|
||||||
int32_t taosGetCurrentAPPName(char *name, int32_t* len);
|
int32_t taosGetCurrentAPPName(char* name, int32_t* len);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,8 @@ int tsem_wait(tsem_t* sem) {
|
||||||
#ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD
|
#ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD
|
||||||
|
|
||||||
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
|
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
|
||||||
int64_t taosGetPthreadId() { return (int64_t)pthread_self(); }
|
int64_t taosGetSelfPthreadId() { return (int64_t)pthread_self(); }
|
||||||
|
int64_t taosGetPthreadId(pthread_t thread) { return (int64_t)thread; }
|
||||||
void taosResetPthread(pthread_t *thread) { *thread = 0; }
|
void taosResetPthread(pthread_t *thread) { *thread = 0; }
|
||||||
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
|
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
|
||||||
int32_t taosGetPId() { return getpid(); }
|
int32_t taosGetPId() { return getpid(); }
|
||||||
|
|
|
@ -25,14 +25,16 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread.p != NULL; }
|
||||||
|
|
||||||
void taosResetPthread(pthread_t *thread) { thread->p = 0; }
|
void taosResetPthread(pthread_t *thread) { thread->p = 0; }
|
||||||
|
|
||||||
int64_t taosGetPthreadId() {
|
int64_t taosGetPthreadId(pthread_t thread) {
|
||||||
#ifdef PTW32_VERSION
|
#ifdef PTW32_VERSION
|
||||||
return pthread_getw32threadid_np(pthread_self());
|
return pthread_getw32threadid_np(thread);
|
||||||
#else
|
#else
|
||||||
return (int64_t)pthread_self();
|
return (int64_t)thread;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t taosGetSelfPthreadId() { return taosGetPthreadId(pthread_self()); }
|
||||||
|
|
||||||
bool taosComparePthread(pthread_t first, pthread_t second) {
|
bool taosComparePthread(pthread_t first, pthread_t second) {
|
||||||
return first.p == second.p;
|
return first.p == second.p;
|
||||||
}
|
}
|
||||||
|
|
|
@ -7250,7 +7250,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
|
||||||
|
|
||||||
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
|
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
|
||||||
// put into task to be executed.
|
// put into task to be executed.
|
||||||
assert(pQInfo->owner == taosGetPthreadId());
|
assert(pQInfo->owner == taosGetSelfPthreadId());
|
||||||
pQInfo->owner = 0;
|
pQInfo->owner = 0;
|
||||||
|
|
||||||
pthread_mutex_unlock(&pQInfo->lock);
|
pthread_mutex_unlock(&pQInfo->lock);
|
||||||
|
@ -7263,7 +7263,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) {
|
||||||
bool qTableQuery(qinfo_t qinfo) {
|
bool qTableQuery(qinfo_t qinfo) {
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||||
assert(pQInfo && pQInfo->signature == pQInfo);
|
assert(pQInfo && pQInfo->signature == pQInfo);
|
||||||
int64_t threadId = taosGetPthreadId();
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
int64_t curOwner = 0;
|
int64_t curOwner = 0;
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
|
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
|
||||||
|
|
|
@ -272,7 +272,7 @@ static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcLockCache(int64_t *lockedBy) {
|
static void rpcLockCache(int64_t *lockedBy) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
|
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
|
||||||
if (++i % 100 == 0) {
|
if (++i % 100 == 0) {
|
||||||
|
@ -282,7 +282,7 @@ static void rpcLockCache(int64_t *lockedBy) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcUnlockCache(int64_t *lockedBy) {
|
static void rpcUnlockCache(int64_t *lockedBy) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
|
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1604,7 +1604,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcLockConn(SRpcConn *pConn) {
|
static void rpcLockConn(SRpcConn *pConn) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
|
while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
|
||||||
if (++i % 1000 == 0) {
|
if (++i % 1000 == 0) {
|
||||||
|
@ -1614,7 +1614,7 @@ static void rpcLockConn(SRpcConn *pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcUnlockConn(SRpcConn *pConn) {
|
static void rpcUnlockConn(SRpcConn *pConn) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
|
if (atomic_val_compare_exchange_64(&(pConn->lockedBy), tid, 0) != tid) {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -478,9 +478,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
|
||||||
|
|
||||||
static void syncFreeNode(void *param) {
|
static void syncFreeNode(void *param) {
|
||||||
SSyncNode *pNode = param;
|
SSyncNode *pNode = param;
|
||||||
|
sDebug("vgId:%d, node is freed, refCount:%d", pNode->vgId, pNode->refCount);
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
|
|
||||||
sDebug("vgId:%d, syncnode is freed, refCount:%d", pNode->vgId, refCount);
|
|
||||||
|
|
||||||
pthread_mutex_destroy(&pNode->mutex);
|
pthread_mutex_destroy(&pNode->mutex);
|
||||||
tfree(pNode->pRecv);
|
tfree(pNode->pRecv);
|
||||||
|
@ -491,10 +489,10 @@ static void syncFreeNode(void *param) {
|
||||||
SSyncNode *syncAcquireNode(int64_t rid) {
|
SSyncNode *syncAcquireNode(int64_t rid) {
|
||||||
SSyncNode *pNode = taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode *pNode = taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
sDebug("failed to acquire syncnode from refId:%" PRId64, rid);
|
sDebug("failed to acquire node from refId:%" PRId64, rid);
|
||||||
} else {
|
} else {
|
||||||
int32_t refCount = atomic_add_fetch_32(&pNode->refCount, 1);
|
int32_t refCount = atomic_add_fetch_32(&pNode->refCount, 1);
|
||||||
sTrace("vgId:%d, acquire syncnode refId:%" PRId64 ", refCount:%d", pNode->vgId, rid, refCount);
|
sTrace("vgId:%d, acquire node refId:%" PRId64 ", refCount:%d", pNode->vgId, rid, refCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pNode;
|
return pNode;
|
||||||
|
@ -502,16 +500,14 @@ SSyncNode *syncAcquireNode(int64_t rid) {
|
||||||
|
|
||||||
void syncReleaseNode(SSyncNode *pNode) {
|
void syncReleaseNode(SSyncNode *pNode) {
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
|
||||||
sTrace("vgId:%d, dec syncnode refId:%" PRId64 " refCount:%d", pNode->vgId, pNode->rid, refCount);
|
sTrace("vgId:%d, release node refId:%" PRId64 ", refCount:%d", pNode->vgId, pNode->rid, refCount);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pNode->rid);
|
taosReleaseRef(tsNodeRefId, pNode->rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncFreePeer(void *param) {
|
static void syncFreePeer(void *param) {
|
||||||
SSyncPeer *pPeer = param;
|
SSyncPeer *pPeer = param;
|
||||||
|
sDebug("%s, peer is freed, refCount:%d", pPeer->id, pPeer->refCount);
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
|
|
||||||
sDebug("%s, peer is freed, refCount:%d", pPeer->id, refCount);
|
|
||||||
|
|
||||||
syncReleaseNode(pPeer->pSyncNode);
|
syncReleaseNode(pPeer->pSyncNode);
|
||||||
tfree(pPeer);
|
tfree(pPeer);
|
||||||
|
@ -531,7 +527,7 @@ SSyncPeer *syncAcquirePeer(int64_t rid) {
|
||||||
|
|
||||||
void syncReleasePeer(SSyncPeer *pPeer) {
|
void syncReleasePeer(SSyncPeer *pPeer) {
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
|
||||||
sTrace("%s, dec peer refId:%" PRId64 ", refCount:%d", pPeer->id, pPeer->rid, refCount);
|
sTrace("%s, release peer refId:%" PRId64 ", refCount:%d", pPeer->id, pPeer->rid, refCount);
|
||||||
|
|
||||||
taosReleaseRef(tsPeerRefId, pPeer->rid);
|
taosReleaseRef(tsPeerRefId, pPeer->rid);
|
||||||
}
|
}
|
||||||
|
@ -879,14 +875,14 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
|
||||||
int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, (void *)pPeer->rid);
|
int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, (void *)pPeer->rid);
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
|
|
||||||
if (ret != 0) {
|
if (ret < 0) {
|
||||||
sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to create sync retrieve thread since %s", pPeer->id, strerror(errno));
|
||||||
|
syncReleasePeer(pPeer);
|
||||||
} else {
|
} else {
|
||||||
pPeer->sstatus = TAOS_SYNC_STATUS_START;
|
pPeer->sstatus = TAOS_SYNC_STATUS_START;
|
||||||
sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
sDebug("%s, sync retrieve thread:0x%08" PRIx64 " create successfully, rid:%" PRId64 ", set sstatus:%s", pPeer->id,
|
||||||
|
taosGetPthreadId(thread), pPeer->rid, syncStatus[pPeer->sstatus]);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncReleasePeer(pPeer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncNotStarted(void *param, void *tmrId) {
|
static void syncNotStarted(void *param, void *tmrId) {
|
||||||
|
@ -1154,19 +1150,19 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
|
||||||
|
|
||||||
(void)syncAcquirePeer(pPeer->rid);
|
(void)syncAcquirePeer(pPeer->rid);
|
||||||
|
|
||||||
int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, (void *)pPeer->rid);
|
int32_t ret = pthread_create(&thread, &thattr, (void *)syncRestoreData, (void *)pPeer->rid);
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
|
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
nodeSStatus = TAOS_SYNC_STATUS_INIT;
|
nodeSStatus = TAOS_SYNC_STATUS_INIT;
|
||||||
sError("%s, failed to create sync thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
sError("%s, failed to create sync restore thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
||||||
taosClose(pPeer->syncFd);
|
taosClose(pPeer->syncFd);
|
||||||
|
syncReleasePeer(pPeer);
|
||||||
} else {
|
} else {
|
||||||
sInfo("%s, sync connection is up", pPeer->id);
|
sInfo("%s, sync restore thread:0x%08" PRIx64 " create successfully, rid:%" PRId64, pPeer->id,
|
||||||
|
taosGetPthreadId(thread), pPeer->rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncReleasePeer(pPeer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
|
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
|
||||||
|
|
|
@ -353,12 +353,16 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
|
||||||
void *syncRestoreData(void *param) {
|
void *syncRestoreData(void *param) {
|
||||||
int64_t rid = (int64_t)param;
|
int64_t rid = (int64_t)param;
|
||||||
SSyncPeer *pPeer = syncAcquirePeer(rid);
|
SSyncPeer *pPeer = syncAcquirePeer(rid);
|
||||||
if (pPeer == NULL) return NULL;
|
if (pPeer == NULL) {
|
||||||
|
sError("failed to restore data, invalid peer rid:%" PRId64, rid);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
|
|
||||||
taosBlockSIGPIPE();
|
taosBlockSIGPIPE();
|
||||||
__sync_fetch_and_add(&tsSyncNum, 1);
|
__sync_fetch_and_add(&tsSyncNum, 1);
|
||||||
|
sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
||||||
|
|
||||||
(*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
|
(*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
|
||||||
|
|
||||||
|
@ -380,11 +384,14 @@ void *syncRestoreData(void *param) {
|
||||||
(*pNode->notifyRole)(pNode->vgId, nodeRole);
|
(*pNode->notifyRole)(pNode->vgId, nodeRole);
|
||||||
|
|
||||||
nodeSStatus = TAOS_SYNC_STATUS_INIT;
|
nodeSStatus = TAOS_SYNC_STATUS_INIT;
|
||||||
sInfo("%s, sync over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
||||||
|
|
||||||
taosClose(pPeer->syncFd);
|
taosClose(pPeer->syncFd);
|
||||||
syncCloseRecvBuffer(pNode);
|
syncCloseRecvBuffer(pNode);
|
||||||
__sync_fetch_and_sub(&tsSyncNum, 1);
|
__sync_fetch_and_sub(&tsSyncNum, 1);
|
||||||
|
|
||||||
|
// The ref is obtained in both the create thread and the current thread, so it is released twice
|
||||||
|
syncReleasePeer(pPeer);
|
||||||
syncReleasePeer(pPeer);
|
syncReleasePeer(pPeer);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -194,7 +194,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret);
|
sDebug("sfd:%d, read to the end of file, ret:%d", sfd, ret);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +253,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
|
sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
|
||||||
|
|
||||||
int32_t wsize = code;
|
int32_t wsize = code;
|
||||||
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
||||||
|
@ -466,10 +466,15 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
||||||
void *syncRetrieveData(void *param) {
|
void *syncRetrieveData(void *param) {
|
||||||
int64_t rid = (int64_t)param;
|
int64_t rid = (int64_t)param;
|
||||||
SSyncPeer *pPeer = syncAcquirePeer(rid);
|
SSyncPeer *pPeer = syncAcquirePeer(rid);
|
||||||
if (pPeer == NULL) return NULL;
|
if (pPeer == NULL) {
|
||||||
|
sError("failed to retrieve data, invalid peer rid:%" PRId64, rid);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
|
|
||||||
taosBlockSIGPIPE();
|
taosBlockSIGPIPE();
|
||||||
|
sInfo("%s, start to retrieve data, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||||
|
|
||||||
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
|
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
|
||||||
|
|
||||||
|
@ -496,7 +501,11 @@ void *syncRetrieveData(void *param) {
|
||||||
|
|
||||||
pPeer->fileChanged = 0;
|
pPeer->fileChanged = 0;
|
||||||
taosClose(pPeer->syncFd);
|
taosClose(pPeer->syncFd);
|
||||||
|
|
||||||
|
// The ref is obtained in both the create thread and the current thread, so it is released twice
|
||||||
|
syncReleasePeer(pPeer);
|
||||||
syncReleasePeer(pPeer);
|
syncReleasePeer(pPeer);
|
||||||
|
|
||||||
|
sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -364,7 +364,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
|
||||||
ptm = localtime_r(&curTime, &Tm);
|
ptm = localtime_r(&curTime, &Tm);
|
||||||
|
|
||||||
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
|
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
|
||||||
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId());
|
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId());
|
||||||
len += sprintf(buffer + len, "%s", flags);
|
len += sprintf(buffer + len, "%s", flags);
|
||||||
|
|
||||||
va_start(argpointer, format);
|
va_start(argpointer, format);
|
||||||
|
@ -450,7 +450,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
|
||||||
ptm = localtime_r(&curTime, &Tm);
|
ptm = localtime_r(&curTime, &Tm);
|
||||||
|
|
||||||
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
|
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
|
||||||
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId());
|
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId());
|
||||||
len += sprintf(buffer + len, "%s", flags);
|
len += sprintf(buffer + len, "%s", flags);
|
||||||
|
|
||||||
va_start(argpointer, format);
|
va_start(argpointer, format);
|
||||||
|
|
|
@ -249,7 +249,7 @@ void taosNotePrint(SNoteObj *pNote, const char *const format, ...) {
|
||||||
curTime = timeSecs.tv_sec;
|
curTime = timeSecs.tv_sec;
|
||||||
ptm = localtime_r(&curTime, &Tm);
|
ptm = localtime_r(&curTime, &Tm);
|
||||||
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
|
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%08" PRIx64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
|
||||||
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetPthreadId());
|
ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId());
|
||||||
va_start(argpointer, format);
|
va_start(argpointer, format);
|
||||||
len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer);
|
len += vsnprintf(buffer + len, MAX_NOTE_LINE_SIZE - len, format, argpointer);
|
||||||
va_end(argpointer);
|
va_end(argpointer);
|
||||||
|
|
|
@ -447,7 +447,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosLockList(int64_t *lockedBy) {
|
static void taosLockList(int64_t *lockedBy) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
|
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
|
||||||
if (++i % 100 == 0) {
|
if (++i % 100 == 0) {
|
||||||
|
@ -457,7 +457,7 @@ static void taosLockList(int64_t *lockedBy) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosUnlockList(int64_t *lockedBy) {
|
static void taosUnlockList(int64_t *lockedBy) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
|
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,7 +119,7 @@ static void timerDecRef(tmr_obj_t* timer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void lockTimerList(timer_list_t* list) {
|
static void lockTimerList(timer_list_t* list) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
|
while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
|
||||||
if (++i % 1000 == 0) {
|
if (++i % 1000 == 0) {
|
||||||
|
@ -129,7 +129,7 @@ static void lockTimerList(timer_list_t* list) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void unlockTimerList(timer_list_t* list) {
|
static void unlockTimerList(timer_list_t* list) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetSelfPthreadId();
|
||||||
if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
|
if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
|
||||||
assert(false);
|
assert(false);
|
||||||
tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
|
tmrError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
|
||||||
|
@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
|
||||||
|
|
||||||
static void processExpiredTimer(void* handle, void* arg) {
|
static void processExpiredTimer(void* handle, void* arg) {
|
||||||
tmr_obj_t* timer = (tmr_obj_t*)handle;
|
tmr_obj_t* timer = (tmr_obj_t*)handle;
|
||||||
timer->executedBy = taosGetPthreadId();
|
timer->executedBy = taosGetSelfPthreadId();
|
||||||
uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
|
uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
|
||||||
if (state == TIMER_STATE_WAITING) {
|
if (state == TIMER_STATE_WAITING) {
|
||||||
const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
|
const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
|
||||||
|
@ -406,7 +406,7 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timer->executedBy == taosGetPthreadId()) {
|
if (timer->executedBy == taosGetSelfPthreadId()) {
|
||||||
// taosTmrReset is called in the timer callback, should do nothing in this
|
// taosTmrReset is called in the timer callback, should do nothing in this
|
||||||
// case to avoid dead lock. note taosTmrReset must be the last statement
|
// case to avoid dead lock. note taosTmrReset must be the last statement
|
||||||
// of the callback funtion, will be a bug otherwise.
|
// of the callback funtion, will be a bug otherwise.
|
||||||
|
|
|
@ -50,6 +50,24 @@ $d1_first = $rows
|
||||||
sql select * from log.dn2
|
sql select * from log.dn2
|
||||||
$d2_first = $rows
|
$d2_first = $rows
|
||||||
|
|
||||||
|
$x = 0
|
||||||
|
show4:
|
||||||
|
$x = $x + 1
|
||||||
|
sleep 1000
|
||||||
|
if $x == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
if $data2_1 != master then
|
||||||
|
goto show4
|
||||||
|
endi
|
||||||
|
if $data2_2 != slave then
|
||||||
|
goto show4
|
||||||
|
endi
|
||||||
|
|
||||||
sleep 3000
|
sleep 3000
|
||||||
sql select * from log.dn1
|
sql select * from log.dn1
|
||||||
$d1_second = $rows
|
$d1_second = $rows
|
||||||
|
|
Loading…
Reference in New Issue