stmt query
This commit is contained in:
parent
5935c9cda4
commit
54fe93f77e
|
@ -46,11 +46,12 @@ typedef struct SStmtTableCache {
|
|||
void* boundTags;
|
||||
} SStmtTableCache;
|
||||
|
||||
typedef struct SQueryFields {
|
||||
typedef struct SStmtQueryResInfo {
|
||||
TAOS_FIELD* fields;
|
||||
TAOS_FIELD* userFields;
|
||||
uint32_t numOfCols;
|
||||
} SQueryFields;
|
||||
int32_t precision;
|
||||
} SStmtQueryResInfo;
|
||||
|
||||
typedef struct SStmtBindInfo {
|
||||
bool needParse;
|
||||
|
@ -72,17 +73,17 @@ typedef struct SStmtExecInfo {
|
|||
} SStmtExecInfo;
|
||||
|
||||
typedef struct SStmtSQLInfo {
|
||||
STMT_TYPE type;
|
||||
STMT_STATUS status;
|
||||
bool autoCreate;
|
||||
uint64_t runTimes;
|
||||
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
||||
SQuery* pQuery;
|
||||
char* sqlStr;
|
||||
int32_t sqlLen;
|
||||
SArray* nodeList;
|
||||
SQueryPlan* pQueryPlan;
|
||||
SQueryFields fields;
|
||||
STMT_TYPE type;
|
||||
STMT_STATUS status;
|
||||
bool autoCreate;
|
||||
uint64_t runTimes;
|
||||
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
||||
SQuery* pQuery;
|
||||
char* sqlStr;
|
||||
int32_t sqlLen;
|
||||
SArray* nodeList;
|
||||
SQueryPlan* pQueryPlan;
|
||||
SStmtQueryResInfo queryRes;
|
||||
} SStmtSQLInfo;
|
||||
|
||||
typedef struct STscStmt {
|
||||
|
|
|
@ -74,17 +74,44 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
|
|||
}
|
||||
|
||||
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
|
||||
SQueryFields *pFields = &pStmt->sql.fields;
|
||||
int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD);
|
||||
SStmtQueryResInfo *pRes = &pStmt->sql.queryRes;
|
||||
pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
|
||||
pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
|
||||
|
||||
pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
|
||||
pFields->fields = taosMemoryMalloc(size);
|
||||
pFields->userFields = taosMemoryMalloc(size);
|
||||
if (NULL == pFields->fields || NULL == pFields->userFields) {
|
||||
int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
|
||||
pRes->fields = taosMemoryMalloc(size);
|
||||
pRes->userFields = taosMemoryMalloc(size);
|
||||
if (NULL == pRes->fields || NULL == pRes->userFields) {
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
|
||||
memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
|
||||
memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
|
||||
memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
|
||||
SStmtQueryResInfo *pRes = &pStmt->sql.queryRes;
|
||||
int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
|
||||
|
||||
pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
|
||||
pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
|
||||
|
||||
if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
|
||||
pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
|
||||
if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
|
||||
}
|
||||
|
||||
if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
|
||||
pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
|
||||
if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -235,6 +262,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
|
|||
}
|
||||
|
||||
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
||||
taosMemoryFree(pStmt->sql.queryRes.fields);
|
||||
taosMemoryFree(pStmt->sql.queryRes.userFields);
|
||||
taosMemoryFree(pStmt->sql.sqlStr);
|
||||
qDestroyQuery(pStmt->sql.pQuery);
|
||||
qDestroyQueryPlan(pStmt->sql.pQueryPlan);
|
||||
|
@ -497,6 +526,8 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
|
|||
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
|
||||
pStmt->exec.pRequest->body.pDag = NULL;
|
||||
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
|
||||
} else {
|
||||
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
|
||||
}
|
||||
|
||||
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId));
|
||||
|
|
|
@ -47,7 +47,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
|||
typedef void (*MndCleanupFp)(SMnode *pMnode);
|
||||
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
|
||||
typedef struct SQWorkerMgmt SQHandle;
|
||||
typedef struct SQWorker SQHandle;
|
||||
|
||||
typedef struct {
|
||||
const char *name;
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SQWorkerMgmt SQHandle;
|
||||
typedef struct SQWorker SQHandle;
|
||||
|
||||
typedef struct SQnode {
|
||||
int32_t qndId;
|
||||
|
|
|
@ -52,7 +52,7 @@ typedef struct STsdb STsdb;
|
|||
typedef struct STQ STQ;
|
||||
typedef struct SVState SVState;
|
||||
typedef struct SVBufPool SVBufPool;
|
||||
typedef struct SQWorkerMgmt SQHandle;
|
||||
typedef struct SQWorker SQHandle;
|
||||
|
||||
#define VNODE_META_DIR "meta"
|
||||
#define VNODE_TSDB_DIR "tsdb"
|
||||
|
|
|
@ -206,6 +206,8 @@ static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) {
|
|||
if (NULL == pLogicCond) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
pLogicCond->condType = LOGIC_COND_TYPE_AND;
|
||||
int32_t code = nodesListMakeAppend(&pLogicCond->pParameterList, *pSrc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
#include "qworker.h"
|
||||
#include "tlockfree.h"
|
||||
#include "ttimer.h"
|
||||
#include "tref.h"
|
||||
|
||||
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
|
||||
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||
|
@ -85,6 +86,11 @@ typedef struct SQWMsg {
|
|||
SQWConnInfo connInfo;
|
||||
} SQWMsg;
|
||||
|
||||
typedef struct SQWHbParam {
|
||||
int32_t qwrId;
|
||||
int64_t refId;
|
||||
} SQWHbParam;
|
||||
|
||||
typedef struct SQWHbInfo {
|
||||
SSchedulerHbRsp rsp;
|
||||
SQWConnInfo connInfo;
|
||||
|
@ -137,7 +143,8 @@ typedef struct SQWSchStatus {
|
|||
} SQWSchStatus;
|
||||
|
||||
// Qnode/Vnode level task management
|
||||
typedef struct SQWorkerMgmt {
|
||||
typedef struct SQWorker {
|
||||
int64_t refId;
|
||||
SQWorkerCfg cfg;
|
||||
int8_t nodeType;
|
||||
int32_t nodeId;
|
||||
|
@ -148,9 +155,15 @@ typedef struct SQWorkerMgmt {
|
|||
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
|
||||
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
|
||||
SMsgCb msgCb;
|
||||
} SQWorker;
|
||||
|
||||
typedef struct SQWorkerMgmt {
|
||||
SRWLatch lock;
|
||||
int32_t qwRef;
|
||||
int32_t qwNum;
|
||||
} SQWorkerMgmt;
|
||||
|
||||
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
|
||||
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
|
||||
#define QW_IDS() sId, qId, tId, rId
|
||||
#define QW_FPARAMS() mgmt, QW_IDS()
|
||||
|
||||
|
@ -209,13 +222,13 @@ typedef struct SQWorkerMgmt {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
|
||||
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
|
||||
#define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__)
|
||||
#define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__)
|
||||
|
||||
#define QW_DUMP(param, ...) \
|
||||
#define QW_DUMP(_param, ...) \
|
||||
do { \
|
||||
if (gQWDebug.dumpEnable) { \
|
||||
qDebug("QW:%p " param, mgmt, __VA_ARGS__); \
|
||||
qDebug("QW:%p " _param, mgmt, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -282,6 +295,14 @@ typedef struct SQWorkerMgmt {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
extern SQWorkerMgmt gQwMgmt;
|
||||
|
||||
FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); }
|
||||
|
||||
FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); }
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -28,7 +28,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
|||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
|
||||
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
|
||||
|
||||
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code);
|
||||
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code);
|
||||
|
@ -41,10 +41,10 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
|
|||
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
|
||||
void qwFreeFetchRsp(void *msg);
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
|
||||
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
|
||||
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
|
||||
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
|
||||
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn);
|
||||
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -10,6 +10,11 @@
|
|||
#include "tname.h"
|
||||
|
||||
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
|
||||
SQWorkerMgmt gQwMgmt = {
|
||||
.lock = 0,
|
||||
.qwRef = -1,
|
||||
.qwNum = 0,
|
||||
};
|
||||
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
|
||||
if (!gQWDebug.statusEnable) {
|
||||
|
@ -98,7 +103,7 @@ _return:
|
|||
|
||||
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) {
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
|
||||
if (!gQWDebug.dumpEnable) {
|
||||
return;
|
||||
}
|
||||
|
@ -186,7 +191,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
|
||||
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
|
||||
SQWSchStatus newSch = {0};
|
||||
newSch.tasksHash =
|
||||
taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
@ -213,7 +218,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
|
||||
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
|
||||
while (true) {
|
||||
QW_LOCK(rwType, &mgmt->schLock);
|
||||
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
|
||||
|
@ -240,15 +245,15 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
|
||||
}
|
||||
|
||||
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
|
||||
}
|
||||
|
||||
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
|
||||
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
|
||||
|
||||
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
|
@ -384,7 +389,7 @@ int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), fal
|
|||
|
||||
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
|
||||
|
||||
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
|
||||
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
|
||||
|
||||
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
|
||||
// Note: free/kill may in RC
|
||||
|
@ -606,7 +611,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
|
||||
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
|
||||
int32_t taskNum = 0;
|
||||
|
||||
hbInfo->connInfo = sch->hbConnInfo;
|
||||
|
@ -1262,7 +1267,7 @@ _return:
|
|||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t code = 0;
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
SQWSchStatus * sch = NULL;
|
||||
|
@ -1288,7 +1293,7 @@ int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq
|
|||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t code = 0;
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
SQWSchStatus * sch = NULL;
|
||||
|
@ -1333,7 +1338,20 @@ _return:
|
|||
}
|
||||
|
||||
void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
|
||||
SQWHbParam* hbParam = (SQWHbParam*)param;
|
||||
if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
|
||||
taosMemoryFree(param);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t refId = hbParam->refId;
|
||||
SQWorker *mgmt = qwAcquire(refId);
|
||||
if (NULL == mgmt) {
|
||||
QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
|
||||
taosMemoryFree(param);
|
||||
return;
|
||||
}
|
||||
|
||||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SQWHbInfo * rspList = NULL;
|
||||
|
@ -1347,6 +1365,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
if (schNum <= 0) {
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1355,6 +1374,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
QW_ELOG("calloc %d SQWHbInfo failed", schNum);
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1396,18 +1416,72 @@ _return:
|
|||
taosMemoryFreeClear(rspList);
|
||||
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
}
|
||||
|
||||
void qwCloseRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||
taosCloseRef(gQwMgmt.qwRef);
|
||||
gQwMgmt.qwRef= -1;
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
}
|
||||
|
||||
void qwDestroyImpl(void *pMgmt) {
|
||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||
|
||||
taosTmrStopA(&mgmt->hbTimer);
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
// TODO STOP ALL QUERY
|
||||
|
||||
// TODO FREE ALL
|
||||
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
|
||||
taosMemoryFree(mgmt);
|
||||
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
|
||||
qwCloseRef();
|
||||
}
|
||||
|
||||
int32_t qwOpenRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (gQwMgmt.qwRef < 0) {
|
||||
gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl);
|
||||
if (gQwMgmt.qwRef < 0) {
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
qError("init qworker ref failed");
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
|
||||
if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
|
||||
qError("invalid param to init qworker");
|
||||
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SQWorkerMgmt *mgmt = taosMemoryCalloc(1, sizeof(SQWorkerMgmt));
|
||||
atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
|
||||
int32_t code = qwOpenRef();
|
||||
if (code) {
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
|
||||
if (NULL == mgmt) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
|
||||
qError("calloc %d failed", (int32_t)sizeof(SQWorker));
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -1449,16 +1523,30 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
|
||||
if (NULL == mgmt->hbTimer) {
|
||||
qError("start hb timer failed");
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
mgmt->nodeType = nodeType;
|
||||
mgmt->nodeId = nodeId;
|
||||
mgmt->msgCb = *pMsgCb;
|
||||
|
||||
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
|
||||
if (mgmt->refId < 0) {
|
||||
qError("taosAddRef qw failed, error:%s", tstrerror(terrno));
|
||||
QW_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
SQWHbParam *param = taosMemoryMalloc(sizeof(SQWHbParam));
|
||||
if (NULL == param) {
|
||||
qError("malloc hb param failed, error:%s", tstrerror(terrno));
|
||||
QW_ERR_JRET(terrno);
|
||||
}
|
||||
param->qwrId = gQwMgmt.qwRef;
|
||||
param->refId = mgmt->refId;
|
||||
|
||||
mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer);
|
||||
if (NULL == mgmt->hbTimer) {
|
||||
qError("start hb timer failed");
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
*qWorkerMgmt = mgmt;
|
||||
|
||||
qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
|
||||
|
@ -1467,13 +1555,17 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
|
||||
_return:
|
||||
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
taosMemoryFreeClear(mgmt);
|
||||
if (mgmt->refId >= 0) {
|
||||
qwRelease(mgmt->refId);
|
||||
} else {
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
taosMemoryFreeClear(mgmt);
|
||||
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
}
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1482,22 +1574,14 @@ void qWorkerDestroy(void **qWorkerMgmt) {
|
|||
return;
|
||||
}
|
||||
|
||||
SQWorkerMgmt *mgmt = *qWorkerMgmt;
|
||||
SQWorker *mgmt = *qWorkerMgmt;
|
||||
|
||||
taosTmrStopA(&mgmt->hbTimer);
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
// TODO STOP ALL QUERY
|
||||
|
||||
// TODO FREE ALL
|
||||
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
|
||||
taosMemoryFreeClear(*qWorkerMgmt);
|
||||
if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
|
||||
qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
|
||||
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
|
||||
/*
|
||||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
|
@ -1544,7 +1628,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
||||
int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
/*
|
||||
|
@ -1557,7 +1641,7 @@ int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
|
||||
int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
|
||||
SQWSchStatus * sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -1584,7 +1668,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
||||
int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
||||
SQWSchStatus * sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -319,7 +319,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) {
|
||||
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn) {
|
||||
SSchedulerHbReq req = {0};
|
||||
req.header.vgId = mgmt->nodeId;
|
||||
req.sId = sId;
|
||||
|
@ -362,7 +362,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -404,7 +404,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
|
||||
bool needStop = false;
|
||||
SQWTaskCtx *handles = NULL;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -435,7 +435,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SResReadyReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -477,7 +477,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
msg->sId = htobe64(msg->sId);
|
||||
uint64_t sId = msg->sId;
|
||||
|
||||
|
@ -498,7 +498,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
SResFetchReq *msg = pMsg->pCont;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
int32_t code = 0;
|
||||
STaskCancelReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
|
@ -578,7 +578,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
STaskDropReq *msg = pMsg->pCont;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = 0;
|
||||
SSchedulerHbReq req = {0};
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
if (NULL == pMsg->pCont) {
|
||||
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
|
||||
|
|
|
@ -188,8 +188,6 @@ CaseCtrl gCaseCtrl = {
|
|||
.caseRunIdx = -1,
|
||||
// .caseRunNum = -1,
|
||||
|
||||
.bindColTypeNum = tListLen(bindColTypeList),
|
||||
.bindColTypeList = bindColTypeList,
|
||||
.caseIdx = 22,
|
||||
.caseNum = 1,
|
||||
.caseRunNum = 1,
|
||||
|
@ -318,7 +316,7 @@ void generateInsertSQL(BindData *data) {
|
|||
len += sprintf(data->sql + len, "ubigdata");
|
||||
break;
|
||||
default:
|
||||
printf("invalid col type:%d", data->pBind[c].buffer_type);
|
||||
printf("!!!invalid col type:%d", data->pBind[c].buffer_type);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -336,7 +334,7 @@ void generateInsertSQL(BindData *data) {
|
|||
len += sprintf(data->sql + len, ")");
|
||||
|
||||
if (gCaseCtrl.printStmtSql) {
|
||||
printf("SQL: %s\n", data->sql);
|
||||
printf("\tSQL: %s\n", data->sql);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -358,7 +356,7 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType) {
|
|||
}
|
||||
break;
|
||||
default:
|
||||
printf("invalid paramNum:%d\n", pInfo->paramNum);
|
||||
printf("!!!invalid paramNum:%d\n", pInfo->paramNum);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -414,7 +412,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) {
|
|||
len += sprintf(data->sql + len, "ubigdata");
|
||||
break;
|
||||
default:
|
||||
printf("invalid col type:%d", data->pBind[c].buffer_type);
|
||||
printf("!!!invalid col type:%d", data->pBind[c].buffer_type);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -423,7 +421,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) {
|
|||
}
|
||||
|
||||
if (gCaseCtrl.printStmtSql) {
|
||||
printf("SQL: %s\n", data->sql);
|
||||
printf("\tSTMT SQL: %s\n", data->sql);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -551,7 +549,7 @@ int32_t prepareColData(BindData *data, int32_t bindIdx, int32_t rowIdx, int32_t
|
|||
data->pBind[bindIdx].is_null = data->isNull ? (data->isNull + rowIdx) : NULL;
|
||||
break;
|
||||
default:
|
||||
printf("invalid col type:%d", dataType);
|
||||
printf("!!!invalid col type:%d", dataType);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -709,7 +707,7 @@ void bpFetchRows(TAOS_RES *result, bool printr, int32_t *rows) {
|
|||
if (printr) {
|
||||
memset(temp, 0, sizeof(temp));
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
printf("[%s]\n", temp);
|
||||
printf("\t[%s]\n", temp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -718,7 +716,7 @@ void bpExecQuery(TAOS * taos, char* sql, bool printr, int32_t *rows) {
|
|||
TAOS_RES *result = taos_query(taos, sql);
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to query table, reason:%s\n", taos_errstr(result));
|
||||
printf("!!!failed to query table, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
|
@ -791,7 +789,7 @@ int32_t bpAppendValueString(char *buf, int type, void *value, int32_t valueLen,
|
|||
break;
|
||||
|
||||
default:
|
||||
printf("invalid data type:%d\n", type);
|
||||
printf("!!!invalid data type:%d\n", type);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -803,13 +801,13 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
|
|||
if (gCurCase->bindRowNum > 1) {
|
||||
if (0 == (n++%2)) {
|
||||
if (taos_stmt_bind_param_batch(stmt, bind)) {
|
||||
printf("taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < gCurCase->bindColNum; ++i) {
|
||||
if (taos_stmt_bind_single_param_batch(stmt, bind++, i)) {
|
||||
printf("taos_stmt_bind_single_param_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_bind_single_param_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -817,12 +815,12 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
|
|||
} else {
|
||||
if (0 == (n++%2)) {
|
||||
if (taos_stmt_bind_param_batch(stmt, bind)) {
|
||||
printf("taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
} else {
|
||||
if (taos_stmt_bind_param(stmt, bind)) {
|
||||
printf("taos_stmt_bind_param error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_bind_param error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -834,12 +832,12 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
|
|||
void bpCheckIsInsert(TAOS_STMT *stmt, int32_t insert) {
|
||||
int32_t isInsert = 0;
|
||||
if (taos_stmt_is_insert(stmt, &isInsert)) {
|
||||
printf("taos_stmt_is_insert error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_is_insert error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (insert != isInsert) {
|
||||
printf("is insert failed\n");
|
||||
printf("!!!is insert failed\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -847,12 +845,12 @@ void bpCheckIsInsert(TAOS_STMT *stmt, int32_t insert) {
|
|||
void bpCheckParamNum(TAOS_STMT *stmt) {
|
||||
int32_t num = 0;
|
||||
if (taos_stmt_num_params(stmt, &num)) {
|
||||
printf("taos_stmt_num_params error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_num_params error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (gCurCase->bindColNum != num) {
|
||||
printf("is insert failed\n");
|
||||
printf("!!!is insert failed\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -861,7 +859,7 @@ void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) {
|
|||
int32_t rows = taos_stmt_affected_rows(stmt);
|
||||
int32_t insertNum = gCurCase->rowNum * gCurCase->tblNum * times;
|
||||
if (insertNum != rows) {
|
||||
printf("affected rows %d mis-match with insert num %d\n", rows, insertNum);
|
||||
printf("!!!affected rows %d mis-match with insert num %d\n", rows, insertNum);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -869,7 +867,7 @@ void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) {
|
|||
void bpCheckAffectedRowsOnce(TAOS_STMT *stmt, int32_t expectedNum) {
|
||||
int32_t rows = taos_stmt_affected_rows_once(stmt);
|
||||
if (expectedNum != rows) {
|
||||
printf("affected rows %d mis-match with expected num %d\n", rows, expectedNum);
|
||||
printf("!!!affected rows %d mis-match with expected num %d\n", rows, expectedNum);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -904,16 +902,16 @@ void bpCheckQueryResult(TAOS_STMT *stmt, TAOS *taos, char *stmtSql, TAOS_MULTI_B
|
|||
}
|
||||
|
||||
if (gCaseCtrl.printQuerySql) {
|
||||
printf("Query SQL: %s\n", sql);
|
||||
printf("\tQuery SQL: %s\n", sql);
|
||||
}
|
||||
|
||||
bpExecQuery(taos, sql, gCaseCtrl.printRes, &sqlResNum);
|
||||
if (sqlResNum != stmtResNum) {
|
||||
printf("sql res num %d mis-match stmt res num %d\n", sqlResNum, stmtResNum);
|
||||
printf("!!!sql res num %d mis-match stmt res num %d\n", sqlResNum, stmtResNum);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
printf("sql res num match stmt res num %d\n", stmtResNum);
|
||||
printf("***sql res num match stmt res num %d\n", stmtResNum);
|
||||
}
|
||||
|
||||
/* prepare [settbname [bind add]] exec */
|
||||
|
@ -923,7 +921,7 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -936,7 +934,7 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -951,14 +949,14 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -978,7 +976,7 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -993,7 +991,7 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1007,14 +1005,14 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1033,7 +1031,7 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1046,7 +1044,7 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1061,13 +1059,13 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1087,7 +1085,7 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1100,7 +1098,7 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1115,12 +1113,12 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1141,7 +1139,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1154,7 +1152,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1169,7 +1167,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1179,12 +1177,12 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1206,7 +1204,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1221,7 +1219,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1235,12 +1233,12 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1264,7 +1262,7 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1277,7 +1275,7 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
sprintf(buf, "t%d", t);
|
||||
code = taos_stmt_set_tbname(stmt, buf);
|
||||
if (code != 0){
|
||||
printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1292,13 +1290,13 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -1328,7 +1326,7 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
|
||||
int code = taos_stmt_prepare(stmt, data.sql, 0);
|
||||
if (code != 0){
|
||||
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -1344,12 +1342,12 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) {
|
|||
}
|
||||
|
||||
if (taos_stmt_add_batch(stmt)) {
|
||||
printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
@ -4249,10 +4247,10 @@ void prepareCheckResultImpl(TAOS * taos, char *tname, bool printr, int expec
|
|||
|
||||
if (rows == expected) {
|
||||
if (!silent) {
|
||||
printf("%d rows are fetched as expected from %s\n", rows, tname);
|
||||
printf("***%d rows are fetched as expected from %s\n", rows, tname);
|
||||
}
|
||||
} else {
|
||||
printf("!!!expect %d rows, but %d rows are fetched from %s\n", expected, rows, tname);
|
||||
printf("!!!expect rows %d mis-match rows %d fetched from %s\n", expected, rows, tname);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
@ -4302,7 +4300,7 @@ int sql_perf1(TAOS *taos) {
|
|||
result = taos_query(taos, sql[i]);
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to query table, reason:%s\n", taos_errstr(result));
|
||||
printf("%d failed to query table, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
|
@ -4539,7 +4537,7 @@ void generateCreateTableSQL(char *buf, int32_t tblIdx, int32_t colNum, int32_t *
|
|||
blen += sprintf(buf + blen, ")");
|
||||
|
||||
if (gCaseCtrl.printCreateTblSql) {
|
||||
printf("Create Table SQL:%s\n", buf);
|
||||
printf("\tCreate Table SQL:%s\n", buf);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4553,7 +4551,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) {
|
|||
result = taos_query(taos, "create database demo keep 36500");
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create database, reason:%s\n", taos_errstr(result));
|
||||
printf("!!!failed to create database, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
|
@ -4570,7 +4568,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) {
|
|||
result = taos_query(taos, buf);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create table, reason:%s\n", taos_errstr(result));
|
||||
printf("!!!failed to create table, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
|
@ -4583,7 +4581,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) {
|
|||
result = taos_query(taos, buf);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create table, reason:%s\n", taos_errstr(result));
|
||||
printf("!!!failed to create table, reason:%s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(1);
|
||||
}
|
||||
|
@ -4654,7 +4652,7 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) {
|
|||
|
||||
stmt = taos_stmt_init(taos);
|
||||
if (NULL == stmt) {
|
||||
printf("taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt));
|
||||
printf("!!!taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue