Merge branch 'fix/TD-17260' into enh/stopquery_a

This commit is contained in:
dapan1121 2022-07-12 15:00:44 +08:00
commit cfa989e6ba
6 changed files with 62 additions and 92 deletions

View File

@ -69,10 +69,9 @@ typedef void (*_ref_fn_t)(const void *pObj);
#define T_REF_VAL_GET(x) (x)->_ref.val #define T_REF_VAL_GET(x) (x)->_ref.val
// single writer multiple reader lock // single writer multiple reader lock
typedef volatile int64_t SRWLatch; typedef volatile int32_t SRWLatch;
void taosInitRWLatch(SRWLatch *pLatch); void taosInitRWLatch(SRWLatch *pLatch);
void taosInitReentrantRWLatch(SRWLatch *pLatch);
void taosWLockLatch(SRWLatch *pLatch); void taosWLockLatch(SRWLatch *pLatch);
void taosWUnLockLatch(SRWLatch *pLatch); void taosWUnLockLatch(SRWLatch *pLatch);
void taosRLockLatch(SRWLatch *pLatch); void taosRLockLatch(SRWLatch *pLatch);

View File

@ -482,33 +482,33 @@ typedef struct SCtgOperation {
#define CTG_LOCK(type, _lock) do { \ #define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \ if (CTG_READ == (type)) { \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \ taosRLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
} else { \ } else { \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \ } \
} while (0) } while (0)
#define CTG_UNLOCK(type, _lock) do { \ #define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \ if (CTG_READ == (type)) { \
assert(atomic_load_64((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} else { \ } else { \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} \ } \
} while (0) } while (0)

View File

@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
#define QW_LOCK(type, _lock) \ #define QW_LOCK(type, _lock) \
do { \ do { \
if (QW_READ == (type)) { \ if (QW_READ == (type)) { \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \ taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
} else { \ } else { \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \ } \
} while (0) } while (0)
#define QW_UNLOCK(type, _lock) \ #define QW_UNLOCK(type, _lock) \
do { \ do { \
if (QW_READ == (type)) { \ if (QW_READ == (type)) { \
assert(atomic_load_64((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} else { \ } else { \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} \ } \
} while (0) } while (0)

View File

@ -368,33 +368,33 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOCK(type, _lock) do { \ #define SCH_LOCK(type, _lock) do { \
if (SCH_READ == (type)) { \ if (SCH_READ == (type)) { \
assert(atomic_load_64(_lock) >= 0); \ assert(atomic_load_32(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \ taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64(_lock) > 0); \ assert(atomic_load_32(_lock) > 0); \
} else { \ } else { \
assert(atomic_load_64(_lock) >= 0); \ assert(atomic_load_32(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64(_lock) & TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \ } \
} while (0) } while (0)
#define SCH_UNLOCK(type, _lock) do { \ #define SCH_UNLOCK(type, _lock) do { \
if (SCH_READ == (type)) { \ if (SCH_READ == (type)) { \
assert(atomic_load_64((_lock)) > 0); \ assert(atomic_load_32((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} else { \ } else { \
assert(atomic_load_64((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \ assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \ assert(atomic_load_32((_lock)) >= 0); \
} \ } \
} while (0) } while (0)

View File

@ -58,7 +58,6 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
taosInitReentrantRWLatch(&pTask->lock);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
@ -260,7 +259,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK_TASK(parent); SCH_LOCK(SCH_WRITE, &parent->planLock);
SDownstreamSourceNode source = { SDownstreamSourceNode source = {
.type = QUERY_NODE_DOWNSTREAM_SOURCE, .type = QUERY_NODE_DOWNSTREAM_SOURCE,
.taskId = pTask->taskId, .taskId = pTask->taskId,
@ -270,7 +269,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.fetchMsgType = SCH_FETCH_TYPE(pTask), .fetchMsgType = SCH_FETCH_TYPE(pTask),
}; };
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK_TASK(parent); SCH_UNLOCK(SCH_WRITE, &parent->planLock);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);

View File

@ -17,10 +17,8 @@
#include "tlockfree.h" #include "tlockfree.h"
#define TD_RWLATCH_WRITE_FLAG 0x40000000 #define TD_RWLATCH_WRITE_FLAG 0x40000000
#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; } void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = TD_RWLATCH_REENTRANT_FLAG; }
void taosWLockLatch(SRWLatch *pLatch) { void taosWLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch; SRWLatch oLatch, nLatch;
@ -28,14 +26,8 @@ void taosWLockLatch(SRWLatch *pLatch) {
// Set write flag // Set write flag
while (1) { while (1) {
oLatch = atomic_load_64(pLatch); oLatch = atomic_load_32(pLatch);
if (oLatch & TD_RWLATCH_WRITE_FLAG) { if (oLatch & TD_RWLATCH_WRITE_FLAG) {
if (oLatch & TD_RWLATCH_REENTRANT_FLAG) {
nLatch = (((oLatch >> 32) + 1) << 32) | (oLatch & 0xFFFFFFFF);
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
continue;
}
nLoops++; nLoops++;
if (nLoops > 1000) { if (nLoops > 1000) {
sched_yield(); sched_yield();
@ -45,14 +37,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
} }
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG; nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break; if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
} }
// wait for all reads end // wait for all reads end
nLoops = 0; nLoops = 0;
while (1) { while (1) {
oLatch = atomic_load_64(pLatch); oLatch = atomic_load_32(pLatch);
if (0 == (oLatch & 0xFFFFFFF)) break; if (0 == oLatch) break;
nLoops++; nLoops++;
if (nLoops > 1000) { if (nLoops > 1000) {
sched_yield(); sched_yield();
@ -64,47 +56,27 @@ void taosWLockLatch(SRWLatch *pLatch) {
// no reentrant // no reentrant
int32_t taosWTryLockLatch(SRWLatch *pLatch) { int32_t taosWTryLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch; SRWLatch oLatch, nLatch;
oLatch = atomic_load_64(pLatch); oLatch = atomic_load_32(pLatch);
if (oLatch << 2) { if (oLatch) {
return -1; return -1;
} }
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG; nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) { if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
return 0; return 0;
} }
return -1; return -1;
} }
void taosWUnLockLatch(SRWLatch *pLatch) { void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); }
SRWLatch oLatch, nLatch, wLatch;
while (1) {
oLatch = atomic_load_64(pLatch);
if (0 == (oLatch & TD_RWLATCH_REENTRANT_FLAG)) {
atomic_store_64(pLatch, 0);
break;
}
wLatch = ((oLatch << 2) >> 34);
if (wLatch) {
nLatch = ((--wLatch) << 32) | TD_RWLATCH_REENTRANT_FLAG | TD_RWLATCH_WRITE_FLAG;
} else {
nLatch = TD_RWLATCH_REENTRANT_FLAG;
}
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
}
}
void taosRLockLatch(SRWLatch *pLatch) { void taosRLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch; SRWLatch oLatch, nLatch;
int32_t nLoops = 0; int32_t nLoops = 0;
while (1) { while (1) {
oLatch = atomic_load_64(pLatch); oLatch = atomic_load_32(pLatch);
if (oLatch & TD_RWLATCH_WRITE_FLAG) { if (oLatch & TD_RWLATCH_WRITE_FLAG) {
nLoops++; nLoops++;
if (nLoops > 1000) { if (nLoops > 1000) {
@ -115,8 +87,8 @@ void taosRLockLatch(SRWLatch *pLatch) {
} }
nLatch = oLatch + 1; nLatch = oLatch + 1;
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break; if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
} }
} }
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_64(pLatch, 1); } void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); }