Merge pull request #13088 from taosdata/feature/qnode
enh: optimize query msg
This commit is contained in:
commit
6b0c9694a3
|
@ -656,6 +656,9 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t sversion;
|
||||
int32_t tversion;
|
||||
} SQueryTableRsp;
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
|
||||
|
|
|
@ -182,8 +182,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||
|
|
|
@ -56,12 +56,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
|||
|
||||
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
@ -72,10 +66,6 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
|||
|
||||
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
void qWorkerDestroy(void **qWorkerMgmt);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -730,11 +730,6 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
|||
taosMemoryFreeClear(pMsgBody);
|
||||
}
|
||||
|
||||
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
|
||||
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP ||
|
||||
msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
|
||||
}
|
||||
|
||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
|
||||
assert(pMsg->info.ahandle != NULL);
|
||||
|
|
|
@ -3507,31 +3507,6 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
|
|||
|
||||
void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskStatus); }
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) {
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, buf, bufLen);
|
||||
|
|
|
@ -101,8 +101,6 @@ SArray *qmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_RES_READY, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASKS_STATUS, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
|
|
|
@ -292,8 +292,6 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutNodeMsgToWriteQueue, 0)== NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_RES_READY, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASKS_STATUS, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -60,21 +60,12 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
|||
case TDMT_VND_FETCH_RSP:
|
||||
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_RES_READY:
|
||||
code = qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_TASKS_STATUS:
|
||||
code = qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_CANCEL_TASK:
|
||||
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_DROP_TASK:
|
||||
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_TABLE_META:
|
||||
// code = vnodeGetTableMeta(pQnode, pMsg);
|
||||
// break;
|
||||
case TDMT_VND_CONSUME:
|
||||
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
|
||||
// break;
|
||||
|
|
|
@ -209,10 +209,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_FETCH_RSP:
|
||||
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_RES_READY:
|
||||
return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_TASKS_STATUS:
|
||||
return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_CANCEL_TASK:
|
||||
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_DROP_TASK:
|
||||
|
|
|
@ -26,7 +26,7 @@ extern "C" {
|
|||
#include "ttimer.h"
|
||||
#include "tref.h"
|
||||
#include "plannodes.h"
|
||||
|
||||
#include "executor.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
|
||||
|
@ -76,6 +76,8 @@ typedef struct SQWDebug {
|
|||
bool dumpEnable;
|
||||
} SQWDebug;
|
||||
|
||||
extern SQWDebug gQWDebug;
|
||||
|
||||
typedef struct SQWMsg {
|
||||
void *node;
|
||||
int32_t code;
|
||||
|
@ -303,9 +305,27 @@ typedef struct SQWorkerMgmt {
|
|||
extern SQWorkerMgmt gQwMgmt;
|
||||
|
||||
static FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); }
|
||||
|
||||
static FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); }
|
||||
|
||||
char *qwPhaseStr(int32_t phase);
|
||||
char *qwBufStatusStr(int32_t bufStatus);
|
||||
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
|
||||
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt);
|
||||
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status);
|
||||
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
|
||||
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
|
||||
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
|
||||
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
|
||||
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
|
||||
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
|
||||
int32_t qwDropTask(QW_FPARAMS_DEF);
|
||||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
|
||||
int32_t qwOpenRef(void);
|
||||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
|
@ -20,7 +20,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "qworkerInt.h"
|
||||
#include "qwInt.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
|
||||
|
@ -36,12 +36,10 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
|
|||
int32_t code);
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
|
||||
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
|
||||
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
|
||||
void qwFreeFetchRsp(void *msg);
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
|
||||
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
|
||||
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
|
|
@ -0,0 +1,128 @@
|
|||
#include "qworker.h"
|
||||
#include "dataSinkMgt.h"
|
||||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
#include "query.h"
|
||||
#include "qwInt.h"
|
||||
#include "qwMsg.h"
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
||||
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
|
||||
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
|
||||
if (!gQWDebug.statusEnable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (oriStatus == newStatus) {
|
||||
if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
|
||||
*ignore = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
switch (oriStatus) {
|
||||
case JOB_TASK_STATUS_NULL:
|
||||
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
|
||||
newStatus != JOB_TASK_STATUS_NOT_START) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_NOT_START:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_EXECUTING:
|
||||
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
|
||||
newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING &&
|
||||
newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
|
||||
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
|
||||
newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_FAILED &&
|
||||
newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_SUCCEED:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING &&
|
||||
newStatus != JOB_TASK_STATUS_FAILED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_FAILED:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
break;
|
||||
|
||||
case JOB_TASK_STATUS_CANCELLING:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_CANCELLED:
|
||||
case JOB_TASK_STATUS_DROPPING:
|
||||
if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
|
||||
if (!gQWDebug.dumpEnable) {
|
||||
return;
|
||||
}
|
||||
|
||||
QW_LOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
/*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
|
||||
|
||||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
sch = (SQWSchStatus *)pIter;
|
||||
qwDbgDumpSchInfo(sch, i);
|
||||
++i;
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
/*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
|
||||
}
|
||||
|
||||
|
|
@ -1,10 +1,10 @@
|
|||
#include "qworkerMsg.h"
|
||||
#include "qwMsg.h"
|
||||
#include "dataSinkMgt.h"
|
||||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
#include "query.h"
|
||||
#include "qworker.h"
|
||||
#include "qworkerInt.h"
|
||||
#include "qwInt.h"
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
@ -43,28 +43,8 @@ void qwFreeFetchRsp(void *msg) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||
SQueryTableRsp rsp = {.code = code};
|
||||
|
||||
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
||||
void * msg = rpcMallocCont(contLen);
|
||||
tSerializeSQueryTableRsp(msg, contLen, &rsp);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.msgType = TDMT_VND_QUERY_RSP,
|
||||
.pCont = msg,
|
||||
.contLen = contLen,
|
||||
.code = code,
|
||||
.info = *pConn,
|
||||
};
|
||||
|
||||
tmsgSendRsp(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
|
||||
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
|
||||
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
if (tbInfo) {
|
||||
strcpy(pRsp->tbFName, tbInfo->tbFName);
|
||||
|
@ -73,13 +53,12 @@ int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo*
|
|||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.msgType = TDMT_VND_RES_READY_RSP,
|
||||
.msgType = TDMT_VND_QUERY_RSP,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
.info = *pConn,
|
||||
};
|
||||
rpcRsp.info.ahandle = NULL;
|
||||
|
||||
tmsgSendRsp(&rpcRsp);
|
||||
|
||||
|
@ -177,76 +156,6 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
int32_t numOfCols = 6;
|
||||
SVShowTablesRsp showRsp = {0};
|
||||
|
||||
// showRsp.showId = 1;
|
||||
showRsp.tableMeta.pSchemas = taosMemoryCalloc(numOfCols, sizeof(SSchema));
|
||||
if (showRsp.tableMeta.pSchemas == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
col_id_t cols = 0;
|
||||
SSchema *pSchema = showRsp.tableMeta.pSchemas;
|
||||
|
||||
const SSchema *s = tGetTbnameColumnSchema();
|
||||
*pSchema = createSchema(s->type, s->bytes, ++cols, "name");
|
||||
pSchema++;
|
||||
|
||||
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created");
|
||||
pSchema++;
|
||||
|
||||
type = TSDB_DATA_TYPE_SMALLINT;
|
||||
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns");
|
||||
pSchema++;
|
||||
|
||||
*pSchema = createSchema(s->type, s->bytes, ++cols, "stable");
|
||||
pSchema++;
|
||||
|
||||
type = TSDB_DATA_TYPE_BIGINT;
|
||||
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid");
|
||||
pSchema++;
|
||||
|
||||
type = TSDB_DATA_TYPE_INT;
|
||||
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId");
|
||||
|
||||
assert(cols == numOfCols);
|
||||
showRsp.tableMeta.numOfColumns = cols;
|
||||
|
||||
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
|
||||
void * pBuf = rpcMallocCont(bufLen);
|
||||
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.info = pMsg->info,
|
||||
.pCont = pBuf,
|
||||
.contLen = bufLen,
|
||||
.code = code,
|
||||
};
|
||||
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchReq) {
|
||||
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
|
||||
int32_t handle = htonl(pFetchReq->id);
|
||||
|
||||
pRsp->numOfRows = 0;
|
||||
SRpcMsg rpcMsg = {
|
||||
.info = pMsg->info,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
||||
SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
|
||||
if (NULL == req) {
|
||||
|
@ -407,65 +316,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SResReadyReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = 0;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
|
||||
|
||||
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||
|
||||
QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
|
||||
|
||||
QW_SCH_TASK_DLOG("processReady end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSchTasksStatusReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid task status msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
msg->sId = htobe64(msg->sId);
|
||||
uint64_t sId = msg->sId;
|
||||
|
||||
SSchedulerStatusRsp *sStatus = NULL;
|
||||
|
||||
// QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
|
||||
|
||||
_return:
|
||||
|
||||
// QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
|
@ -613,22 +463,3 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SVShowTablesReq *pReq = pMsg->pCont;
|
||||
QW_RET(qwBuildAndSendShowRsp(pMsg, code));
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
||||
QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
|
||||
}
|
|
@ -0,0 +1,502 @@
|
|||
#include "qworker.h"
|
||||
#include "dataSinkMgt.h"
|
||||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
#include "query.h"
|
||||
#include "qwInt.h"
|
||||
#include "qwMsg.h"
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
||||
char *qwPhaseStr(int32_t phase) {
|
||||
switch (phase) {
|
||||
case QW_PHASE_PRE_QUERY:
|
||||
return "PRE_QUERY";
|
||||
case QW_PHASE_POST_QUERY:
|
||||
return "POST_QUERY";
|
||||
case QW_PHASE_PRE_FETCH:
|
||||
return "PRE_FETCH";
|
||||
case QW_PHASE_POST_FETCH:
|
||||
return "POST_FETCH";
|
||||
case QW_PHASE_PRE_CQUERY:
|
||||
return "PRE_CQUERY";
|
||||
case QW_PHASE_POST_CQUERY:
|
||||
return "POST_CQUERY";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
char *qwBufStatusStr(int32_t bufStatus) {
|
||||
switch (bufStatus) {
|
||||
case DS_BUF_LOW:
|
||||
return "LOW";
|
||||
case DS_BUF_FULL:
|
||||
return "FULL";
|
||||
case DS_BUF_EMPTY:
|
||||
return "EMPTY";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
|
||||
int32_t code = 0;
|
||||
int8_t origStatus = 0;
|
||||
bool ignore = false;
|
||||
|
||||
while (true) {
|
||||
origStatus = atomic_load_8(&task->status);
|
||||
|
||||
QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
|
||||
if (ignore) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
|
||||
SQWSchStatus newSch = {0};
|
||||
newSch.tasksHash =
|
||||
taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (NULL == newSch.tasksHash) {
|
||||
QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
QW_LOCK(QW_WRITE, &mgmt->schLock);
|
||||
int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
|
||||
if (0 != code) {
|
||||
if (!HASH_NODE_EXIST(code)) {
|
||||
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
|
||||
|
||||
QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
|
||||
taosHashCleanup(newSch.tasksHash);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
taosHashCleanup(newSch.tasksHash);
|
||||
}
|
||||
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
|
||||
while (true) {
|
||||
QW_LOCK(rwType, &mgmt->schLock);
|
||||
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
|
||||
if (NULL == (*sch)) {
|
||||
QW_UNLOCK(rwType, &mgmt->schLock);
|
||||
|
||||
if (QW_NOT_EXIST_ADD == nOpt) {
|
||||
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
|
||||
|
||||
nOpt = QW_NOT_EXIST_RET_ERR;
|
||||
|
||||
continue;
|
||||
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
|
||||
QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
|
||||
} else {
|
||||
QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
|
||||
}
|
||||
|
||||
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
|
||||
}
|
||||
|
||||
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
|
||||
|
||||
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
QW_LOCK(rwType, &sch->tasksLock);
|
||||
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
|
||||
if (NULL == (*task)) {
|
||||
QW_UNLOCK(rwType, &sch->tasksLock);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
|
||||
int32_t code = 0;
|
||||
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
SQWTaskStatus ntask = {0};
|
||||
ntask.status = status;
|
||||
ntask.refId = rId;
|
||||
|
||||
QW_LOCK(QW_WRITE, &sch->tasksLock);
|
||||
code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
|
||||
if (0 != code) {
|
||||
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
if (rwType && task) {
|
||||
QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
|
||||
} else {
|
||||
QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
|
||||
}
|
||||
} else {
|
||||
QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
|
||||
|
||||
QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));
|
||||
|
||||
if (rwType && task) {
|
||||
QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
|
||||
SQWSchStatus *tsch = NULL;
|
||||
int32_t code = 0;
|
||||
QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
|
||||
|
||||
QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
|
||||
|
||||
_return:
|
||||
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
|
||||
SQWTaskStatus **task) {
|
||||
return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
|
||||
}
|
||||
|
||||
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
|
||||
|
||||
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == (*ctx)) {
|
||||
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == (*ctx)) {
|
||||
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
SQWTaskCtx nctx = {0};
|
||||
|
||||
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
|
||||
if (0 != code) {
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
if (acquire && ctx) {
|
||||
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
|
||||
} else if (ctx) {
|
||||
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
||||
} else {
|
||||
QW_TASK_ELOG_E("task ctx already exist");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
|
||||
}
|
||||
} else {
|
||||
QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
if (acquire && ctx) {
|
||||
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
|
||||
} else if (ctx) {
|
||||
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
|
||||
|
||||
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
|
||||
|
||||
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
|
||||
|
||||
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
||||
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
||||
qDestroyTask(otaskHandle);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
int32_t code = 0;
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
|
||||
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
|
||||
code = qAsyncKillTask(taskHandle);
|
||||
atomic_store_ptr(&ctx->taskHandle, taskHandle);
|
||||
}
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
|
||||
ctx->ctrlConnInfo.handle = NULL;
|
||||
ctx->ctrlConnInfo.refId = -1;
|
||||
|
||||
// NO need to release dataConnInfo
|
||||
|
||||
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
|
||||
|
||||
if (ctx->sinkHandle) {
|
||||
dsDestroyDataSinker(ctx->sinkHandle);
|
||||
ctx->sinkHandle = NULL;
|
||||
}
|
||||
|
||||
if (ctx->plan) {
|
||||
nodesDestroyNode(ctx->plan);
|
||||
ctx->plan = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
SQWTaskCtx octx;
|
||||
|
||||
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == ctx) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
octx = *ctx;
|
||||
|
||||
atomic_store_ptr(&ctx->taskHandle, NULL);
|
||||
atomic_store_ptr(&ctx->sinkHandle, NULL);
|
||||
atomic_store_ptr(&ctx->plan, NULL);
|
||||
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
|
||||
|
||||
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
|
||||
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
qwFreeTask(QW_FPARAMS(), &octx);
|
||||
|
||||
QW_TASK_DLOG_E("task ctx dropped");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
|
||||
QW_TASK_WLOG_E("scheduler does not exist");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
|
||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||
|
||||
QW_TASK_WLOG_E("task does not exist");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
|
||||
QW_TASK_ELOG_E("taosHashRemove task from hash failed");
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
QW_TASK_DLOG_E("task status dropped");
|
||||
|
||||
_return:
|
||||
|
||||
if (task) {
|
||||
qwReleaseTaskStatus(QW_WRITE, sch);
|
||||
}
|
||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
|
||||
QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
|
||||
|
||||
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
|
||||
|
||||
_return:
|
||||
|
||||
if (task) {
|
||||
qwReleaseTaskStatus(QW_READ, sch);
|
||||
}
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwDropTask(QW_FPARAMS_DEF) {
|
||||
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
|
||||
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
|
||||
|
||||
QW_TASK_DLOG_E("task is dropped");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||
int32_t paramIdx = 0;
|
||||
int32_t newParamIdx = 0;
|
||||
|
||||
while (true) {
|
||||
paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
|
||||
if (paramIdx == tListLen(gQwMgmt.param)) {
|
||||
newParamIdx = 0;
|
||||
} else {
|
||||
newParamIdx = paramIdx + 1;
|
||||
}
|
||||
|
||||
if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
|
||||
gQwMgmt.param[paramIdx].refId = refId;
|
||||
|
||||
*pParam = &gQwMgmt.param[paramIdx];
|
||||
}
|
||||
|
||||
|
||||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
|
||||
|
||||
if (dbFName[0] && tbName[0]) {
|
||||
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
|
||||
} else {
|
||||
ctx->tbInfo.tbFName[0] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void qwCloseRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||
taosCloseRef(gQwMgmt.qwRef);
|
||||
gQwMgmt.qwRef = -1;
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
}
|
||||
|
||||
|
||||
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
|
||||
|
||||
void qwDestroyImpl(void *pMgmt) {
|
||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||
|
||||
taosTmrStopA(&mgmt->hbTimer);
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
// TODO STOP ALL QUERY
|
||||
|
||||
// TODO FREE ALL
|
||||
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
||||
qwDestroySchStatus(sch);
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
}
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
|
||||
taosMemoryFree(mgmt);
|
||||
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
|
||||
qwCloseRef();
|
||||
}
|
||||
|
||||
int32_t qwOpenRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (gQwMgmt.qwRef < 0) {
|
||||
gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl);
|
||||
if (gQwMgmt.qwRef < 0) {
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
qError("init qworker ref failed");
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
@ -3,528 +3,43 @@
|
|||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
#include "query.h"
|
||||
#include "qworkerInt.h"
|
||||
#include "qworkerMsg.h"
|
||||
#include "qwInt.h"
|
||||
#include "qwMsg.h"
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
||||
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
|
||||
SQWorkerMgmt gQwMgmt = {
|
||||
.lock = 0,
|
||||
.qwRef = -1,
|
||||
.qwNum = 0,
|
||||
};
|
||||
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
|
||||
if (!gQWDebug.statusEnable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t code = 0;
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
|
||||
|
||||
QW_LOCK(QW_WRITE, &sch->hbConnLock);
|
||||
|
||||
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
|
||||
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
|
||||
sch->hbConnInfo.handle = NULL;
|
||||
sch->hbConnInfo.ahandle = NULL;
|
||||
|
||||
QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
|
||||
} else {
|
||||
QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (oriStatus == newStatus) {
|
||||
if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
|
||||
*ignore = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
switch (oriStatus) {
|
||||
case JOB_TASK_STATUS_NULL:
|
||||
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
|
||||
newStatus != JOB_TASK_STATUS_NOT_START) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_NOT_START:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_EXECUTING:
|
||||
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
|
||||
newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING &&
|
||||
newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
|
||||
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
|
||||
newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_FAILED &&
|
||||
newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_SUCCEED:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING &&
|
||||
newStatus != JOB_TASK_STATUS_FAILED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_FAILED:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
break;
|
||||
|
||||
case JOB_TASK_STATUS_CANCELLING:
|
||||
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_CANCELLED:
|
||||
case JOB_TASK_STATUS_DROPPING:
|
||||
if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus));
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {}
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
|
||||
if (!gQWDebug.dumpEnable) {
|
||||
return;
|
||||
}
|
||||
|
||||
QW_LOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
/*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
|
||||
|
||||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
sch = (SQWSchStatus *)pIter;
|
||||
qwDbgDumpSchInfo(sch, i);
|
||||
++i;
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
/*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
|
||||
}
|
||||
|
||||
char *qwPhaseStr(int32_t phase) {
|
||||
switch (phase) {
|
||||
case QW_PHASE_PRE_QUERY:
|
||||
return "PRE_QUERY";
|
||||
case QW_PHASE_POST_QUERY:
|
||||
return "POST_QUERY";
|
||||
case QW_PHASE_PRE_FETCH:
|
||||
return "PRE_FETCH";
|
||||
case QW_PHASE_POST_FETCH:
|
||||
return "POST_FETCH";
|
||||
case QW_PHASE_PRE_CQUERY:
|
||||
return "PRE_CQUERY";
|
||||
case QW_PHASE_POST_CQUERY:
|
||||
return "POST_CQUERY";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
char *qwBufStatusStr(int32_t bufStatus) {
|
||||
switch (bufStatus) {
|
||||
case DS_BUF_LOW:
|
||||
return "LOW";
|
||||
case DS_BUF_FULL:
|
||||
return "FULL";
|
||||
case DS_BUF_EMPTY:
|
||||
return "EMPTY";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
|
||||
int32_t code = 0;
|
||||
int8_t origStatus = 0;
|
||||
bool ignore = false;
|
||||
|
||||
while (true) {
|
||||
origStatus = atomic_load_8(&task->status);
|
||||
|
||||
QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
|
||||
if (ignore) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
|
||||
SQWSchStatus newSch = {0};
|
||||
newSch.tasksHash =
|
||||
taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (NULL == newSch.tasksHash) {
|
||||
QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
QW_LOCK(QW_WRITE, &mgmt->schLock);
|
||||
int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
|
||||
if (0 != code) {
|
||||
if (!HASH_NODE_EXIST(code)) {
|
||||
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
|
||||
|
||||
QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
|
||||
taosHashCleanup(newSch.tasksHash);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
taosHashCleanup(newSch.tasksHash);
|
||||
}
|
||||
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
|
||||
while (true) {
|
||||
QW_LOCK(rwType, &mgmt->schLock);
|
||||
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
|
||||
if (NULL == (*sch)) {
|
||||
QW_UNLOCK(rwType, &mgmt->schLock);
|
||||
|
||||
if (QW_NOT_EXIST_ADD == nOpt) {
|
||||
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
|
||||
|
||||
nOpt = QW_NOT_EXIST_RET_ERR;
|
||||
|
||||
continue;
|
||||
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
|
||||
QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
|
||||
} else {
|
||||
QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
|
||||
}
|
||||
|
||||
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
||||
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
|
||||
}
|
||||
|
||||
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
|
||||
|
||||
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
QW_LOCK(rwType, &sch->tasksLock);
|
||||
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
|
||||
if (NULL == (*task)) {
|
||||
QW_UNLOCK(rwType, &sch->tasksLock);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
|
||||
int32_t code = 0;
|
||||
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
SQWTaskStatus ntask = {0};
|
||||
ntask.status = status;
|
||||
ntask.refId = rId;
|
||||
|
||||
QW_LOCK(QW_WRITE, &sch->tasksLock);
|
||||
code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
|
||||
if (0 != code) {
|
||||
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
if (rwType && task) {
|
||||
QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
|
||||
} else {
|
||||
QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
|
||||
}
|
||||
} else {
|
||||
QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
|
||||
|
||||
QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));
|
||||
|
||||
if (rwType && task) {
|
||||
QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
|
||||
SQWSchStatus *tsch = NULL;
|
||||
int32_t code = 0;
|
||||
QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
|
||||
|
||||
QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
|
||||
|
||||
_return:
|
||||
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
|
||||
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
|
||||
SQWTaskStatus **task) {
|
||||
return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
|
||||
}
|
||||
|
||||
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
|
||||
|
||||
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == (*ctx)) {
|
||||
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == (*ctx)) {
|
||||
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
SQWTaskCtx nctx = {0};
|
||||
|
||||
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
|
||||
if (0 != code) {
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
if (acquire && ctx) {
|
||||
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
|
||||
} else if (ctx) {
|
||||
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
||||
} else {
|
||||
QW_TASK_ELOG_E("task ctx already exist");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
|
||||
}
|
||||
} else {
|
||||
QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
if (acquire && ctx) {
|
||||
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
|
||||
} else if (ctx) {
|
||||
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
|
||||
|
||||
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
|
||||
|
||||
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
|
||||
|
||||
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
||||
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
||||
qDestroyTask(otaskHandle);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
int32_t code = 0;
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
|
||||
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
|
||||
code = qAsyncKillTask(taskHandle);
|
||||
atomic_store_ptr(&ctx->taskHandle, taskHandle);
|
||||
}
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
|
||||
ctx->ctrlConnInfo.handle = NULL;
|
||||
ctx->ctrlConnInfo.refId = -1;
|
||||
|
||||
// NO need to release dataConnInfo
|
||||
|
||||
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
|
||||
|
||||
if (ctx->sinkHandle) {
|
||||
dsDestroyDataSinker(ctx->sinkHandle);
|
||||
ctx->sinkHandle = NULL;
|
||||
}
|
||||
|
||||
if (ctx->plan) {
|
||||
nodesDestroyNode(ctx->plan);
|
||||
ctx->plan = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
SQWTaskCtx octx;
|
||||
|
||||
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||
if (NULL == ctx) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
octx = *ctx;
|
||||
|
||||
atomic_store_ptr(&ctx->taskHandle, NULL);
|
||||
atomic_store_ptr(&ctx->sinkHandle, NULL);
|
||||
atomic_store_ptr(&ctx->plan, NULL);
|
||||
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
|
||||
|
||||
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
|
||||
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
}
|
||||
|
||||
qwFreeTask(QW_FPARAMS(), &octx);
|
||||
|
||||
QW_TASK_DLOG_E("task ctx dropped");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
|
||||
QW_TASK_WLOG_E("scheduler does not exist");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
|
||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||
|
||||
QW_TASK_WLOG_E("task does not exist");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
|
||||
QW_TASK_ELOG_E("taosHashRemove task from hash failed");
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
QW_TASK_DLOG_E("task status dropped");
|
||||
|
||||
_return:
|
||||
|
||||
if (task) {
|
||||
qwReleaseTaskStatus(QW_WRITE, sch);
|
||||
}
|
||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
|
||||
QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
|
||||
|
||||
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
|
||||
|
||||
_return:
|
||||
|
||||
if (task) {
|
||||
qwReleaseTaskStatus(QW_READ, sch);
|
||||
}
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwDropTask(QW_FPARAMS_DEF) {
|
||||
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
|
||||
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
|
||||
|
||||
QW_TASK_DLOG_E("task is dropped");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
|
@ -722,23 +237,9 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
|||
}
|
||||
|
||||
|
||||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
|
||||
|
||||
if (dbFName[0] && tbName[0]) {
|
||||
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
|
||||
} else {
|
||||
ctx->tbInfo.tbFName[0] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
||||
int32_t code = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
SRpcHandleInfo *dropConnection = NULL;
|
||||
SRpcHandleInfo *cancelConnection = NULL;
|
||||
|
||||
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
|
||||
|
@ -771,12 +272,10 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
}
|
||||
|
||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
dropConnection = &ctx->ctrlConnInfo;
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
dropConnection = NULL;
|
||||
|
||||
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
break;
|
||||
|
@ -809,12 +308,10 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
}
|
||||
|
||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
dropConnection = &ctx->ctrlConnInfo;
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
dropConnection = NULL;
|
||||
|
||||
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
}
|
||||
|
@ -839,11 +336,6 @@ _return:
|
|||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
}
|
||||
|
||||
if (dropConnection) {
|
||||
qwBuildAndSendDropRsp(dropConnection, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
|
||||
}
|
||||
|
||||
if (cancelConnection) {
|
||||
qwBuildAndSendCancelRsp(cancelConnection, code);
|
||||
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
|
||||
|
@ -862,7 +354,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
|||
int32_t code = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
SRpcHandleInfo connInfo = {0};
|
||||
SRpcHandleInfo *readyConnection = NULL;
|
||||
SRpcHandleInfo *rspConnection = NULL;
|
||||
|
||||
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
|
||||
|
||||
|
@ -883,7 +375,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
|||
}
|
||||
#else
|
||||
connInfo = ctx->ctrlConnInfo;
|
||||
readyConnection = &connInfo;
|
||||
rspConnection = &connInfo;
|
||||
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
||||
#endif
|
||||
|
@ -895,8 +387,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
|||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
|
@ -916,9 +408,9 @@ _return:
|
|||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
|
||||
}
|
||||
|
||||
if (readyConnection) {
|
||||
qwBuildAndSendReadyRsp(readyConnection, code, ctx ? &ctx->tbInfo : NULL);
|
||||
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
|
||||
if (rspConnection) {
|
||||
qwBuildAndSendQueryRsp(rspConnection, code, ctx ? &ctx->tbInfo : NULL);
|
||||
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
|
||||
}
|
||||
|
||||
if (ctx) {
|
||||
|
@ -1009,69 +501,6 @@ _return:
|
|||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||
int32_t code = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
int8_t phase = 0;
|
||||
bool needRsp = true;
|
||||
|
||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
QW_TASK_WLOG_E("task is dropping or already dropped");
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
}
|
||||
|
||||
if (ctx->phase == QW_PHASE_PRE_QUERY) {
|
||||
ctx->ctrlConnInfo = qwMsg->connInfo;
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
|
||||
needRsp = false;
|
||||
QW_TASK_DLOG_E("ready msg will not rsp now");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
||||
|
||||
if (atomic_load_8((int8_t *)&ctx->queryEnd) || atomic_load_8((int8_t *)&ctx->queryFetched)) {
|
||||
QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8((int8_t *)&ctx->queryEnd),
|
||||
atomic_load_8((int8_t *)&ctx->queryFetched));
|
||||
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
|
||||
}
|
||||
|
||||
if (ctx->phase == QW_PHASE_POST_QUERY) {
|
||||
code = ctx->rspCode;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
|
||||
|
||||
_return:
|
||||
|
||||
if (code && ctx) {
|
||||
QW_UPDATE_RSP_CODE(ctx, code);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
|
||||
}
|
||||
|
||||
if (ctx) {
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
}
|
||||
|
||||
if (needRsp) {
|
||||
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code, NULL);
|
||||
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
}
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -1245,11 +674,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
|
||||
} else if (ctx->phase > 0) {
|
||||
if (0 == qwMsg->code) {
|
||||
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
}
|
||||
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
rsped = true;
|
||||
} else {
|
||||
|
@ -1280,37 +704,6 @@ _return:
|
|||
qwReleaseTaskCtx(mgmt, ctx);
|
||||
}
|
||||
|
||||
if ((TSDB_CODE_SUCCESS != code) && (0 == qwMsg->code)) {
|
||||
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
|
||||
}
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||
int32_t code = 0;
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
|
||||
|
||||
QW_LOCK(QW_WRITE, &sch->hbConnLock);
|
||||
|
||||
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
|
||||
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
|
||||
sch->hbConnInfo.handle = NULL;
|
||||
sch->hbConnInfo.ahandle = NULL;
|
||||
|
||||
QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
|
||||
} else {
|
||||
QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
|
||||
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -1441,81 +834,6 @@ _return:
|
|||
qwRelease(refId);
|
||||
}
|
||||
|
||||
void qwCloseRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||
taosCloseRef(gQwMgmt.qwRef);
|
||||
gQwMgmt.qwRef = -1;
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
}
|
||||
|
||||
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
|
||||
|
||||
void qwDestroyImpl(void *pMgmt) {
|
||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||
|
||||
taosTmrStopA(&mgmt->hbTimer);
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
// TODO STOP ALL QUERY
|
||||
|
||||
// TODO FREE ALL
|
||||
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
||||
qwDestroySchStatus(sch);
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
}
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
|
||||
taosMemoryFree(mgmt);
|
||||
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
|
||||
qwCloseRef();
|
||||
}
|
||||
|
||||
int32_t qwOpenRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (gQwMgmt.qwRef < 0) {
|
||||
gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl);
|
||||
if (gQwMgmt.qwRef < 0) {
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
qError("init qworker ref failed");
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||
int32_t paramIdx = 0;
|
||||
int32_t newParamIdx = 0;
|
||||
|
||||
while (true) {
|
||||
paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
|
||||
if (paramIdx == tListLen(gQwMgmt.param)) {
|
||||
newParamIdx = 0;
|
||||
} else {
|
||||
newParamIdx = paramIdx + 1;
|
||||
}
|
||||
|
||||
if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
|
||||
gQwMgmt.param[paramIdx].refId = refId;
|
||||
|
||||
*pParam = &gQwMgmt.param[paramIdx];
|
||||
}
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
|
||||
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
|
||||
|
@ -1632,146 +950,4 @@ void qWorkerDestroy(void **qWorkerMgmt) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
|
||||
/*
|
||||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
|
||||
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
|
||||
|
||||
sch->lastAccessTs = taosGetTimestampSec();
|
||||
|
||||
QW_LOCK(QW_READ, &sch->tasksLock);
|
||||
|
||||
taskNum = taosHashGetSize(sch->tasksHash);
|
||||
|
||||
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
|
||||
*rsp = taosMemoryCalloc(1, size);
|
||||
if (NULL == *rsp) {
|
||||
QW_SCH_ELOG("calloc %d failed", size);
|
||||
QW_UNLOCK(QW_READ, &sch->tasksLock);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
|
||||
void *pIter = taosHashIterate(sch->tasksHash, NULL);
|
||||
while (pIter) {
|
||||
SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
|
||||
taosHashGetKey(pIter, &key, &keyLen);
|
||||
|
||||
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
|
||||
(*rsp)->status[i].status = taskStatus->status;
|
||||
|
||||
++i;
|
||||
pIter = taosHashIterate(sch->tasksHash, pIter);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_READ, &sch->tasksLock);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
(*rsp)->num = taskNum;
|
||||
*/
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
/*
|
||||
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
|
||||
|
||||
sch->lastAccessTs = taosGetTimestampSec();
|
||||
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
*/
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
/*
|
||||
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
|
||||
*taskStatus = JOB_TASK_STATUS_NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
*taskStatus = JOB_TASK_STATUS_NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*taskStatus = task->status;
|
||||
|
||||
qwReleaseTask(QW_READ, sch);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
*/
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
|
||||
SQWSchStatus *sch = NULL;
|
||||
SQWTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
/*
|
||||
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
|
||||
|
||||
QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
|
||||
|
||||
|
||||
QW_LOCK(QW_WRITE, &task->lock);
|
||||
|
||||
task->cancel = true;
|
||||
|
||||
int8_t oriStatus = task->status;
|
||||
int8_t newStatus = 0;
|
||||
|
||||
if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status ==
|
||||
JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) { QW_UNLOCK(QW_WRITE, &task->lock);
|
||||
|
||||
qwReleaseTask(QW_READ, sch);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status ==
|
||||
JOB_TASK_STATUS_PARTIAL_SUCCEED) { QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); } else {
|
||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_WRITE, &task->lock);
|
||||
|
||||
qwReleaseTask(QW_READ, sch);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
|
||||
if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
|
||||
//TODO call executer to cancel subquery async
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (task) {
|
||||
QW_UNLOCK(QW_WRITE, &task->lock);
|
||||
|
||||
qwReleaseTask(QW_READ, sch);
|
||||
}
|
||||
|
||||
if (sch) {
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
}
|
||||
*/
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
|
|
@ -127,15 +127,6 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
|
|||
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
|
||||
}
|
||||
|
||||
void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
|
||||
readyMsg->sId = htobe64(1);
|
||||
readyMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
|
||||
readyMsg->taskId = htobe64(1);
|
||||
readyRpc->msgType = TDMT_VND_RES_READY;
|
||||
readyRpc->pCont = readyMsg;
|
||||
readyRpc->contLen = sizeof(SResReadyReq);
|
||||
}
|
||||
|
||||
void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
|
||||
fetchMsg->sId = htobe64(1);
|
||||
fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
|
||||
|
@ -154,13 +145,6 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
|
|||
dropRpc->contLen = sizeof(STaskDropReq);
|
||||
}
|
||||
|
||||
void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
|
||||
statusMsg->sId = htobe64(1);
|
||||
statusRpc->pCont = statusMsg;
|
||||
statusRpc->contLen = sizeof(SSchTasksStatusReq);
|
||||
statusRpc->msgType = TDMT_VND_TASKS_STATUS;
|
||||
}
|
||||
|
||||
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
|
||||
*subplan = (SSubplan *)0x1;
|
||||
return 0;
|
||||
|
@ -222,10 +206,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
|||
case TDMT_VND_QUERY_RSP: {
|
||||
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
|
||||
|
||||
if (0 == pRsp->code) {
|
||||
qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc);
|
||||
qwtPutReqToFetchQueue((void *)0x1, &qwtreadyRpc);
|
||||
} else {
|
||||
if (pRsp->code) {
|
||||
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
|
||||
}
|
||||
|
@ -233,19 +214,6 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
|||
rpcFreeCont(rsp);
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_RES_READY_RSP: {
|
||||
SResReadyRsp *rsp = (SResReadyRsp *)pRsp->pCont;
|
||||
|
||||
if (0 == pRsp->code) {
|
||||
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
|
||||
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
|
||||
} else {
|
||||
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
|
||||
}
|
||||
rpcFreeCont(rsp);
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_FETCH_RSP: {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
|
||||
|
||||
|
@ -679,28 +647,6 @@ void *queryThread(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *readyThread(void *param) {
|
||||
SRpcMsg readyRpc = {0};
|
||||
int32_t code = 0;
|
||||
uint32_t n = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
SResReadyReq readyMsg = {0};
|
||||
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
|
||||
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
|
||||
if (qwtTestEnableSleep) {
|
||||
taosUsleep(taosRand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("ready:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *fetchThread(void *param) {
|
||||
SRpcMsg fetchRpc = {0};
|
||||
int32_t code = 0;
|
||||
|
@ -745,29 +691,6 @@ void *dropThread(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *statusThread(void *param) {
|
||||
SRpcMsg statusRpc = {0};
|
||||
int32_t code = 0;
|
||||
uint32_t n = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
void *mgmt = param;
|
||||
SSchTasksStatusReq statusMsg = {0};
|
||||
|
||||
while (!qwtTestStop) {
|
||||
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
if (qwtTestEnableSleep) {
|
||||
taosUsleep(taosRand()%5);
|
||||
}
|
||||
if (++n % qwtTestPrintNum == 0) {
|
||||
printf("status:%d\n", n);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void *qwtclientThread(void *param) {
|
||||
int32_t code = 0;
|
||||
uint32_t n = 0;
|
||||
|
@ -894,12 +817,6 @@ void *fetchQueueThread(void *param) {
|
|||
case TDMT_VND_FETCH:
|
||||
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc);
|
||||
break;
|
||||
case TDMT_VND_RES_READY:
|
||||
qWorkerProcessReadyMsg(mockPointer, mgmt, fetchRpc);
|
||||
break;
|
||||
case TDMT_VND_TASKS_STATUS:
|
||||
qWorkerProcessStatusMsg(mockPointer, mgmt, fetchRpc);
|
||||
break;
|
||||
case TDMT_VND_CANCEL_TASK:
|
||||
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc);
|
||||
break;
|
||||
|
@ -934,15 +851,12 @@ TEST(seqTest, normalCase) {
|
|||
int32_t code = 0;
|
||||
void *mockPointer = (void *)0x1;
|
||||
SRpcMsg queryRpc = {0};
|
||||
SRpcMsg readyRpc = {0};
|
||||
SRpcMsg fetchRpc = {0};
|
||||
SRpcMsg dropRpc = {0};
|
||||
SRpcMsg statusRpc = {0};
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
qwtBuildQueryReqMsg(&queryRpc);
|
||||
qwtBuildReadyReqMsg(&qwtreadyMsg, &readyRpc);
|
||||
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
|
||||
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
|
||||
|
||||
|
@ -976,10 +890,6 @@ TEST(seqTest, normalCase) {
|
|||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qWorkerDestroy(&mgmt);
|
||||
}
|
||||
|
||||
|
@ -989,13 +899,11 @@ TEST(seqTest, cancelFirst) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
SRpcMsg queryRpc = {0};
|
||||
SRpcMsg dropRpc = {0};
|
||||
SRpcMsg statusRpc = {0};
|
||||
|
||||
qwtInitLogFile();
|
||||
|
||||
qwtBuildQueryReqMsg(&queryRpc);
|
||||
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
|
||||
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
||||
|
||||
stubSetStringToPlan();
|
||||
stubSetRpcSendResponse();
|
||||
|
@ -1006,24 +914,12 @@ TEST(seqTest, cancelFirst) {
|
|||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||
ASSERT_TRUE(0 != code);
|
||||
|
||||
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
qWorkerDestroy(&mgmt);
|
||||
}
|
||||
|
||||
|
@ -1087,9 +983,6 @@ TEST(seqTest, randCase) {
|
|||
}
|
||||
} else if (r >= maxr * 4/5 && r < maxr-1) {
|
||||
printf("Status,%d\n", t++);
|
||||
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
|
||||
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
|
||||
ASSERT_EQ(code, 0);
|
||||
if (qwtTestEnableSleep) {
|
||||
taosUsleep(1);
|
||||
}
|
||||
|
@ -1137,7 +1030,6 @@ TEST(seqTest, multithreadRand) {
|
|||
//taosThreadCreate(&(t2), &thattr, readyThread, NULL);
|
||||
taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
|
||||
taosThreadCreate(&(t4), &thattr, dropThread, NULL);
|
||||
taosThreadCreate(&(t5), &thattr, statusThread, NULL);
|
||||
taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt);
|
||||
|
||||
while (true) {
|
||||
|
|
|
@ -297,7 +297,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx);
|
|||
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
|
||||
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
|
||||
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp);
|
||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
|
||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
|
||||
void schProcessOnDataFetched(SSchJob *job);
|
||||
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||
|
|
|
@ -1067,7 +1067,7 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
|
||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
||||
if (rsp->tbFName[0]) {
|
||||
if (NULL == pJob->queryRes) {
|
||||
pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
||||
|
|
|
@ -31,7 +31,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
|||
case TDMT_VND_EXPLAIN_RSP:
|
||||
return TSDB_CODE_SUCCESS;
|
||||
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
|
||||
if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
|
||||
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
||||
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
||||
TMSG_INFO(msgType));
|
||||
}
|
||||
|
@ -41,22 +41,6 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
|||
TMSG_INFO(msgType));
|
||||
}
|
||||
|
||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
case TDMT_VND_RES_READY_RSP:
|
||||
reqMsgType = TDMT_VND_QUERY;
|
||||
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
||||
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s",
|
||||
(lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
|
||||
TMSG_INFO(msgType));
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
case TDMT_VND_FETCH_RSP:
|
||||
|
@ -231,24 +215,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
break;
|
||||
}
|
||||
case TDMT_VND_QUERY_RSP: {
|
||||
SQueryTableRsp rsp = {0};
|
||||
if (msg) {
|
||||
SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
|
||||
SCH_ERR_JRET(rsp.code);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
||||
if (NULL == msg) {
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
// SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
|
||||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_RES_READY_RSP: {
|
||||
SResReadyRsp *rsp = (SResReadyRsp *)msg;
|
||||
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
if (NULL == msg) {
|
||||
|
@ -429,10 +396,6 @@ int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code)
|
|||
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
|
||||
}
|
||||
|
@ -518,9 +481,6 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
case TDMT_VND_QUERY:
|
||||
*fp = schHandleQueryCallback;
|
||||
break;
|
||||
case TDMT_VND_RES_READY:
|
||||
*fp = schHandleReadyCallback;
|
||||
break;
|
||||
case TDMT_VND_EXPLAIN:
|
||||
*fp = schHandleExplainCallback;
|
||||
break;
|
||||
|
@ -933,7 +893,6 @@ _return:
|
|||
|
||||
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *pReadyMsgSendInfo = NULL;
|
||||
SMsgSendInfo *pExplainMsgSendInfo = NULL;
|
||||
|
||||
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
|
@ -942,18 +901,10 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
|
||||
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
|
||||
|
||||
int32_t msgType = TDMT_VND_RES_READY_RSP;
|
||||
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
||||
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msgType = TDMT_VND_EXPLAIN_RSP;
|
||||
ctxVal.val = pExplainMsgSendInfo;
|
||||
int32_t msgType = TDMT_VND_EXPLAIN_RSP;
|
||||
SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
||||
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -968,11 +919,6 @@ _return:
|
|||
|
||||
taosHashCleanup(pCtx->args);
|
||||
|
||||
if (pReadyMsgSendInfo) {
|
||||
taosMemoryFreeClear(pReadyMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pReadyMsgSendInfo);
|
||||
}
|
||||
|
||||
if (pExplainMsgSendInfo) {
|
||||
taosMemoryFreeClear(pExplainMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pExplainMsgSendInfo);
|
||||
|
@ -1128,24 +1074,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
persistHandle = true;
|
||||
break;
|
||||
}
|
||||
|
||||
case TDMT_VND_RES_READY: {
|
||||
msgSize = sizeof(SResReadyReq);
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SResReadyReq *pMsg = msg;
|
||||
|
||||
pMsg->header.vgId = htonl(addr->nodeId);
|
||||
|
||||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(pJob->queryId);
|
||||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_FETCH: {
|
||||
msgSize = sizeof(SResFetchReq);
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
|
|
|
@ -542,27 +542,6 @@ void* schtRunJobThread(void *aa) {
|
|||
pIter = taosHashIterate(execTasks, pIter);
|
||||
}
|
||||
|
||||
|
||||
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
|
||||
param->refId = queryJobRefId;
|
||||
param->queryId = pJob->queryId;
|
||||
|
||||
pIter = taosHashIterate(execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = (SSchTask *)pIter;
|
||||
|
||||
param->taskId = task->taskId;
|
||||
SResReadyRsp rsp = {0};
|
||||
dataBuf.pData = &rsp;
|
||||
dataBuf.len = sizeof(rsp);
|
||||
|
||||
code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
|
||||
assert(code == 0 || code);
|
||||
|
||||
pIter = taosHashIterate(execTasks, pIter);
|
||||
}
|
||||
|
||||
|
||||
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
|
||||
param->refId = queryJobRefId;
|
||||
param->queryId = pJob->queryId;
|
||||
|
@ -583,25 +562,6 @@ void* schtRunJobThread(void *aa) {
|
|||
}
|
||||
|
||||
|
||||
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
|
||||
param->refId = queryJobRefId;
|
||||
param->queryId = pJob->queryId;
|
||||
|
||||
pIter = taosHashIterate(execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = (SSchTask *)pIter;
|
||||
|
||||
param->taskId = task->taskId - 1;
|
||||
SResReadyRsp rsp = {0};
|
||||
dataBuf.pData = &rsp;
|
||||
dataBuf.len = sizeof(rsp);
|
||||
|
||||
code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0);
|
||||
assert(code == 0 || code);
|
||||
|
||||
pIter = taosHashIterate(execTasks, pIter);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (queryDone) {
|
||||
break;
|
||||
|
@ -701,17 +661,6 @@ TEST(queryTest, normalCase) {
|
|||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SResReadyRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
printf("code:%d", code);
|
||||
ASSERT_EQ(code, 0);
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
@ -723,17 +672,6 @@ TEST(queryTest, normalCase) {
|
|||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SResReadyRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (queryDone) {
|
||||
break;
|
||||
|
@ -804,19 +742,8 @@ TEST(queryTest, readyFirstCase) {
|
|||
|
||||
|
||||
SSchJob *pJob = schAcquireJob(job);
|
||||
|
||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SResReadyRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
printf("code:%d", code);
|
||||
ASSERT_EQ(code, 0);
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
|
@ -827,17 +754,6 @@ TEST(queryTest, readyFirstCase) {
|
|||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SResReadyRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
@ -942,10 +858,6 @@ TEST(queryTest, flowCtrlCase) {
|
|||
SQueryTableRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
|
||||
ASSERT_EQ(code, 0);
|
||||
} else if (task->lastMsgType == TDMT_VND_RES_READY) {
|
||||
SResReadyRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
ASSERT_EQ(code, 0);
|
||||
} else {
|
||||
qDone = true;
|
||||
|
|
Loading…
Reference in New Issue