stmt query

This commit is contained in:
dapan1121 2022-04-29 11:31:39 +08:00
parent 18b6c99e09
commit a4d4cd2a84
3 changed files with 38 additions and 9 deletions

View File

@ -75,7 +75,15 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
if (valueNode->node.resType.type != type) { if (valueNode->node.resType.type != type) {
out.columnData->info.type = type; out.columnData->info.type = type;
if (IS_VAR_DATA_TYPE(type)) {
if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
} else {
out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
}
} else {
out.columnData->info.bytes = tDataTypes[type].bytes; out.columnData->info.bytes = tDataTypes[type].bytes;
}
code = doConvertDataType(valueNode, &out); code = doConvertDataType(valueNode, &out);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -85,7 +85,9 @@ typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId uint64_t sId; // schedulerId
SSchedulerCfg cfg; SSchedulerCfg cfg;
SRWLatch lock;
int32_t jobRef; int32_t jobRef;
int32_t jobNum;
SSchedulerStat stat; SSchedulerStat stat;
SHashObj *hbConnections; SHashObj *hbConnections;
} SSchedulerMgmt; } SSchedulerMgmt;

View File

@ -21,7 +21,9 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
SSchedulerMgmt schMgmt = {0}; SSchedulerMgmt schMgmt = {
.jobRef = -1,
};
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
@ -70,6 +72,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool syncSchedule) { int64_t startTs, bool syncSchedule) {
int32_t code = 0; int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) { if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
@ -114,15 +117,17 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
tsem_init(&pJob->rspSem, 0, 0); tsem_init(&pJob->rspSem, 0, 0);
int64_t refId = taosAddRef(schMgmt.jobRef, pJob); refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) { if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno); SCH_ERR_JRET(terrno);
} }
atomic_add_fetch_32(&schMgmt.jobNum, 1);
if (NULL == schAcquireJob(refId)) { if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
} }
pJob->refId = refId; pJob->refId = refId;
@ -137,7 +142,11 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
_return: _return:
if (refId < 0) {
schFreeJobImpl(pJob); schFreeJobImpl(pJob);
} else {
taosRemoveRef(schMgmt.jobRef, refId);
}
SCH_RET(code); SCH_RET(code);
} }
@ -2239,6 +2248,15 @@ int32_t schCancelJob(SSchJob *pJob) {
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
} }
void schCloseJobRef(void) {
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = -1;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}
void schFreeJobImpl(void *job) { void schFreeJobImpl(void *job) {
if (NULL == job) { if (NULL == job) {
return; return;
@ -2284,6 +2302,10 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear(pJob); taosMemoryFreeClear(pJob);
qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
atomic_sub_fetch_32(&schMgmt.jobNum, 1);
schCloseJobRef();
} }
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
@ -2732,7 +2754,7 @@ void schedulerFreeTaskList(SArray *taskList) {
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {
if (schMgmt.jobRef) { if (schMgmt.jobRef >= 0) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
int64_t refId = 0; int64_t refId = 0;
@ -2745,9 +2767,6 @@ void schedulerDestroy(void) {
pJob = taosIterateRef(schMgmt.jobRef, refId); pJob = taosIterateRef(schMgmt.jobRef, refId);
} }
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = 0;
} }
if (schMgmt.hbConnections) { if (schMgmt.hbConnections) {