Merge pull request #1506 from taosdata/feature/liaohj_v3
Feature/liaohj v3
This commit is contained in:
commit
8a2d7d99b6
|
@ -357,7 +357,6 @@ typedef struct SSqlObj {
|
||||||
char freed : 4;
|
char freed : 4;
|
||||||
char listed : 4;
|
char listed : 4;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
tsem_t emptyRspSem;
|
|
||||||
SSqlCmd cmd;
|
SSqlCmd cmd;
|
||||||
SSqlRes res;
|
SSqlRes res;
|
||||||
uint8_t numOfSubs;
|
uint8_t numOfSubs;
|
||||||
|
@ -409,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql);
|
||||||
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
|
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
|
||||||
void tscQueueAsyncRes(SSqlObj *pSql);
|
void tscQueueAsyncRes(SSqlObj *pSql);
|
||||||
|
|
||||||
void tscQueueAsyncError(void(*fp), void *param);
|
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
|
||||||
|
|
||||||
int tscProcessLocalCmd(SSqlObj *pSql);
|
int tscProcessLocalCmd(SSqlObj *pSql);
|
||||||
int tscCfgDynamicOptions(char *msg);
|
int tscCfgDynamicOptions(char *msg);
|
||||||
|
@ -450,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj);
|
||||||
|
|
||||||
void tscCloseTscObj(STscObj *pObj);
|
void tscCloseTscObj(STscObj *pObj);
|
||||||
|
|
||||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen);
|
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
|
||||||
|
|
||||||
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
|
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
|
||||||
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
|
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
|
||||||
|
|
|
@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
||||||
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
|
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
|
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
|
|
||||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) {
|
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
|
||||||
|
@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
||||||
tscError("failed to malloc payload");
|
tscError("failed to malloc payload");
|
||||||
tfree(pSql);
|
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
|
||||||
tscQueueAsyncError(fp, param);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->sqlstr = malloc(sqlLen + 1);
|
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
|
||||||
if (pSql->sqlstr == NULL) {
|
if (pSql->sqlstr == NULL) {
|
||||||
tscError("%p failed to malloc sql string buffer", pSql);
|
tscError("%p failed to malloc sql string buffer", pSql);
|
||||||
tscQueueAsyncError(fp, param);
|
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
|
||||||
free(pCmd->payload);
|
free(pCmd->payload);
|
||||||
free(pSql);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
|
||||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pSql->res.code = (uint8_t)code;
|
pSql->res.code = code;
|
||||||
tscQueueAsyncRes(pSql);
|
tscQueueAsyncRes(pSql);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -88,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
if (pObj == NULL || pObj->signature != pObj) {
|
if (pObj == NULL || pObj->signature != pObj) {
|
||||||
tscError("bug!!! pObj:%p", pObj);
|
tscError("bug!!! pObj:%p", pObj);
|
||||||
globalCode = TSDB_CODE_DISCONNECTED;
|
terrno = TSDB_CODE_DISCONNECTED;
|
||||||
tscQueueAsyncError(fp, param);
|
tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sqlLen = strlen(sqlstr);
|
int32_t sqlLen = strlen(sqlstr);
|
||||||
if (sqlLen > tsMaxSQLStringLen) {
|
if (sqlLen > tsMaxSQLStringLen) {
|
||||||
tscError("sql string too long");
|
tscError("sql string too long");
|
||||||
tscQueueAsyncError(fp, param);
|
terrno = TSDB_CODE_INVALID_SQL;
|
||||||
|
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
|
||||||
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
||||||
if (pSql == NULL) {
|
if (pSql == NULL) {
|
||||||
tscError("failed to malloc sqlObj");
|
tscError("failed to malloc sqlObj");
|
||||||
tscQueueAsyncError(fp, param);
|
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
||||||
pRes->code = numOfRows;
|
pRes->code = numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscQueueAsyncError(pSql->fetchFp, param);
|
tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
|
||||||
SSqlObj *pSql = (SSqlObj *)taosa;
|
SSqlObj *pSql = (SSqlObj *)taosa;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
tscError("sql object is NULL");
|
tscError("sql object is NULL");
|
||||||
globalCode = TSDB_CODE_DISCONNECTED;
|
// globalCode = TSDB_CODE_DISCONNECTED;
|
||||||
tscQueueAsyncError(fp, param);
|
tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
|
||||||
|
|
||||||
if (pRes->qhandle == 0) {
|
if (pRes->qhandle == 0) {
|
||||||
tscError("qhandle is NULL");
|
tscError("qhandle is NULL");
|
||||||
tscQueueAsyncError(fp, param);
|
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
|
||||||
SSqlObj *pSql = (SSqlObj *)taosa;
|
SSqlObj *pSql = (SSqlObj *)taosa;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
tscError("sql object is NULL");
|
tscError("sql object is NULL");
|
||||||
globalCode = TSDB_CODE_DISCONNECTED;
|
// globalCode = TSDB_CODE_DISCONNECTED;
|
||||||
tscQueueAsyncError(fp, param);
|
tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
|
||||||
|
|
||||||
if (pRes->qhandle == 0) {
|
if (pRes->qhandle == 0) {
|
||||||
tscError("qhandle is NULL");
|
tscError("qhandle is NULL");
|
||||||
tscQueueAsyncError(fp, param);
|
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
// pCmd may be released, so cache pCmd->command
|
// pCmd may be released, so cache pCmd->command
|
||||||
int cmd = pCmd->command;
|
int cmd = pCmd->command;
|
||||||
int code = pRes->code ? -pRes->code : pRes->numOfRows;
|
int code = pRes->code;// ? -pRes->code : pRes->numOfRows;
|
||||||
|
|
||||||
// in case of async insert, restore the user specified callback function
|
// in case of async insert, restore the user specified callback function
|
||||||
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
|
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
|
||||||
|
@ -349,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessAsyncError(SSchedMsg *pMsg) {
|
static void tscProcessAsyncError(SSchedMsg *pMsg) {
|
||||||
void (*fp)() = pMsg->ahandle;
|
void (*fp)() = pMsg->ahandle;
|
||||||
|
(*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
|
||||||
(*fp)(pMsg->thandle, NULL, -1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscQueueAsyncError(void(*fp), void *param) {
|
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
|
||||||
|
int32_t* c = malloc(sizeof(int32_t));
|
||||||
|
*c = code;
|
||||||
|
|
||||||
SSchedMsg schedMsg;
|
SSchedMsg schedMsg;
|
||||||
schedMsg.fp = tscProcessAsyncError;
|
schedMsg.fp = tscProcessAsyncError;
|
||||||
schedMsg.ahandle = fp;
|
schedMsg.ahandle = fp;
|
||||||
schedMsg.thandle = param;
|
schedMsg.thandle = param;
|
||||||
schedMsg.msg = NULL;
|
schedMsg.msg = c;
|
||||||
taosScheduleTask(tscQhandle, &schedMsg);
|
taosScheduleTask(tscQhandle, &schedMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
tscTrace("%p failed to renew tableMeta", pSql);
|
tscTrace("%p failed to renew tableMeta", pSql);
|
||||||
tsem_post(&pSql->rspSem);
|
// tsem_post(&pSql->rspSem);
|
||||||
} else {
|
} else {
|
||||||
tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
|
tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
|
||||||
pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
|
pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
|
||||||
|
@ -424,7 +426,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
code = tscSendMsgToServer(pSql);
|
code = tscSendMsgToServer(pSql);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
tsem_post(&pSql->rspSem);
|
// tsem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
tsem_init(&pSql->rspSem, 0, 0);
|
||||||
tsem_init(&pSql->emptyRspSem, 0, 1);
|
|
||||||
pSql->signature = pSql;
|
pSql->signature = pSql;
|
||||||
pSql->pTscObj = pObj;
|
pSql->pTscObj = pObj;
|
||||||
|
|
||||||
|
|
|
@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
if (pQueryInfo->numOfTables == 0) {
|
if (pQueryInfo->numOfTables == 0) {
|
||||||
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
|
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
|
||||||
} else {
|
} else {
|
||||||
pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0];
|
pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
pCmd->command = pInfo->type;
|
pCmd->command = pInfo->type;
|
||||||
|
|
|
@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
pRes->rspType = rpcMsg->msgType;
|
pRes->rspType = rpcMsg->msgType;
|
||||||
pRes->rspLen = rpcMsg->contLen;
|
pRes->rspLen = rpcMsg->contLen;
|
||||||
|
|
||||||
|
if (pRes->rspLen > 0) {
|
||||||
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
} else {
|
} else {
|
||||||
pRes->pRsp = tmp;
|
pRes->pRsp = tmp;
|
||||||
if (pRes->rspLen) {
|
|
||||||
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
|
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
pRes->pRsp = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ignore the error information returned from mnode when set ignore flag in sql
|
// ignore the error information returned from mnode when set ignore flag in sql
|
||||||
|
@ -327,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
|
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
|
||||||
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
|
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
|
||||||
|
|
||||||
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres);
|
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
|
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
|
||||||
|
|
|
@ -155,6 +155,10 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
|
||||||
STscObj *pObj = (STscObj *)param;
|
STscObj *pObj = (STscObj *)param;
|
||||||
assert(pObj != NULL && pObj->pSql != NULL);
|
assert(pObj != NULL && pObj->pSql != NULL);
|
||||||
|
|
||||||
|
if (code < 0) {
|
||||||
|
pObj->pSql->res.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
sem_post(&pObj->pSql->rspSem);
|
sem_post(&pObj->pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,6 +181,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
||||||
sem_wait(&pSql->rspSem);
|
sem_wait(&pSql->rspSem);
|
||||||
|
|
||||||
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = pSql->res.code;
|
||||||
taos_close(pObj);
|
taos_close(pObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -186,8 +191,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
||||||
// version compare only requires the first 3 segments of the version string
|
// version compare only requires the first 3 segments of the version string
|
||||||
int code = taosCheckVersion(version, taos_get_server_info(pObj), 3);
|
int code = taosCheckVersion(version, taos_get_server_info(pObj), 3);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pSql->res.code = code;
|
terrno = code;
|
||||||
|
|
||||||
taos_close(pObj);
|
taos_close(pObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -267,31 +271,29 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
|
||||||
return pRes->code;
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncQueryCallback(void *param, TAOS_RES *tres, int code) {
|
static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
STscObj *pObj = (STscObj *)param;
|
assert(param != NULL);
|
||||||
assert(pObj != NULL && pObj->pSql != NULL);
|
SSqlObj *pSql = ((STscObj *)param)->pSql;
|
||||||
|
|
||||||
sem_post(&pObj->pSql->rspSem);
|
// valid error code is less than 0
|
||||||
|
if (code < 0) {
|
||||||
|
pSql->res.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
sem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_query(TAOS *taos, const char *sqlstr) {
|
int taos_query(TAOS *taos, const char *sqlstr) {
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
if (pObj == NULL || pObj->signature != pObj) {
|
if (pObj == NULL || pObj->signature != pObj) {
|
||||||
globalCode = TSDB_CODE_DISCONNECTED;
|
terrno = TSDB_CODE_DISCONNECTED;
|
||||||
return TSDB_CODE_DISCONNECTED;
|
return TSDB_CODE_DISCONNECTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
SSqlObj* pSql = pObj->pSql;
|
||||||
if (pSql == NULL) {
|
|
||||||
tscError("failed to malloc sqlObj");
|
|
||||||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
pObj->pSql = pSql;
|
size_t sqlLen = strlen(sqlstr);
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
doAsyncQuery(pObj, pObj->pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
|
||||||
|
|
||||||
int32_t sqlLen = strlen(sqlstr);
|
|
||||||
doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen);
|
|
||||||
|
|
||||||
// wait for the callback function to post the semaphore
|
// wait for the callback function to post the semaphore
|
||||||
sem_wait(&pSql->rspSem);
|
sem_wait(&pSql->rspSem);
|
||||||
|
@ -649,12 +651,12 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
|
||||||
return pRes->tsrow;
|
return pRes->tsrow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) {
|
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
||||||
SSqlObj* pSql = (SSqlObj*) tres;
|
SSqlObj* pSql = (SSqlObj*) tres;
|
||||||
|
|
||||||
if (numOfRows < 0) { // set the error code
|
if (numOfRows < 0) { // set the error code
|
||||||
pSql->res.code = -numOfRows;
|
pSql->res.code = -numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_post(&pSql->rspSem);
|
sem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -677,7 +679,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
// current data are exhausted, fetch more data
|
// current data are exhausted, fetch more data
|
||||||
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
|
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
|
||||||
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
|
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
|
||||||
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
|
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
|
||||||
|
|
||||||
sem_wait(&pSql->rspSem);
|
sem_wait(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
@ -754,20 +756,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
|
||||||
if (pRes == NULL || pRes->qhandle == 0) {
|
if (pRes == NULL || pRes->qhandle == 0) {
|
||||||
/* Query rsp is not received from vnode, so the qhandle is NULL */
|
/* Query rsp is not received from vnode, so the qhandle is NULL */
|
||||||
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
|
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
|
||||||
if (pSql->fp != NULL) {
|
|
||||||
STscObj* pObj = pSql->pTscObj;
|
|
||||||
|
|
||||||
if (pSql == pObj->pSql) {
|
if (tscShouldFreeAsyncSqlObj(pSql)) {
|
||||||
pObj->pSql = NULL;
|
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
}
|
|
||||||
|
|
||||||
tscTrace("%p Async SqlObj is freed by app", pSql);
|
tscTrace("%p Async SqlObj is freed by app", pSql);
|
||||||
} else if (keepCmd) {
|
} else {
|
||||||
|
if (keepCmd) {
|
||||||
tscFreeSqlResult(pSql);
|
tscFreeSqlResult(pSql);
|
||||||
} else {
|
} else {
|
||||||
tscFreeSqlObjPartial(pSql);
|
tscFreeSqlObjPartial(pSql);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -793,7 +793,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
|
||||||
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
|
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
|
||||||
*/
|
*/
|
||||||
if (pRes->code != TSDB_CODE_QUERY_CANCELLED &&
|
if (pRes->code != TSDB_CODE_QUERY_CANCELLED &&
|
||||||
((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL) ||
|
((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
|
||||||
(pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT &&
|
(pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT &&
|
||||||
pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) {
|
pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) {
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
|
@ -836,38 +836,36 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// if no free resource msg is sent to vnode, we free this object immediately.
|
// if no free resource msg is sent to vnode, we free this object immediately.
|
||||||
|
bool free = tscShouldFreeAsyncSqlObj(pSql);
|
||||||
if (pSql->fp) {
|
if (free) {
|
||||||
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
|
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
|
||||||
|
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
tscTrace("%p Async sql result is freed by app", pSql);
|
tscTrace("%p Async sql result is freed by app", pSql);
|
||||||
} else if (keepCmd) {
|
} else {
|
||||||
|
if (keepCmd) {
|
||||||
tscFreeSqlResult(pSql);
|
tscFreeSqlResult(pSql);
|
||||||
tscTrace("%p sql result is freed while sql command is kept", pSql);
|
tscTrace("%p sql result is freed while sql command is kept", pSql);
|
||||||
} else {
|
} else {
|
||||||
tscFreeSqlObjPartial(pSql);
|
tscFreeSqlObjPartial(pSql);
|
||||||
tscTrace("%p sql result is freed", pSql);
|
tscTrace("%p sql result is freed by app", pSql);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
|
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
|
||||||
|
|
||||||
|
// todo should not be used in async query
|
||||||
int taos_errno(TAOS *taos) {
|
int taos_errno(TAOS *taos) {
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
int code;
|
|
||||||
|
|
||||||
if (pObj == NULL || pObj->signature != pObj) return globalCode;
|
if (pObj == NULL || pObj->signature != pObj) {
|
||||||
|
return terrno;
|
||||||
if ((int8_t)(pObj->pSql->res.code) == -1)
|
|
||||||
code = TSDB_CODE_OTHERS;
|
|
||||||
else
|
|
||||||
code = pObj->pSql->res.code;
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; }
|
return pObj->pSql->res.code;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In case of invalid sql error, additional information is attached to explain
|
* In case of invalid sql error, additional information is attached to explain
|
||||||
|
@ -888,13 +886,15 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
|
||||||
return z != NULL;
|
return z != NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo should not be used in async model
|
||||||
char *taos_errstr(TAOS *taos) {
|
char *taos_errstr(TAOS *taos) {
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
|
|
||||||
if (pObj == NULL || pObj->signature != pObj)
|
if (pObj == NULL || pObj->signature != pObj)
|
||||||
return (char*)tstrerror(globalCode);
|
return (char*)tstrerror(terrno);
|
||||||
|
|
||||||
SSqlObj* pSql = pObj->pSql;
|
SSqlObj* pSql = pObj->pSql;
|
||||||
|
|
||||||
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) {
|
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) {
|
||||||
return pSql->cmd.payload;
|
return pSql->cmd.payload;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
|
||||||
pSql->sqlstr = sqlstr;
|
pSql->sqlstr = sqlstr;
|
||||||
|
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
tsem_init(&pSql->rspSem, 0, 0);
|
||||||
tsem_init(&pSql->emptyRspSem, 0, 1);
|
|
||||||
|
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
pRes->numOfRows = 1;
|
pRes->numOfRows = 1;
|
||||||
|
|
|
@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) {
|
||||||
static void doQuitSubquery(SSqlObj* pParentSql) {
|
static void doQuitSubquery(SSqlObj* pParentSql) {
|
||||||
freeSubqueryObj(pParentSql);
|
freeSubqueryObj(pParentSql);
|
||||||
|
|
||||||
tsem_wait(&pParentSql->emptyRspSem);
|
// tsem_wait(&pParentSql->emptyRspSem);
|
||||||
tsem_wait(&pParentSql->emptyRspSem);
|
// tsem_wait(&pParentSql->emptyRspSem);
|
||||||
|
|
||||||
tsem_post(&pParentSql->rspSem);
|
// tsem_post(&pParentSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
|
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
|
||||||
|
@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||||
freeSubqueryObj(pParentSql);
|
freeSubqueryObj(pParentSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_post(&pParentSql->rspSem);
|
// tsem_post(&pParentSql->rspSem);
|
||||||
} else {
|
} else {
|
||||||
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
|
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
|
||||||
}
|
}
|
||||||
|
@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for all subquery completed
|
// wait for all subquery completed
|
||||||
tsem_wait(&pSql->rspSem);
|
// tsem_wait(&pSql->rspSem);
|
||||||
|
|
||||||
// update the records for each subquery
|
// update the records for each subquery
|
||||||
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
|
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
|
||||||
|
@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
tscProcessSql(pSql);
|
tscProcessSql(pSql);
|
||||||
} else { // first retrieve from vnode during the secondary stage sub-query
|
} else { // first retrieve from vnode during the secondary stage sub-query
|
||||||
if (pParentSql->fp == NULL) {
|
if (pParentSql->fp == NULL) {
|
||||||
tsem_wait(&pParentSql->emptyRspSem);
|
// tsem_post(&pParentSql->rspSem);
|
||||||
tsem_wait(&pParentSql->emptyRspSem);
|
|
||||||
|
|
||||||
tsem_post(&pParentSql->rspSem);
|
|
||||||
} else {
|
} else {
|
||||||
// set the command flag must be after the semaphore been correctly set.
|
// set the command flag must be after the semaphore been correctly set.
|
||||||
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
|
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
|
||||||
|
@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_post(&pSql->emptyRspSem);
|
// tsem_wait(&pSql->rspSem);
|
||||||
tsem_wait(&pSql->rspSem);
|
|
||||||
|
|
||||||
tsem_post(&pSql->emptyRspSem);
|
|
||||||
|
|
||||||
if (pSql->numOfSubs <= 0) {
|
if (pSql->numOfSubs <= 0) {
|
||||||
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||||
|
|
|
@ -40,15 +40,10 @@ void * tscQhandle;
|
||||||
void * tscCheckDiskUsageTmr;
|
void * tscCheckDiskUsageTmr;
|
||||||
int tsInsertHeadSize;
|
int tsInsertHeadSize;
|
||||||
|
|
||||||
extern int tscEmbedded;
|
|
||||||
int tscNumOfThreads;
|
int tscNumOfThreads;
|
||||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
|
||||||
static pthread_mutex_t tscMutex;
|
|
||||||
|
|
||||||
extern int tsTscEnableRecordSql;
|
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
||||||
extern int tsNumOfLogLines;
|
|
||||||
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
|
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
|
||||||
void deltaToUtcInitOnce();
|
|
||||||
|
|
||||||
void tscCheckDiskUsage(void *para, void *unused) {
|
void tscCheckDiskUsage(void *para, void *unused) {
|
||||||
taosGetDisk();
|
taosGetDisk();
|
||||||
|
@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
|
||||||
char secretEncrypt[32] = {0};
|
char secretEncrypt[32] = {0};
|
||||||
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
|
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
|
||||||
|
|
||||||
pthread_mutex_lock(&tscMutex);
|
|
||||||
if (pVnodeConn == NULL) {
|
if (pVnodeConn == NULL) {
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localIp = tsLocalIp;
|
rpcInit.localIp = tsLocalIp;
|
||||||
|
@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
|
||||||
pVnodeConn = rpcOpen(&rpcInit);
|
pVnodeConn = rpcOpen(&rpcInit);
|
||||||
if (pVnodeConn == NULL) {
|
if (pVnodeConn == NULL) {
|
||||||
tscError("failed to init connection to vnode");
|
tscError("failed to init connection to vnode");
|
||||||
pthread_mutex_unlock(&tscMutex);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) {
|
||||||
pTscMgmtConn = rpcOpen(&rpcInit);
|
pTscMgmtConn = rpcOpen(&rpcInit);
|
||||||
if (pTscMgmtConn == NULL) {
|
if (pTscMgmtConn == NULL) {
|
||||||
tscError("failed to init connection to mgmt");
|
tscError("failed to init connection to mgmt");
|
||||||
pthread_mutex_unlock(&tscMutex);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&tscMutex);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +104,7 @@ void taos_init_imp() {
|
||||||
char temp[128];
|
char temp[128];
|
||||||
struct stat dirstat;
|
struct stat dirstat;
|
||||||
|
|
||||||
pthread_mutex_init(&tscMutex, NULL);
|
errno = TSDB_CODE_SUCCESS;
|
||||||
srand(taosGetTimestampSec());
|
srand(taosGetTimestampSec());
|
||||||
deltaToUtcInitOnce();
|
deltaToUtcInitOnce();
|
||||||
|
|
||||||
|
|
|
@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
|
||||||
pSql->freed = 0;
|
pSql->freed = 0;
|
||||||
tscFreeSqlCmdData(pCmd);
|
tscFreeSqlCmdData(pCmd);
|
||||||
|
|
||||||
tscTrace("%p free sqlObj partial completed", pSql);
|
tscTrace("%p partially free sqlObj completed", pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeSqlObj(SSqlObj* pSql) {
|
void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
|
@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
tfree(pCmd->payload);
|
tfree(pCmd->payload);
|
||||||
|
|
||||||
pCmd->allocSize = 0;
|
pCmd->allocSize = 0;
|
||||||
|
|
||||||
tsem_destroy(&pSql->rspSem);
|
|
||||||
free(pSql);
|
free(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) {
|
||||||
taosTmrStopA(&(pObj->pTimer));
|
taosTmrStopA(&(pObj->pTimer));
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
|
|
||||||
|
sem_destroy(&pSql->rspSem);
|
||||||
pthread_mutex_destroy(&pObj->mutex);
|
pthread_mutex_destroy(&pObj->mutex);
|
||||||
|
|
||||||
tscTrace("%p DB connection is closed", pObj);
|
tscTrace("%p DB connection is closed", pObj);
|
||||||
tfree(pObj);
|
tfree(pObj);
|
||||||
}
|
}
|
||||||
|
@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
|
||||||
if (pCmd->payload == NULL) {
|
if (pCmd->payload == NULL) {
|
||||||
assert(pCmd->allocSize == 0);
|
assert(pCmd->allocSize == 0);
|
||||||
|
|
||||||
pCmd->payload = (char*)malloc(size);
|
pCmd->payload = (char*)calloc(1, size);
|
||||||
if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
pCmd->allocSize = size;
|
pCmd->allocSize = size;
|
||||||
memset(pCmd->payload, 0, pCmd->allocSize);
|
|
||||||
} else {
|
} else {
|
||||||
if (pCmd->allocSize < size) {
|
if (pCmd->allocSize < size) {
|
||||||
char* b = realloc(pCmd->payload, size);
|
char* b = realloc(pCmd->payload, size);
|
||||||
|
@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
|
||||||
pCmd->payload = b;
|
pCmd->payload = b;
|
||||||
pCmd->allocSize = size;
|
pCmd->allocSize = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
memset(pCmd->payload, 0, pCmd->payloadLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
//memset(pCmd->payload, 0, pCmd->allocSize);
|
//memset(pCmd->payload, 0, pCmd->allocSize);
|
||||||
|
@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STscObj* pTscObj = pSql->pTscObj;
|
STscObj* pTscObj = pSql->pTscObj;
|
||||||
if (pSql->pStream != NULL || pTscObj->pHb == pSql) {
|
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableMetaInfo->pTableMeta = pTableMeta;
|
pTableMetaInfo->pTableMeta = pTableMeta;
|
||||||
// pTableMetaInfo->pMetricMeta = pMetricMeta;
|
|
||||||
pTableMetaInfo->numOfTags = numOfTags;
|
pTableMetaInfo->numOfTags = numOfTags;
|
||||||
|
|
||||||
if (tags != NULL) {
|
if (tags != NULL) {
|
||||||
|
@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
|
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
|
||||||
tscTrace("%p deref the metric/meter meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
|
tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
|
||||||
|
|
||||||
int32_t index = pQueryInfo->numOfTables;
|
int32_t index = pQueryInfo->numOfTables;
|
||||||
while (index >= 0) {
|
while (index >= 0) {
|
||||||
|
|
|
@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) {
|
||||||
void (*callback)(int) = tharg;
|
void (*callback)(int) = tharg;
|
||||||
|
|
||||||
timer_t timerId;
|
timer_t timerId;
|
||||||
struct sigevent sevent;
|
struct sigevent sevent = {0};
|
||||||
|
|
||||||
#ifdef _ALPINE
|
#ifdef _ALPINE
|
||||||
sevent.sigev_notify = SIGEV_THREAD;
|
sevent.sigev_notify = SIGEV_THREAD;
|
||||||
|
|
|
@ -5148,7 +5148,7 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
// group by normal column, sliding window query, interval query are handled by interval query processor
|
// group by normal column, sliding window query, interval query are handled by interval query processor
|
||||||
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
|
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
|
||||||
tableIntervalProcessor(pQInfo);
|
tableIntervalProcessor(pQInfo);
|
||||||
} else {
|
} else {
|
||||||
if (isFixedOutputQuery(pQuery)) {
|
if (isFixedOutputQuery(pQuery)) {
|
||||||
|
|
|
@ -147,6 +147,11 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
|
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
|
||||||
|
if (pIter->numOfFGroups == 0) {
|
||||||
|
assert(pIter->pFileGroup == NULL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
||||||
void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags);
|
void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
|
|
|
@ -745,13 +745,24 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
|
|
||||||
int32_t tid = pCheckInfo->tableId.tid;
|
int32_t tid = pCheckInfo->tableId.tid;
|
||||||
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
|
|
||||||
|
|
||||||
while (1) {
|
while (pCheckInfo->pFileGroup != NULL) {
|
||||||
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) {
|
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
|
||||||
|
|
||||||
|
// no data block in current file, try next
|
||||||
|
if (pCheckInfo->compIndex[tid].numOfSuperBlocks == 0) {
|
||||||
|
dTrace("QInfo:%p no data block in file, fid:%d, tid:%d, try next", pQueryHandle->qinfo,
|
||||||
|
pCheckInfo->pFileGroup->fileId, tid);
|
||||||
|
|
||||||
|
pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
index = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key);
|
index = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key);
|
||||||
|
|
||||||
if (type == QUERY_RANGE_GREATER_EQUAL) {
|
if (type == QUERY_RANGE_GREATER_EQUAL) {
|
||||||
|
@ -792,6 +803,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
||||||
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
|
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
|
||||||
tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema);
|
tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema);
|
||||||
|
|
||||||
|
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
|
||||||
if (pFile->fd == FD_INITIALIZER) {
|
if (pFile->fd == FD_INITIALIZER) {
|
||||||
pFile->fd = open(pFile->fname, O_RDONLY);
|
pFile->fd = open(pFile->fname, O_RDONLY);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue