[td-11818] fix error in fetch data.
This commit is contained in:
parent
5245783825
commit
3e40fa3427
|
@ -150,7 +150,7 @@ struct SQueryNode;
|
||||||
* @param requestId
|
* @param requestId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId);
|
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId);
|
||||||
|
|
||||||
// Set datasource of this subplan, multiple calls may be made to a subplan.
|
// Set datasource of this subplan, multiple calls may be made to a subplan.
|
||||||
// @subplan subplan to be schedule
|
// @subplan subplan to be schedule
|
||||||
|
|
|
@ -20,8 +20,10 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "planner.h"
|
|
||||||
#include "catalog.h"
|
#include "catalog.h"
|
||||||
|
#include "planner.h"
|
||||||
|
|
||||||
|
struct SSchJob;
|
||||||
|
|
||||||
typedef struct SSchedulerCfg {
|
typedef struct SSchedulerCfg {
|
||||||
uint32_t maxJobNum;
|
uint32_t maxJobNum;
|
||||||
|
@ -65,7 +67,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
|
||||||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes);
|
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the query job, generated according to the query physical plan.
|
* Process the query job, generated according to the query physical plan.
|
||||||
|
@ -73,7 +75,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
|
||||||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob);
|
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch query result from the remote query executor
|
* Fetch query result from the remote query executor
|
||||||
|
@ -81,7 +83,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
|
||||||
* @param data
|
* @param data
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleFetchRows(void *pJob, void **data);
|
int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -113,6 +113,7 @@ typedef struct SRequestSendRecvBody {
|
||||||
tsem_t rspSem; // not used now
|
tsem_t rspSem; // not used now
|
||||||
void* fp;
|
void* fp;
|
||||||
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
|
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
|
||||||
|
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
|
||||||
SDataBuf requestMsg;
|
SDataBuf requestMsg;
|
||||||
SReqResultInfo resInfo;
|
SReqResultInfo resInfo;
|
||||||
} SRequestSendRecvBody;
|
} SRequestSendRecvBody;
|
||||||
|
@ -129,7 +130,7 @@ typedef struct SRequestObj {
|
||||||
char *msgBuf;
|
char *msgBuf;
|
||||||
void *pInfo; // sql parse info, generated by parser module
|
void *pInfo; // sql parse info, generated by parser module
|
||||||
int32_t code;
|
int32_t code;
|
||||||
uint64_t affectedRows;
|
uint64_t affectedRows; // todo remove it
|
||||||
SQueryExecMetric metric;
|
SQueryExecMetric metric;
|
||||||
SRequestSendRecvBody body;
|
SRequestSendRecvBody body;
|
||||||
} SRequestObj;
|
} SRequestObj;
|
||||||
|
@ -161,12 +162,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str);
|
||||||
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
|
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
|
||||||
|
|
||||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
||||||
|
|
||||||
void initMsgHandleFp();
|
void initMsgHandleFp();
|
||||||
|
|
||||||
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
|
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
|
||||||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
|
|
||||||
|
|
||||||
void *doFetchRow(SRequestObj* pRequest);
|
void *doFetchRow(SRequestObj* pRequest);
|
||||||
|
|
||||||
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
|
|
||||||
|
#include "../../libs/scheduler/inc/schedulerInt.h"
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
|
#include "parser.h"
|
||||||
|
#include "planner.h"
|
||||||
|
#include "scheduler.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tep.h"
|
#include "tep.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -8,9 +12,6 @@
|
||||||
#include "tnote.h"
|
#include "tnote.h"
|
||||||
#include "tpagedfile.h"
|
#include "tpagedfile.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "parser.h"
|
|
||||||
#include "planner.h"
|
|
||||||
#include "scheduler.h"
|
|
||||||
|
|
||||||
#define CHECK_CODE_GOTO(expr, lable) \
|
#define CHECK_CODE_GOTO(expr, lable) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -57,6 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
|
||||||
}
|
}
|
||||||
|
|
||||||
static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
|
static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
|
||||||
|
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
|
||||||
|
|
||||||
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
|
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
|
||||||
if (taos_init() != TSDB_CODE_SUCCESS) {
|
if (taos_init() != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -198,36 +200,48 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
|
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
|
||||||
pRequest->type = pQueryNode->type;
|
pRequest->type = pQueryNode->type;
|
||||||
|
|
||||||
SSchema *pSchema = NULL;
|
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
|
int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
|
||||||
int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &pResInfo->numOfCols, pRequest->requestId);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueryNode->type == TSDB_SQL_SELECT) {
|
if (pQueryNode->type == TSDB_SQL_SELECT) {
|
||||||
pResInfo->fields = calloc(1, sizeof(TAOS_FIELD));
|
SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);
|
||||||
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
|
||||||
pResInfo->fields[i].bytes = pSchema[i].bytes;
|
SSubplan* pPlan = taosArrayGetP(pa, 0);
|
||||||
pResInfo->fields[i].type = pSchema[i].type;
|
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
|
||||||
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
|
setResSchemaInfo(pResInfo, pDataBlockSchema);
|
||||||
}
|
pRequest->type = TDMT_VND_QUERY;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
|
||||||
|
assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);
|
||||||
|
|
||||||
|
pResInfo->numOfCols = pDataBlockSchema->numOfCols;
|
||||||
|
pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
||||||
|
SSchema* pSchema = &pDataBlockSchema->pSchema[i];
|
||||||
|
pResInfo->fields[i].bytes = pSchema->bytes;
|
||||||
|
pResInfo->fields[i].type = pSchema->type;
|
||||||
|
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
|
||||||
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
|
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
|
|
||||||
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res);
|
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// handle error and retry
|
// handle error and retry
|
||||||
} else {
|
} else {
|
||||||
if (*pJob != NULL) {
|
if (pRequest->body.pQueryJob != NULL) {
|
||||||
scheduleFreeJob(*pJob);
|
scheduleFreeJob(pRequest->body.pQueryJob);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,7 +249,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
||||||
return res.code;
|
return res.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob);
|
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
|
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
|
||||||
|
@ -312,7 +326,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
SRequestObj *pRequest = NULL;
|
SRequestObj *pRequest = NULL;
|
||||||
SQueryNode *pQuery = NULL;
|
SQueryNode *pQuery = NULL;
|
||||||
SQueryDag *pDag = NULL;
|
SQueryDag *pDag = NULL;
|
||||||
void *pJob = NULL;
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||||
|
@ -322,9 +335,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
||||||
} else {
|
} else {
|
||||||
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
|
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
|
||||||
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
|
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return);
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
return pRequest;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -333,6 +345,7 @@ _return:
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -531,7 +544,10 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||||
if (pRequest->type == TDMT_MND_SHOW) {
|
if (pRequest->type == TDMT_VND_QUERY) {
|
||||||
|
pRequest->type = TDMT_VND_FETCH;
|
||||||
|
scheduleFetchRows(pRequest->body.pQueryJob, &pRequest->body.resInfo.pData);
|
||||||
|
} else if (pRequest->type == TDMT_MND_SHOW) {
|
||||||
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
|
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
|
||||||
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
|
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
|
||||||
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
||||||
|
|
|
@ -568,7 +568,7 @@ TEST(testCase, projection_query_tables) {
|
||||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
|
|
||||||
// pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)");
|
// pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)");
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
//
|
//
|
||||||
// pRes = taos_query(pConn, "create table tu using m1 tags(1)");
|
// pRes = taos_query(pConn, "create table tu using m1 tags(1)");
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
|
|
@ -89,7 +89,7 @@ static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
|
||||||
|
|
||||||
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
|
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
|
||||||
dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
|
dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
|
||||||
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
|
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
|
||||||
if (NULL == dataBlockSchema->pSchema) {
|
if (NULL == dataBlockSchema->pSchema) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -306,8 +306,6 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
case QNODE_TABLESCAN:
|
case QNODE_TABLESCAN:
|
||||||
node = createTableScanNode(pCxt, pPlanNode);
|
node = createTableScanNode(pCxt, pPlanNode);
|
||||||
break;
|
break;
|
||||||
case QNODE_PROJECT:
|
|
||||||
// node = create
|
|
||||||
case QNODE_MODIFY:
|
case QNODE_MODIFY:
|
||||||
// Insert is not an operator in a physical plan.
|
// Insert is not an operator in a physical plan.
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -56,7 +56,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
|
||||||
tfree(pDag);
|
tfree(pDag);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId) {
|
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {
|
||||||
SQueryPlanNode* pLogicPlan;
|
SQueryPlanNode* pLogicPlan;
|
||||||
int32_t code = createQueryPlan(pNode, &pLogicPlan);
|
int32_t code = createQueryPlan(pNode, &pLogicPlan);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
@ -70,15 +70,6 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
|
||||||
printf("%s\n", str);
|
printf("%s\n", str);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfOutput = pLogicPlan->numOfExpr;
|
|
||||||
*pSchema = calloc(numOfOutput, sizeof(SSchema));
|
|
||||||
*numOfResCols = numOfOutput;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
SExprInfo* pExprInfo = taosArrayGetP(pLogicPlan->pExpr, i);
|
|
||||||
memcpy(&(*pSchema)[i], pExprInfo->pExpr->pSchema, sizeof(SSchema));
|
|
||||||
}
|
|
||||||
|
|
||||||
code = optimizeQueryPlan(pLogicPlan);
|
code = optimizeQueryPlan(pLogicPlan);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
destroyQueryPlan(pLogicPlan);
|
destroyQueryPlan(pLogicPlan);
|
||||||
|
|
|
@ -65,7 +65,7 @@ protected:
|
||||||
SSchema *schema = NULL;
|
SSchema *schema = NULL;
|
||||||
uint32_t numOfOutput = 0;
|
uint32_t numOfOutput = 0;
|
||||||
|
|
||||||
code = qCreateQueryDag(query, &dag, &schema, &numOfOutput, requestId);
|
code = qCreateQueryDag(query, &dag, requestId);
|
||||||
dag_.reset(dag);
|
dag_.reset(dag);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,6 @@ typedef struct SSchLevel {
|
||||||
SArray *subTasks; // Element is SQueryTask
|
SArray *subTasks; // Element is SQueryTask
|
||||||
} SSchLevel;
|
} SSchLevel;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SSchTask {
|
typedef struct SSchTask {
|
||||||
uint64_t taskId; // task id
|
uint64_t taskId; // task id
|
||||||
SSchLevel *level; // level
|
SSchLevel *level; // level
|
||||||
|
|
|
@ -606,8 +606,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
|
||||||
case TDMT_VND_QUERY_RSP: {
|
case TDMT_VND_QUERY_RSP: {
|
||||||
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
|
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
|
||||||
|
|
||||||
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
|
if (rspCode != TSDB_CODE_SUCCESS || rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
|
||||||
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
|
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
|
||||||
} else {
|
} else {
|
||||||
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
|
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -986,7 +986,7 @@ void schDropJobAllTasks(SSchJob *job) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
|
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
|
||||||
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
|
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
|
||||||
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
|
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
|
||||||
}
|
}
|
||||||
|
@ -1092,7 +1092,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
|
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
|
||||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -1107,7 +1107,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
|
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
|
||||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -1116,53 +1116,51 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleFetchRows(void *pJob, void **data) {
|
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
|
||||||
if (NULL == pJob || NULL == data) {
|
if (NULL == pJob || NULL == pData) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchJob *job = pJob;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
|
||||||
if (!SCH_JOB_NEED_FETCH(&job->attr)) {
|
|
||||||
qError("no need to fetch data");
|
qError("no need to fetch data");
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (job->status == JOB_TASK_STATUS_FAILED) {
|
if (pJob->status == JOB_TASK_STATUS_FAILED) {
|
||||||
job->res = NULL;
|
pJob->res = NULL;
|
||||||
SCH_RET(atomic_load_32(&job->errCode));
|
SCH_RET(atomic_load_32(&pJob->errCode));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (job->status == JOB_TASK_STATUS_SUCCEED) {
|
if (pJob->status == JOB_TASK_STATUS_SUCCEED) {
|
||||||
job->res = NULL;
|
pJob->res = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
|
if (atomic_val_compare_exchange_32(&pJob->userFetch, 0, 1) != 0) {
|
||||||
qError("prior fetching not finished");
|
qError("prior fetching not finished");
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
if (pJob->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||||
SCH_ERR_JRET(schFetchFromRemote(job));
|
SCH_ERR_JRET(schFetchFromRemote(pJob));
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_wait(&job->rspSem);
|
tsem_wait(&pJob->rspSem);
|
||||||
|
|
||||||
if (job->status == JOB_TASK_STATUS_FAILED) {
|
if (pJob->status == JOB_TASK_STATUS_FAILED) {
|
||||||
code = atomic_load_32(&job->errCode);
|
code = atomic_load_32(&pJob->errCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
|
if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) {
|
||||||
job->status = JOB_TASK_STATUS_SUCCEED;
|
pJob->status = JOB_TASK_STATUS_SUCCEED;
|
||||||
}
|
}
|
||||||
|
|
||||||
*data = job->res;
|
*pData = pJob->res;
|
||||||
job->res = NULL;
|
pJob->res = NULL;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
atomic_val_compare_exchange_32(&job->userFetch, 1, 0);
|
atomic_val_compare_exchange_32(&pJob->userFetch, 1, 0);
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue