diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index fd82cbb8c5..054b2894c5 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -408,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); void tscQueueAsyncRes(SSqlObj *pSql); -void tscQueueAsyncError(void(*fp), void *param); +void tscQueueAsyncError(void(*fp), void *param, int32_t code); int tscProcessLocalCmd(SSqlObj *pSql); int tscCfgDynamicOptions(char *msg); @@ -449,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj); void tscCloseTscObj(STscObj *pObj); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen); +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index f05d269925..10878ee37f 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) { +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -51,14 +51,14 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("failed to malloc payload"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); return; } pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); free(pCmd->payload); return; } @@ -73,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { - pSql->res.code = (uint8_t)code; + pSql->res.code = code; tscQueueAsyncRes(pSql); return; } @@ -86,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { tscError("bug!!! pObj:%p", pObj); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } int32_t sqlLen = strlen(sqlstr); if (sqlLen > tsMaxSQLStringLen) { tscError("sql string too long"); - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_INVALID_SQL; + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL); return; } @@ -103,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { tscError("failed to malloc sqlObj"); - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); return; } @@ -168,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pRes->code = numOfRows; } - tscQueueAsyncError(pSql->fetchFp, param); + tscQueueAsyncError(pSql->fetchFp, param, pRes->code); return; } @@ -198,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); +// globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } @@ -208,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE); return; } @@ -230,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); +// globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } @@ -240,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE); return; } @@ -329,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { // pCmd may be released, so cache pCmd->command int cmd = pCmd->command; - int code = pRes->code ? -pRes->code : pRes->numOfRows; + int code = pRes->code;// ? -pRes->code : pRes->numOfRows; // in case of async insert, restore the user specified callback function bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); @@ -347,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { } } -void tscProcessAsyncError(SSchedMsg *pMsg) { +static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; - - (*fp)(pMsg->thandle, NULL, -1); + (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); } -void tscQueueAsyncError(void(*fp), void *param) { +void tscQueueAsyncError(void(*fp), void *param, int32_t code) { + int32_t* c = malloc(sizeof(int32_t)); + *c = code; + SSchedMsg schedMsg; schedMsg.fp = tscProcessAsyncError; schedMsg.ahandle = fp; schedMsg.thandle = param; - schedMsg.msg = NULL; + schedMsg.msg = c; taosScheduleTask(tscQhandle, &schedMsg); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 591b41d948..f651c35324 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -329,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL; rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows; - tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres); + tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0fe281859b..896f7e294f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -267,25 +267,25 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { return pRes->code; } -static void syncQueryCallback(void *param, TAOS_RES *tres, int code) { +static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { assert(param != NULL); - STscObj *pObj = (STscObj *)param; + SSqlObj *pSql = ((STscObj *)param)->pSql; - assert(pObj->pSql != NULL); - sem_post(&pObj->pSql->rspSem); + pSql->res.code = code; + sem_post(&pSql->rspSem); } int taos_query(TAOS *taos, const char *sqlstr) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { - globalCode = TSDB_CODE_DISCONNECTED; + terrno = TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED; } SSqlObj* pSql = pObj->pSql; - int32_t sqlLen = strlen(sqlstr); - doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen); + size_t sqlLen = strlen(sqlstr); + doAsyncQuery(pObj, pObj->pSql, waitForQueryRsp, taos, sqlstr, sqlLen); // wait for the callback function to post the semaphore sem_wait(&pSql->rspSem); @@ -643,7 +643,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { return pRes->tsrow; } -static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { +static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; if (numOfRows < 0) { // set the error code @@ -671,7 +671,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // current data are exhausted, fetch more data if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { - taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); + taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); sem_wait(&pSql->rspSem); } @@ -848,22 +848,17 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } +// todo should not be used in async query int taos_errno(TAOS *taos) { STscObj *pObj = (STscObj *)taos; - int code; - if (pObj == NULL || pObj->signature != pObj) return globalCode; + if (pObj == NULL || pObj->signature != pObj) { + return terrno; + } - if ((int8_t)(pObj->pSql->res.code) == -1) - code = TSDB_CODE_OTHERS; - else - code = pObj->pSql->res.code; - - return code; + return pObj->pSql->res.code; } -//static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; } - /* * In case of invalid sql error, additional information is attached to explain * why the sql is invalid @@ -883,6 +878,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { return z != NULL; } +// todo should not be used in async model char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; diff --git a/src/os/linux/src/tlinux.c b/src/os/linux/src/tlinux.c index bce4a8f13d..780e2903a0 100644 --- a/src/os/linux/src/tlinux.c +++ b/src/os/linux/src/tlinux.c @@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) { void (*callback)(int) = tharg; timer_t timerId; - struct sigevent sevent; + struct sigevent sevent = {0}; #ifdef _ALPINE sevent.sigev_notify = SIGEV_THREAD;