diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d61262c9dc..51a5b84a1c 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -128,7 +128,7 @@ typedef struct STscObj { int8_t connType; int32_t acctId; uint32_t connId; - uint64_t id; // ref ID returned by taosAddRef + TAOS *id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 24246e5c45..78454268b7 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { - STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); + STscObj *pTscObj = acquireTscObj(*(int64_t*)pRequest->pTscObj->id); assert(pTscObj != NULL); @@ -54,7 +54,7 @@ static void registerRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, - pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); + pRequest->self, *(int64_t*)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } } @@ -70,8 +70,8 @@ static void deregisterRequest(SRequestObj *pRequest) { int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", - pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); - releaseTscObj(pTscObj->id); + pRequest->self, *(int64_t*)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); + releaseTscObj(*(int64_t*)pTscObj->id); } // todo close the transporter properly @@ -80,7 +80,7 @@ void closeTransporter(STscObj *pTscObj) { return; } - tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); + tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t*)pTscObj->id); rpcClose(pTscObj->pAppInfo->pTransporter); } @@ -128,7 +128,7 @@ void closeAllRequests(SHashObj *pRequests) { void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; + SClientHbKey connKey = {.tscRid = *(int64_t*)pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); @@ -137,7 +137,7 @@ void destroyTscObj(void *pObj) { // TODO //closeTransporter(pTscObj); } - tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); + tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t*)pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); } @@ -166,10 +166,11 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c } taosThreadMutexInit(&pObj->mutex, NULL); - pObj->id = taosAddRef(clientConnRefPool, pObj); + pObj->id = taosMemoryMalloc(sizeof(int64_t)); + *(int64_t*)pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->schemalessType = 1; - tscDebug("connObj created, 0x%" PRIx64, pObj->id); + tscDebug("connObj created, 0x%" PRIx64, *(int64_t*)pObj->id); return pObj; } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d04cc90ee4..9668d2b72c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -263,6 +263,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { // drop table if exists not_exists_table if (NULL == pQuery->pCmdMsg) { + pRequest->body.queryFp(pRequest->body.param, pRequest, 0); return TSDB_CODE_SUCCESS; } @@ -609,6 +610,16 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; pRequest->code = code; + if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || + TDMT_VND_CREATE_TABLE == pRequest->type) { + pRequest->body.resInfo.numOfRows = pResult->numOfRows; + + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob, 0); + pRequest->body.queryJob = 0; + } + } + tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); @@ -712,7 +723,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { code = asyncExecDdlQuery(pRequest, pQuery); break; case QUERY_EXEC_MODE_SCHEDULE: { - SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + SArray* pNodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); pRequest->type = pQuery->msgType; @@ -725,12 +736,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; SAppInstInfo* pAppInfo = getAppInfo(pRequest); - if (TSDB_CODE_SUCCESS == code) { - code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); - if (code) { - tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), - pRequest->requestId); - } + code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); + if (code) { + tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); } if (TSDB_CODE_SUCCESS == code) { @@ -927,7 +936,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t taos_close_internal(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, + tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, *(int64_t*)pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -1090,10 +1099,10 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); if (pObj) { - return (TAOS*)pObj->id; + return pObj->id; } - return (TAOS*)0; + return NULL; } TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9a5d5bbeca..51e05537d4 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -99,10 +99,10 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); if (pObj) { - return (TAOS*)pObj->id; + return pObj->id; } - return (TAOS*)0; + return NULL; } void taos_close_internal(void *taos) { @@ -111,19 +111,24 @@ void taos_close_internal(void *taos) { } STscObj *pTscObj = (STscObj *)taos; - tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); + tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t*)pTscObj->id, pTscObj->numOfReqs); - taosRemoveRef(clientConnRefPool, pTscObj->id); + taosRemoveRef(clientConnRefPool, *(int64_t*)pTscObj->id); } void taos_close(TAOS *taos) { - STscObj* pObj = acquireTscObj((int64_t)taos); + if (taos == NULL) { + return; + } + + STscObj* pObj = acquireTscObj(*(int64_t*)taos); if (NULL == pObj) { return; } taos_close_internal(pObj); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); + taosMemoryFree(taos); } @@ -206,7 +211,12 @@ static void syncQueryFn(void *param, void *res, int32_t code) { } TAOS_RES *taos_query(TAOS *taos, const char *sql) { - STscObj* pTscObj = acquireTscObj((int64_t)taos); + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); if (pTscObj == NULL || sql == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -219,13 +229,13 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { taos_query_a(taos, sql, syncQueryFn, param); tsem_wait(¶m->sem); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); return param->pRequest; #else size_t sqlLen = strlen(sql); if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; @@ -233,7 +243,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); return pRes; #endif @@ -453,15 +463,15 @@ int taos_result_precision(TAOS_RES *res) { } int taos_select_db(TAOS *taos, const char *db) { - STscObj* pObj = acquireTscObj((int64_t)taos); + STscObj* pObj = acquireTscObj(*(int64_t*)taos); if (pObj == NULL) { - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); terrno = TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED; } if (db == NULL || strlen(db) == 0) { - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); terrno = TSDB_CODE_TSC_INVALID_INPUT; return terrno; } @@ -473,7 +483,7 @@ int taos_select_db(TAOS *taos, const char *db) { int32_t code = taos_errno(pRequest); taos_free_result(pRequest); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); return code; } @@ -626,7 +636,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int taos_validate_sql(TAOS *taos, const char *sql) { return true; } void taos_reset_current_db(TAOS *taos) { - STscObj* pTscObj = acquireTscObj((int64_t)taos); + STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return; @@ -634,17 +644,17 @@ void taos_reset_current_db(TAOS *taos) { resetConnectDB(pTscObj); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); } const char *taos_get_server_info(TAOS *taos) { - STscObj* pTscObj = acquireTscObj((int64_t)taos); + STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); return pTscObj->ver; } @@ -671,6 +681,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { } void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { + tscDebug("enter meta callback, code %s", tstrerror(code)); + SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; SQuery *pQuery = pWrapper->pQuery; SRequestObj *pRequest = pWrapper->pRequest; @@ -711,11 +723,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - STscObj* pTscObj = acquireTscObj((int64_t)taos); + STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); if (pTscObj == NULL || sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; if (pTscObj) { - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); } else { terrno = TSDB_CODE_TSC_DISCONNECTED; } @@ -936,7 +948,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } TAOS_STMT *taos_stmt_init(TAOS *taos) { - STscObj* pObj = acquireTscObj((int64_t)taos); + STscObj* pObj = acquireTscObj(*(int64_t*)taos); if (NULL == pObj) { tscError("invalid parameter for %s", __FUNCTION__); terrno = TSDB_CODE_TSC_DISCONNECTED; @@ -945,7 +957,7 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) { TAOS_STMT* pStmt = stmtInit(pObj); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); return pStmt; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index bf627ec26b..db8aebb322 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -77,7 +77,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) { tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, - connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id); + connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, *(int64_t*)pTscObj->id); } pTscObj->connId = connectRsp.connId; @@ -90,7 +90,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connType = connectRsp.connType; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, *(int64_t*)pTscObj->id, connectRsp.clusterId, connectRsp.connType); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 25018e0d15..580aafa010 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_ADD_COLUMN: { int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); const char *errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { @@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_ADD_TAG: { int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); const char *errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { @@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_CHANGE_TAG_SIZE: { int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { pos--; ++freeBytes; outBytes = snprintf(pos, freeBytes, ")"); - TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); + TAOS_RES *res = taos_query(info->taos->id, result); code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -2434,7 +2434,7 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - STscObj* pTscObj = acquireTscObj((int64_t)taos); + STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; uError("SML:taos_schemaless_insert invalid taos"); @@ -2443,7 +2443,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); if(!request){ - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); uError("SML:taos_schemaless_insert error request is null"); return NULL; } @@ -2531,6 +2531,6 @@ end: // ((STscObj *)taos)->schemalessType = 0; pTscObj->schemalessType = 1; uDebug("resultend:%s", request->msgBuf); - releaseTscObj((int64_t)taos); + releaseTscObj(*(int64_t*)taos); return (TAOS_RES*)request; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d895b73fb0..a845ae7b39 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -204,8 +204,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -213,7 +213,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 99476c4cbd..f2671f6279 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -35,7 +35,7 @@ typedef struct SPhysiPlanContext { int32_t errCode; int16_t nextDataBlockId; SArray* pLocationHelper; - SArray* pExecNodeList; + SArray* pExecNodeList; // SArray } SPhysiPlanContext; static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) { @@ -459,7 +459,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla } vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0}; - taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + taosArrayPush(pCxt->pExecNodeList, &node); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode); } @@ -532,7 +532,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) { vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0}; - taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + taosArrayPush(pCxt->pExecNodeList, &node); } else { SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0}; taosArrayPush(pCxt->pExecNodeList, &node); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 545bb6c45a..6b2570c5b7 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -210,7 +210,7 @@ typedef struct SSchJob { int32_t levelNum; int32_t taskNum; SRequestConnInfo conn; - SArray *nodeList; // qnode/vnode list, SArray + SArray *nodeList; // qnode/vnode list, SArray SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 733f8694cc..679a244f31 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -614,7 +614,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { ++pJob->taskNum; } - SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); + SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum); } SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); @@ -636,8 +636,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { nodeNum = taosArrayGetSize(pJob->nodeList); for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - + SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); + SQueryNodeAddr *naddr = &nload->addr; + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);