From 54fe93f77e526d0d830fd711ef4fe1d7cf6be686 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 28 Apr 2022 17:09:21 +0800 Subject: [PATCH 1/7] stmt query --- source/client/inc/clientStmt.h | 27 ++-- source/client/src/clientStmt.c | 47 +++++-- source/dnode/mnode/impl/inc/mndInt.h | 2 +- source/dnode/qnode/inc/qndInt.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/libs/planner/src/planOptimizer.c | 2 + source/libs/qworker/inc/qworkerInt.h | 33 ++++- source/libs/qworker/inc/qworkerMsg.h | 6 +- source/libs/qworker/src/qworker.c | 168 ++++++++++++++++++------ source/libs/qworker/src/qworkerMsg.c | 18 +-- tests/script/api/batchprepare.c | 126 +++++++++--------- 11 files changed, 285 insertions(+), 148 deletions(-) diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index ab27d7f00c..43e886faf5 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -46,11 +46,12 @@ typedef struct SStmtTableCache { void* boundTags; } SStmtTableCache; -typedef struct SQueryFields { +typedef struct SStmtQueryResInfo { TAOS_FIELD* fields; TAOS_FIELD* userFields; uint32_t numOfCols; -} SQueryFields; + int32_t precision; +} SStmtQueryResInfo; typedef struct SStmtBindInfo { bool needParse; @@ -72,17 +73,17 @@ typedef struct SStmtExecInfo { } SStmtExecInfo; typedef struct SStmtSQLInfo { - STMT_TYPE type; - STMT_STATUS status; - bool autoCreate; - uint64_t runTimes; - SHashObj* pTableCache; //SHash - SQuery* pQuery; - char* sqlStr; - int32_t sqlLen; - SArray* nodeList; - SQueryPlan* pQueryPlan; - SQueryFields fields; + STMT_TYPE type; + STMT_STATUS status; + bool autoCreate; + uint64_t runTimes; + SHashObj* pTableCache; //SHash + SQuery* pQuery; + char* sqlStr; + int32_t sqlLen; + SArray* nodeList; + SQueryPlan* pQueryPlan; + SStmtQueryResInfo queryRes; } SStmtSQLInfo; typedef struct STscStmt { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index ca6a11a668..aad7012056 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -74,17 +74,44 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { } int32_t stmtBackupQueryFields(STscStmt* pStmt) { - SQueryFields *pFields = &pStmt->sql.fields; - int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD); + SStmtQueryResInfo *pRes = &pStmt->sql.queryRes; + pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols; + pRes->precision = pStmt->exec.pRequest->body.resInfo.precision; - pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols; - pFields->fields = taosMemoryMalloc(size); - pFields->userFields = taosMemoryMalloc(size); - if (NULL == pFields->fields || NULL == pFields->userFields) { + int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD); + pRes->fields = taosMemoryMalloc(size); + pRes->userFields = taosMemoryMalloc(size); + if (NULL == pRes->fields || NULL == pRes->userFields) { STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); } - memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size); - memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); + memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size); + memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtRestoreQueryFields(STscStmt* pStmt) { + SStmtQueryResInfo *pRes = &pStmt->sql.queryRes; + int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD); + + pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols; + pStmt->exec.pRequest->body.resInfo.precision = pRes->precision; + + if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { + pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size); + if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { + STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size); + } + + if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { + pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size); + if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { + STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size); + } return TSDB_CODE_SUCCESS; } @@ -235,6 +262,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) { } int32_t stmtCleanSQLInfo(STscStmt* pStmt) { + taosMemoryFree(pStmt->sql.queryRes.fields); + taosMemoryFree(pStmt->sql.queryRes.userFields); taosMemoryFree(pStmt->sql.sqlStr); qDestroyQuery(pStmt->sql.pQuery); qDestroyQueryPlan(pStmt->sql.pQueryPlan); @@ -497,6 +526,8 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) { pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag; pStmt->exec.pRequest->body.pDag = NULL; STMT_ERR_RET(stmtBackupQueryFields(pStmt)); + } else { + STMT_ERR_RET(stmtRestoreQueryFields(pStmt)); } STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId)); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5083104039..b96444bebc 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -47,7 +47,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); -typedef struct SQWorkerMgmt SQHandle; +typedef struct SQWorker SQHandle; typedef struct { const char *name; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 307bf3efb8..c18a43c4fb 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,7 +29,7 @@ extern "C" { #endif -typedef struct SQWorkerMgmt SQHandle; +typedef struct SQWorker SQHandle; typedef struct SQnode { int32_t qndId; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b5066799ca..b11f9def7c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -52,7 +52,7 @@ typedef struct STsdb STsdb; typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; -typedef struct SQWorkerMgmt SQHandle; +typedef struct SQWorker SQHandle; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 042b914927..52d1980166 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -206,6 +206,8 @@ static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) { if (NULL == pLogicCond) { return TSDB_CODE_OUT_OF_MEMORY; } + pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL; + pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pLogicCond->condType = LOGIC_COND_TYPE_AND; int32_t code = nodesListMakeAppend(&pLogicCond->pParameterList, *pSrc); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index de73f1db0b..4f2f3febaf 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -23,6 +23,7 @@ extern "C" { #include "qworker.h" #include "tlockfree.h" #include "ttimer.h" +#include "tref.h" #define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000 @@ -85,6 +86,11 @@ typedef struct SQWMsg { SQWConnInfo connInfo; } SQWMsg; +typedef struct SQWHbParam { + int32_t qwrId; + int64_t refId; +} SQWHbParam; + typedef struct SQWHbInfo { SSchedulerHbRsp rsp; SQWConnInfo connInfo; @@ -137,7 +143,8 @@ typedef struct SQWSchStatus { } SQWSchStatus; // Qnode/Vnode level task management -typedef struct SQWorkerMgmt { +typedef struct SQWorker { + int64_t refId; SQWorkerCfg cfg; int8_t nodeType; int32_t nodeId; @@ -148,9 +155,15 @@ typedef struct SQWorkerMgmt { SHashObj *schHash; // key: schedulerId, value: SQWSchStatus SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx SMsgCb msgCb; +} SQWorker; + +typedef struct SQWorkerMgmt { + SRWLatch lock; + int32_t qwRef; + int32_t qwNum; } SQWorkerMgmt; -#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId +#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId #define QW_IDS() sId, qId, tId, rId #define QW_FPARAMS() mgmt, QW_IDS() @@ -209,13 +222,13 @@ typedef struct SQWorkerMgmt { } \ } while (0) -#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) -#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) +#define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__) +#define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__) -#define QW_DUMP(param, ...) \ +#define QW_DUMP(_param, ...) \ do { \ if (gQWDebug.dumpEnable) { \ - qDebug("QW:%p " param, mgmt, __VA_ARGS__); \ + qDebug("QW:%p " _param, mgmt, __VA_ARGS__); \ } \ } while (0) @@ -282,6 +295,14 @@ typedef struct SQWorkerMgmt { } \ } while (0) + +extern SQWorkerMgmt gQwMgmt; + +FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); } + +FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); } + + #ifdef __cplusplus } #endif diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index fb1950d16b..fbd9ce91bc 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -28,7 +28,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); -int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); +int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); @@ -41,10 +41,10 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); +int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn); +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b51bc5083e..c94dd29ea1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -10,6 +10,11 @@ #include "tname.h" SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; +SQWorkerMgmt gQwMgmt = { + .lock = 0, + .qwRef = -1, + .qwNum = 0, +}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -98,7 +103,7 @@ _return: void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {} -void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) { +void qwDbgDumpMgmtInfo(SQWorker *mgmt) { if (!gQWDebug.dumpEnable) { return; } @@ -186,7 +191,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { return TSDB_CODE_SUCCESS; } -int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) { +int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -213,7 +218,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) { return TSDB_CODE_SUCCESS; } -int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { +int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { while (true) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); @@ -240,15 +245,15 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, return TSDB_CODE_SUCCESS; } -int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD); } -int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR); } -void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } +void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { char id[sizeof(qId) + sizeof(tId)] = {0}; @@ -384,7 +389,7 @@ int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), fal int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); } -void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } +void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { // Note: free/kill may in RC @@ -606,7 +611,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_RET(code); } -int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { +int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; hbInfo->connInfo = sch->hbConnInfo; @@ -1262,7 +1267,7 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } -int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { +int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; SQWSchStatus * sch = NULL; @@ -1288,7 +1293,7 @@ int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq QW_RET(TSDB_CODE_SUCCESS); } -int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { +int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; SQWSchStatus * sch = NULL; @@ -1333,7 +1338,20 @@ _return: } void qwProcessHbTimerEvent(void *param, void *tmrId) { - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param; + SQWHbParam* hbParam = (SQWHbParam*)param; + if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { + taosMemoryFree(param); + return; + } + + int64_t refId = hbParam->refId; + SQWorker *mgmt = qwAcquire(refId); + if (NULL == mgmt) { + QW_DLOG("qwAcquire %" PRIx64 "failed", refId); + taosMemoryFree(param); + return; + } + SQWSchStatus *sch = NULL; int32_t taskNum = 0; SQWHbInfo * rspList = NULL; @@ -1347,6 +1365,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (schNum <= 0) { QW_UNLOCK(QW_READ, &mgmt->schLock); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + qwRelease(refId); return; } @@ -1355,6 +1374,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { QW_UNLOCK(QW_READ, &mgmt->schLock); QW_ELOG("calloc %d SQWHbInfo failed", schNum); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + qwRelease(refId); return; } @@ -1396,18 +1416,72 @@ _return: taosMemoryFreeClear(rspList); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + qwRelease(refId); } +void qwCloseRef(void) { + taosWLockLatch(&gQwMgmt.lock); + if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { + taosCloseRef(gQwMgmt.qwRef); + gQwMgmt.qwRef= -1; + } + taosWUnLockLatch(&gQwMgmt.lock); +} + +void qwDestroyImpl(void *pMgmt) { + SQWorker *mgmt = (SQWorker *)pMgmt; + + taosTmrStopA(&mgmt->hbTimer); + taosTmrCleanUp(mgmt->timer); + + // TODO STOP ALL QUERY + + // TODO FREE ALL + + taosHashCleanup(mgmt->ctxHash); + taosHashCleanup(mgmt->schHash); + + taosMemoryFree(mgmt); + + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + + qwCloseRef(); +} + +int32_t qwOpenRef(void) { + taosWLockLatch(&gQwMgmt.lock); + if (gQwMgmt.qwRef < 0) { + gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl); + if (gQwMgmt.qwRef < 0) { + taosWUnLockLatch(&gQwMgmt.lock); + qError("init qworker ref failed"); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + taosWUnLockLatch(&gQwMgmt.lock); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; - SQWorkerMgmt *mgmt = taosMemoryCalloc(1, sizeof(SQWorkerMgmt)); + atomic_add_fetch_32(&gQwMgmt.qwNum, 1); + + int32_t code = qwOpenRef(); + if (code) { + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + QW_RET(code); + } + + SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker)); if (NULL == mgmt) { - qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); + qError("calloc %d failed", (int32_t)sizeof(SQWorker)); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1449,16 +1523,30 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer); - if (NULL == mgmt->hbTimer) { - qError("start hb timer failed"); - QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; mgmt->msgCb = *pMsgCb; + mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); + if (mgmt->refId < 0) { + qError("taosAddRef qw failed, error:%s", tstrerror(terrno)); + QW_ERR_JRET(terrno); + } + + SQWHbParam *param = taosMemoryMalloc(sizeof(SQWHbParam)); + if (NULL == param) { + qError("malloc hb param failed, error:%s", tstrerror(terrno)); + QW_ERR_JRET(terrno); + } + param->qwrId = gQwMgmt.qwRef; + param->refId = mgmt->refId; + + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); + if (NULL == mgmt->hbTimer) { + qError("start hb timer failed"); + QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + *qWorkerMgmt = mgmt; qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); @@ -1467,13 +1555,17 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW _return: - taosHashCleanup(mgmt->schHash); - taosHashCleanup(mgmt->ctxHash); - - taosTmrCleanUp(mgmt->timer); - - taosMemoryFreeClear(mgmt); + if (mgmt->refId >= 0) { + qwRelease(mgmt->refId); + } else { + taosHashCleanup(mgmt->schHash); + taosHashCleanup(mgmt->ctxHash); + taosTmrCleanUp(mgmt->timer); + taosMemoryFreeClear(mgmt); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + } + QW_RET(code); } @@ -1482,22 +1574,14 @@ void qWorkerDestroy(void **qWorkerMgmt) { return; } - SQWorkerMgmt *mgmt = *qWorkerMgmt; + SQWorker *mgmt = *qWorkerMgmt; - taosTmrStopA(&mgmt->hbTimer); - taosTmrCleanUp(mgmt->timer); - - // TODO STOP ALL QUERY - - // TODO FREE ALL - - taosHashCleanup(mgmt->ctxHash); - taosHashCleanup(mgmt->schHash); - - taosMemoryFreeClear(*qWorkerMgmt); + if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) { + qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId); + } } -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { +int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { /* SQWSchStatus *sch = NULL; int32_t taskNum = 0; @@ -1544,7 +1628,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs return TSDB_CODE_SUCCESS; } -int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; /* @@ -1557,7 +1641,7 @@ int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui return TSDB_CODE_SUCCESS; } -int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { +int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { SQWSchStatus * sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -1584,7 +1668,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_RET(code); } -int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus * sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 64df4f02bf..454db7fb95 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -319,7 +319,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } -int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) { +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn) { SSchedulerHbReq req = {0}; req.header.vgId = mgmt->nodeId; req.sId = sId; @@ -362,7 +362,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -404,7 +404,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; SQWTaskCtx *handles = NULL; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -435,7 +435,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SResReadyReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -477,7 +477,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; msg->sId = htobe64(msg->sId); uint64_t sId = msg->sId; @@ -498,7 +498,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } SResFetchReq *msg = pMsg->pCont; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -578,7 +578,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskDropReq *msg = pMsg->pCont; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSchedulerHbReq req = {0}; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index b1ab5253ad..28d27050f0 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -188,8 +188,6 @@ CaseCtrl gCaseCtrl = { .caseRunIdx = -1, // .caseRunNum = -1, - .bindColTypeNum = tListLen(bindColTypeList), - .bindColTypeList = bindColTypeList, .caseIdx = 22, .caseNum = 1, .caseRunNum = 1, @@ -318,7 +316,7 @@ void generateInsertSQL(BindData *data) { len += sprintf(data->sql + len, "ubigdata"); break; default: - printf("invalid col type:%d", data->pBind[c].buffer_type); + printf("!!!invalid col type:%d", data->pBind[c].buffer_type); exit(1); } } @@ -336,7 +334,7 @@ void generateInsertSQL(BindData *data) { len += sprintf(data->sql + len, ")"); if (gCaseCtrl.printStmtSql) { - printf("SQL: %s\n", data->sql); + printf("\tSQL: %s\n", data->sql); } } @@ -358,7 +356,7 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType) { } break; default: - printf("invalid paramNum:%d\n", pInfo->paramNum); + printf("!!!invalid paramNum:%d\n", pInfo->paramNum); exit(1); } } @@ -414,7 +412,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) { len += sprintf(data->sql + len, "ubigdata"); break; default: - printf("invalid col type:%d", data->pBind[c].buffer_type); + printf("!!!invalid col type:%d", data->pBind[c].buffer_type); exit(1); } @@ -423,7 +421,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) { } if (gCaseCtrl.printStmtSql) { - printf("SQL: %s\n", data->sql); + printf("\tSTMT SQL: %s\n", data->sql); } } @@ -551,7 +549,7 @@ int32_t prepareColData(BindData *data, int32_t bindIdx, int32_t rowIdx, int32_t data->pBind[bindIdx].is_null = data->isNull ? (data->isNull + rowIdx) : NULL; break; default: - printf("invalid col type:%d", dataType); + printf("!!!invalid col type:%d", dataType); exit(1); } @@ -709,7 +707,7 @@ void bpFetchRows(TAOS_RES *result, bool printr, int32_t *rows) { if (printr) { memset(temp, 0, sizeof(temp)); taos_print_row(temp, row, fields, num_fields); - printf("[%s]\n", temp); + printf("\t[%s]\n", temp); } } } @@ -718,7 +716,7 @@ void bpExecQuery(TAOS * taos, char* sql, bool printr, int32_t *rows) { TAOS_RES *result = taos_query(taos, sql); int code = taos_errno(result); if (code != 0) { - printf("failed to query table, reason:%s\n", taos_errstr(result)); + printf("!!!failed to query table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -791,7 +789,7 @@ int32_t bpAppendValueString(char *buf, int type, void *value, int32_t valueLen, break; default: - printf("invalid data type:%d\n", type); + printf("!!!invalid data type:%d\n", type); exit(1); } } @@ -803,13 +801,13 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { if (gCurCase->bindRowNum > 1) { if (0 == (n++%2)) { if (taos_stmt_bind_param_batch(stmt, bind)) { - printf("taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } else { for (int32_t i = 0; i < gCurCase->bindColNum; ++i) { if (taos_stmt_bind_single_param_batch(stmt, bind++, i)) { - printf("taos_stmt_bind_single_param_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_single_param_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -817,12 +815,12 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { } else { if (0 == (n++%2)) { if (taos_stmt_bind_param_batch(stmt, bind)) { - printf("taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } else { if (taos_stmt_bind_param(stmt, bind)) { - printf("taos_stmt_bind_param error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_param error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -834,12 +832,12 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { void bpCheckIsInsert(TAOS_STMT *stmt, int32_t insert) { int32_t isInsert = 0; if (taos_stmt_is_insert(stmt, &isInsert)) { - printf("taos_stmt_is_insert error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_is_insert error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (insert != isInsert) { - printf("is insert failed\n"); + printf("!!!is insert failed\n"); exit(1); } } @@ -847,12 +845,12 @@ void bpCheckIsInsert(TAOS_STMT *stmt, int32_t insert) { void bpCheckParamNum(TAOS_STMT *stmt) { int32_t num = 0; if (taos_stmt_num_params(stmt, &num)) { - printf("taos_stmt_num_params error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_num_params error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (gCurCase->bindColNum != num) { - printf("is insert failed\n"); + printf("!!!is insert failed\n"); exit(1); } } @@ -861,7 +859,7 @@ void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) { int32_t rows = taos_stmt_affected_rows(stmt); int32_t insertNum = gCurCase->rowNum * gCurCase->tblNum * times; if (insertNum != rows) { - printf("affected rows %d mis-match with insert num %d\n", rows, insertNum); + printf("!!!affected rows %d mis-match with insert num %d\n", rows, insertNum); exit(1); } } @@ -869,7 +867,7 @@ void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) { void bpCheckAffectedRowsOnce(TAOS_STMT *stmt, int32_t expectedNum) { int32_t rows = taos_stmt_affected_rows_once(stmt); if (expectedNum != rows) { - printf("affected rows %d mis-match with expected num %d\n", rows, expectedNum); + printf("!!!affected rows %d mis-match with expected num %d\n", rows, expectedNum); exit(1); } } @@ -904,16 +902,16 @@ void bpCheckQueryResult(TAOS_STMT *stmt, TAOS *taos, char *stmtSql, TAOS_MULTI_B } if (gCaseCtrl.printQuerySql) { - printf("Query SQL: %s\n", sql); + printf("\tQuery SQL: %s\n", sql); } bpExecQuery(taos, sql, gCaseCtrl.printRes, &sqlResNum); if (sqlResNum != stmtResNum) { - printf("sql res num %d mis-match stmt res num %d\n", sqlResNum, stmtResNum); + printf("!!!sql res num %d mis-match stmt res num %d\n", sqlResNum, stmtResNum); exit(1); } - printf("sql res num match stmt res num %d\n", stmtResNum); + printf("***sql res num match stmt res num %d\n", stmtResNum); } /* prepare [settbname [bind add]] exec */ @@ -923,7 +921,7 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -936,7 +934,7 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -951,14 +949,14 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -978,7 +976,7 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -993,7 +991,7 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1007,14 +1005,14 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1033,7 +1031,7 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1046,7 +1044,7 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1061,13 +1059,13 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1087,7 +1085,7 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1100,7 +1098,7 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1115,12 +1113,12 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1141,7 +1139,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1154,7 +1152,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1169,7 +1167,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1179,12 +1177,12 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1206,7 +1204,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1221,7 +1219,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1235,12 +1233,12 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1264,7 +1262,7 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1277,7 +1275,7 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1292,13 +1290,13 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1328,7 +1326,7 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1344,12 +1342,12 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -4249,10 +4247,10 @@ void prepareCheckResultImpl(TAOS * taos, char *tname, bool printr, int expec if (rows == expected) { if (!silent) { - printf("%d rows are fetched as expected from %s\n", rows, tname); + printf("***%d rows are fetched as expected from %s\n", rows, tname); } } else { - printf("!!!expect %d rows, but %d rows are fetched from %s\n", expected, rows, tname); + printf("!!!expect rows %d mis-match rows %d fetched from %s\n", expected, rows, tname); exit(1); } } @@ -4302,7 +4300,7 @@ int sql_perf1(TAOS *taos) { result = taos_query(taos, sql[i]); int code = taos_errno(result); if (code != 0) { - printf("failed to query table, reason:%s\n", taos_errstr(result)); + printf("%d failed to query table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4539,7 +4537,7 @@ void generateCreateTableSQL(char *buf, int32_t tblIdx, int32_t colNum, int32_t * blen += sprintf(buf + blen, ")"); if (gCaseCtrl.printCreateTblSql) { - printf("Create Table SQL:%s\n", buf); + printf("\tCreate Table SQL:%s\n", buf); } } @@ -4553,7 +4551,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) { result = taos_query(taos, "create database demo keep 36500"); code = taos_errno(result); if (code != 0) { - printf("failed to create database, reason:%s\n", taos_errstr(result)); + printf("!!!failed to create database, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4570,7 +4568,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) { result = taos_query(taos, buf); code = taos_errno(result); if (code != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(result)); + printf("!!!failed to create table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4583,7 +4581,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) { result = taos_query(taos, buf); code = taos_errno(result); if (code != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(result)); + printf("!!!failed to create table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4654,7 +4652,7 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) { stmt = taos_stmt_init(taos); if (NULL == stmt) { - printf("taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt)); exit(1); } From c4cfcef6e941ec4328bfa47d7ad60887e96776ba Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 28 Apr 2022 20:24:21 +0800 Subject: [PATCH 2/7] stmt query --- include/libs/nodes/querynodes.h | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 28 +++++++++++ source/libs/nodes/src/nodesUtilFuncs.c | 67 ++++++++++++++++++++++++-- source/libs/parser/src/parTranslater.c | 54 ++++++++++++++++++--- tests/script/api/batchprepare.c | 32 ++++++++---- 5 files changed, 162 insertions(+), 20 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 238c4c8538..0dfab984c7 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -88,6 +88,7 @@ typedef struct SValueNode { double d; char* p; } datum; + int64_t typeData; char unit; } SValueNode; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0e6ec4f945..723b256cfd 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1747,23 +1747,51 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { break; case TSDB_DATA_TYPE_BOOL: code = tjsonGetBoolValue(pJson, jkValueDatum, &pNode->datum.b); + *(bool*)&pNode->typeData = pNode->datum.b; break; case TSDB_DATA_TYPE_TINYINT: + code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i); + *(int8_t*)&pNode->typeData = pNode->datum.i; + break; case TSDB_DATA_TYPE_SMALLINT: + code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i); + *(int16_t*)&pNode->typeData = pNode->datum.i; + break; case TSDB_DATA_TYPE_INT: + code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i); + *(int32_t*)&pNode->typeData = pNode->datum.i; + break; case TSDB_DATA_TYPE_BIGINT: + code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i); + *(int64_t*)&pNode->typeData = pNode->datum.i; + break; case TSDB_DATA_TYPE_TIMESTAMP: code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i); + *(int64_t*)&pNode->typeData = pNode->datum.i; break; case TSDB_DATA_TYPE_UTINYINT: + code = tjsonGetUBigIntValue(pJson, jkValueDatum, &pNode->datum.u); + *(uint8_t*)&pNode->typeData = pNode->datum.u; + break; case TSDB_DATA_TYPE_USMALLINT: + code = tjsonGetUBigIntValue(pJson, jkValueDatum, &pNode->datum.u); + *(uint16_t*)&pNode->typeData = pNode->datum.u; + break; case TSDB_DATA_TYPE_UINT: + code = tjsonGetUBigIntValue(pJson, jkValueDatum, &pNode->datum.u); + *(uint32_t*)&pNode->typeData = pNode->datum.u; + break; case TSDB_DATA_TYPE_UBIGINT: code = tjsonGetUBigIntValue(pJson, jkValueDatum, &pNode->datum.u); + *(uint64_t*)&pNode->typeData = pNode->datum.u; break; case TSDB_DATA_TYPE_FLOAT: + code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d); + *(float*)&pNode->typeData = pNode->datum.d; + break; case TSDB_DATA_TYPE_DOUBLE: code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d); + *(double*)&pNode->typeData = pNode->datum.d; break; case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 351b50133c..9280cd7067 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -867,21 +867,18 @@ void nodesClearList(SNodeList* pList) { void* nodesGetValueFromNode(SValueNode* pNode) { switch (pNode->node.resType.type) { case TSDB_DATA_TYPE_BOOL: - return (void*)&pNode->datum.b; case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: - return (void*)&pNode->datum.i; case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UBIGINT: - return (void*)&pNode->datum.u; case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: - return (void*)&pNode->datum.d; + return (void*)&pNode->typeData; case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: @@ -893,6 +890,68 @@ void* nodesGetValueFromNode(SValueNode* pNode) { return NULL; } +int32_t nodesSetValueNodeValue(SValueNode* pNode, void *value) { + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_BOOL: + pNode->datum.b = *(bool*)value; + *(bool*)pNode->typeData = pNode->datum.b; + break; + case TSDB_DATA_TYPE_TINYINT: + pNode->datum.i = *(int8_t*)value; + *(int8_t*)pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_SMALLINT: + pNode->datum.i = *(int16_t*)value; + *(int16_t*)pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_INT: + pNode->datum.i = *(int32_t*)value; + *(int32_t*)pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_BIGINT: + pNode->datum.i = *(int64_t*)value; + *(int64_t*)pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_TIMESTAMP: + pNode->datum.i = *(int64_t*)value; + *(int64_t*)pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_UTINYINT: + pNode->datum.u = *(int8_t*)value; + *(int8_t*)pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_USMALLINT: + pNode->datum.u = *(int16_t*)value; + *(int16_t*)pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_UINT: + pNode->datum.u = *(int32_t*)value; + *(int32_t*)pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_UBIGINT: + pNode->datum.u = *(uint64_t*)value; + *(uint64_t*)pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_FLOAT: + pNode->datum.d = *(float*)value; + *(float*)pNode->typeData = pNode->datum.d; + break; + case TSDB_DATA_TYPE_DOUBLE: + pNode->datum.d = *(double*)value; + *(double*)pNode->typeData = pNode->datum.d; + break; + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + pNode->datum.p = (char*)value; + break; + default: + return TSDB_CODE_QRY_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + char* nodesGetStrValueFromNode(SValueNode* pNode) { switch (pNode->node.resType.type) { case TSDB_DATA_TYPE_BOOL: { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 463764e713..18612a2209 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -473,27 +473,66 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { break; case TSDB_DATA_TYPE_BOOL: pVal->datum.b = (0 == strcasecmp(pVal->literal, "true")); + *(bool*)&pVal->typeData = pVal->datum.b; break; - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_TINYINT:{ + char* endPtr = NULL; + pVal->datum.i = strtoll(pVal->literal, &endPtr, 10); + *(int8_t*)&pVal->typeData = pVal->datum.i; + break; + } + case TSDB_DATA_TYPE_SMALLINT:{ + char* endPtr = NULL; + pVal->datum.i = strtoll(pVal->literal, &endPtr, 10); + *(int16_t*)&pVal->typeData = pVal->datum.i; + break; + } + case TSDB_DATA_TYPE_INT:{ + char* endPtr = NULL; + pVal->datum.i = strtoll(pVal->literal, &endPtr, 10); + *(int32_t*)&pVal->typeData = pVal->datum.i; + break; + } case TSDB_DATA_TYPE_BIGINT: { char* endPtr = NULL; pVal->datum.i = strtoll(pVal->literal, &endPtr, 10); + *(int64_t*)&pVal->typeData = pVal->datum.i; + break; + } + case TSDB_DATA_TYPE_UTINYINT:{ + char* endPtr = NULL; + pVal->datum.u = strtoull(pVal->literal, &endPtr, 10); + *(uint8_t*)&pVal->typeData = pVal->datum.u; + break; + } + case TSDB_DATA_TYPE_USMALLINT:{ + char* endPtr = NULL; + pVal->datum.u = strtoull(pVal->literal, &endPtr, 10); + *(uint16_t*)&pVal->typeData = pVal->datum.u; + break; + } + case TSDB_DATA_TYPE_UINT:{ + char* endPtr = NULL; + pVal->datum.u = strtoull(pVal->literal, &endPtr, 10); + *(uint32_t*)&pVal->typeData = pVal->datum.u; break; } - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UBIGINT: { char* endPtr = NULL; pVal->datum.u = strtoull(pVal->literal, &endPtr, 10); + *(uint64_t*)&pVal->typeData = pVal->datum.u; + break; + } + case TSDB_DATA_TYPE_FLOAT:{ + char* endPtr = NULL; + pVal->datum.d = strtold(pVal->literal, &endPtr); + *(float*)&pVal->typeData = pVal->datum.d; break; } - case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: { char* endPtr = NULL; pVal->datum.d = strtold(pVal->literal, &endPtr); + *(double*)&pVal->typeData = pVal->datum.d; break; } case TSDB_DATA_TYPE_VARCHAR: @@ -511,6 +550,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { TSDB_CODE_SUCCESS) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); } + *(int64_t*)&pVal->typeData = pVal->datum.i; break; } case TSDB_DATA_TYPE_NCHAR: diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 28d27050f0..1e5ab62348 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -11,7 +11,8 @@ int32_t shortColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT}; int32_t fullColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL, TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_UINT, TSDB_DATA_TYPE_BIGINT, TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_BINARY, TSDB_DATA_TYPE_NCHAR}; -int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP}; +int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_FLOAT}; +int32_t optrIdxList[] = {3, 5, 2}; typedef struct { char* oper; @@ -161,6 +162,8 @@ typedef struct { int32_t bindRowNum; //row num for once bind int32_t bindColTypeNum; int32_t* bindColTypeList; + int32_t optrIdxListNum; + int32_t* optrIdxList; int32_t runTimes; int32_t caseIdx; // static case idx int32_t caseNum; // num in static case list @@ -180,6 +183,8 @@ CaseCtrl gCaseCtrl = { .bindRowNum = 0, // .bindColTypeNum = 0, // .bindColTypeList = NULL, +// .optrIdxListNum = 0, +// .optrIdxList = NULL, .checkParamNum = false, .printRes = true, .runTimes = 0, @@ -188,6 +193,11 @@ CaseCtrl gCaseCtrl = { .caseRunIdx = -1, // .caseRunNum = -1, + + .optrIdxListNum = tListLen(optrIdxList), + .optrIdxList = optrIdxList, + .bindColTypeNum = tListLen(bindColTypeList), + .bindColTypeList = bindColTypeList, .caseIdx = 22, .caseNum = 1, .caseRunNum = 1, @@ -338,15 +348,19 @@ void generateInsertSQL(BindData *data) { } } -void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType) { +void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType, int32_t idx) { OperInfo *pInfo = NULL; - - if (TSDB_DATA_TYPE_VARCHAR == dataType || TSDB_DATA_TYPE_NCHAR == dataType) { - pInfo = &operInfo[varoperatorList[rand() % tListLen(varoperatorList)]]; - } else { - pInfo = &operInfo[operatorList[rand() % tListLen(operatorList)]]; - } + if (gCaseCtrl.optrIdxListNum > 0) { + pInfo = &operInfo[gCaseCtrl.optrIdxList[idx]]; + } else { + if (TSDB_DATA_TYPE_VARCHAR == dataType || TSDB_DATA_TYPE_NCHAR == dataType) { + pInfo = &operInfo[varoperatorList[rand() % tListLen(varoperatorList)]]; + } else { + pInfo = &operInfo[operatorList[rand() % tListLen(operatorList)]]; + } + } + switch (pInfo->paramNum) { case 2: if (pInfo->enclose) { @@ -416,7 +430,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) { exit(1); } - bpAppendOperatorParam(data, &len, data->pBind[c].buffer_type); + bpAppendOperatorParam(data, &len, data->pBind[c].buffer_type, c); } } From 92c555f2571d39c83e23f5095b19c22274336c24 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 09:20:37 +0800 Subject: [PATCH 3/7] stmt query --- include/libs/nodes/querynodes.h | 1 + source/libs/nodes/src/nodesUtilFuncs.c | 24 ++++++++++++------------ source/libs/scalar/src/scalar.c | 6 +++--- tests/script/api/batchprepare.c | 26 ++++++++++++-------------- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 0dfab984c7..83b9a94803 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -313,6 +313,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery); bool nodesIsTimelineQuery(const SNode* pQuery); void* nodesGetValueFromNode(SValueNode* pNode); +int32_t nodesSetValueNodeValue(SValueNode* pNode, void *value); char* nodesGetStrValueFromNode(SValueNode* pNode); char* getFillModeString(EFillMode mode); void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 9280cd7067..ea2eace24b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -894,51 +894,51 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void *value) { switch (pNode->node.resType.type) { case TSDB_DATA_TYPE_BOOL: pNode->datum.b = *(bool*)value; - *(bool*)pNode->typeData = pNode->datum.b; + *(bool*)&pNode->typeData = pNode->datum.b; break; case TSDB_DATA_TYPE_TINYINT: pNode->datum.i = *(int8_t*)value; - *(int8_t*)pNode->typeData = pNode->datum.i; + *(int8_t*)&pNode->typeData = pNode->datum.i; break; case TSDB_DATA_TYPE_SMALLINT: pNode->datum.i = *(int16_t*)value; - *(int16_t*)pNode->typeData = pNode->datum.i; + *(int16_t*)&pNode->typeData = pNode->datum.i; break; case TSDB_DATA_TYPE_INT: pNode->datum.i = *(int32_t*)value; - *(int32_t*)pNode->typeData = pNode->datum.i; + *(int32_t*)&pNode->typeData = pNode->datum.i; break; case TSDB_DATA_TYPE_BIGINT: pNode->datum.i = *(int64_t*)value; - *(int64_t*)pNode->typeData = pNode->datum.i; + *(int64_t*)&pNode->typeData = pNode->datum.i; break; case TSDB_DATA_TYPE_TIMESTAMP: pNode->datum.i = *(int64_t*)value; - *(int64_t*)pNode->typeData = pNode->datum.i; + *(int64_t*)&pNode->typeData = pNode->datum.i; break; case TSDB_DATA_TYPE_UTINYINT: pNode->datum.u = *(int8_t*)value; - *(int8_t*)pNode->typeData = pNode->datum.u; + *(int8_t*)&pNode->typeData = pNode->datum.u; break; case TSDB_DATA_TYPE_USMALLINT: pNode->datum.u = *(int16_t*)value; - *(int16_t*)pNode->typeData = pNode->datum.u; + *(int16_t*)&pNode->typeData = pNode->datum.u; break; case TSDB_DATA_TYPE_UINT: pNode->datum.u = *(int32_t*)value; - *(int32_t*)pNode->typeData = pNode->datum.u; + *(int32_t*)&pNode->typeData = pNode->datum.u; break; case TSDB_DATA_TYPE_UBIGINT: pNode->datum.u = *(uint64_t*)value; - *(uint64_t*)pNode->typeData = pNode->datum.u; + *(uint64_t*)&pNode->typeData = pNode->datum.u; break; case TSDB_DATA_TYPE_FLOAT: pNode->datum.d = *(float*)value; - *(float*)pNode->typeData = pNode->datum.d; + *(float*)&pNode->typeData = pNode->datum.d; break; case TSDB_DATA_TYPE_DOUBLE: pNode->datum.d = *(double*)value; - *(double*)pNode->typeData = pNode->datum.d; + *(double*)&pNode->typeData = pNode->datum.d; break; case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index a6656dc87d..e2059c7d06 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -599,7 +599,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { - memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes); + nodesSetValueNodeValue(res, output.columnData->pData); } } @@ -639,7 +639,7 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) { res->datum.p = output.columnData->pData; output.columnData->pData = NULL; } else { - memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes); + nodesSetValueNodeValue(res, output.columnData->pData); } nodesDestroyNode(*pNode); @@ -681,7 +681,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { res->datum.p = output.columnData->pData; output.columnData->pData = NULL; } else { - memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes); + nodesSetValueNodeValue(res, output.columnData->pData); } } diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 1e5ab62348..d7f928ccd9 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -11,8 +11,8 @@ int32_t shortColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT}; int32_t fullColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL, TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_UINT, TSDB_DATA_TYPE_BIGINT, TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_BINARY, TSDB_DATA_TYPE_NCHAR}; -int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_FLOAT}; -int32_t optrIdxList[] = {3, 5, 2}; +int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_NCHAR, TSDB_DATA_TYPE_SMALLINT}; +int32_t optrIdxList[] = {4, 11, 1}; typedef struct { char* oper; @@ -33,7 +33,7 @@ OperInfo operInfo[] = { {"like", 2, false}, {"not like", 2, false}, {"match", 2, false}, - {"nmake", 2, false}, + {"nmatch", 2, false}, }; int32_t operatorList[] = {0, 1, 2, 3, 4, 5, 6, 7}; @@ -140,9 +140,7 @@ CaseCfg gCase[] = { {"insert:MPME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, false, false, insertMPMETest1, 10, 10, 2, 12, 0, 1, -1}, // 22 - //{"query:SUBT-FULL", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, querySUBTTest1, 10, 10, 1, 3, 0, 1, 2}, - - {"query:SUBT-FULL", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, querySUBTTest1, 1, 10, 1, 3, 0, 1, 2}, + {"query:SUBT-FULL", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, querySUBTTest1, 10, 10, 1, 3, 0, 1, 2}, }; @@ -181,10 +179,10 @@ CaseCtrl gCaseCtrl = { .rowNum = 0, .bindColNum = 0, .bindRowNum = 0, -// .bindColTypeNum = 0, -// .bindColTypeList = NULL, -// .optrIdxListNum = 0, -// .optrIdxList = NULL, + .bindColTypeNum = 0, + .bindColTypeList = NULL, + .optrIdxListNum = 0, + .optrIdxList = NULL, .checkParamNum = false, .printRes = true, .runTimes = 0, @@ -194,10 +192,10 @@ CaseCtrl gCaseCtrl = { // .caseRunNum = -1, - .optrIdxListNum = tListLen(optrIdxList), - .optrIdxList = optrIdxList, - .bindColTypeNum = tListLen(bindColTypeList), - .bindColTypeList = bindColTypeList, +// .optrIdxListNum = tListLen(optrIdxList), +// .optrIdxList = optrIdxList, +// .bindColTypeNum = tListLen(bindColTypeList), +// .bindColTypeList = bindColTypeList, .caseIdx = 22, .caseNum = 1, .caseRunNum = 1, From a4d4cd2a84c89cc7a5a8c1217b23ca122835a6e1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 11:31:39 +0800 Subject: [PATCH 4/7] stmt query --- source/libs/scalar/src/scalar.c | 10 ++++++- source/libs/scheduler/inc/schedulerInt.h | 2 ++ source/libs/scheduler/src/scheduler.c | 35 ++++++++++++++++++------ 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index ba71c4ae3c..f5fab814ff 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -75,7 +75,15 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { if (valueNode->node.resType.type != type) { out.columnData->info.type = type; - out.columnData->info.bytes = tDataTypes[type].bytes; + if (IS_VAR_DATA_TYPE(type)) { + if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { + out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE; + } else { + out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE; + } + } else { + out.columnData->info.bytes = tDataTypes[type].bytes; + } code = doConvertDataType(valueNode, &out); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 0f6961018c..f4d6872969 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -85,7 +85,9 @@ typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId uint64_t sId; // schedulerId SSchedulerCfg cfg; + SRWLatch lock; int32_t jobRef; + int32_t jobNum; SSchedulerStat stat; SHashObj *hbConnections; } SSchedulerMgmt; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 31d2da9380..aa4e598d7d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -21,7 +21,9 @@ #include "tref.h" #include "trpc.h" -SSchedulerMgmt schMgmt = {0}; +SSchedulerMgmt schMgmt = { + .jobRef = -1, +}; FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } @@ -70,6 +72,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, int64_t startTs, bool syncSchedule) { int32_t code = 0; + int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); if (NULL == pJob) { qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); @@ -114,15 +117,17 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray tsem_init(&pJob->rspSem, 0, 0); - int64_t refId = taosAddRef(schMgmt.jobRef, pJob); + refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_ERR_JRET(terrno); } + atomic_add_fetch_32(&schMgmt.jobNum, 1); + if (NULL == schAcquireJob(refId)) { SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } pJob->refId = refId; @@ -137,7 +142,11 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray _return: - schFreeJobImpl(pJob); + if (refId < 0) { + schFreeJobImpl(pJob); + } else { + taosRemoveRef(schMgmt.jobRef, refId); + } SCH_RET(code); } @@ -2239,6 +2248,15 @@ int32_t schCancelJob(SSchJob *pJob) { // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST } +void schCloseJobRef(void) { + SCH_LOCK(SCH_WRITE, &schMgmt.lock); + if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) { + taosCloseRef(schMgmt.jobRef); + schMgmt.jobRef = -1; + } + SCH_UNLOCK(SCH_WRITE, &schMgmt.lock); +} + void schFreeJobImpl(void *job) { if (NULL == job) { return; @@ -2284,6 +2302,10 @@ void schFreeJobImpl(void *job) { taosMemoryFreeClear(pJob); qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); + + atomic_sub_fetch_32(&schMgmt.jobNum, 1); + + schCloseJobRef(); } static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, @@ -2732,7 +2754,7 @@ void schedulerFreeTaskList(SArray *taskList) { } void schedulerDestroy(void) { - if (schMgmt.jobRef) { + if (schMgmt.jobRef >= 0) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); int64_t refId = 0; @@ -2745,9 +2767,6 @@ void schedulerDestroy(void) { pJob = taosIterateRef(schMgmt.jobRef, refId); } - - taosCloseRef(schMgmt.jobRef); - schMgmt.jobRef = 0; } if (schMgmt.hbConnections) { From 529dc19a559bec9668f6916d59e030a793d47360 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 11:43:14 +0800 Subject: [PATCH 5/7] fix ref issue --- source/libs/scheduler/inc/schedulerInt.h | 1 + source/libs/scheduler/src/scheduler.c | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f4d6872969..5906ee8970 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -86,6 +86,7 @@ typedef struct SSchedulerMgmt { uint64_t sId; // schedulerId SSchedulerCfg cfg; SRWLatch lock; + bool exit; int32_t jobRef; int32_t jobNum; SSchedulerStat stat; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index aa4e598d7d..af276ba9cd 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2249,6 +2249,10 @@ int32_t schCancelJob(SSchJob *pJob) { } void schCloseJobRef(void) { + if (!atomic_load_8((int8_t*)&schMgmt.exit)) { + return; + } + SCH_LOCK(SCH_WRITE, &schMgmt.lock); if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) { taosCloseRef(schMgmt.jobRef); @@ -2390,7 +2394,7 @@ _return: } int32_t schedulerInit(SSchedulerCfg *cfg) { - if (schMgmt.jobRef) { + if (schMgmt.jobRef >= 0) { qError("scheduler already initialized"); return TSDB_CODE_QRY_INVALID_INPUT; } @@ -2754,6 +2758,8 @@ void schedulerFreeTaskList(SArray *taskList) { } void schedulerDestroy(void) { + atomic_store_8((int8_t*)&schMgmt.exit, 1); + if (schMgmt.jobRef >= 0) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); int64_t refId = 0; From dd13924dbcbfe260207840d18b8c428ca03da015 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 14:35:54 +0800 Subject: [PATCH 6/7] stmt query --- source/client/src/clientStmt.c | 6 +++++- source/libs/planner/src/planner.c | 32 ++++++++++++++++++++++++++----- source/libs/scalar/src/filter.c | 1 + tests/script/api/batchprepare.c | 10 +++++----- 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index aad7012056..5ddffa0cbd 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -540,7 +540,11 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) { } if (colIdx < 0) { - qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); + int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); + if (code) { + tscError("qBindStmtColsValue failed, error:%s", tstrerror(code)); + STMT_ERR_RET(code); + } } else { if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) { tscError("bind column index not in sequence"); diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 70a969584a..6336377279 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -104,8 +104,9 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; return TSDB_CODE_SUCCESS; } + int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes); pVal->node.resType.type = pParam->buffer_type; - pVal->node.resType.bytes = NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes; + pVal->node.resType.bytes = inputSize; switch (pParam->buffer_type) { case TSDB_DATA_TYPE_BOOL: pVal->datum.b = *((bool*)pParam->buffer); @@ -130,7 +131,6 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { break; case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: - case TSDB_DATA_TYPE_NCHAR: pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1); if (NULL == pVal->datum.p) { return TSDB_CODE_OUT_OF_MEMORY; @@ -138,6 +138,21 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { varDataSetLen(pVal->datum.p, pVal->node.resType.bytes); strncpy(varDataVal(pVal->datum.p), (const char*)pParam->buffer, pVal->node.resType.bytes); break; + case TSDB_DATA_TYPE_NCHAR: { + pVal->node.resType.bytes *= TSDB_NCHAR_SIZE; + pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1); + if (NULL == pVal->datum.p) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t output = 0; + if (!taosMbsToUcs4(pParam->buffer, inputSize, (TdUcs4*)varDataVal(pVal->datum.p), pVal->node.resType.bytes, &output)) { + return errno; + } + varDataSetLen(pVal->datum.p, output); + pVal->node.resType.bytes = output; + break; + } case TSDB_DATA_TYPE_TIMESTAMP: pVal->datum.i = *((int64_t*)pParam->buffer); break; @@ -181,13 +196,20 @@ static EDealRes updatePlanQueryId(SNode* pNode, void* pContext) { int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId) { int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues); - + int32_t code = 0; + if (colIdx < 0) { for (int32_t i = 0; i < size; ++i) { - setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, i), pParams + i); + code = setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, i), pParams + i); + if (code) { + return code; + } } } else { - setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, colIdx), pParams); + code = setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, colIdx), pParams); + if (code) { + return code; + } } if (colIdx < 0 || ((colIdx + 1) == size)) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 864894fa42..80e5669cc2 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3765,6 +3765,7 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options) FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat)); info->scalarMode = stat.scalarMode; + fltDebug("scalar mode: %d", info->scalarMode); if (!info->scalarMode) { FLT_ERR_JRET(fltInitFromNode(pNode, info, options)); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index d7f928ccd9..f14885b72e 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -11,8 +11,8 @@ int32_t shortColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT}; int32_t fullColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL, TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_UINT, TSDB_DATA_TYPE_BIGINT, TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_BINARY, TSDB_DATA_TYPE_NCHAR}; -int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_NCHAR, TSDB_DATA_TYPE_SMALLINT}; -int32_t optrIdxList[] = {4, 11, 1}; +int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_NCHAR, TSDB_DATA_TYPE_BOOL}; +int32_t optrIdxList[] = {2, 11, 6}; typedef struct { char* oper; @@ -216,10 +216,10 @@ CaseCtrl gCaseCtrl = { .checkParamNum = false, .printRes = true, .runTimes = 0, - .caseIdx = -1, - .caseNum = -1, + .caseIdx = 2, + .caseNum = 1, .caseRunIdx = -1, - .caseRunNum = -1, + .caseRunNum = 1, }; #endif From 6973e3555474715d09407bcfb29443499e2277a8 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 15:13:44 +0800 Subject: [PATCH 7/7] fix mem leak --- source/libs/qworker/inc/qworkerInt.h | 9 ++++--- source/libs/qworker/src/qworker.c | 39 +++++++++++++++++++++------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 4f2f3febaf..a2b1353093 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -87,6 +87,7 @@ typedef struct SQWMsg { } SQWMsg; typedef struct SQWHbParam { + bool inUse; int32_t qwrId; int64_t refId; } SQWHbParam; @@ -158,9 +159,11 @@ typedef struct SQWorker { } SQWorker; typedef struct SQWorkerMgmt { - SRWLatch lock; - int32_t qwRef; - int32_t qwNum; + SRWLatch lock; + int32_t qwRef; + int32_t qwNum; + SQWHbParam param[1024]; + int32_t paramIdx; } SQWorkerMgmt; #define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c94dd29ea1..e84f387dbe 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1340,7 +1340,6 @@ _return: void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWHbParam* hbParam = (SQWHbParam*)param; if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { - taosMemoryFree(param); return; } @@ -1463,6 +1462,28 @@ int32_t qwOpenRef(void) { return TSDB_CODE_SUCCESS; } +void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { + int32_t paramIdx = 0; + int32_t newParamIdx = 0; + + while (true) { + paramIdx = atomic_load_32(&gQwMgmt.paramIdx); + if (paramIdx == tListLen(gQwMgmt.param)) { + newParamIdx = 0; + } else { + newParamIdx = paramIdx + 1; + } + + if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) { + break; + } + } + + gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef; + gQwMgmt.param[paramIdx].refId = refId; + + *pParam = &gQwMgmt.param[paramIdx]; +} int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { @@ -1470,7 +1491,10 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - atomic_add_fetch_32(&gQwMgmt.qwNum, 1); + int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); + if (1 == qwNum) { + memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); + } int32_t code = qwOpenRef(); if (code) { @@ -1533,14 +1557,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_ERR_JRET(terrno); } - SQWHbParam *param = taosMemoryMalloc(sizeof(SQWHbParam)); - if (NULL == param) { - qError("malloc hb param failed, error:%s", tstrerror(terrno)); - QW_ERR_JRET(terrno); - } - param->qwrId = gQwMgmt.qwRef; - param->refId = mgmt->refId; - + SQWHbParam *param = NULL; + qwSetHbParam(mgmt->refId, ¶m); + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); if (NULL == mgmt->hbTimer) { qError("start hb timer failed");