From f263dac186d1a15280186b7ded023ab2d0ec93f1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sun, 10 Jul 2022 19:29:26 +0800 Subject: [PATCH 1/5] fix: fix taosc crash issue --- include/util/tlockfree.h | 2 +- source/libs/scheduler/inc/schInt.h | 44 +++++++++++++++++++++++++++-- source/libs/scheduler/src/schDbg.c | 1 + source/libs/scheduler/src/schJob.c | 5 ++++ source/libs/scheduler/src/schTask.c | 4 +-- source/util/src/tlockfree.c | 4 ++- 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/include/util/tlockfree.h b/include/util/tlockfree.h index 44e43f81cf..638499cc60 100644 --- a/include/util/tlockfree.h +++ b/include/util/tlockfree.h @@ -69,7 +69,7 @@ typedef void (*_ref_fn_t)(const void *pObj); #define T_REF_VAL_GET(x) (x)->_ref.val // single writer multiple reader lock -typedef volatile int32_t SRWLatch; +typedef volatile int64_t SRWLatch; void taosInitRWLatch(SRWLatch *pLatch); void taosWLockLatch(SRWLatch *pLatch); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 052fdefa61..4b5aac60ea 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -54,6 +54,11 @@ typedef enum { SCH_OP_GET_STATUS, } SCH_OP_TYPE; +typedef struct SSchDebug { + bool lockEnable; + bool apiEnable; +} SSchDebug; + typedef struct SSchTrans { void *pTrans; void *pHandle; @@ -356,8 +361,41 @@ extern SSchedulerMgmt schMgmt; #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0) -#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) -#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) +#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0) + +#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + +#define SCH_LOCK(type, _lock) do { \ + if (SCH_READ == (type)) { \ + assert(atomic_load_32((_lock)) >= 0); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) > 0); \ + } else { \ + assert(atomic_load_32((_lock)) >= 0); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + } \ +} while (0) + +#define SCH_UNLOCK(type, _lock) do { \ + if (SCH_READ == (type)) { \ + assert(atomic_load_32((_lock)) > 0); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRUnLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ + } else { \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWUnLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ + } \ +} while (0) void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); @@ -435,6 +473,8 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); +extern SSchDebug gSCHDebug; + #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index 7f013b8f32..5ecc27ff6e 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -17,6 +17,7 @@ #include "schInt.h" tsem_t schdRspSem; +SSchDebug gSCHDebug = {.lockEnable = true}; void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { if (code) { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d2f9624eee..bba75db376 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -543,9 +543,12 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { if (rsp->tbFName[0]) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (NULL == pJob->execRes.res) { pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); if (NULL == pJob->execRes.res) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } @@ -557,6 +560,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { taosArrayPush((SArray *)pJob->execRes.res, &tbInfo); pJob->execRes.msgType = TDMT_SCH_QUERY; + + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e1e4ed8769..23c542b670 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -263,7 +263,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); - SCH_LOCK(SCH_WRITE, &parent->lock); + SCH_LOCK_TASK(parent); SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, @@ -272,7 +272,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .fetchMsgType = SCH_FETCH_TYPE(pTask), }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); - SCH_UNLOCK(SCH_WRITE, &parent->lock); + SCH_UNLOCK_TASK(parent); if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); diff --git a/source/util/src/tlockfree.c b/source/util/src/tlockfree.c index a755a67cc8..55f0211476 100644 --- a/source/util/src/tlockfree.c +++ b/source/util/src/tlockfree.c @@ -17,8 +17,10 @@ #include "tlockfree.h" #define TD_RWLATCH_WRITE_FLAG 0x40000000 +#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000 void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; } +void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = 0x4000000000000000; } void taosWLockLatch(SRWLatch *pLatch) { SRWLatch oLatch, nLatch; @@ -90,4 +92,4 @@ void taosRLockLatch(SRWLatch *pLatch) { } } -void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); } \ No newline at end of file +void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); } From e4446354e9782766b78163359ab26ca7857b98f7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 10 Jul 2022 20:12:14 +0800 Subject: [PATCH 2/5] fix(query): update the timerange. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 1 + source/libs/executor/src/timewindowoperator.c | 1 + 3 files changed, 3 insertions(+) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 72efaa165d..20b03716f4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -552,6 +552,7 @@ typedef struct SFillOperatorInfo { STimeWindow win; SNode* pCondition; SArray* pColMatchColInfo; + int32_t primaryTsCol; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1de02e3545..2549d3128e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4006,6 +4006,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SResultInfo* pResultInfo = &pOperator->resultInfo; initResultSizeInfo(pOperator, 4096); + pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; int32_t numOfOutputCols = 0; SArray* pColMatchColInfo = diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 78775073a4..947d10dcb4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4502,6 +4502,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } size_t rows = pRes->info.rows; + blockDataUpdateTsWindow(pRes, iaInfo->primaryTsIndex); pOperator->resultInfo.totalRows += rows; return (rows == 0) ? NULL : pRes; } From def96ee38ff39591ecf340dfcf9fc09982227b5c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 11 Jul 2022 10:43:08 +0800 Subject: [PATCH 3/5] fix: fix taos crash and deak lock issue --- include/util/tlockfree.h | 11 +++--- source/libs/catalog/inc/catalogInt.h | 34 ++++++++--------- source/libs/qworker/inc/qwInt.h | 32 ++++++++-------- source/libs/scheduler/inc/schInt.h | 34 ++++++++--------- source/libs/scheduler/src/schDbg.c | 2 +- source/libs/scheduler/src/schJob.c | 4 +- source/libs/scheduler/src/schTask.c | 1 + source/util/src/tlockfree.c | 51 +++++++++++++++++++------ tests/system-test/2-query/queryQnode.py | 6 +-- 9 files changed, 101 insertions(+), 74 deletions(-) diff --git a/include/util/tlockfree.h b/include/util/tlockfree.h index 638499cc60..54a90d7b71 100644 --- a/include/util/tlockfree.h +++ b/include/util/tlockfree.h @@ -71,11 +71,12 @@ typedef void (*_ref_fn_t)(const void *pObj); // single writer multiple reader lock typedef volatile int64_t SRWLatch; -void taosInitRWLatch(SRWLatch *pLatch); -void taosWLockLatch(SRWLatch *pLatch); -void taosWUnLockLatch(SRWLatch *pLatch); -void taosRLockLatch(SRWLatch *pLatch); -void taosRUnLockLatch(SRWLatch *pLatch); +void taosInitRWLatch(SRWLatch *pLatch); +void taosInitReentrantRWLatch(SRWLatch *pLatch); +void taosWLockLatch(SRWLatch *pLatch); +void taosWUnLockLatch(SRWLatch *pLatch); +void taosRLockLatch(SRWLatch *pLatch); +void taosRUnLockLatch(SRWLatch *pLatch); int32_t taosWTryLockLatch(SRWLatch *pLatch); // copy on read diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9d0e3871cc..453f30d151 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -480,37 +480,35 @@ typedef struct SCtgOperation { #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 -#define CTG_IS_LOCKED(_lock) atomic_load_32((_lock)) - #define CTG_LOCK(type, _lock) do { \ if (CTG_READ == (type)) { \ - assert(atomic_load_32((_lock)) >= 0); \ - CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ + CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ - CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) > 0); \ + CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) > 0); \ } else { \ - assert(atomic_load_32((_lock)) >= 0); \ - CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ + CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ - CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ } \ } while (0) #define CTG_UNLOCK(type, _lock) do { \ if (CTG_READ == (type)) { \ - assert(atomic_load_32((_lock)) > 0); \ - CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) > 0); \ + CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) >= 0); \ + CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ } else { \ - assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ - CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) >= 0); \ + CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ } \ } while (0) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 539643c390..b35e0e2fc4 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt { #define QW_LOCK(type, _lock) \ do { \ if (QW_READ == (type)) { \ - assert(atomic_load_32((_lock)) >= 0); \ - QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ + QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ - QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) > 0); \ + QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) > 0); \ } else { \ - assert(atomic_load_32((_lock)) >= 0); \ - QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ + QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ - QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ } \ } while (0) #define QW_UNLOCK(type, _lock) \ do { \ if (QW_READ == (type)) { \ - assert(atomic_load_32((_lock)) > 0); \ - QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) > 0); \ + QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) >= 0); \ + QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ } else { \ - assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ - QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) >= 0); \ + QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ } \ } while (0) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 4b5aac60ea..79adfaebb3 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -191,7 +191,7 @@ typedef struct SSchTaskProfile { typedef struct SSchTask { uint64_t taskId; // task id - SRWLatch lock; // task lock + SRWLatch lock; // task reentrant lock int32_t maxExecTimes; // task may exec times int32_t execId; // task current execute try index SSchLevel *level; // level @@ -367,33 +367,33 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOCK(type, _lock) do { \ if (SCH_READ == (type)) { \ - assert(atomic_load_32((_lock)) >= 0); \ - SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64(_lock) >= 0); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) > 0); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64(_lock) > 0); \ } else { \ - assert(atomic_load_32((_lock)) >= 0); \ - SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64(_lock) >= 0); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64(_lock) & TD_RWLATCH_WRITE_FLAG_COPY); \ } \ } while (0) #define SCH_UNLOCK(type, _lock) do { \ if (SCH_READ == (type)) { \ - assert(atomic_load_32((_lock)) > 0); \ - SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) > 0); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) >= 0); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ } else { \ - assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ - SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - assert(atomic_load_32((_lock)) >= 0); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \ + assert(atomic_load_64((_lock)) >= 0); \ } \ } while (0) diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index 5ecc27ff6e..a6398522d3 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -17,7 +17,7 @@ #include "schInt.h" tsem_t schdRspSem; -SSchDebug gSCHDebug = {.lockEnable = true}; +SSchDebug gSCHDebug = {0}; void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { if (code) { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index bba75db376..394095785d 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -337,14 +337,14 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_SET_JOB_TYPE(pJob, plan->subplanType); SSchTask task = {0}; - SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel, levelNum)); - SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task); if (NULL == pTask) { SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum)); + SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask)); if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 23c542b670..a6621d279d 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -60,6 +60,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + taosInitReentrantRWLatch(&pTask->lock); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); diff --git a/source/util/src/tlockfree.c b/source/util/src/tlockfree.c index 55f0211476..3cab16ee83 100644 --- a/source/util/src/tlockfree.c +++ b/source/util/src/tlockfree.c @@ -20,7 +20,7 @@ #define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000 void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; } -void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = 0x4000000000000000; } +void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = TD_RWLATCH_REENTRANT_FLAG; } void taosWLockLatch(SRWLatch *pLatch) { SRWLatch oLatch, nLatch; @@ -28,8 +28,14 @@ void taosWLockLatch(SRWLatch *pLatch) { // Set write flag while (1) { - oLatch = atomic_load_32(pLatch); + oLatch = atomic_load_64(pLatch); if (oLatch & TD_RWLATCH_WRITE_FLAG) { + if (oLatch & TD_RWLATCH_REENTRANT_FLAG) { + nLatch = (((oLatch >> 32) + 1) << 32) | (oLatch & 0xFFFFFFFF); + if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break; + + continue; + } nLoops++; if (nLoops > 1000) { sched_yield(); @@ -39,14 +45,14 @@ void taosWLockLatch(SRWLatch *pLatch) { } nLatch = oLatch | TD_RWLATCH_WRITE_FLAG; - if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break; + if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break; } // wait for all reads end nLoops = 0; while (1) { - oLatch = atomic_load_32(pLatch); - if (oLatch == TD_RWLATCH_WRITE_FLAG) break; + oLatch = atomic_load_64(pLatch); + if (0 == (oLatch & 0xFFFFFFF)) break; nLoops++; if (nLoops > 1000) { sched_yield(); @@ -55,29 +61,50 @@ void taosWLockLatch(SRWLatch *pLatch) { } } +// no reentrant int32_t taosWTryLockLatch(SRWLatch *pLatch) { SRWLatch oLatch, nLatch; - oLatch = atomic_load_32(pLatch); - if (oLatch) { + oLatch = atomic_load_64(pLatch); + if (oLatch << 2) { return -1; } nLatch = oLatch | TD_RWLATCH_WRITE_FLAG; - if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) { + if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) { return 0; } return -1; } -void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); } +void taosWUnLockLatch(SRWLatch *pLatch) { + SRWLatch oLatch, nLatch, wLatch; + + while (1) { + oLatch = atomic_load_64(pLatch); + + if (0 == (oLatch & TD_RWLATCH_REENTRANT_FLAG)) { + atomic_store_64(pLatch, 0); + break; + } + + wLatch = ((oLatch << 2) >> 34); + if (wLatch) { + nLatch = ((--wLatch) << 32) | TD_RWLATCH_REENTRANT_FLAG | TD_RWLATCH_WRITE_FLAG; + } else { + nLatch = TD_RWLATCH_REENTRANT_FLAG; + } + + if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break; + } +} void taosRLockLatch(SRWLatch *pLatch) { SRWLatch oLatch, nLatch; int32_t nLoops = 0; while (1) { - oLatch = atomic_load_32(pLatch); + oLatch = atomic_load_64(pLatch); if (oLatch & TD_RWLATCH_WRITE_FLAG) { nLoops++; if (nLoops > 1000) { @@ -88,8 +115,8 @@ void taosRLockLatch(SRWLatch *pLatch) { } nLatch = oLatch + 1; - if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break; + if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break; } } -void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); } +void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_64(pLatch, 1); } diff --git a/tests/system-test/2-query/queryQnode.py b/tests/system-test/2-query/queryQnode.py index 8b893a93d7..3fdc09478d 100644 --- a/tests/system-test/2-query/queryQnode.py +++ b/tests/system-test/2-query/queryQnode.py @@ -32,9 +32,9 @@ class TDTestCase: # # --------------- main frame ------------------- # - clientCfgDict = {'queryPolicy': '1','debugFlag': 135} + clientCfgDict = {'queryPolicy': '1','debugFlag': 143} clientCfgDict["queryPolicy"] = '1' - clientCfgDict["debugFlag"] = 131 + clientCfgDict["debugFlag"] = 143 updatecfgDict = {'clientCfg': {}} updatecfgDict = {'debugFlag': 143} @@ -480,4 +480,4 @@ class TDTestCase: # add case with filename # tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase()) From b824dc71a091b2650236c4fc7c73c7f62c93e39a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 11 Jul 2022 11:23:15 +0800 Subject: [PATCH 4/5] fix: fix invalid time range issue --- source/libs/catalog/src/ctgAsync.c | 6 ++++-- source/libs/scalar/src/filter.c | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 455f2bd6a7..e77df8f7f2 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -789,9 +789,13 @@ _return: int32_t ctgCallUserCb(void* param) { SCtgJob* pJob = (SCtgJob*)param; + + qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); + qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId); + taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); return TSDB_CODE_SUCCESS; @@ -822,8 +826,6 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { _return: - qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code)); - pJob->jobResCode = code; //taosSsleep(2); diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 42121e8813..0348f13191 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3623,7 +3623,8 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } - if (FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP) && node->opType >= OP_TYPE_NOT_EQUAL) { + if (FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP) && + (node->opType >= OP_TYPE_NOT_EQUAL) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) { stat->scalarMode = true; return DEAL_RES_CONTINUE; } From 0fb06cbc4cec5a48fd7f90e1ffd345fb7b24f43c Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Mon, 11 Jul 2022 11:40:44 +0800 Subject: [PATCH 5/5] fix: set capacity for newly created datablock --- source/common/src/tdatablock.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3bb829f77a..1fa37d5ef0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -463,6 +463,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 pDst->info = pBlock->info; pDst->info.rows = 0; + pDst->info.capacity = 0; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0};