feature/scheduler
This commit is contained in:
parent
732a0b2c6b
commit
a510b9ea7f
|
@ -1363,6 +1363,8 @@ typedef struct SVCreateTbReq {
|
||||||
} SVCreateTbReq, SVUpdateTbReq;
|
} SVCreateTbReq, SVUpdateTbReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
int32_t code;
|
||||||
|
SName tableName;
|
||||||
} SVCreateTbRsp, SVUpdateTbRsp;
|
} SVCreateTbRsp, SVUpdateTbRsp;
|
||||||
|
|
||||||
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
||||||
|
|
|
@ -36,9 +36,8 @@ extern "C" {
|
||||||
|
|
||||||
#define CHECK_CODE_GOTO(expr, label) \
|
#define CHECK_CODE_GOTO(expr, label) \
|
||||||
do { \
|
do { \
|
||||||
int32_t code = expr; \
|
code = expr; \
|
||||||
if (TSDB_CODE_SUCCESS != code) { \
|
if (TSDB_CODE_SUCCESS != code) { \
|
||||||
terrno = code; \
|
|
||||||
goto label; \
|
goto label; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
|
@ -245,9 +245,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
|
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
||||||
|
|
||||||
|
@ -256,13 +256,12 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
} else {
|
} else {
|
||||||
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
|
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
|
||||||
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
|
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
|
||||||
pRequest->code = terrno;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,7 +486,6 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
assert(pRequest->self == pSendInfo->requestObjRefId);
|
assert(pRequest->self == pSendInfo->requestObjRefId);
|
||||||
|
|
||||||
pRequest->metric.rsp = taosGetTimestampMs();
|
pRequest->metric.rsp = taosGetTimestampMs();
|
||||||
pRequest->code = pMsg->code;
|
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
if (pEpSet) {
|
if (pEpSet) {
|
||||||
|
|
|
@ -88,12 +88,14 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SSchema *pTagSchema;
|
SSchema *pTagSchema;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
int msgLen = 0;
|
int msgLen = 0;
|
||||||
int32_t code = TSDB_CODE_VND_APP_ERROR;
|
int32_t code = 0;
|
||||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
int32_t rspLen = 0;
|
||||||
|
void *pRsp = NULL;
|
||||||
|
|
||||||
STableInfoReq infoReq = {0};
|
STableInfoReq infoReq = {0};
|
||||||
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,22 +163,22 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
memcpy(POINTER_SHIFT(metaRsp.pSchemas, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols);
|
memcpy(POINTER_SHIFT(metaRsp.pSchemas, sizeof(SSchema) * pSW->nCols), pTagSchema, sizeof(SSchema) * nTagCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
|
|
||||||
|
_exit:
|
||||||
|
|
||||||
|
rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
|
||||||
if (rspLen < 0) {
|
if (rspLen < 0) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pRsp = rpcMallocCont(rspLen);
|
pRsp = rpcMallocCont(rspLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
|
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
|
||||||
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
|
|
||||||
tFreeSTableMetaRsp(&metaRsp);
|
tFreeSTableMetaRsp(&metaRsp);
|
||||||
if (pSW != NULL) {
|
if (pSW != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue