fix(query): prepare test env

This commit is contained in:
Haojun Liao 2022-06-02 17:52:51 +08:00
parent 343f912ddd
commit ef6b0f942f
4 changed files with 41 additions and 96 deletions

View File

@ -328,12 +328,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq // --- mq
void hbMgrInitMqHbRspHandle(); void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res);
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery); void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);

View File

@ -443,10 +443,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code; return pRequest->code;
} }
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
return getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
}
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) {
int32_t code = 0; int32_t code = 0;
SArray* pArray = NULL; SArray* pArray = NULL;
@ -556,8 +552,9 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
if (TSDB_CODE_SUCCESS == code) { int32_t code = 0;
switch (pQuery->execMode) { switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL: case QUERY_EXEC_MODE_LOCAL:
code = execLocalCmd(pRequest, pQuery); code = execLocalCmd(pRequest, pQuery);
@ -580,7 +577,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
default: default:
break; break;
} }
}
if (!keepQuery) { if (!keepQuery) {
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
@ -1069,15 +1065,12 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
return pResultInfo->row; return pResultInfo->row;
} }
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
SRequestObj* pRequest = (SRequestObj*) param; SSyncQueryParam* pParam = param;
tsem_post(&pParam->sem);
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
// return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
assert(pRequest != NULL); assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
@ -1089,34 +1082,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
} }
SSyncQueryParam* pParam = pRequest->body.param; SSyncQueryParam* pParam = pRequest->body.param;
// always converted in async query: convertUcs4
taos_fetch_rows_a(pRequest, syncFetchFn, pParam); taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem);
} }
/*
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
}
pRequest->code =
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
}
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
if (pResultInfo->numOfRows == 0) {
return NULL;
}
*/
if (setupOneRowPtr) { if (setupOneRowPtr) {
doSetOneRowPtr(pResultInfo); doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1; pResultInfo->current += 1;

View File

@ -180,7 +180,6 @@ static void syncQueryFn(void* param, void* res, int32_t code) {
pParam->pRequest = res; pParam->pRequest = res;
pParam->pRequest->code = code; pParam->pRequest->code = code;
printf("ready to go in query rsp---------------\n");
tsem_post(&pParam->sem); tsem_post(&pParam->sem);
} }
@ -205,8 +204,6 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param); taos_query_a(pTscObj, sql, syncQueryFn, param);
printf("start to waiting\n");
tsem_wait(&param->sem); tsem_wait(&param->sem);
return param->pRequest; return param->pRequest;
@ -755,7 +752,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
} }
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true); pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;

View File

@ -703,7 +703,6 @@ TEST(testCase, projection_query_tables) {
// taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, projection_query_stables) { TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -733,7 +732,6 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn); taos_close(pConn);
} }
#if 0
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
@ -774,41 +772,26 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
/*
--- copy the following script in the shell to setup the environment.
create database test;
use test;
create table m1(ts timestamp, k int) tags(a int);
create table tm0 using m1 tags(1);
create table tm1 using m1 tags(2);
insert into tm0 values('2021-1-1 1:1:1.120', 1) ('2021-1-1 1:1:2.9', 2) tm1 values('2021-1-1 1:1:1.120', 11) ('2021-1-1 1:1:2.99', 22);
*/
TEST(testCase, async_api_test) { TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
taos_query_a(pConn, "create table abc1.txx(ts timestamp, k int)", queryCallback1, pConn); taos_query_a(pConn, "select ts from test.m1", queryCallback, pConn);
getchar(); getchar();
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// int32_t n = 0;
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t* length = taos_fetch_lengths(pRes);
// for(int32_t i = 0; i < numOfFields; ++i) {
// printf("(%d):%d " , i, length[i]);
// }
// printf("\n");
//
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// memset(str, 0, sizeof(str));
// }
//
// taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
#pragma GCC diagnostic pop #pragma GCC diagnostic pop