feature/scheduler

This commit is contained in:
dapan1121 2022-03-16 16:43:17 +08:00
parent cb0d52d13c
commit 24d12c6087
5 changed files with 205 additions and 412 deletions

View File

@ -58,10 +58,10 @@ enum {
}; };
typedef struct SCtgDebug { typedef struct SCtgDebug {
bool lockDebug; bool lockEnable;
bool cacheDebug; bool cacheEnable;
bool apiDebug; bool apiEnable;
bool metaDebug; bool metaEnable;
uint32_t showCachePeriodSec; uint32_t showCachePeriodSec;
} SCtgDebug; } SCtgDebug;
@ -242,9 +242,9 @@ typedef struct SCtgAction {
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheEnable) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_API_DEBUG(...) do { if (gCTGDebug.apiEnable) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

View File

@ -55,25 +55,25 @@ SCtgAction gCtgAction[CTG_ACT_MAX] = {{
int32_t ctgDbgEnableDebug(char *option) { int32_t ctgDbgEnableDebug(char *option) {
if (0 == strcasecmp(option, "lock")) { if (0 == strcasecmp(option, "lock")) {
gCTGDebug.lockDebug = true; gCTGDebug.lockEnable = true;
qDebug("lock debug enabled"); qDebug("lock debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "cache")) { if (0 == strcasecmp(option, "cache")) {
gCTGDebug.cacheDebug = true; gCTGDebug.cacheEnable = true;
qDebug("cache debug enabled"); qDebug("cache debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "api")) { if (0 == strcasecmp(option, "api")) {
gCTGDebug.apiDebug = true; gCTGDebug.apiEnable = true;
qDebug("api debug enabled"); qDebug("api debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 == strcasecmp(option, "meta")) { if (0 == strcasecmp(option, "meta")) {
gCTGDebug.metaDebug = true; gCTGDebug.metaEnable = true;
qDebug("api debug enabled"); qDebug("api debug enabled");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -155,7 +155,7 @@ int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
} }
void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) { void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
if (!gCTGDebug.metaDebug) { if (!gCTGDebug.metaEnable) {
return; return;
} }
@ -177,7 +177,7 @@ void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
} }
void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
if (NULL == dbHash || !gCTGDebug.cacheDebug) { if (NULL == dbHash || !gCTGDebug.cacheEnable) {
return; return;
} }
@ -217,7 +217,7 @@ void ctgDbgShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
void ctgDbgShowClusterCache(SCatalog* pCtg) { void ctgDbgShowClusterCache(SCatalog* pCtg) {
if (!gCTGDebug.cacheDebug || NULL == pCtg) { if (!gCTGDebug.cacheEnable || NULL == pCtg) {
return; return;
} }

View File

@ -59,23 +59,15 @@ enum {
QW_WRITE, QW_WRITE,
}; };
enum {
QW_EXIST_ACQUIRE = 1,
QW_EXIST_RET_ERR,
};
enum { enum {
QW_NOT_EXIST_RET_ERR = 1, QW_NOT_EXIST_RET_ERR = 1,
QW_NOT_EXIST_ADD, QW_NOT_EXIST_ADD,
}; };
enum {
QW_ADD_RET_ERR = 1,
QW_ADD_ACQUIRE,
};
typedef struct SQWDebug { typedef struct SQWDebug {
int32_t lockDebug; bool lockEnable;
bool statusEnable;
} SQWDebug; } SQWDebug;
typedef struct SQWMsg { typedef struct SQWMsg {
@ -91,14 +83,10 @@ typedef struct SQWHbInfo {
} SQWHbInfo; } SQWHbInfo;
typedef struct SQWPhaseInput { typedef struct SQWPhaseInput {
int8_t taskStatus;
int8_t taskType;
int32_t code; int32_t code;
} SQWPhaseInput; } SQWPhaseInput;
typedef struct SQWPhaseOutput { typedef struct SQWPhaseOutput {
int32_t rspCode;
bool needStop;
} SQWPhaseOutput; } SQWPhaseOutput;
@ -118,7 +106,6 @@ typedef struct SQWTaskCtx {
void *cancelConnection; void *cancelConnection;
bool emptyRes; bool emptyRes;
bool multiExec;
int8_t queryContinue; int8_t queryContinue;
int8_t queryInQueue; int8_t queryInQueue;
int32_t rspCode; int32_t rspCode;
@ -198,7 +185,7 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

View File

@ -9,33 +9,21 @@
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
SQWDebug gQWDebug = {0}; SQWDebug gQWDebug = {.statusEnable = true};
char *qwPhaseStr(int32_t phase) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
switch (phase) { if (!gQWDebug.statusEnable) {
case QW_PHASE_PRE_QUERY: return TSDB_CODE_SUCCESS;
return "PRE_QUERY";
case QW_PHASE_POST_QUERY:
return "POST_QUERY";
case QW_PHASE_PRE_FETCH:
return "PRE_FETCH";
case QW_PHASE_POST_FETCH:
return "POST_FETCH";
case QW_PHASE_PRE_CQUERY:
return "PRE_CQUERY";
case QW_PHASE_POST_CQUERY:
return "POST_CQUERY";
default:
break;
} }
return "UNKNOWN";
}
int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
*ignore = true;
return TSDB_CODE_SUCCESS;
}
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
@ -105,14 +93,55 @@ _return:
QW_RET(code); QW_RET(code);
} }
char *qwPhaseStr(int32_t phase) {
switch (phase) {
case QW_PHASE_PRE_QUERY:
return "PRE_QUERY";
case QW_PHASE_POST_QUERY:
return "POST_QUERY";
case QW_PHASE_PRE_FETCH:
return "PRE_FETCH";
case QW_PHASE_POST_FETCH:
return "POST_FETCH";
case QW_PHASE_PRE_CQUERY:
return "PRE_CQUERY";
case QW_PHASE_POST_CQUERY:
return "POST_CQUERY";
default:
break;
}
return "UNKNOWN";
}
char *qwBufStatusStr(int32_t bufStatus) {
switch (bufStatus) {
case DS_BUF_LOW:
return "LOW";
case DS_BUF_FULL:
return "FULL";
case DS_BUF_EMPTY:
return "EMPTY";
default:
break;
}
return "UNKNOWN";
}
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
int32_t code = 0; int32_t code = 0;
int8_t origStatus = 0; int8_t origStatus = 0;
bool ignore = false;
while (true) { while (true) {
origStatus = atomic_load_8(&task->status); origStatus = atomic_load_8(&task->status);
QW_ERR_RET(qwValidateStatus(QW_FPARAMS(), origStatus, status)); QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore));
if (ignore) {
break;
}
if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) { if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
continue; continue;
@ -337,18 +366,12 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx);
} }
int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), false, ctx);
}
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) {
//QW_UNLOCK(rwType, &mgmt->ctxLock);
taosHashRelease(mgmt->ctxHash, ctx); taosHashRelease(mgmt->ctxHash, ctx);
} }
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
// RC WARNING // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
@ -357,7 +380,7 @@ void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = 0; int32_t code = 0;
// RC WARNING // Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
code = qAsyncKillTask(taskHandle); code = qAsyncKillTask(taskHandle);
@ -378,8 +401,7 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
} }
// Note: NEED CTX HASH LOCKED BEFORE ENTRANCE int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) {
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
SQWTaskCtx octx; SQWTaskCtx octx;
@ -396,29 +418,18 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF, int32_t rwType) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
if (rwType) {
QW_UNLOCK(rwType, &ctx->lock);
}
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} }
if (octx.taskHandle) { qwFreeTask(QW_FPARAMS(), &octx);
qDestroyTask(octx.taskHandle);
}
if (octx.sinkHandle) {
dsDestroyDataSinker(octx.sinkHandle);
}
QW_TASK_DLOG_E("task ctx dropped"); QW_TASK_DLOG_E("task ctx dropped");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
@ -472,6 +483,13 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwDropTask(QW_FPARAMS_DEF) {
QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
return TSDB_CODE_SUCCESS;
}
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
@ -487,14 +505,15 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
code = qExecTask(*taskHandle, &pRes, &useconds); code = qExecTask(*taskHandle, &pRes, &useconds);
if (code) { if (code) {
QW_TASK_ELOG("qExecTask failed, code:%s", tstrerror(code)); QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_RET(code);
} }
++execNum; ++execNum;
if (NULL == pRes) { if (NULL == pRes) {
QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds); QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds); dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == ctx->taskType) { if (TASK_TYPE_TEMP == ctx->taskType) {
@ -513,8 +532,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
SInputData inputData = {.pData = pRes}; SInputData inputData = {.pData = pRes};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) { if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
@ -532,8 +551,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
} }
} }
_return:
QW_RET(code); QW_RET(code);
} }
@ -589,10 +606,9 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
int32_t code = 0; int32_t code = 0;
if (ctx->emptyRes) { if (ctx->emptyRes) {
QW_TASK_DLOG("query empty result, query end, phase:%d", ctx->phase); QW_TASK_DLOG_E("query end with empty result");
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; *rspMsg = rsp;
@ -613,14 +629,13 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
if (queryEnd) { if (queryEnd) {
code = dsGetDataBlock(ctx->sinkHandle, pOutput); code = dsGetDataBlock(ctx->sinkHandle, pOutput);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase); QW_TASK_DLOG_E("no data in sink and query end");
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; *rspMsg = rsp;
*dataLen = 0; *dataLen = 0;
@ -630,25 +645,21 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
pOutput->bufStatus = DS_BUF_EMPTY; pOutput->bufStatus = DS_BUF_EMPTY;
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// Got data from sink // Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
*dataLen = len; *dataLen = len;
QW_TASK_DLOG("task got data in sink, dataLength:%d", len);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; *rspMsg = rsp;
pOutput->pData = rsp->data; pOutput->pData = rsp->data;
code = dsGetDataBlock(ctx->sinkHandle, pOutput); code = dsGetDataBlock(ctx->sinkHandle, pOutput);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
} }
@ -662,9 +673,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
bool locked = false;
void *dropConnection = NULL; void *dropConnection = NULL;
void *cancelConnection = NULL; void *cancelConnection = NULL;
@ -677,193 +686,92 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (QW_PHASE_PRE_FETCH != phase) {
atomic_store_8(&ctx->phase, phase);
}
switch (phase) { switch (phase) {
case QW_PHASE_PRE_QUERY: { case QW_PHASE_PRE_QUERY: {
atomic_store_8(&ctx->phase, phase); if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
QW_TASK_ELOG("task already cancelled/dropped at wrong phase %s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_SET_RSP_CODE(ctx, output->rspCode);
dropConnection = ctx->dropConnection; dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
// Note: ctx freed, no need to unlock it
locked = false;
break;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection;
break; break;
} }
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
if (!output->needStop) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
}
break; break;
} }
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("drop event at wrong phase %s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_ELOG("cancel event at wrong phase %s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION;
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase)); QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
} }
break; break;
} }
case QW_PHASE_PRE_CQUERY: { case QW_PHASE_PRE_CQUERY: {
atomic_store_8(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
output->needStop = true;
QW_SET_RSP_CODE(ctx, output->rspCode);
dropConnection = ctx->dropConnection; dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
// Note: ctx freed, no need to unlock it
locked = false;
QW_ERR_JRET(output->rspCode);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection;
QW_ERR_JRET(output->rspCode);
} }
break;
}
default:
QW_TASK_ELOG("invalid phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (ctx->rspCode) { if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true; QW_ERR_JRET(ctx->rspCode);
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
} }
_return: _return:
if (ctx) { if (ctx) {
if (output->rspCode) { QW_UPDATE_RSP_CODE(ctx, code);
QW_UPDATE_RSP_CODE(ctx, output->rspCode);
}
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
}
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (code) {
output->needStop = true;
if (TSDB_CODE_SUCCESS == output->rspCode) {
output->rspCode = code;
}
}
if (dropConnection) { if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, output->rspCode); qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
} }
if (cancelConnection) { if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
QW_RET(code); QW_RET(code);
} }
@ -871,40 +779,22 @@ _return:
int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
bool locked = false;
void *readyConnection = NULL; void *readyConnection = NULL;
void *dropConnection = NULL; void *dropConnection = NULL;
void *cancelConnection = NULL; void *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
output->needStop = false;
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (input->code) {
output->rspCode = input->code;
}
if (QW_PHASE_POST_QUERY == phase) { if (QW_PHASE_POST_QUERY == phase) {
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true; ctx->emptyRes = true;
@ -917,84 +807,57 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); if (QW_PHASE_POST_FETCH == phase) {
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
output->needStop = true;
QW_SET_RSP_CODE(ctx, output->rspCode);
dropConnection = ctx->dropConnection; dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
// Note: ctx freed, no need to unlock it
locked = false;
QW_ERR_JRET(output->rspCode);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection;
QW_ERR_JRET(output->rspCode);
} }
if (ctx->rspCode) { if (ctx->rspCode) {
QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); QW_TASK_ELOG("task already failed, phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode));
output->needStop = true; QW_ERR_JRET(ctx->rspCode);
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
} }
if (QW_PHASE_POST_QUERY == phase && (!output->needStop)) { QW_ERR_JRET(input->code);
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus));
}
_return: _return:
if (ctx) { if (ctx) {
if (output->rspCode) { QW_UPDATE_RSP_CODE(ctx, code);
QW_UPDATE_RSP_CODE(ctx, output->rspCode);
}
if (QW_PHASE_POST_FETCH != phase) { if (QW_PHASE_POST_FETCH != phase) {
atomic_store_8(&ctx->phase, phase); atomic_store_8(&ctx->phase, phase);
} }
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
}
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (code) {
output->needStop = true;
if (TSDB_CODE_SUCCESS == output->rspCode) {
output->rspCode = code;
}
}
if (readyConnection) { if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, output->rspCode); qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
} }
if (dropConnection) { if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, output->rspCode); qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
} }
if (cancelConnection) { if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase)); if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
QW_RET(code); QW_RET(code);
} }
@ -1005,22 +868,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
bool queryRsped = false; bool queryRsped = false;
bool needStop = false; bool needStop = false;
struct SSubplan *plan = NULL; struct SSubplan *plan = NULL;
int32_t rspCode = 0;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL; DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
needStop = output.needStop;
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
@ -1028,13 +881,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%s", tstrerror(code)); QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
@ -1043,10 +896,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
//TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code));
QW_TASK_DLOG("query msg rsped, code:%d", code); QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
queryRsped = true; queryRsped = true;
@ -1059,72 +910,71 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
_return: _return:
if (code) { input.code = code;
rspCode = code; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (TSDB_CODE_SUCCESS == code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
} }
if (!queryRsped) { if (!queryRsped) {
qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); qwBuildAndSendQueryRsp(qwMsg->connection, code);
QW_TASK_DLOG("query msg rsped, code:%x", rspCode); QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
} }
input.code = rspCode;
input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED;
QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
QW_RET(rspCode);
} }
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
int8_t phase = 0; int8_t phase = 0;
bool needRsp = false; bool needRsp = true;
int32_t rspCode = 0;
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task is dropping or already dropped");
QW_TASK_WLOG("task already cancelled/dropped, phase:%d", phase); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} }
phase = QW_GET_PHASE(ctx); if (ctx->phase == QW_PHASE_PRE_QUERY) {
if (phase == QW_PHASE_PRE_QUERY) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
ctx->readyConnection = qwMsg->connection; ctx->readyConnection = qwMsg->connection;
QW_TASK_DLOG("ready msg not rsped, phase:%d", phase); needRsp = false;
} else if (phase == QW_PHASE_POST_QUERY) { QW_TASK_DLOG_E("ready msg will not rsp now");
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); goto _return;
needRsp = true;
rspCode = ctx->rspCode;
} else {
QW_TASK_ELOG("invalid phase when got ready msg, phase:%d", phase);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
needRsp = true;
rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
} }
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
if (ctx->phase == QW_PHASE_POST_QUERY) {
code = ctx->rspCode;
goto _return;
}
QW_TASK_ELOG("invalid phase when got ready msg, phase:%s", qwPhaseStr(ctx->phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
_return: _return:
if (code && ctx) { if (code && ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
} }
if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
}
if (ctx) { if (ctx) {
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (needRsp) { if (needRsp) {
qwBuildAndSendReadyRsp(qwMsg->connection, rspCode); qwBuildAndSendReadyRsp(qwMsg->connection, code);
QW_TASK_DLOG("ready msg rsped, code:%x", rspCode); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_RET(code); QW_RET(code);
@ -1134,25 +984,13 @@ _return:
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
int32_t code = 0; int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false; bool queryEnd = false;
do { do {
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
needStop = output.needStop;
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_CQUERY);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
@ -1166,7 +1004,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
// RC WARNING // RC WARNING
atomic_store_8(&ctx->queryContinue, 1); atomic_store_8(&ctx->queryContinue, 1);
@ -1185,11 +1023,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
} }
if (queryEnd) { _return:
needStop = true;
}
_return:
if (NULL == ctx) { if (NULL == ctx) {
break; break;
@ -1200,51 +1034,33 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, 0); QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (needStop || code || 0 == atomic_load_8(&ctx->queryContinue)) { if (queryEnd || code || 0 == atomic_load_8(&ctx->queryContinue)) {
// Note: if necessary, fetch need to put cquery to queue again
atomic_store_8(&ctx->phase, 0); atomic_store_8(&ctx->phase, 0);
QW_UNLOCK(QW_WRITE,&ctx->lock); QW_UNLOCK(QW_WRITE,&ctx->lock);
break; break;
} }
QW_UNLOCK(QW_WRITE,&ctx->lock); QW_UNLOCK(QW_WRITE,&ctx->lock);
} while (true); } while (true);
input.code = code; input.code = code;
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL));
QW_RET(code);
} }
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
int32_t needRsp = true;
void *data = NULL;
int32_t sinkStatus = 0;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false;
bool needStop = false;
bool locked = false; bool locked = false;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
int8_t status = 0;
void *rsp = NULL; void *rsp = NULL;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
needStop = output.needStop;
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_FETCH);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
@ -1259,7 +1075,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
locked = true; locked = true;
@ -1268,16 +1084,11 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_IS_QUERY_RUNNING(ctx)) {
atomic_store_8(&ctx->queryContinue, 1); atomic_store_8(&ctx->queryContinue, 1);
} else if (0 == atomic_load_8(&ctx->queryInQueue)) { } else if (0 == atomic_load_8(&ctx->queryInQueue)) {
if (!ctx->multiExec) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
ctx->multiExec = true;
}
atomic_store_8(&ctx->queryInQueue, 1); atomic_store_8(&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection)); QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
QW_TASK_DLOG("schedule query in queue, phase:%d", ctx->phase);
} }
} }
@ -1288,20 +1099,15 @@ _return:
} }
input.code = code; input.code = code;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output);
if (output.rspCode) {
code = output.rspCode;
}
if (code) { if (code) {
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
dataLen = 0; dataLen = 0;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); }
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} else if (rsp) { if (code || rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} }
@ -1316,6 +1122,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
bool locked = false; bool locked = false;
// TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
@ -1323,22 +1131,18 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = true; locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase); QW_TASK_WLOG_E("task already dropping");
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_IS_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
QW_SET_RSP_CODE(ctx, TSDB_CODE_QRY_TASK_DROPPED);
locked = false;
needRsp = true; needRsp = true;
} else {
// task not started
} }
if (!needRsp) { if (!needRsp) {
@ -1351,6 +1155,8 @@ _return:
if (code) { if (code) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
} }
if (locked) { if (locked) {

View File

@ -263,7 +263,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_SCH_TASK_DLOG("put task continue exec msg to query queue, vgId:%d", mgmt->nodeId); QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }