feature/qnode

This commit is contained in:
dapan1121 2022-01-12 18:40:22 +08:00
parent 0a281a4f0b
commit 5cf4edacbb
9 changed files with 389 additions and 145 deletions

View File

@ -993,6 +993,13 @@ typedef struct {
uint64_t taskId;
} SSinkDataReq;
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SQueryContinueReq;
typedef struct {
SMsgHead header;

View File

@ -170,6 +170,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)

View File

@ -22,9 +22,18 @@ extern "C" {
#include "trpc.h"
enum {
NODE_TYPE_VNODE = 1,
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
};
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
uint32_t maxResCacheNum;
uint32_t maxTaskNum;
uint32_t maxSchTaskNum;
} SQWorkerCfg;
@ -39,11 +48,17 @@ typedef struct {
uint64_t numOfErrors;
} SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *);
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);

View File

@ -22,6 +22,7 @@ extern "C" {
#include "vnodeInt.h"
#include "qworker.h"
typedef struct SQWorkerMgmt SQHandle;

View File

@ -19,11 +19,22 @@
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processed");
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
vTrace("query message is processing");
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_SCHEDULE_DATA_SINK:
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {

View File

@ -178,8 +178,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
*handle = pTaskInfo->dsHandle;
if (handle) {
*handle = pTaskInfo->dsHandle;
}
while(1) {
st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);

View File

@ -23,7 +23,7 @@ extern "C" {
#include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
#define QWORKER_DEFAULT_TASK_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum {
@ -57,7 +57,6 @@ enum {
QW_ADD_ACQUIRE,
};
typedef struct SQWTaskStatus {
SRWLatch lock;
int32_t code;
@ -67,12 +66,14 @@ typedef struct SQWTaskStatus {
bool drop;
} SQWTaskStatus;
typedef struct SQWorkerTaskHandlesCache {
typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t sinkScheduled;
int8_t queryScheduled;
bool needRsp;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWorkerTaskHandlesCache;
} SQWTaskCtx;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
@ -82,11 +83,15 @@ typedef struct SQWSchStatus {
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
SRWLatch schLock;
SRWLatch resLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
SQWorkerCfg cfg;
int8_t nodeType;
int32_t nodeId;
SRWLatch schLock;
SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj;
putReqToQueryQFp putToQueueFp;
} SQWorkerMgmt;
#define QW_GOT_RES_DATA(data) (true)
@ -95,40 +100,63 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
taosRLockLatch(_lock); \
qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
qDebug("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
if ((*(_lock)) < 0) assert(0); \
assert(atomic_load_32((_lock)) >= 0); \
qDebug("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
assert(atomic_load_32((_lock)) > 0); \
qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt);
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
#ifdef __cplusplus

View File

@ -8,7 +8,7 @@
#include "tname.h"
#include "dataSinkMgt.h"
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t qwValidateStatus(SQWorkerMgmt *mgmt, int8_t oriStatus, int8_t newStatus, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t code = 0;
if (oriStatus == newStatus) {
@ -62,7 +62,7 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
break;
default:
qError("invalid task status:%d", oriStatus);
QW_TASK_ELOG("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
@ -70,22 +70,27 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
_return:
qError("invalid task status, from %d to %d", oriStatus, newStatus);
QW_ERR_RET(code);
QW_TASK_ELOG("invalid task status update from %d to %d", oriStatus, newStatus);
QW_RET(code);
}
int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) {
int32_t qwUpdateTaskInfo(SQWorkerMgmt *mgmt, SQWTaskStatus *task, int8_t type, void *data, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t code = 0;
int8_t origStatus = 0;
switch (type) {
case QW_TASK_INFO_STATUS: {
int8_t newStatus = *(int8_t *)data;
QW_ERR_RET(qwValidateStatus(task->status, newStatus));
QW_ERR_RET(qwValidateStatus(mgmt, task->status, newStatus, QW_IDS()));
origStatus = task->status;
task->status = newStatus;
QW_TASK_DLOG("task status updated from %d to %d", origStatus, newStatus);
break;
}
default:
qError("uknown task info type:%d", type);
QW_TASK_ELOG("unknown task info, type:%d", type);
return TSDB_CODE_QRY_APP_ERROR;
}
@ -96,18 +101,18 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId,
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWorkerTaskHandlesCache resCache = {0};
SQWTaskCtx resCache = {0};
resCache.taskHandle = taskHandle;
resCache.sinkHandle = sinkHandle;
QW_LOCK(QW_WRITE, &mgmt->resLock);
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerTaskHandlesCache))) {
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR;
}
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_SUCCESS;
}
@ -116,7 +121,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId,
SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
QW_SCH_DLOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
@ -126,14 +131,18 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId,
if (0 != code) {
if (!HASH_NODE_EXIST(code)) {
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId);
QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) {
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch)) {
if (code) {
taosHashCleanup(newSch.tasksHash);
}
return TSDB_CODE_SUCCESS;
}
}
@ -141,7 +150,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId,
return TSDB_CODE_SUCCESS;
}
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) {
static int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
if (NULL == (*sch)) {
@ -159,6 +168,14 @@ static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t s
return TSDB_CODE_SUCCESS;
}
static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD);
}
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR);
}
static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock);
}
@ -234,7 +251,7 @@ int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t
static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) {
SQWSchStatus *tsch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD));
QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &tsch));
int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task);
if (code) {
@ -250,14 +267,14 @@ static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_RET(code);
}
static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerTaskHandlesCache **handles) {
static FORCE_INLINE int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
QW_LOCK(rwType, &mgmt->resLock);
*handles = taosHashGet(mgmt->resHash, id, sizeof(id));
QW_LOCK(rwType, &mgmt->ctxLock);
*handles = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*handles)) {
QW_UNLOCK(rwType, &mgmt->resLock);
QW_UNLOCK(rwType, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
@ -265,7 +282,7 @@ static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *m
}
static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->resLock);
QW_UNLOCK(rwType, &mgmt->ctxLock);
}
@ -273,7 +290,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
sch->lastAccessTs = taosGetTimestampSec();
@ -319,7 +336,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) {
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
sch->lastAccessTs = taosGetTimestampSec();
@ -333,12 +350,12 @@ int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock);
_return:
@ -355,7 +372,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint
SQWTaskStatus *task = NULL;
int32_t code = 0;
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
}
@ -376,17 +393,17 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint
}
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD));
QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch));
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
@ -409,10 +426,10 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_
return TSDB_CODE_SUCCESS;
} else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
newStatus = JOB_TASK_STATUS_CANCELLED;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
} else {
newStatus = JOB_TASK_STATUS_CANCELLING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
}
QW_UNLOCK(QW_WRITE, &task->lock);
@ -441,50 +458,60 @@ _return:
QW_RET(code);
}
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
QW_LOCK(QW_WRITE, &mgmt->resLock);
if (mgmt->resHash) {
taosHashRemove(mgmt->resHash, id, sizeof(id));
}
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
qWarn("scheduler %"PRIx64" doesn't exist", sId);
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
if (mgmt->ctxHash) {
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_WLOG("taosHashRemove from ctx hash failed, id:%s", id);
}
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) {
QW_TASK_WLOG("scheduler does not exist, sch:%p", sch);
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) {
if (qwAcquireTask(QW_WRITE, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId);
QW_TASK_WLOG("task does not exist, task:%p", task);
return TSDB_CODE_SUCCESS;
}
taosHashRemove(sch->tasksHash, id, sizeof(id));
QW_TASK_DLOG("drop task, status:%d, code:%x, ready:%d, cancel:%d, drop:%d", task->status, task->code, task->ready, task->cancel, task->drop);
if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove task from hash failed, task:%p", task);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
_return:
qwReleaseTask(QW_WRITE, sch);
qwReleaseScheduler(QW_WRITE, mgmt);
return TSDB_CODE_SUCCESS;
QW_RET(code);
}
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD));
QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch));
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
@ -500,7 +527,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin
if (task->status == JOB_TASK_STATUS_EXECUTING) {
newStatus = JOB_TASK_STATUS_DROPPING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
} else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
@ -512,7 +539,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId));
QW_ERR_RET(qwDropTask(mgmt, sId, qId, tId));
return TSDB_CODE_SUCCESS;
}
@ -743,7 +770,7 @@ int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
@ -785,7 +812,7 @@ int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId,
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
@ -816,7 +843,7 @@ _return:
QW_RET(code);
}
int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) {
int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
@ -824,11 +851,11 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
*needStop = false;
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
@ -836,9 +863,10 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
QW_LOCK(QW_READ, &task->lock);
if ((!task->cancel) && (!task->drop)) {
qError("no cancel or drop, but task:%"PRIx64" exists", taskId);
QW_TASK_ELOG("no cancel or drop but task exists, status:%d", task->status);
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
@ -851,17 +879,21 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(code);
}
if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return qwDropTask(mgmt, sId, queryId, taskId);
QW_RET(qwDropTask(mgmt, sId, qId, tId));
}
_return:
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
@ -875,7 +907,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD);
code = qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch);
if (code) {
qError("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code);
@ -895,7 +927,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock);
}
@ -910,7 +942,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
task->code = errCode;
QW_UNLOCK(QW_WRITE, &task->lock);
}
@ -921,6 +953,86 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS;
}
int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) {
qDebug("query already scheduled");
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_EXECUTING));
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put query continue msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
@ -932,11 +1044,15 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
int32_t dataLength = 0;
SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false;
SQWorkerTaskHandlesCache *handles = NULL;
SQWTaskCtx *handles = NULL;
QW_ERR_JRET(qwAcquireTaskHandles(QW_READ, mgmt, queryId, taskId, &handles));
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, queryId, taskId, &handles));
if (atomic_load_8(&handles->needRsp)) {
qError("last fetch not responsed");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock);
@ -974,15 +1090,15 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) {
rsp->completed = 1;
QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED));
}
// Note: schedule data sink firstly and will schedule query after it's done
if (output.needSchedule) {
//TODO
}
if ((!output.queryEnd) && DS_BUF_LOW == output.bufStatus) {
//TODO
//UPDATE STATUS TO EXECUTING
QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, queryId, taskId, pMsg));
} else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) {
QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, queryId, taskId, pMsg));
}
} else {
if (dataLength < 0) {
@ -991,12 +1107,11 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
}
if (queryEnd) {
QW_ERR_JRET(qwQueryPostProcess(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED, code));
QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED));
} else {
if (task->status != JOB_TASK_STATUS_EXECUTING) {
qError("invalid status %d for fetch without res", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
assert(0 == handles->needRsp);
qDebug("no res data in sink, need response later");
QW_LOCK(QW_WRITE, &handles->lock);
handles->needRsp = true;
@ -1028,7 +1143,12 @@ _return:
QW_RET(code);
}
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) {
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp) {
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
@ -1037,29 +1157,46 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
if (cfg) {
mgmt->cfg = *cfg;
if (0 == mgmt->cfg.maxSchedulerNum) {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
}
if (0 == mgmt->cfg.maxTaskNum) {
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
}
if (0 == mgmt->cfg.maxSchTaskNum) {
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
} else {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
if (NULL == mgmt->schHash) {
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum);
qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == mgmt->resHash) {
mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == mgmt->ctxHash) {
taosHashCleanup(mgmt->schHash);
mgmt->schHash = NULL;
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum);
qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId;
mgmt->nodeObj = nodeObj;
mgmt->putToQueueFp = fp;
*qWorkerMgmt = mgmt;
qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
return TSDB_CODE_SUCCESS;
}
@ -1069,25 +1206,31 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid query msg");
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
code = qStringToSubplan(msg->msg, &plan);
@ -1144,30 +1287,59 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
uint64_t sId, qId, tId;
SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false;
SQWTaskCtx *handles = NULL;
//TODO call executer to continue execute subquery
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to continue execute subquery
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
qTaskInfo_t taskHandle = handles->taskHandle;
DataSinkHandle sinkHandle = handles->sinkHandle;
bool needRsp = handles->needRsp;
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
if (needRsp) {
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
}
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
DataSinkHandle newHandle = NULL;
code = qExecTask(taskHandle, &newHandle);
if (code) {
qError("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if (sinkHandle != newHandle) {
qError("data sink mis-match");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
_return:
if (needRsp) {
code = qwBuildAndSendQueryRsp(pMsg, code);
}
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code);
code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code);
QW_RET(code);
}
int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
@ -1176,8 +1348,9 @@ int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid sink data msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
}
//dsScheduleProcess();
//TODO
return TSDB_CODE_SUCCESS;

View File

@ -42,6 +42,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0;
}
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
return 0;
}
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
@ -258,7 +263,7 @@ TEST(seqTest, normalCase) {
stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt);
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
@ -328,7 +333,7 @@ TEST(seqTest, cancelFirst) {
stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt);
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
@ -402,7 +407,7 @@ TEST(seqTest, randCase) {
srand(time(NULL));
code = qWorkerInit(NULL, &mgmt);
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
int32_t t = 0;
@ -446,7 +451,7 @@ TEST(seqTest, multithreadRand) {
srand(time(NULL));
code = qWorkerInit(NULL, &mgmt);
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
pthread_attr_t thattr;