Merge pull request #13952 from taosdata/enh/taos.ref
fix: fix insert affected rows issue
This commit is contained in:
commit
6f610bd363
|
@ -128,7 +128,7 @@ typedef struct STscObj {
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
int32_t acctId;
|
int32_t acctId;
|
||||||
uint32_t connId;
|
uint32_t connId;
|
||||||
uint64_t id; // ref ID returned by taosAddRef
|
TAOS *id; // ref ID returned by taosAddRef
|
||||||
TdThreadMutex mutex; // used to protect the operation on db
|
TdThreadMutex mutex; // used to protect the operation on db
|
||||||
int32_t numOfReqs; // number of sqlObj bound to this connection
|
int32_t numOfReqs; // number of sqlObj bound to this connection
|
||||||
SAppInstInfo* pAppInfo;
|
SAppInstInfo* pAppInfo;
|
||||||
|
|
|
@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
|
||||||
volatile int32_t tscInitRes = 0;
|
volatile int32_t tscInitRes = 0;
|
||||||
|
|
||||||
static void registerRequest(SRequestObj *pRequest) {
|
static void registerRequest(SRequestObj *pRequest) {
|
||||||
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
|
STscObj *pTscObj = acquireTscObj(*(int64_t*)pRequest->pTscObj->id);
|
||||||
|
|
||||||
assert(pTscObj != NULL);
|
assert(pTscObj != NULL);
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ static void registerRequest(SRequestObj *pRequest) {
|
||||||
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
|
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
|
||||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
||||||
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
pRequest->self, *(int64_t*)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,8 +70,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
||||||
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
||||||
" ms, current:%d, app current:%d",
|
" ms, current:%d, app current:%d",
|
||||||
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
|
pRequest->self, *(int64_t*)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
|
||||||
releaseTscObj(pTscObj->id);
|
releaseTscObj(*(int64_t*)pTscObj->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo close the transporter properly
|
// todo close the transporter properly
|
||||||
|
@ -80,7 +80,7 @@ void closeTransporter(STscObj *pTscObj) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
|
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t*)pTscObj->id);
|
||||||
rpcClose(pTscObj->pAppInfo->pTransporter);
|
rpcClose(pTscObj->pAppInfo->pTransporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ void closeAllRequests(SHashObj *pRequests) {
|
||||||
void destroyTscObj(void *pObj) {
|
void destroyTscObj(void *pObj) {
|
||||||
STscObj *pTscObj = pObj;
|
STscObj *pTscObj = pObj;
|
||||||
|
|
||||||
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
|
SClientHbKey connKey = {.tscRid = *(int64_t*)pTscObj->id, .connType = pTscObj->connType};
|
||||||
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
|
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
|
||||||
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||||
closeAllRequests(pTscObj->pRequests);
|
closeAllRequests(pTscObj->pRequests);
|
||||||
|
@ -137,7 +137,7 @@ void destroyTscObj(void *pObj) {
|
||||||
// TODO
|
// TODO
|
||||||
//closeTransporter(pTscObj);
|
//closeTransporter(pTscObj);
|
||||||
}
|
}
|
||||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t*)pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||||
taosThreadMutexDestroy(&pTscObj->mutex);
|
taosThreadMutexDestroy(&pTscObj->mutex);
|
||||||
taosMemoryFreeClear(pTscObj);
|
taosMemoryFreeClear(pTscObj);
|
||||||
}
|
}
|
||||||
|
@ -166,10 +166,11 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexInit(&pObj->mutex, NULL);
|
taosThreadMutexInit(&pObj->mutex, NULL);
|
||||||
pObj->id = taosAddRef(clientConnRefPool, pObj);
|
pObj->id = taosMemoryMalloc(sizeof(int64_t));
|
||||||
|
*(int64_t*)pObj->id = taosAddRef(clientConnRefPool, pObj);
|
||||||
pObj->schemalessType = 1;
|
pObj->schemalessType = 1;
|
||||||
|
|
||||||
tscDebug("connObj created, 0x%" PRIx64, pObj->id);
|
tscDebug("connObj created, 0x%" PRIx64, *(int64_t*)pObj->id);
|
||||||
return pObj;
|
return pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -263,6 +263,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
// drop table if exists not_exists_table
|
// drop table if exists not_exists_table
|
||||||
if (NULL == pQuery->pCmdMsg) {
|
if (NULL == pQuery->pCmdMsg) {
|
||||||
|
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -609,6 +610,16 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
||||||
SRequestObj* pRequest = (SRequestObj*)param;
|
SRequestObj* pRequest = (SRequestObj*)param;
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
|
|
||||||
|
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
|
||||||
|
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||||
|
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
|
||||||
|
|
||||||
|
if (pRequest->body.queryJob != 0) {
|
||||||
|
schedulerFreeJob(pRequest->body.queryJob, 0);
|
||||||
|
pRequest->body.queryJob = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->requestId);
|
||||||
|
|
||||||
|
@ -712,7 +723,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
code = asyncExecDdlQuery(pRequest, pQuery);
|
code = asyncExecDdlQuery(pRequest, pQuery);
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
SArray* pNodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
|
|
||||||
pRequest->type = pQuery->msgType;
|
pRequest->type = pQuery->msgType;
|
||||||
|
|
||||||
|
@ -725,12 +736,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||||
|
|
||||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
|
||||||
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
|
if (code) {
|
||||||
if (code) {
|
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
pRequest->requestId);
|
||||||
pRequest->requestId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -927,7 +936,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
|
||||||
taos_close_internal(pTscObj);
|
taos_close_internal(pTscObj);
|
||||||
pTscObj = NULL;
|
pTscObj = NULL;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
|
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, *(int64_t*)pTscObj->id,
|
||||||
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
|
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
}
|
}
|
||||||
|
@ -1090,10 +1099,10 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
|
||||||
|
|
||||||
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
|
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
|
||||||
if (pObj) {
|
if (pObj) {
|
||||||
return (TAOS*)pObj->id;
|
return pObj->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (TAOS*)0;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
|
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
|
||||||
|
|
|
@ -99,10 +99,10 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
||||||
|
|
||||||
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
|
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
|
||||||
if (pObj) {
|
if (pObj) {
|
||||||
return (TAOS*)pObj->id;
|
return pObj->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (TAOS*)0;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_close_internal(void *taos) {
|
void taos_close_internal(void *taos) {
|
||||||
|
@ -111,19 +111,24 @@ void taos_close_internal(void *taos) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STscObj *pTscObj = (STscObj *)taos;
|
STscObj *pTscObj = (STscObj *)taos;
|
||||||
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
|
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t*)pTscObj->id, pTscObj->numOfReqs);
|
||||||
|
|
||||||
taosRemoveRef(clientConnRefPool, pTscObj->id);
|
taosRemoveRef(clientConnRefPool, *(int64_t*)pTscObj->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_close(TAOS *taos) {
|
void taos_close(TAOS *taos) {
|
||||||
STscObj* pObj = acquireTscObj((int64_t)taos);
|
if (taos == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (NULL == pObj) {
|
if (NULL == pObj) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_close_internal(pObj);
|
taos_close_internal(pObj);
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
|
taosMemoryFree(taos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -206,7 +211,12 @@ static void syncQueryFn(void *param, void *res, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (pTscObj == NULL || sql == NULL) {
|
if (pTscObj == NULL || sql == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -219,13 +229,13 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||||
taos_query_a(taos, sql, syncQueryFn, param);
|
taos_query_a(taos, sql, syncQueryFn, param);
|
||||||
tsem_wait(¶m->sem);
|
tsem_wait(¶m->sem);
|
||||||
|
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
|
|
||||||
return param->pRequest;
|
return param->pRequest;
|
||||||
#else
|
#else
|
||||||
size_t sqlLen = strlen(sql);
|
size_t sqlLen = strlen(sql);
|
||||||
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -233,7 +243,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||||
|
|
||||||
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
|
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
|
||||||
|
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
|
|
||||||
return pRes;
|
return pRes;
|
||||||
#endif
|
#endif
|
||||||
|
@ -453,15 +463,15 @@ int taos_result_precision(TAOS_RES *res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_select_db(TAOS *taos, const char *db) {
|
int taos_select_db(TAOS *taos, const char *db) {
|
||||||
STscObj* pObj = acquireTscObj((int64_t)taos);
|
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (pObj == NULL) {
|
if (pObj == NULL) {
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return TSDB_CODE_TSC_DISCONNECTED;
|
return TSDB_CODE_TSC_DISCONNECTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (db == NULL || strlen(db) == 0) {
|
if (db == NULL || strlen(db) == 0) {
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -473,7 +483,7 @@ int taos_select_db(TAOS *taos, const char *db) {
|
||||||
int32_t code = taos_errno(pRequest);
|
int32_t code = taos_errno(pRequest);
|
||||||
|
|
||||||
taos_free_result(pRequest);
|
taos_free_result(pRequest);
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -626,7 +636,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
|
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
|
||||||
|
|
||||||
void taos_reset_current_db(TAOS *taos) {
|
void taos_reset_current_db(TAOS *taos) {
|
||||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (pTscObj == NULL) {
|
if (pTscObj == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return;
|
return;
|
||||||
|
@ -634,17 +644,17 @@ void taos_reset_current_db(TAOS *taos) {
|
||||||
|
|
||||||
resetConnectDB(pTscObj);
|
resetConnectDB(pTscObj);
|
||||||
|
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *taos_get_server_info(TAOS *taos) {
|
const char *taos_get_server_info(TAOS *taos) {
|
||||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (pTscObj == NULL) {
|
if (pTscObj == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
|
|
||||||
return pTscObj->ver;
|
return pTscObj->ver;
|
||||||
}
|
}
|
||||||
|
@ -671,6 +681,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
|
tscDebug("enter meta callback, code %s", tstrerror(code));
|
||||||
|
|
||||||
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
||||||
SQuery *pQuery = pWrapper->pQuery;
|
SQuery *pQuery = pWrapper->pQuery;
|
||||||
SRequestObj *pRequest = pWrapper->pRequest;
|
SRequestObj *pRequest = pWrapper->pRequest;
|
||||||
|
@ -711,11 +723,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
||||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (pTscObj == NULL || sql == NULL || NULL == fp) {
|
if (pTscObj == NULL || sql == NULL || NULL == fp) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
if (pTscObj) {
|
if (pTscObj) {
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
}
|
}
|
||||||
|
@ -936,7 +948,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||||
STscObj* pObj = acquireTscObj((int64_t)taos);
|
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (NULL == pObj) {
|
if (NULL == pObj) {
|
||||||
tscError("invalid parameter for %s", __FUNCTION__);
|
tscError("invalid parameter for %s", __FUNCTION__);
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
@ -945,7 +957,7 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||||
|
|
||||||
TAOS_STMT* pStmt = stmtInit(pObj);
|
TAOS_STMT* pStmt = stmtInit(pObj);
|
||||||
|
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
|
|
||||||
return pStmt;
|
return pStmt;
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
|
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
|
||||||
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
|
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
|
||||||
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
|
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, *(int64_t*)pTscObj->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTscObj->connId = connectRsp.connId;
|
pTscObj->connId = connectRsp.connId;
|
||||||
|
@ -90,7 +90,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
pTscObj->connType = connectRsp.connType;
|
pTscObj->connType = connectRsp.connType;
|
||||||
|
|
||||||
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
|
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, *(int64_t*)pTscObj->id, connectRsp.clusterId, connectRsp.connType);
|
||||||
|
|
||||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||||
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
|
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
|
||||||
|
|
|
@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
||||||
case SCHEMA_ACTION_ADD_COLUMN: {
|
case SCHEMA_ACTION_ADD_COLUMN: {
|
||||||
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
|
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
|
||||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
|
||||||
code = taos_errno(res);
|
code = taos_errno(res);
|
||||||
const char *errStr = taos_errstr(res);
|
const char *errStr = taos_errstr(res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
||||||
case SCHEMA_ACTION_ADD_TAG: {
|
case SCHEMA_ACTION_ADD_TAG: {
|
||||||
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
|
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
|
||||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
|
||||||
code = taos_errno(res);
|
code = taos_errno(res);
|
||||||
const char *errStr = taos_errstr(res);
|
const char *errStr = taos_errstr(res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
||||||
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
|
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
|
||||||
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
|
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
|
||||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
|
||||||
code = taos_errno(res);
|
code = taos_errno(res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||||
|
@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
||||||
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
|
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
|
||||||
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
|
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
|
||||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
|
||||||
code = taos_errno(res);
|
code = taos_errno(res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||||
|
@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
||||||
pos--;
|
pos--;
|
||||||
++freeBytes;
|
++freeBytes;
|
||||||
outBytes = snprintf(pos, freeBytes, ")");
|
outBytes = snprintf(pos, freeBytes, ")");
|
||||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result);
|
TAOS_RES *res = taos_query(info->taos->id, result);
|
||||||
code = taos_errno(res);
|
code = taos_errno(res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||||
|
@ -2434,7 +2434,7 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
|
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
|
||||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
|
||||||
if (NULL == pTscObj) {
|
if (NULL == pTscObj) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
uError("SML:taos_schemaless_insert invalid taos");
|
uError("SML:taos_schemaless_insert invalid taos");
|
||||||
|
@ -2443,7 +2443,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
||||||
|
|
||||||
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
|
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
|
||||||
if(!request){
|
if(!request){
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
uError("SML:taos_schemaless_insert error request is null");
|
uError("SML:taos_schemaless_insert error request is null");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2531,6 +2531,6 @@ end:
|
||||||
// ((STscObj *)taos)->schemalessType = 0;
|
// ((STscObj *)taos)->schemalessType = 0;
|
||||||
pTscObj->schemalessType = 1;
|
pTscObj->schemalessType = 1;
|
||||||
uDebug("resultend:%s", request->msgBuf);
|
uDebug("resultend:%s", request->msgBuf);
|
||||||
releaseTscObj((int64_t)taos);
|
releaseTscObj(*(int64_t*)taos);
|
||||||
return (TAOS_RES*)request;
|
return (TAOS_RES*)request;
|
||||||
}
|
}
|
||||||
|
|
|
@ -204,8 +204,8 @@ SArray *mmGetMsgHandles() {
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -213,7 +213,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -35,7 +35,7 @@ typedef struct SPhysiPlanContext {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
int16_t nextDataBlockId;
|
int16_t nextDataBlockId;
|
||||||
SArray* pLocationHelper;
|
SArray* pLocationHelper;
|
||||||
SArray* pExecNodeList;
|
SArray* pExecNodeList; // SArray<SQueryNodeLoad>
|
||||||
} SPhysiPlanContext;
|
} SPhysiPlanContext;
|
||||||
|
|
||||||
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
|
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
|
||||||
|
@ -459,7 +459,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
|
||||||
}
|
}
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,7 +532,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
||||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
|
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||||
} else {
|
} else {
|
||||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||||
|
|
|
@ -210,7 +210,7 @@ typedef struct SSchJob {
|
||||||
int32_t levelNum;
|
int32_t levelNum;
|
||||||
int32_t taskNum;
|
int32_t taskNum;
|
||||||
SRequestConnInfo conn;
|
SRequestConnInfo conn;
|
||||||
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeAddr>
|
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
|
||||||
SArray *levels; // starting from 0. SArray<SSchLevel>
|
SArray *levels; // starting from 0. SArray<SSchLevel>
|
||||||
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
|
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
|
||||||
|
|
||||||
|
|
|
@ -614,7 +614,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
||||||
++pJob->taskNum;
|
++pJob->taskNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
|
SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
|
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
|
||||||
|
@ -636,8 +636,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
nodeNum = taosArrayGetSize(pJob->nodeList);
|
nodeNum = taosArrayGetSize(pJob->nodeList);
|
||||||
|
|
||||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
||||||
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
|
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
|
||||||
|
SQueryNodeAddr *naddr = &nload->addr;
|
||||||
|
|
||||||
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
||||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
|
Loading…
Reference in New Issue