Merge branch '3.0' into feature/3_liaohj
This commit is contained in:
commit
4f43d7df43
|
@ -69,13 +69,14 @@ typedef void (*_ref_fn_t)(const void *pObj);
|
||||||
#define T_REF_VAL_GET(x) (x)->_ref.val
|
#define T_REF_VAL_GET(x) (x)->_ref.val
|
||||||
|
|
||||||
// single writer multiple reader lock
|
// single writer multiple reader lock
|
||||||
typedef volatile int32_t SRWLatch;
|
typedef volatile int64_t SRWLatch;
|
||||||
|
|
||||||
void taosInitRWLatch(SRWLatch *pLatch);
|
void taosInitRWLatch(SRWLatch *pLatch);
|
||||||
void taosWLockLatch(SRWLatch *pLatch);
|
void taosInitReentrantRWLatch(SRWLatch *pLatch);
|
||||||
void taosWUnLockLatch(SRWLatch *pLatch);
|
void taosWLockLatch(SRWLatch *pLatch);
|
||||||
void taosRLockLatch(SRWLatch *pLatch);
|
void taosWUnLockLatch(SRWLatch *pLatch);
|
||||||
void taosRUnLockLatch(SRWLatch *pLatch);
|
void taosRLockLatch(SRWLatch *pLatch);
|
||||||
|
void taosRUnLockLatch(SRWLatch *pLatch);
|
||||||
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
||||||
|
|
||||||
// copy on read
|
// copy on read
|
||||||
|
|
|
@ -463,6 +463,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
|
|
||||||
pDst->info = pBlock->info;
|
pDst->info = pBlock->info;
|
||||||
pDst->info.rows = 0;
|
pDst->info.rows = 0;
|
||||||
|
pDst->info.capacity = 0;
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
|
|
|
@ -480,37 +480,35 @@ typedef struct SCtgOperation {
|
||||||
|
|
||||||
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||||
|
|
||||||
#define CTG_IS_LOCKED(_lock) atomic_load_32((_lock))
|
|
||||||
|
|
||||||
#define CTG_LOCK(type, _lock) do { \
|
#define CTG_LOCK(type, _lock) do { \
|
||||||
if (CTG_READ == (type)) { \
|
if (CTG_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRLockLatch(_lock); \
|
taosRLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWLockLatch(_lock); \
|
taosWLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CTG_UNLOCK(type, _lock) do { \
|
#define CTG_UNLOCK(type, _lock) do { \
|
||||||
if (CTG_READ == (type)) { \
|
if (CTG_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRUnLockLatch(_lock); \
|
taosRUnLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWUnLockLatch(_lock); \
|
taosWUnLockLatch(_lock); \
|
||||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -789,9 +789,13 @@ _return:
|
||||||
|
|
||||||
int32_t ctgCallUserCb(void* param) {
|
int32_t ctgCallUserCb(void* param) {
|
||||||
SCtgJob* pJob = (SCtgJob*)param;
|
SCtgJob* pJob = (SCtgJob*)param;
|
||||||
|
|
||||||
|
qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
|
||||||
|
|
||||||
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
|
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
|
||||||
|
|
||||||
|
qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
|
||||||
|
|
||||||
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -822,8 +826,6 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code));
|
|
||||||
|
|
||||||
pJob->jobResCode = code;
|
pJob->jobResCode = code;
|
||||||
|
|
||||||
//taosSsleep(2);
|
//taosSsleep(2);
|
||||||
|
|
|
@ -555,6 +555,7 @@ typedef struct SFillOperatorInfo {
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
SNode* pCondition;
|
SNode* pCondition;
|
||||||
SArray* pColMatchColInfo;
|
SArray* pColMatchColInfo;
|
||||||
|
int32_t primaryTsCol;
|
||||||
} SFillOperatorInfo;
|
} SFillOperatorInfo;
|
||||||
|
|
||||||
typedef struct SGroupbyOperatorInfo {
|
typedef struct SGroupbyOperatorInfo {
|
||||||
|
|
|
@ -4043,6 +4043,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
|
|
||||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* pColMatchColInfo =
|
SArray* pColMatchColInfo =
|
||||||
|
|
|
@ -4502,6 +4502,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t rows = pRes->info.rows;
|
size_t rows = pRes->info.rows;
|
||||||
|
blockDataUpdateTsWindow(pRes, iaInfo->primaryTsIndex);
|
||||||
pOperator->resultInfo.totalRows += rows;
|
pOperator->resultInfo.totalRows += rows;
|
||||||
return (rows == 0) ? NULL : pRes;
|
return (rows == 0) ? NULL : pRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_LOCK(type, _lock) \
|
#define QW_LOCK(type, _lock) \
|
||||||
do { \
|
do { \
|
||||||
if (QW_READ == (type)) { \
|
if (QW_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRLockLatch(_lock); \
|
taosRLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWLockLatch(_lock); \
|
taosWLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define QW_UNLOCK(type, _lock) \
|
#define QW_UNLOCK(type, _lock) \
|
||||||
do { \
|
do { \
|
||||||
if (QW_READ == (type)) { \
|
if (QW_READ == (type)) { \
|
||||||
assert(atomic_load_32((_lock)) > 0); \
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosRUnLockLatch(_lock); \
|
taosRUnLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} else { \
|
} else { \
|
||||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
taosWUnLockLatch(_lock); \
|
taosWUnLockLatch(_lock); \
|
||||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
assert(atomic_load_32((_lock)) >= 0); \
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -3623,7 +3623,8 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP) && node->opType >= OP_TYPE_NOT_EQUAL) {
|
if (FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP) &&
|
||||||
|
(node->opType >= OP_TYPE_NOT_EQUAL) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
|
||||||
stat->scalarMode = true;
|
stat->scalarMode = true;
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,11 @@ typedef enum {
|
||||||
SCH_OP_GET_STATUS,
|
SCH_OP_GET_STATUS,
|
||||||
} SCH_OP_TYPE;
|
} SCH_OP_TYPE;
|
||||||
|
|
||||||
|
typedef struct SSchDebug {
|
||||||
|
bool lockEnable;
|
||||||
|
bool apiEnable;
|
||||||
|
} SSchDebug;
|
||||||
|
|
||||||
typedef struct SSchTrans {
|
typedef struct SSchTrans {
|
||||||
void *pTrans;
|
void *pTrans;
|
||||||
void *pHandle;
|
void *pHandle;
|
||||||
|
@ -186,7 +191,7 @@ typedef struct SSchTaskProfile {
|
||||||
|
|
||||||
typedef struct SSchTask {
|
typedef struct SSchTask {
|
||||||
uint64_t taskId; // task id
|
uint64_t taskId; // task id
|
||||||
SRWLatch lock; // task lock
|
SRWLatch lock; // task reentrant lock
|
||||||
int32_t maxExecTimes; // task may exec times
|
int32_t maxExecTimes; // task may exec times
|
||||||
int32_t execId; // task current execute try index
|
int32_t execId; // task current execute try index
|
||||||
SSchLevel *level; // level
|
SSchLevel *level; // level
|
||||||
|
@ -356,8 +361,41 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
|
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
|
||||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
|
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
|
||||||
|
|
||||||
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
|
#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
|
||||||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
|
||||||
|
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||||
|
|
||||||
|
#define SCH_LOCK(type, _lock) do { \
|
||||||
|
if (SCH_READ == (type)) { \
|
||||||
|
assert(atomic_load_64(_lock) >= 0); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosRLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64(_lock) > 0); \
|
||||||
|
} else { \
|
||||||
|
assert(atomic_load_64(_lock) >= 0); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosWLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64(_lock) & TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define SCH_UNLOCK(type, _lock) do { \
|
||||||
|
if (SCH_READ == (type)) { \
|
||||||
|
assert(atomic_load_64((_lock)) > 0); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosRUnLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
|
} else { \
|
||||||
|
assert(atomic_load_64((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
taosWUnLockLatch(_lock); \
|
||||||
|
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
|
||||||
|
assert(atomic_load_64((_lock)) >= 0); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
@ -435,6 +473,8 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
|
||||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
|
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
|
||||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
|
||||||
|
extern SSchDebug gSCHDebug;
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "schInt.h"
|
#include "schInt.h"
|
||||||
|
|
||||||
tsem_t schdRspSem;
|
tsem_t schdRspSem;
|
||||||
|
SSchDebug gSCHDebug = {0};
|
||||||
|
|
||||||
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
|
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -337,14 +337,14 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
||||||
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
|
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
|
||||||
|
|
||||||
SSchTask task = {0};
|
SSchTask task = {0};
|
||||||
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel, levelNum));
|
|
||||||
|
|
||||||
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
|
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
|
||||||
if (NULL == pTask) {
|
if (NULL == pTask) {
|
||||||
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
|
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum));
|
||||||
|
|
||||||
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
||||||
|
|
||||||
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
|
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
|
||||||
|
@ -543,9 +543,12 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
||||||
if (rsp->tbFName[0]) {
|
if (rsp->tbFName[0]) {
|
||||||
|
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
|
||||||
if (NULL == pJob->execRes.res) {
|
if (NULL == pJob->execRes.res) {
|
||||||
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
||||||
if (NULL == pJob->execRes.res) {
|
if (NULL == pJob->execRes.res) {
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -557,6 +560,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
||||||
|
|
||||||
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
|
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
|
||||||
pJob->execRes.msgType = TDMT_SCH_QUERY;
|
pJob->execRes.msgType = TDMT_SCH_QUERY;
|
||||||
|
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -60,6 +60,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
||||||
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
|
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
taosInitReentrantRWLatch(&pTask->lock);
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
||||||
|
|
||||||
|
@ -263,7 +264,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||||
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
||||||
|
|
||||||
SCH_LOCK(SCH_WRITE, &parent->lock);
|
SCH_LOCK_TASK(parent);
|
||||||
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
||||||
.taskId = pTask->taskId,
|
.taskId = pTask->taskId,
|
||||||
.schedId = schMgmt.sId,
|
.schedId = schMgmt.sId,
|
||||||
|
@ -272,7 +273,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
||||||
};
|
};
|
||||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||||
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
SCH_UNLOCK_TASK(parent);
|
||||||
|
|
||||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
||||||
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
|
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
|
||||||
|
|
|
@ -17,8 +17,10 @@
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#define TD_RWLATCH_WRITE_FLAG 0x40000000
|
#define TD_RWLATCH_WRITE_FLAG 0x40000000
|
||||||
|
#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000
|
||||||
|
|
||||||
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
|
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
|
||||||
|
void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = TD_RWLATCH_REENTRANT_FLAG; }
|
||||||
|
|
||||||
void taosWLockLatch(SRWLatch *pLatch) {
|
void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
SRWLatch oLatch, nLatch;
|
SRWLatch oLatch, nLatch;
|
||||||
|
@ -26,8 +28,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
|
|
||||||
// Set write flag
|
// Set write flag
|
||||||
while (1) {
|
while (1) {
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
||||||
|
if (oLatch & TD_RWLATCH_REENTRANT_FLAG) {
|
||||||
|
nLatch = (((oLatch >> 32) + 1) << 32) | (oLatch & 0xFFFFFFFF);
|
||||||
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
nLoops++;
|
nLoops++;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
@ -37,14 +45,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
||||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for all reads end
|
// wait for all reads end
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch == TD_RWLATCH_WRITE_FLAG) break;
|
if (0 == (oLatch & 0xFFFFFFF)) break;
|
||||||
nLoops++;
|
nLoops++;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
@ -53,29 +61,50 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// no reentrant
|
||||||
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
|
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
|
||||||
SRWLatch oLatch, nLatch;
|
SRWLatch oLatch, nLatch;
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch) {
|
if (oLatch << 2) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
||||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); }
|
void taosWUnLockLatch(SRWLatch *pLatch) {
|
||||||
|
SRWLatch oLatch, nLatch, wLatch;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
oLatch = atomic_load_64(pLatch);
|
||||||
|
|
||||||
|
if (0 == (oLatch & TD_RWLATCH_REENTRANT_FLAG)) {
|
||||||
|
atomic_store_64(pLatch, 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
wLatch = ((oLatch << 2) >> 34);
|
||||||
|
if (wLatch) {
|
||||||
|
nLatch = ((--wLatch) << 32) | TD_RWLATCH_REENTRANT_FLAG | TD_RWLATCH_WRITE_FLAG;
|
||||||
|
} else {
|
||||||
|
nLatch = TD_RWLATCH_REENTRANT_FLAG;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void taosRLockLatch(SRWLatch *pLatch) {
|
void taosRLockLatch(SRWLatch *pLatch) {
|
||||||
SRWLatch oLatch, nLatch;
|
SRWLatch oLatch, nLatch;
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
oLatch = atomic_load_32(pLatch);
|
oLatch = atomic_load_64(pLatch);
|
||||||
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
|
||||||
nLoops++;
|
nLoops++;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
|
@ -86,8 +115,8 @@ void taosRLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nLatch = oLatch + 1;
|
nLatch = oLatch + 1;
|
||||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
|
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); }
|
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_64(pLatch, 1); }
|
||||||
|
|
|
@ -32,9 +32,9 @@ class TDTestCase:
|
||||||
#
|
#
|
||||||
# --------------- main frame -------------------
|
# --------------- main frame -------------------
|
||||||
#
|
#
|
||||||
clientCfgDict = {'queryPolicy': '1','debugFlag': 135}
|
clientCfgDict = {'queryPolicy': '1','debugFlag': 143}
|
||||||
clientCfgDict["queryPolicy"] = '1'
|
clientCfgDict["queryPolicy"] = '1'
|
||||||
clientCfgDict["debugFlag"] = 131
|
clientCfgDict["debugFlag"] = 143
|
||||||
|
|
||||||
updatecfgDict = {'clientCfg': {}}
|
updatecfgDict = {'clientCfg': {}}
|
||||||
updatecfgDict = {'debugFlag': 143}
|
updatecfgDict = {'debugFlag': 143}
|
||||||
|
@ -480,4 +480,4 @@ class TDTestCase:
|
||||||
# add case with filename
|
# add case with filename
|
||||||
#
|
#
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
|
Loading…
Reference in New Issue