diff --git a/include/client/taos.h b/include/client/taos.h index d31d5c582c..79f567fc9a 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -131,10 +131,10 @@ DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT setConfRet taos_set_config(const char *config); DLL_EXPORT int taos_init(void); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); -const char *taos_data_type(int type); +const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); @@ -164,6 +164,7 @@ DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT void taos_kill_query(TAOS *taos); DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index ecb21335b9..be3d16ab0d 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -69,18 +69,20 @@ typedef struct SSchdFetchParam { int32_t* code; } SSchdFetchParam; -typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code); -typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); +typedef void (*schedulerExecFp)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code); +typedef bool (*schedulerChkKillFp)(void* param); typedef struct SSchedulerReq { - bool *reqKilled; SRequestConnInfo *pConn; SArray *pNodeList; SQueryPlan *pDag; const char *sql; int64_t startTs; - schedulerExecCallback fp; - void* cbParam; + schedulerExecFp execFp; + void* execParam; + schedulerChkKillFp chkKillFp; + void* chkKillParam; } SSchedulerReq; @@ -110,7 +112,7 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) */ int32_t schedulerFetchRows(int64_t job, void **data); -void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); +void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param); int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index cca79186d0..3b21258fe1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -66,7 +66,7 @@ enum { typedef struct SAppInstInfo SAppInstInfo; typedef struct { - char* key; + char* key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -118,6 +118,7 @@ struct SAppInstInfo { uint64_t clusterId; void* pTransporter; SAppHbMgr* pAppHbMgr; + char* instKey; }; typedef struct SAppInfo { @@ -139,7 +140,7 @@ typedef struct STscObj { int8_t connType; int32_t acctId; uint32_t connId; - TAOS* id; // ref ID returned by taosAddRef + int64_t id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; @@ -183,7 +184,7 @@ typedef struct SRequestSendRecvBody { void* param; SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. - struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. + int32_t subplanNum; SReqResultInfo resInfo; } SRequestSendRecvBody; @@ -300,6 +301,8 @@ void* createRequest(STscObj* pObj, int32_t type); void destroyRequest(SRequestObj* pRequest); SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); +int32_t removeRequest(int64_t rid); +void doDestroyRequest(void *p); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); @@ -334,6 +337,8 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); +void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); +void closeAllRequests(SHashObj *pRequests); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 8e0556125a..d150d68c8d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -37,10 +37,12 @@ int32_t clientConnRefPool = -1; static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; -static void registerRequest(SRequestObj *pRequest) { - STscObj *pTscObj = acquireTscObj(*(int64_t *)pRequest->pTscObj->id); - - assert(pTscObj != NULL); +static int32_t registerRequest(SRequestObj *pRequest) { + STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); + if (NULL == pTscObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return terrno; + } // connection has been released already, abort creating request. pRequest->self = taosAddRef(clientReqRefPool, pRequest); @@ -54,8 +56,10 @@ static void registerRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, - pRequest->self, *(int64_t *)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); + pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } + + return TSDB_CODE_SUCCESS; } static void deregisterRequest(SRequestObj *pRequest) { @@ -70,26 +74,23 @@ static void deregisterRequest(SRequestObj *pRequest) { int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", - pRequest->self, *(int64_t *)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); - releaseTscObj(*(int64_t *)pTscObj->id); + pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); + releaseTscObj(pTscObj->id); } // todo close the transporter properly -void closeTransporter(STscObj *pTscObj) { - if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { +void closeTransporter(SAppInstInfo *pAppInfo) { + if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) { return; } - tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t *)pTscObj->id); - rpcClose(pTscObj->pAppInfo->pTransporter); + tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo); + rpcClose(pAppInfo->pTransporter); } static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { - if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) { - return false; - } return true; } else { return false; @@ -123,26 +124,46 @@ void closeAllRequests(SHashObj *pRequests) { while (pIter != NULL) { int64_t *rid = pIter; - releaseRequest(*rid); + removeRequest(*rid); pIter = taosHashIterate(pRequests, pIter); } } +void destroyAppInst(SAppInstInfo *pAppInfo) { + tscDebug("destroy app inst mgr %p", pAppInfo); + + taosThreadMutexLock(&appInfo.mutex); + + hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); + taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey)); + + taosThreadMutexUnlock(&appInfo.mutex); + + taosMemoryFreeClear(pAppInfo->instKey); + closeTransporter(pAppInfo); + + taosThreadMutexLock(&pAppInfo->qnodeMutex); + taosArrayDestroy(pAppInfo->pQnodeList); + taosThreadMutexUnlock(&pAppInfo->qnodeMutex); + + taosMemoryFree(pAppInfo); +} + void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - SClientHbKey connKey = {.tscRid = *(int64_t *)pTscObj->id, .connType = pTscObj->connType}; + SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); - if (0 == connNum) { - // TODO - // closeTransporter(pTscObj); - } - tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id, + tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); + + if (0 == connNum) { + destroyAppInst(pTscObj->pAppInfo); + } taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); } @@ -171,11 +192,12 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c } taosThreadMutexInit(&pObj->mutex, NULL); - pObj->id = taosMemoryMalloc(sizeof(int64_t)); - *(int64_t *)pObj->id = taosAddRef(clientConnRefPool, pObj); + pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->schemalessType = 1; - tscDebug("connObj created, 0x%" PRIx64, *(int64_t *)pObj->id); + atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); + + tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj); return pObj; } @@ -205,7 +227,10 @@ void *createRequest(STscObj *pObj, int32_t type) { pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; tsem_init(&pRequest->body.rspSem, 0, 0); - registerRequest(pRequest); + if (registerRequest(pRequest)) { + doDestroyRequest(pRequest); + return NULL; + } return pRequest; } @@ -227,12 +252,16 @@ void doFreeReqResultInfo(SReqResultInfo *pResInfo) { } } -static void doDestroyRequest(void *p) { +SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); } + +int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); } + +int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } + +void doDestroyRequest(void *p) { assert(p != NULL); SRequestObj *pRequest = (SRequestObj *)p; - assert(RID_VALID(pRequest->self)); - taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); if (pRequest->body.queryJob != 0) { @@ -244,14 +273,15 @@ static void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); - qDestroyQueryPlan(pRequest->body.pDag); taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); destroyQueryExecRes(&pRequest->body.resInfo.execRes); - deregisterRequest(pRequest); + if (pRequest->self) { + deregisterRequest(pRequest); + } taosMemoryFreeClear(pRequest); } @@ -260,13 +290,9 @@ void destroyRequest(SRequestObj *pRequest) { return; } - taosRemoveRef(clientReqRefPool, pRequest->self); + removeRequest(pRequest->self); } -SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); } - -int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); } - void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 86562fea97..2de630e181 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -66,25 +66,31 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { - SDBVgInfo vgInfo = {0}; - vgInfo.vgVersion = rsp->vgVersion; - vgInfo.hashMethod = rsp->hashMethod; - vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == vgInfo.vgHash) { + SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo)); + if (NULL == vgInfo) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + vgInfo->vgVersion = rsp->vgVersion; + vgInfo->hashMethod = rsp->hashMethod; + vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo->vgHash) { + taosMemoryFree(vgInfo); tscError("hash init[%d] failed", rsp->vgNum); return TSDB_CODE_TSC_OUT_OF_MEMORY; } for (int32_t j = 0; j < rsp->vgNum; ++j) { SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); - if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { + if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { tscError("hash push failed, errno:%d", errno); - taosHashCleanup(vgInfo.vgHash); + taosHashCleanup(vgInfo->vgHash); + taosMemoryFree(vgInfo); return TSDB_CODE_TSC_OUT_OF_MEMORY; } } - catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); + catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo); } if (code) { @@ -269,8 +275,11 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) int32_t rspNum = taosArrayGetSize(pRsp.rsps); + taosThreadMutexLock(&appInfo.mutex); + SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { + taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); taosMemoryFreeClear(param); tFreeClientHbBatchRsp(&pRsp); @@ -294,6 +303,8 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) } } + taosThreadMutexUnlock(&appInfo.mutex); + tFreeClientHbBatchRsp(&pRsp); return code; @@ -320,7 +331,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { desc.reqRid = pRequest->self; desc.stableQuery = pRequest->stableQuery; taosGetFqdn(desc.fqdn); - desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0; + desc.subPlanNum = pRequest->body.subplanNum; if (desc.subPlanNum) { desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc)); @@ -790,22 +801,40 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { return pAppHbMgr; } +void hbFreeAppHbMgr(SAppHbMgr *pTarget) { + void *pIter = taosHashIterate(pTarget->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq *pOneReq = pIter; + tFreeClientHbReq(pOneReq); + pIter = taosHashIterate(pTarget->activeInfo, pIter); + } + taosHashCleanup(pTarget->activeInfo); + pTarget->activeInfo = NULL; + + taosMemoryFree(pTarget->key); + taosMemoryFree(pTarget); +} + +void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) { + taosThreadMutexLock(&clientHbMgr.lock); + int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs); + for (int32_t i = 0; i < mgrSize; ++i) { + SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i); + if (pItem == *pAppHbMgr) { + hbFreeAppHbMgr(*pAppHbMgr); + *pAppHbMgr = NULL; + taosArrayRemove(clientHbMgr.appHbMgrs, i); + break; + } + } + taosThreadMutexUnlock(&clientHbMgr.lock); +} + void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); - - void *pIter = taosHashIterate(pTarget->activeInfo, NULL); - while (pIter != NULL) { - SClientHbReq *pOneReq = pIter; - tFreeClientHbReq(pOneReq); - pIter = taosHashIterate(pTarget->activeInfo, pIter); - } - taosHashCleanup(pTarget->activeInfo); - pTarget->activeInfo = NULL; - - taosMemoryFree(pTarget->key); - taosMemoryFree(pTarget); + hbFreeAppHbMgr(pTarget); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 489966b636..09fb73cfba 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -55,6 +55,18 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i return strdup(key); } +bool chkRequestKilled(void* param) { + bool killed = false; + SRequestObj* pRequest = acquireRequest((int64_t)param); + if (NULL == pRequest || pRequest->killed) { + killed = true; + } + + releaseRequest((int64_t)param); + + return killed; +} + static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType); @@ -122,6 +134,9 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); + p->instKey = key; + key = NULL; + tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, ip, port); pInst = &p; } @@ -609,58 +624,6 @@ _return: return code; } -int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { - tsem_init(&schdRspSem, 0, 0); - - SQueryResult res = {.code = 0, .numOfRows = 0}; - SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self}; - SSchedulerReq req = {.pConn = &conn, - .pNodeList = pNodeList, - .pDag = pDag, - .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, - .fp = schdExecCallback, - .cbParam = &res}; - - int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); - - pRequest->body.resInfo.execRes = res.res; - - while (true) { - if (code != TSDB_CODE_SUCCESS) { - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob, 0); - } - - pRequest->code = code; - terrno = code; - return pRequest->code; - } else { - tsem_wait(&schdRspSem); - - if (res.code) { - code = res.code; - } else { - break; - } - } - } - - if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { - pRequest->body.resInfo.numOfRows = res.numOfRows; - - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob, 0); - } - } - - pRequest->code = res.code; - terrno = res.code; - return pRequest->code; -} - int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; @@ -669,13 +632,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = {.pConn = &conn, - .pNodeList = pNodeList, - .pDag = pDag, - .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, - .fp = NULL, - .cbParam = NULL, - .reqKilled = &pRequest->killed}; + .pNodeList = pNodeList, + .pDag = pDag, + .sql = pRequest->sqlstr, + .startTs = pRequest->metric.start, + .execFp = NULL, + .execParam = NULL, + .chkKillFp = chkRequestKilled, + .chkKillParam = (void*)pRequest->self}; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res); pRequest->body.resInfo.execRes = res.res; @@ -874,14 +838,18 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue } break; case QUERY_EXEC_MODE_SCHEDULE: { - SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); - code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList); - if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { - SArray* pNodeList = NULL; - buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList); + SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); + SQueryPlan* pDag = NULL; + code = getPlan(pRequest, pQuery, &pDag, pMnodeList); + if (TSDB_CODE_SUCCESS == code) { + pRequest->body.subplanNum = pDag->numOfSubplans; + if (!pRequest->validateOnly) { + SArray* pNodeList = NULL; + buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList); - code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList); - taosArrayDestroy(pNodeList); + code = scheduleQuery(pRequest, pDag, pNodeList); + taosArrayDestroy(pNodeList); + } } taosArrayDestroy(pMnodeList); break; @@ -959,10 +927,13 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pUser = pRequest->pTscObj->user}; SAppInstInfo* pAppInfo = getAppInfo(pRequest); - code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pMnodeList); + SQueryPlan* pDag = NULL; + code = qCreateQueryPlan(&cxt, &pDag, pMnodeList); if (code) { tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); + } else { + pRequest->body.subplanNum = pDag->numOfSubplans; } if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { @@ -973,12 +944,13 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = {.pConn = &conn, .pNodeList = pNodeList, - .pDag = pRequest->body.pDag, + .pDag = pDag, .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, - .fp = schedulerExecCb, - .cbParam = pRequest, - .reqKilled = &pRequest->killed}; + .execFp = schedulerExecCb, + .execParam = pRequest, + .chkKillFp = chkRequestKilled, + .chkKillParam = (void*)pRequest->self}; code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); taosArrayDestroy(pNodeList); } else { @@ -1163,7 +1135,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t taos_close_internal(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, *(int64_t*)pTscObj->id, + tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -1326,7 +1298,9 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); if (pObj) { - return pObj->id; + int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t)); + *rid = pObj->id; + return (TAOS*)rid; } return NULL; @@ -2001,17 +1975,26 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) { void syncQueryFn(void* param, void* res, int32_t code) { SSyncQueryParam* pParam = param; pParam->pRequest = res; - pParam->pRequest->code = code; + if (pParam->pRequest) { + pParam->pRequest->code = code; + } tsem_post(&pParam->sem); } void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + fp(param, NULL, terrno); + return; + } + + int64_t rid = *(int64_t*)taos; + STscObj* pTscObj = acquireTscObj(rid); if (pTscObj == NULL || sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; if (pTscObj) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); } else { terrno = TSDB_CODE_TSC_DISCONNECTED; } @@ -2023,6 +2006,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + releaseTscObj(rid); fp(param, NULL, terrno); return; @@ -2032,6 +2016,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (code != TSDB_CODE_SUCCESS) { terrno = code; + releaseTscObj(rid); fp(param, NULL, terrno); return; } @@ -2040,6 +2025,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* pRequest->body.queryFp = fp; pRequest->body.param = param; doAsyncQuery(pRequest, false); + releaseTscObj(rid); } TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { @@ -2048,7 +2034,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { return NULL; } - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + int64_t rid = *(int64_t*)taos; + STscObj* pTscObj = acquireTscObj(rid); if (pTscObj == NULL || sql == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -2058,16 +2045,16 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); tsem_init(¶m->sem, 0, 0); - taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly); + taosAsyncQueryImpl((TAOS*)&rid, sql, syncQueryFn, param, validateOnly); tsem_wait(¶m->sem); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); return param->pRequest; #else size_t sqlLen = strlen(sql); if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; @@ -2075,7 +2062,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); return pRes; #endif diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 365367360f..c8595ae29a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -106,7 +106,9 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); if (pObj) { - return pObj->id; + int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); + *rid = pObj->id; + return (TAOS*)rid; } return NULL; @@ -118,9 +120,9 @@ void taos_close_internal(void *taos) { } STscObj *pTscObj = (STscObj *)taos; - tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t *)pTscObj->id, pTscObj->numOfReqs); + tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - taosRemoveRef(clientConnRefPool, *(int64_t *)pTscObj->id); + taosRemoveRef(clientConnRefPool, pTscObj->id); } void taos_close(TAOS *taos) { @@ -192,6 +194,17 @@ void taos_free_result(TAOS_RES *res) { } } +void taos_kill_query(TAOS *taos) { + if (NULL == taos) { + return; + } + int64_t rid = *(int64_t*)taos; + + STscObj* pTscObj = acquireTscObj(rid); + closeAllRequests(pTscObj->pRequests); + releaseTscObj(rid); +} + int taos_field_count(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ_META(res)) { return 0; @@ -895,6 +908,12 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { } int taos_load_table_info(TAOS *taos, const char *tableNameList) { + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return terrno; + } + + int64_t rid = *(int64_t*)taos; const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list int32_t code = 0; SRequestObj *pRequest = NULL; @@ -912,7 +931,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return TSDB_CODE_TSC_INVALID_OPERATION; } - STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); + STscObj *pTscObj = acquireTscObj(rid); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return terrno; @@ -957,7 +976,7 @@ _return: taosArrayDestroy(catalogReq.pTableMeta); destroyRequest(pRequest); - releaseTscObj(*(int64_t *)taos); + releaseTscObj(rid); return code; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 45a525d124..5c30df4ae2 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -77,7 +77,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) { tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, - connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, *(int64_t*)pTscObj->id); + connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id); } pTscObj->connId = connectRsp.connId; @@ -87,11 +87,10 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; - atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); pTscObj->connType = connectRsp.connType; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, *(int64_t*)pTscObj->id, connectRsp.clusterId, connectRsp.connType); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7d2bf019d2..1cb0e2d54b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_ADD_COLUMN: { int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); const char *errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { @@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_ADD_TAG: { int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); const char *errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { @@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_CHANGE_TAG_SIZE: { int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { pos--; ++freeBytes; outBytes = snprintf(pos, freeBytes, ")"); - TAOS_RES *res = taos_query(info->taos->id, result); + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -2436,7 +2436,13 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + int64_t rid = *(int64_t*)taos; + STscObj* pTscObj = acquireTscObj(rid); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; uError("SML:taos_schemaless_insert invalid taos"); @@ -2445,7 +2451,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); if(!request){ - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); uError("SML:taos_schemaless_insert error request is null"); return NULL; } @@ -2533,6 +2539,6 @@ end: // ((STscObj *)taos)->schemalessType = 0; pTscObj->schemalessType = 1; uDebug("resultend:%s", request->msgBuf); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); return (TAOS_RES*)request; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 5e1ef23f1c..91ef292360 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -107,13 +107,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); - if (code != 0) { - if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr()); - vmSendRsp(pMsg, code); - } - + int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); // no response here dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 7e31cc3144..1c5d74bb94 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -251,9 +251,6 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { - if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) { - return false; - } return true; } else { return false; @@ -264,7 +261,7 @@ int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND"; + rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; @@ -298,7 +295,7 @@ int32_t dmInitServer(SDnode *pDnode) { SRpcInit rpcInit = {0}; strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn)); rpcInit.localPort = tsServerPort; - rpcInit.label = "DND"; + rpcInit.label = "DND-S"; rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = tsMaxShellConns; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cb2f2aef07..fa9c8606bb 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -527,6 +527,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; + if (pMsg->msgType == TDMT_VND_QUERY || pMsg->msgType == TDMT_VND_QUERY_CONTINUE || + pMsg->msgType == TDMT_VND_QUERY_HEARTBEAT || pMsg->msgType == TDMT_VND_FETCH || + pMsg->msgType == TDMT_VND_DROP_TASK) { + return 0; + } if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d324a76438..e9b64389aa 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -73,7 +73,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) { vInfo("vgId:%d, set standby success", TD_VID(pVnode)); return 0; } else { - vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr()); + vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr()); return -1; } } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2b376bbce8..3a7ad4a2d6 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1127,12 +1127,14 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_ } *num = taosHashGetSize(pCtg->userCache); - if (*num > 0) { - *users = taosMemoryCalloc(*num, sizeof(SUserAuthVersion)); - if (NULL == *users) { - ctgError("calloc %d userAuthVersion failed", *num); - CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); - } + if (*num <= 0) { + CTG_API_LEAVE(TSDB_CODE_SUCCESS); + } + + *users = taosMemoryCalloc(*num, sizeof(SUserAuthVersion)); + if (NULL == *users) { + ctgError("calloc %d userAuthVersion failed", *num); + CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); } uint32_t i = 0; @@ -1144,6 +1146,11 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_ (*users)[i].user[len] = 0; (*users)[i].version = pAuth->version; ++i; + if (i >= *num) { + taosHashCancelIterate(pCtg->userCache, pAuth); + break; + } + pAuth = taosHashIterate(pCtg->userCache, pAuth); } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 6184d13533..455f2bd6a7 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -825,6 +825,9 @@ _return: qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code)); pJob->jobResCode = code; + + //taosSsleep(2); + //qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); taosAsyncExec(ctgCallUserCb, pJob, NULL); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 9c9aa4001c..77c1e5b8b1 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1083,7 +1083,7 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qError("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qDebug("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1536,8 +1536,6 @@ void ctgClearAllInstance(void) { pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); } - - taosHashClear(gCtgMgmt.pCluster); } void ctgFreeAllInstance(void) { @@ -1566,27 +1564,27 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { SCatalog* pCtg = msg->pCtg; if (NULL == dbInfo->vgHash) { - return TSDB_CODE_SUCCESS; + goto _return; } if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } bool newAdded = false; SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SCtgDBCache *dbCache = NULL; - CTG_ERR_RET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); + CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); if (NULL == dbCache) { ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } SCtgVgCache *vgCache = &dbCache->vgCache; - CTG_ERR_RET(ctgWLockVgInfo(msg->pCtg, dbCache)); + CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache)); if (vgCache->vgInfo) { SDBVgInfo *vgInfo = vgCache->vgInfo; @@ -1595,14 +1593,14 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion); ctgWUnlockVgInfo(dbCache); - return TSDB_CODE_SUCCESS; + goto _return; } if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) { ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgWUnlockVgInfo(dbCache); - return TSDB_CODE_SUCCESS; + goto _return; } ctgFreeVgInfo(vgInfo); diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 2cb6f0209f..9195747bee 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "catalogInt.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {.lockEnable = true, .apiEnable = true}; +SCtgDebug gCTGDebug = {0}; void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { ASSERT(*(int32_t*)param == 1); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 185c244e61..044ec90a7f 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1770,7 +1770,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN } int32_t code = TSDB_CODE_SUCCESS; - SSchema* pSchema = pDataBlock->pTableMeta->schema; + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); bool isJson = false; STag* pTag = NULL; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 75a9bc3809..cef8ff7075 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -936,6 +936,25 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) { return DEAL_RES_ERROR; } +int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) { + SSDataBlock* pb = taosArrayGetP(pBlockList, 0); + SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam)); + if (NULL == pLeft) { + sclError("calloc %d failed", (int32_t)sizeof(SScalarParam)); + SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pLeft->numOfRows = pb->info.rows; + colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows); + + _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN); + OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC); + + taosMemoryFree(pLeft); + + return TSDB_CODE_SUCCESS; +} + int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { if (NULL == pNode) { SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -983,9 +1002,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); - colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); - pDst->numOfRows = res->numOfRows; + if (1 == res->numOfRows) { + SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); + } else { + colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); + colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); + pDst->numOfRows = res->numOfRows; + } + taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 6b2570c5b7..5998ab6965 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -99,8 +99,8 @@ typedef struct SSchStat { typedef struct SSchResInfo { SQueryResult* queryRes; void** fetchRes; - schedulerExecCallback execFp; - schedulerFetchCallback fetchFp; + schedulerExecFp execFp; + schedulerFetchFp fetchFp; void* userParam; } SSchResInfo; @@ -204,39 +204,38 @@ typedef struct { } SSchOpStatus; typedef struct SSchJob { - int64_t refId; - uint64_t queryId; - SSchJobAttr attr; - int32_t levelNum; - int32_t taskNum; - SRequestConnInfo conn; - SArray *nodeList; // qnode/vnode list, SArray - SArray *levels; // starting from 0. SArray - SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler + int64_t refId; + uint64_t queryId; + SSchJobAttr attr; + int32_t levelNum; + int32_t taskNum; + SRequestConnInfo conn; + SArray *nodeList; // qnode/vnode list, SArray + SArray *levels; // starting from 0. SArray + SQueryPlan *pDag; - SArray *dataSrcTasks; // SArray - int32_t levelIdx; - SEpSet dataSrcEps; - SHashObj *taskList; - SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* - SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* - SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* - SHashObj *flowCtrl; // key is ep, element is SSchFlowControl + SArray *dataSrcTasks; // SArray + int32_t levelIdx; + SEpSet dataSrcEps; + SHashObj *taskList; + SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask* + SHashObj *flowCtrl; // key is ep, element is SSchFlowControl - SExplainCtx *explainCtx; - int8_t status; - SQueryNodeAddr resNode; - tsem_t rspSem; - SSchOpStatus opStatus; - bool *reqKilled; - SSchTask *fetchTask; - int32_t errCode; - SRWLatch resLock; - SQueryExecRes execRes; - void *resData; //TODO free it or not - int32_t resNumOfRows; - SSchResInfo userRes; - const char *sql; + SExplainCtx *explainCtx; + int8_t status; + SQueryNodeAddr resNode; + tsem_t rspSem; + SSchOpStatus opStatus; + schedulerChkKillFp chkKillFp; + void* chkKillParam; + SSchTask *fetchTask; + int32_t errCode; + SRWLatch resLock; + SQueryExecRes execRes; + void *resData; //TODO free it or not + int32_t resNumOfRows; + SSchResInfo userRes; + const char *sql; SQueryProfileSummary summary; } SSchJob; @@ -334,13 +333,13 @@ extern SSchedulerMgmt schMgmt; #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); -void schCleanClusterHb(void* pTrans); +void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); +void schCleanClusterHb(void* pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); -void schFreeFlowCtrl(SSchJob *pJob); +void schFreeFlowCtrl(SSchJob *pJob); int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); @@ -351,38 +350,40 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction); int32_t schCloneSMsgSendInfo(void *src, void **dst); int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob); -void schFreeJobImpl(void *job); +void schFreeJobImpl(void *job); int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx); int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask); int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans); int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code); -void schFreeRpcCtx(SRpcCtx *pCtx); +void schFreeRpcCtx(SRpcCtx *pCtx); int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp); -bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus); +bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus); int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask); int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); -void schProcessOnDataFetched(SSchJob *job); +void schProcessOnDataFetched(SSchJob *job); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); -void schFreeRpcCtxVal(const void *arg); +void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx); int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync); -int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bool sync); +int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync); int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus); int32_t schCancelJob(SSchJob *pJob); int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode); uint64_t schGenTaskId(void); -void schCloseJobRef(void); +void schCloseJobRef(void); int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes); int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob); int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); -void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo); -char* schGetOpStr(SCH_OP_TYPE type); +void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo); +char* schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); +int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob); +int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 72809e1f93..bd3a944c3f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -42,7 +42,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } -int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, bool syncSchedule) { +int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -54,12 +54,15 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b pJob->attr.explainMode = pReq->pDag->explainInfo.mode; pJob->conn = *pReq->pConn; pJob->sql = pReq->sql; - pJob->reqKilled = pReq->reqKilled; - pJob->userRes.queryRes = pRes; - pJob->userRes.execFp = pReq->fp; - pJob->userRes.userParam = pReq->cbParam; - - if (pReq->pNodeList != NULL) { + pJob->pDag = pReq->pDag; + pJob->chkKillFp = pReq->chkKillFp; + pJob->chkKillParam = pReq->chkKillParam; + pJob->userRes.execFp = pReq->execFp; + pJob->userRes.userParam = pReq->execParam; + + if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { + qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); + } else { pJob->nodeList = taosArrayDup(pReq->pNodeList); } @@ -83,20 +86,6 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->succTasks = - taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->succTasks) { - SCH_JOB_ELOG("taosHashInit %d succTasks failed", pReq->pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pJob->failTasks = - taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->failTasks) { - SCH_JOB_ELOG("taosHashInit %d failTasks failed", pReq->pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - tsem_init(&pJob->rspSem, 0, 0); refId = taosAddRef(schMgmt.jobRef, pJob); @@ -194,7 +183,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { *pStatus = status; } - if (*pJob->reqKilled) { + if ((*pJob->chkKillFp)(pJob->chkKillParam)) { schUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); @@ -547,8 +536,6 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pJob->levelNum = levelNum; pJob->levelIdx = levelNum - 1; - pJob->subPlans = pDag->pSubplans; - SSchLevel level = {0}; SNodeListNode *plans = NULL; int32_t taskNum = 0; @@ -724,6 +711,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +/* int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); @@ -801,6 +789,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { return TSDB_CODE_SUCCESS; } +*/ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { int8_t status = 0; @@ -1047,9 +1036,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); - if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); - } else { + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -1115,8 +1102,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_LOG_TASK_END_TS(pTask); - SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); @@ -1150,8 +1135,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; - SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved)); - SCH_RET(schProcessOnJobPartialSuccess(pJob)); } @@ -1466,8 +1449,8 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { void schDropJobAllTasks(SSchJob *pJob) { schDropTaskInHashList(pJob, pJob->execTasks); - schDropTaskInHashList(pJob, pJob->succTasks); - schDropTaskInHashList(pJob, pJob->failTasks); +// schDropTaskInHashList(pJob, pJob->succTasks); +// schDropTaskInHashList(pJob, pJob->failTasks); } int32_t schCancelJob(SSchJob *pJob) { @@ -1491,8 +1474,6 @@ void schFreeJobImpl(void *job) { schDropJobAllTasks(pJob); - pJob->subPlans = NULL; // it is a reference to pDag->pSubplans - int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -1509,8 +1490,8 @@ void schFreeJobImpl(void *job) { schFreeFlowCtrl(pJob); taosHashCleanup(pJob->execTasks); - taosHashCleanup(pJob->failTasks); - taosHashCleanup(pJob->succTasks); +// taosHashCleanup(pJob->failTasks); +// taosHashCleanup(pJob->succTasks); taosHashCleanup(pJob->taskList); taosArrayDestroy(pJob->levels); @@ -1521,6 +1502,8 @@ void schFreeJobImpl(void *job) { destroyQueryExecRes(&pJob->execRes); + qDestroyQueryPlan(pJob->pDag); + taosMemoryFreeClear(pJob->userRes.queryRes); taosMemoryFreeClear(pJob->resData); taosMemoryFree(pJob); @@ -1533,88 +1516,11 @@ void schFreeJobImpl(void *job) { } } -int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bool sync) { - if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { - qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); - } - - int32_t code = 0; - SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(pReq, &pJob, pRes, sync)); - - qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId); - *job = pJob->refId; - - SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_EXEC, sync)); - - code = schLaunchJob(pJob); - - if (sync) { - SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - tsem_wait(&pJob->rspSem); - - schEndOperation(pJob); - } else if (code) { - schPostJobRes(pJob, SCH_OP_EXEC); - } - - SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); - - schReleaseJob(pJob->refId); - - SCH_RET(code); - -_return: - - if (!sync) { - pReq->fp(NULL, pReq->cbParam, code); - } - - schReleaseJob(pJob->refId); - - SCH_RET(code); -} - -int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) { - int32_t code = 0; - - *pJob = 0; - - if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) { - SCH_ERR_JRET(schExecStaticExplainJob(pReq, pJob, true)); - } else { - SCH_ERR_JRET(schExecJobImpl(pReq, pJob, NULL, true)); - } - -_return: - - if (*pJob) { - SSchJob *job = schAcquireJob(*pJob); - schSetJobQueryRes(job, pRes); - schReleaseJob(*pJob); - } - - return code; -} - -int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) { - int32_t code = 0; - - *pJob = 0; - - if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) { - SCH_RET(schExecStaticExplainJob(pReq, pJob, false)); - } - - SCH_ERR_RET(schExecJobImpl(pReq, pJob, NULL, false)); - - return code; -} - -int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) { +int32_t schLaunchStaticExplainJob(SSchedulerReq *pReq, SSchJob *pJob, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pReq->pDag->queryId); int32_t code = 0; +/* SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); if (NULL == pJob) { qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob)); @@ -1625,10 +1531,10 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) { pJob->sql = pReq->sql; pJob->reqKilled = pReq->reqKilled; + pJob->pDag = pReq->pDag; pJob->attr.queryJob = true; pJob->attr.explainMode = pReq->pDag->explainInfo.mode; pJob->queryId = pReq->pDag->queryId; - pJob->subPlans = pReq->pDag->pSubplans; pJob->userRes.execFp = pReq->fp; pJob->userRes.userParam = pReq->cbParam; @@ -1637,11 +1543,14 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) { code = schBeginOperation(pJob, SCH_OP_EXEC, sync); if (code) { pReq->fp(NULL, pReq->cbParam, code); + schFreeJobImpl(pJob); SCH_ERR_RET(code); } - +*/ + SCH_ERR_JRET(qExecStaticExplain(pReq->pDag, (SRetrieveTableRsp **)&pJob->resData)); +/* int64_t refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); @@ -1656,10 +1565,10 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) { pJob->refId = refId; SCH_JOB_DLOG("job refId:0x%" PRIx64, pJob->refId); +*/ pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; - *job = pJob->refId; SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); if (!sync) { @@ -1668,7 +1577,7 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) { schEndOperation(pJob); } - schReleaseJob(pJob->refId); +// schReleaseJob(pJob->refId); SCH_RET(code); @@ -1676,7 +1585,7 @@ _return: schEndOperation(pJob); if (!sync) { - pReq->fp(NULL, pReq->cbParam, code); + pReq->execFp(NULL, pReq->execParam, code); } schFreeJobImpl(pJob); @@ -1715,3 +1624,39 @@ int32_t schAsyncFetchRows(SSchJob *pJob) { } +int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync) { + int32_t code = 0; + + qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId); + + SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_EXEC, sync)); + + if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) { + code = schLaunchStaticExplainJob(pReq, pJob, sync); + } else { + code = schLaunchJob(pJob); + if (sync) { + SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + tsem_wait(&pJob->rspSem); + + schEndOperation(pJob); + } else if (code) { + schPostJobRes(pJob, SCH_OP_EXEC); + } + } + + SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); + + SCH_RET(code); + +_return: + + if (!sync) { + pReq->execFp(NULL, pReq->execParam, code); + } + + SCH_RET(code); +} + + + diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 57a405ffa3..74ddc89b40 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -67,33 +67,53 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) { +int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId, SQueryResult *pRes) { qDebug("scheduler sync exec job start"); + + int32_t code = 0; + SSchJob *pJob = NULL; + SCH_ERR_JRET(schInitJob(pReq, &pJob)); + + *pJobId = pJob->refId; - if (NULL == pReq || NULL == pJob || NULL == pRes) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - SCH_RET(schExecJob(pReq, pJob, pRes)); -} - -int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) { - qDebug("scheduler async exec job start"); - - int32_t code = 0; - if (NULL == pReq || NULL == pJob) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - schAsyncExecJob(pReq, pJob); + SCH_ERR_JRET(schExecJobImpl(pReq, pJob, true)); _return: - if (code != TSDB_CODE_SUCCESS) { - pReq->fp(NULL, pReq->cbParam, code); + if (code && NULL == pJob) { + qDestroyQueryPlan(pReq->pDag); + } + + if (pJob) { + schSetJobQueryRes(pJob, pRes); + schReleaseJob(pJob->refId); } - SCH_RET(code); + return code; +} + +int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJobId) { + qDebug("scheduler async exec job start"); + + int32_t code = 0; + SSchJob *pJob = NULL; + SCH_ERR_JRET(schInitJob(pReq, &pJob)); + + *pJobId = pJob->refId; + + SCH_ERR_JRET(schExecJobImpl(pReq, pJob, false)); + +_return: + + if (code && NULL == pJob) { + qDestroyQueryPlan(pReq->pDag); + } + + if (pJob) { + schReleaseJob(pJob->refId); + } + + return code; } int32_t schedulerFetchRows(int64_t job, void **pData) { @@ -120,7 +140,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_RET(code); } -void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { +void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param) { qDebug("scheduler async fetch rows start"); int32_t code = 0; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index e5cc3cd481..b372ee3ead 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -511,8 +511,8 @@ void* schtRunJobThread(void *aa) { req.pNodeList = qnodeList; req.pDag = &dag; req.sql = "select * from tb"; - req.fp = schtQueryCb; - req.cbParam = &queryDone; + req.execFp = schtQueryCb; + req.execParam = &queryDone; code = schedulerAsyncExecJob(&req, &queryJobRefId); assert(code == 0); @@ -663,8 +663,8 @@ TEST(queryTest, normalCase) { req.pNodeList = qnodeList; req.pDag = &dag; req.sql = "select * from tb"; - req.fp = schtQueryCb; - req.cbParam = &queryDone; + req.execFp = schtQueryCb; + req.execParam = &queryDone; code = schedulerAsyncExecJob(&req, &job); ASSERT_EQ(code, 0); @@ -767,8 +767,8 @@ TEST(queryTest, readyFirstCase) { req.pNodeList = qnodeList; req.pDag = &dag; req.sql = "select * from tb"; - req.fp = schtQueryCb; - req.cbParam = &queryDone; + req.execFp = schtQueryCb; + req.execParam = &queryDone; code = schedulerAsyncExecJob(&req, &job); ASSERT_EQ(code, 0); @@ -874,8 +874,8 @@ TEST(queryTest, flowCtrlCase) { req.pNodeList = qnodeList; req.pDag = &dag; req.sql = "select * from tb"; - req.fp = schtQueryCb; - req.cbParam = &queryDone; + req.execFp = schtQueryCb; + req.execParam = &queryDone; code = schedulerAsyncExecJob(&req, &job); ASSERT_EQ(code, 0); @@ -987,8 +987,8 @@ TEST(insertTest, normalCase) { req.pNodeList = qnodeList; req.pDag = &dag; req.sql = "insert into tb values(now,1)"; - req.fp = schtQueryCb; - req.cbParam = NULL; + req.execFp = schtQueryCb; + req.execParam = NULL; code = schedulerExecJob(&req, &insertJobRefId, &res); ASSERT_EQ(code, 0); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 8183b7fd9f..b06d541b75 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -253,7 +253,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); do { \ if (id > 0) { \ tTrace("handle step1"); \ - SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ + SExHandle* exh2 = transAcquireExHandle(id); \ if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ exh2 ? exh2->refId : 0, id); \ @@ -261,7 +261,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); } \ } else if (id == 0) { \ tTrace("handle step2"); \ - SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ + SExHandle* exh2 = transAcquireExHandle(id); \ if (exh2 == NULL || id == exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \ exh2 ? exh2->refId : 0); \ @@ -274,6 +274,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); goto _return2; \ } \ } while (0) + int transInitBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); @@ -387,13 +388,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b); */ void transThreadOnce(); -void transInitEnv(); +void transInit(); +void transCleanup(); + int32_t transOpenExHandleMgt(int size); -void transCloseExHandleMgt(int32_t mgt); -int64_t transAddExHandle(int32_t mgt, void* p); -int32_t transRemoveExHandle(int32_t mgt, int64_t refId); -SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId); -int32_t transReleaseExHandle(int32_t mgt, int64_t refId); +void transCloseExHandleMgt(); +int64_t transAddExHandle(void* p); +int32_t transRemoveExHandle(int64_t refId); +SExHandle* transAcquireExHandle(int64_t refId); +int32_t transReleaseExHandle(int64_t refId); void transDestoryExHandle(void* handle); #ifdef __cplusplus diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index bd8195462e..936cfe870d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -36,7 +36,7 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { return 0; } void* rpcOpen(const SRpcInit* pInit) { - transInitEnv(); + rpcInit(); SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { @@ -82,8 +82,8 @@ void rpcClose(void* arg) { tInfo("start to close rpc"); SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); - transCloseExHandleMgt(pRpc->refMgt); taosMemoryFree(pRpc); + tInfo("finish to close rpc"); return; } @@ -141,7 +141,9 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } -int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } + +int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; } + void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); @@ -165,11 +167,11 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { } int32_t rpcInit() { - // impl later + transInit(); return 0; } void rpcCleanup(void) { - // impl later + transCleanup(); return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6390855e43..2836bc2949 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -15,9 +15,6 @@ #ifdef USE_UV #include "transComm.h" -static int32_t transSCliInst = 0; -static int32_t refMgt = 0; - typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -320,8 +317,9 @@ void cliHandleResp(SCliConn* conn) { if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); - pCtx = pMsg->ctx; - transMsg.info.ahandle = pCtx->ahandle; + + pCtx = pMsg ? pMsg->ctx : NULL; + transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); } else { uint64_t ahandle = (uint64_t)pHead->ahandle; @@ -467,8 +465,7 @@ void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { - queue* h = QUEUE_HEAD(&connList->conn); - // QUEUE_REMOVE(h); + queue* h = QUEUE_HEAD(&connList->conn); SCliConn* c = QUEUE_DATA(h, SCliConn, conn); cliDestroyConn(c, true); } @@ -504,12 +501,13 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { } static void allocConnRef(SCliConn* conn, bool update) { if (update) { - transRemoveExHandle(refMgt, conn->refId); + transRemoveExHandle(conn->refId); + conn->refId = -1; } SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(refMgt, exh); + exh->refId = transAddExHandle(exh); conn->refId = exh->refId; } static void addConnToPool(void* pool, SCliConn* conn) { @@ -603,14 +601,26 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); QUEUE_INIT(&conn->conn); - transRemoveExHandle(refMgt, conn->refId); + transRemoveExHandle(conn->refId); + conn->refId = -1; + if (clear) { - uv_close((uv_handle_t*)conn->stream, cliDestroy); + if (!uv_is_closing((uv_handle_t*)conn->stream)) { + uv_close((uv_handle_t*)conn->stream, cliDestroy); + } else { + cliDestroy((uv_handle_t*)conn->stream); + } } } static void cliDestroy(uv_handle_t* handle) { + if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { + return; + } + SCliConn* conn = handle->data; + transRemoveExHandle(conn->refId); taosMemoryFree(conn->ip); + conn->stream->data = NULL; taosMemoryFree(conn->stream); transCtxCleanup(&conn->ctx); transQueueDestroy(&conn->cliMsgs); @@ -736,7 +746,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { } static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)(pMsg->msg.info.handle); - SExHandle* exh = transAcquireExHandle(refMgt, refId); + SExHandle* exh = transAcquireExHandle(refId); if (exh == NULL) { tDebug("%" PRId64 " already release", refId); } @@ -762,7 +772,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SCliConn* conn = NULL; int64_t refId = (int64_t)(pMsg->msg.info.handle); if (refId != 0) { - SExHandle* exh = transAcquireExHandle(refMgt, refId); + SExHandle* exh = transAcquireExHandle(refId); if (exh == NULL) { *ignore = true; destroyCmsg(pMsg); @@ -770,7 +780,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { // assert(0); } else { conn = exh->handle; - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); } return conn; }; @@ -900,10 +910,6 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } cli->pThreadObj[i] = pThrd; } - int ref = atomic_add_fetch_32(&transSCliInst, 1); - if (ref == 1) { - refMgt = transOpenExHandleMgt(50000); - } return cli; } @@ -972,7 +978,7 @@ void cliSendQuit(SCliThrd* thrd) { } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { - uv_close(handle, NULL); + uv_close(handle, cliDestroy); } } @@ -1110,10 +1116,6 @@ void transCloseClient(void* arg) { } taosMemoryFree(cli->pThreadObj); taosMemoryFree(cli); - int ref = atomic_sub_fetch_32(&transSCliInst, 1); - if (ref == 0) { - transCloseExHandleMgt(refMgt); - } } void transRefCliHandle(void* handle) { if (handle == NULL) { @@ -1135,12 +1137,12 @@ void transUnrefCliHandle(void* handle) { } SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { SCliThrd* pThrd = NULL; - SExHandle* exh = transAcquireExHandle(refMgt, handle); + SExHandle* exh = transAcquireExHandle(handle); if (exh == NULL) { return NULL; } pThrd = exh->pThrd; - transReleaseExHandle(refMgt, handle); + transReleaseExHandle(handle); return pThrd; } SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fbe0951a46..85a45ec921 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -16,7 +16,9 @@ #include "transComm.h" -// static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; +static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; + +static int32_t refMgt; int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; @@ -478,35 +480,47 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { return true; } -void transInitEnv() { - // +static void transInitEnv() { + refMgt = transOpenExHandleMgt(50000); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } +static void transDestroyEnv() { + // close ref + transCloseExHandleMgt(refMgt); +} +void transInit() { + // init env + taosThreadOnce(&transModuleInit, transInitEnv); +} +void transCleanup() { + // clean env + transDestroyEnv(); +} int32_t transOpenExHandleMgt(int size) { // added into once later return taosOpenRef(size, transDestoryExHandle); } -void transCloseExHandleMgt(int32_t mgt) { +void transCloseExHandleMgt() { // close ref - taosCloseRef(mgt); + taosCloseRef(refMgt); } -int64_t transAddExHandle(int32_t mgt, void* p) { +int64_t transAddExHandle(void* p) { // acquire extern handle - return taosAddRef(mgt, p); + return taosAddRef(refMgt, p); } -int32_t transRemoveExHandle(int32_t mgt, int64_t refId) { +int32_t transRemoveExHandle(int64_t refId) { // acquire extern handle - return taosRemoveRef(mgt, refId); + return taosRemoveRef(refMgt, refId); } -SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId) { +SExHandle* transAcquireExHandle(int64_t refId) { // acquire extern handle - return (SExHandle*)taosAcquireRef(mgt, refId); + return (SExHandle*)taosAcquireRef(refMgt, refId); } -int32_t transReleaseExHandle(int32_t mgt, int64_t refId) { +int32_t transReleaseExHandle(int64_t refId) { // release extern handle - return taosReleaseRef(mgt, refId); + return taosReleaseRef(refMgt, refId); } void transDestoryExHandle(void* handle) { if (handle == NULL) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c190950a17..dfc6de6442 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -19,9 +19,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; -static char* notify = "a"; -static int32_t tranSSvrInst = 0; -static int32_t refMgt = 0; +static char* notify = "a"; typedef struct { int notifyCount; // @@ -142,7 +140,6 @@ static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister, NULL}; -static int32_t exHandlesMgt; static void uvDestroyConn(uv_handle_t* handle); @@ -265,7 +262,7 @@ static void uvHandleReq(SSvrConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId); + transMsg.info.handle = (void*)transAcquireExHandle(pConn->refId); transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; @@ -283,7 +280,7 @@ static void uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = ntohs(pConn->addr.sin_port); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - transReleaseExHandle(refMgt, pConn->refId); + transReleaseExHandle(pConn->refId); STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -336,7 +333,6 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnSendCb(uv_write_t* req, int status) { SSvrConn* conn = req->data; - // transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { @@ -366,6 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) { } } } + transUnrefSrvHandle(conn); } else { tError("conn %p failed to write data, %s", conn, uv_err_name(status)); conn->broken = true; @@ -428,11 +425,15 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { } static void uvStartSendRespInternal(SSvrMsg* smsg) { + SSvrConn* pConn = smsg->pConn; + if (pConn->broken) { + return; + } + uv_buf_t wb; uvPrepareSendData(smsg, &wb); - SSvrConn* pConn = smsg->pConn; - // uv_timer_stop(&pConn->pTimer); + transRefSrvHandle(pConn); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSvrMsg* smsg) { @@ -507,15 +508,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(refMgt, refId); + SExHandle* exh2 = transAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -757,8 +758,8 @@ static SSvrConn* createConn(void* hThrd) { SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = transAddExHandle(refMgt, exh); - transAcquireExHandle(refMgt, exh->refId); + exh->refId = transAddExHandle(exh); + transAcquireExHandle(exh->refId); pConn->refId = exh->refId; transRefSrvHandle(pConn); @@ -773,11 +774,12 @@ static void destroyConn(SSvrConn* conn, bool clear) { transDestroyBuffer(&conn->readBuf); if (clear) { - tTrace("conn %p to be destroyed", conn); - // uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t)); - uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); - // uv_close(conn->pTcp) - // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); + if (!uv_is_closing((uv_handle_t*)conn->pTcp)) { + tTrace("conn %p to be destroyed", conn); + uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); + } else { + uvDestroyConn((uv_handle_t*)conn->pTcp); + } } } static void destroyConnRegArg(SSvrConn* conn) { @@ -787,14 +789,14 @@ static void destroyConnRegArg(SSvrConn* conn) { } } static int reallocConnRef(SSvrConn* conn) { - transReleaseExHandle(refMgt, conn->refId); - transRemoveExHandle(refMgt, conn->refId); + transReleaseExHandle(conn->refId); + transRemoveExHandle(conn->refId); // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(refMgt, exh); - transAcquireExHandle(refMgt, exh->refId); + exh->refId = transAddExHandle(exh); + transAcquireExHandle(exh->refId); conn->refId = exh->refId; return 0; @@ -806,11 +808,10 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - transReleaseExHandle(refMgt, conn->refId); - transRemoveExHandle(refMgt, conn->refId); + transReleaseExHandle(conn->refId); + transRemoveExHandle(conn->refId); tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); - // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); @@ -855,12 +856,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); - // taosThreadOnce(&transModuleInit, uvInitEnv); - int ref = atomic_add_fetch_32(&tranSSvrInst, 1); - if (ref == 1) { - refMgt = transOpenExHandleMgt(50000); - } - assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS char pipeName[64]; @@ -1012,11 +1007,6 @@ void transCloseServer(void* arg) { taosMemoryFree(srv->pipe); taosMemoryFree(srv); - - int ref = atomic_sub_fetch_32(&tranSSvrInst, 1); - if (ref == 0) { - transCloseExHandleMgt(refMgt); - } } void transRefSrvHandle(void* handle) { @@ -1055,11 +1045,11 @@ void transReleaseSrvHandle(void* handle) { tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return1: tTrace("handle %p failed to send to release handle", exh); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return2: tTrace("handle %p failed to send to release handle", exh); @@ -1084,12 +1074,12 @@ void transSendResponse(const STransMsg* msg) { STraceId* trace = (STraceId*)&msg->info.traceId; tGTrace("conn %p start to send resp (1/2)", exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return1: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return2: tTrace("handle %p failed to send resp", exh); @@ -1113,13 +1103,13 @@ void transRegisterMsg(const STransMsg* msg) { tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return1: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return2: tTrace("handle %p failed to register brokenlink", exh); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 0e7030b230..b31c39718c 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -915,7 +915,7 @@ int32_t prepareInsertData(BindData *data) { data->colNum = 0; data->colTypes = taosMemoryCalloc(30, sizeof(int32_t)); data->sql = taosMemoryCalloc(1, 1024); - data->pBind = taosMemoryCalloc((allRowNum/gCurCase->bindRowNum)*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); + data->pBind = taosMemoryCalloc((int32_t)(allRowNum/gCurCase->bindRowNum)*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); data->pTags = taosMemoryCalloc(gCurCase->tblNum*gCurCase->bindTagNum, sizeof(TAOS_MULTI_BIND)); data->tsData = taosMemoryMalloc(allRowNum * sizeof(int64_t)); data->boolData = taosMemoryMalloc(allRowNum * sizeof(bool)); @@ -932,7 +932,7 @@ int32_t prepareInsertData(BindData *data) { data->binaryData = taosMemoryMalloc(allRowNum * gVarCharSize); data->binaryLen = taosMemoryMalloc(allRowNum * sizeof(int32_t)); if (gCurCase->bindNullNum) { - data->isNull = taosMemoryCalloc(allRowNum, sizeof(char)); + data->isNull = taosMemoryCalloc((int32_t)allRowNum, sizeof(char)); } for (int32_t i = 0; i < allRowNum; ++i) { @@ -950,7 +950,7 @@ int32_t prepareInsertData(BindData *data) { data->doubleData[i] = (double)(i+1); memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen); if (gCurCase->bindNullNum) { - data->isNull[i] = i % 2; + data->isNull[i] = (char)(i % 2); } data->binaryLen[i] = gVarCharLen; } @@ -979,7 +979,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) { data->colNum = 0; data->colTypes = taosMemoryCalloc(30, sizeof(int32_t)); data->sql = taosMemoryCalloc(1, 1024); - data->pBind = taosMemoryCalloc(bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); + data->pBind = taosMemoryCalloc((int32_t)bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); data->tsData = taosMemoryMalloc(bindNum * sizeof(int64_t)); data->boolData = taosMemoryMalloc(bindNum * sizeof(bool)); data->tinyData = taosMemoryMalloc(bindNum * sizeof(int8_t)); @@ -995,7 +995,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) { data->binaryData = taosMemoryMalloc(bindNum * gVarCharSize); data->binaryLen = taosMemoryMalloc(bindNum * sizeof(int32_t)); if (gCurCase->bindNullNum) { - data->isNull = taosMemoryCalloc(bindNum, sizeof(char)); + data->isNull = taosMemoryCalloc((int32_t)bindNum, sizeof(char)); } for (int32_t i = 0; i < bindNum; ++i) { @@ -1013,7 +1013,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) { data->doubleData[i] = (double)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen); if (gCurCase->bindNullNum) { - data->isNull[i] = i % 2; + data->isNull[i] = (char)(i % 2); } data->binaryLen[i] = gVarCharLen; } @@ -1036,7 +1036,7 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { data->colNum = 0; data->colTypes = taosMemoryCalloc(30, sizeof(int32_t)); data->sql = taosMemoryCalloc(1, 1024); - data->pBind = taosMemoryCalloc(bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); + data->pBind = taosMemoryCalloc((int32_t)bindNum*gCurCase->bindColNum, sizeof(TAOS_MULTI_BIND)); data->tsData = taosMemoryMalloc(bindNum * sizeof(int64_t)); data->boolData = taosMemoryMalloc(bindNum * sizeof(bool)); data->tinyData = taosMemoryMalloc(bindNum * sizeof(int8_t)); @@ -1052,7 +1052,7 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { data->binaryData = taosMemoryMalloc(bindNum * gVarCharSize); data->binaryLen = taosMemoryMalloc(bindNum * sizeof(int32_t)); if (gCurCase->bindNullNum) { - data->isNull = taosMemoryCalloc(bindNum, sizeof(char)); + data->isNull = taosMemoryCalloc((int32_t)bindNum, sizeof(char)); } for (int32_t i = 0; i < bindNum; ++i) { @@ -1070,7 +1070,7 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { data->doubleData[i] = (double)(tblIdx*gCurCase->rowNum + rand() % gCurCase->rowNum); memset(data->binaryData + gVarCharSize * i, 'a'+i%26, gVarCharLen); if (gCurCase->bindNullNum) { - data->isNull[i] = i % 2; + data->isNull[i] = (char)(i % 2); } data->binaryLen[i] = gVarCharLen; } @@ -1279,7 +1279,7 @@ void bpCheckQueryResult(TAOS_STMT *stmt, TAOS *taos, char *stmtSql, TAOS_MULTI_B } memcpy(&sql[len], p, (int64_t)s - (int64_t)p); - len += (int64_t)s - (int64_t)p; + len += (int32_t)((int64_t)s - (int64_t)p); if (bind[i].is_null && bind[i].is_null[0]) { bpAppendValueString(sql, TSDB_DATA_TYPE_NULL, NULL, 0, &len); @@ -2669,7 +2669,7 @@ int main(int argc, char *argv[]) { TAOS *taos = NULL; - srand(time(NULL)); + srand((unsigned int)time(NULL)); // connect to server if (argc < 2) { diff --git a/tests/script/api/makefile b/tests/script/api/makefile index 46a172cc3a..1f725f17c9 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -12,6 +12,7 @@ all: $(TARGET) exe: gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS) + gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS) clean: rm $(ROOT)batchprepare diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c new file mode 100644 index 0000000000..4c7964c983 --- /dev/null +++ b/tests/script/api/stopquery.c @@ -0,0 +1,434 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS asynchronous API example +// this example opens multiple tables, insert/retrieve multiple tables +// it is used by TAOS internally for one performance testing +// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos + +#include +#include +#include +#include +#include +#include +#include "taos.h" + + +int points = 5; +int numOfTables = 3; +int tablesInsertProcessed = 0; +int tablesSelectProcessed = 0; +int64_t st, et; + +char hostName[128]; +char dbName[128]; +char tbName[128]; +int32_t runTimes = 10000; + +typedef struct { + int id; + TAOS *taos; + char name[16]; + time_t timeStamp; + int value; + int rowsInserted; + int rowsTried; + int rowsRetrieved; +} STable; + +typedef struct SSP_CB_PARAM { + TAOS *taos; + bool fetch; + int32_t *end; +} SSP_CB_PARAM; + +#define CASE_ENTER() do { printf("enter case %s\n", __FUNCTION__); } while (0) +#define CASE_LEAVE() do { printf("leave case %s, runTimes %d\n", __FUNCTION__, runTimes); } while (0) + +static void sqExecSQL(TAOS *taos, char *command) { + int i; + int32_t code = -1; + + TAOS_RES *pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (code != 0) { + fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); + taos_cleanup(); + exit(EXIT_FAILURE); + } + + taos_free_result(pSql); +} + +static void sqExecSQLE(TAOS *taos, char *command) { + int i; + int32_t code = -1; + + TAOS_RES *pSql = taos_query(taos, command); + + taos_free_result(pSql); +} + + +void sqExit(char* prefix, const char* errMsg) { + fprintf(stderr, "%s error: %s\n", prefix, errMsg); + exit(1); +} + +void sqStopFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + taos_stop_query(pRes); + taos_free_result(pRes); + + *qParam->end = 1; +} + +void sqStopQueryCb(void *param, TAOS_RES *pRes, int code) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (code == 0 && pRes) { + if (qParam->fetch) { + taos_fetch_rows_a(pRes, sqStopFetchCb, param); + } else { + taos_stop_query(pRes); + taos_free_result(pRes); + *qParam->end = 1; + } + } else { + sqExit("select", taos_errstr(pRes)); + } +} + +void sqFreeFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + taos_free_result(pRes); + + *qParam->end = 1; +} + +void sqFreeQueryCb(void *param, TAOS_RES *pRes, int code) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (code == 0 && pRes) { + if (qParam->fetch) { + taos_fetch_rows_a(pRes, sqFreeFetchCb, param); + } else { + taos_free_result(pRes); + *qParam->end = 1; + } + } else { + sqExit("select", taos_errstr(pRes)); + } +} + + +void sqCloseFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + taos_close(qParam->taos); + + *qParam->end = 1; +} + +void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (code == 0 && pRes) { + if (qParam->fetch) { + taos_fetch_rows_a(pRes, sqFreeFetchCb, param); + } else { + taos_close(qParam->taos); + *qParam->end = 1; + } + } else { + sqExit("select", taos_errstr(pRes)); + } +} + +int sqSyncStopQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + TAOS_RES* pRes = taos_query(taos, sql); + code = taos_errno(pRes); + if (code) { + sqExit("taos_query", taos_errstr(pRes)); + } + + if (fetch) { + taos_fetch_row(pRes); + } + + taos_stop_query(pRes); + taos_free_result(pRes); + + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqAsyncStopQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + + int32_t qEnd = 0; + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + param.end = &qEnd; + taos_query_a(taos, sql, sqStopQueryCb, ¶m); + while (0 == qEnd) { + usleep(5000); + } + + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqSyncFreeQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + TAOS_RES* pRes = taos_query(taos, sql); + code = taos_errno(pRes); + if (code) { + sqExit("taos_query", taos_errstr(pRes)); + } + + if (fetch) { + taos_fetch_row(pRes); + } + + taos_free_result(pRes); + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqAsyncFreeQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + + int32_t qEnd = 0; + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + param.end = &qEnd; + taos_query_a(taos, sql, sqFreeQueryCb, ¶m); + while (0 == qEnd) { + usleep(5000); + } + + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqSyncCloseQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + TAOS_RES* pRes = taos_query(taos, sql); + code = taos_errno(pRes); + if (code) { + sqExit("taos_query", taos_errstr(pRes)); + } + + if (fetch) { + taos_fetch_row(pRes); + } + + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqAsyncCloseQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + + int32_t qEnd = 0; + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + param.end = &qEnd; + taos_query_a(taos, sql, sqFreeQueryCb, ¶m); + while (0 == qEnd) { + usleep(5000); + } + } + CASE_LEAVE(); +} + +void *syncQueryThreadFp(void *arg) { + SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + qParam->taos = taos; + + sprintf(sql, "reset query cache"); + sqExecSQLE(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQLE(taos, sql); + + sprintf(sql, "select * from %s", tbName); + TAOS_RES* pRes = taos_query(taos, sql); + + if (qParam->fetch) { + taos_fetch_row(pRes); + } + + taos_free_result(pRes); +} + +void *closeThreadFp(void *arg) { + SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; + while (true) { + if (qParam->taos) { + usleep(rand() % 10000); + taos_close(qParam->taos); + break; + } + usleep(1); + } +} + + +int sqConSyncCloseQuery(bool fetch) { + CASE_ENTER(); + pthread_t qid, cid; + for (int32_t i = 0; i < runTimes; ++i) { + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + pthread_create(&qid, NULL, syncQueryThreadFp, (void*)¶m); + pthread_create(&cid, NULL, closeThreadFp, (void*)¶m); + + pthread_join(qid, NULL); + pthread_join(cid, NULL); + } + CASE_LEAVE(); +} + +void sqRunAllCase(void) { +/* + sqSyncStopQuery(false); + sqSyncStopQuery(true); + sqAsyncStopQuery(false); + sqAsyncStopQuery(true); + + sqSyncFreeQuery(false); + sqSyncFreeQuery(true); + sqAsyncFreeQuery(false); + sqAsyncFreeQuery(true); + + sqSyncCloseQuery(false); + sqSyncCloseQuery(true); + sqAsyncCloseQuery(false); + sqAsyncCloseQuery(true); +*/ + sqConSyncCloseQuery(false); + sqConSyncCloseQuery(true); + +} + + +int main(int argc, char *argv[]) { + if (argc != 4) { + printf("usage: %s server-ip dbname tablename\n", argv[0]); + exit(0); + } + + srand((unsigned int)time(NULL)); + + strcpy(hostName, argv[1]); + strcpy(dbName, argv[2]); + strcpy(tbName, argv[3]); + + sqRunAllCase(); + + return 0; +} + + diff --git a/tests/script/tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim b/tests/script/tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim index 78bca45f31..20eac3836c 100644 --- a/tests/script/tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim +++ b/tests/script/tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim @@ -124,7 +124,7 @@ if $data(5)[3] != 3 then return -1 endi -#sql reset query cache +sql reset query cache print =============== step4: select data sql show d1.tables diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 6fb839fe2d..53dfc57458 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -199,7 +199,7 @@ class TDTestCase: tdSql.checkData(0, 0, "true") # test select json tag->'key', value is null tdSql.query("select jtag->'tag1' from jsons1_4") - tdSql.checkData(0, 0, "null") + tdSql.checkData(0, 0, None) # test select json tag->'key', value is double tdSql.query("select jtag->'tag1' from jsons1_5") tdSql.checkData(0, 0, "1.232000000")