diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ad250d7b78..8398ad5884 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1018,7 +1018,7 @@ typedef struct { } SUpdateTagValRsp; typedef struct SSubQueryMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; uint32_t contentLen; @@ -1026,7 +1026,7 @@ typedef struct SSubQueryMsg { } SSubQueryMsg; typedef struct SResReadyMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } SResReadyMsg; @@ -1036,13 +1036,13 @@ typedef struct SResReadyRsp { } SResReadyRsp; typedef struct SResFetchMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } SResFetchMsg; typedef struct SSchTasksStatusMsg { - uint64_t schedulerId; + uint64_t sId; } SSchTasksStatusMsg; typedef struct STaskStatus { @@ -1057,7 +1057,7 @@ typedef struct SSchedulerStatusRsp { } SSchedulerStatusRsp; typedef struct STaskCancelMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } STaskCancelMsg; @@ -1067,7 +1067,7 @@ typedef struct STaskCancelRsp { } STaskCancelRsp; typedef struct STaskDropMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } STaskDropMsg; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 917ee1dee1..db1ea435f1 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -227,7 +227,7 @@ void taos_init_imp(void) { rpcInit(); - SCatalogCfg cfg = {.enableVgroupCache = true, .maxDBCacheNum = 100, .maxTblCacheNum = 100}; + SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; catalogInit(&cfg); tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index c5a57b02a6..78214ce14d 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -73,7 +73,7 @@ struct SVnode { SVnodeSync* pSync; SVnodeFS* pFs; tsem_t canCommit; - void* pQuery; + SQHandle* pQuery; }; int vnodeScheduleTask(SVnodeTask* task); diff --git a/source/dnode/vnode/impl/inc/vnodeQuery.h b/source/dnode/vnode/impl/inc/vnodeQuery.h index 59bab42f62..d43f5b1cf1 100644 --- a/source/dnode/vnode/impl/inc/vnodeQuery.h +++ b/source/dnode/vnode/impl/inc/vnodeQuery.h @@ -22,6 +22,9 @@ extern "C" { #include "vnodeInt.h" #include "qworker.h" +typedef struct SQWorkerMgmt SQHandle; + + int vnodeQueryOpen(SVnode *pVnode); #ifdef __cplusplus diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 31481bf7c4..cc743d658e 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -22,13 +22,27 @@ int vnodeQueryOpen(SVnode *pVnode) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("query message is processed"); - qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); - return 0; + return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); } int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("fetch message is processed"); - qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + switch (pMsg->msgType) { + case TDMT_VND_FETCH: + return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_RES_READY: + return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_TASKS_STATUS: + return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_CANCEL_TASK: + return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_DROP_TASK: + return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); + default: + vError("unknown msg type:%d in fetch queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + break; + } return 0; } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 7f2ddb5723..820bcdfa3f 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -46,7 +46,6 @@ typedef struct STableMetaCache { } STableMetaCache; typedef struct SCatalog { - SVgroupListCache vgroupCache; SDBVgroupCache dbCache; STableMetaCache tableCache; } SCatalog; @@ -67,6 +66,7 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) +#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b573953e9e..a65f471cfd 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -146,8 +146,44 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { } } +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == output) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } -int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + char tbFullName[TSDB_TABLE_FNAME_LEN]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + + SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName}; + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + CTG_ERR_RET(queryBuildMsg[TDMT_MND_STB_META](&bInput, &msg, 0, &msgLen)); + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_STB_META, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for table meta, code:%x", rpcRsp.code); + CTG_ERR_RET(rpcRsp.code); + } + + CTG_ERR_RET(queryProcessMsgRsp[TDMT_MND_STB_META](output, rpcRsp.pCont, rpcRsp.contLen)); + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -307,7 +343,9 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + } + if (NULL == pCatalog->tableCache.stableCache) { pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); @@ -318,55 +356,51 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("push ctable[%s] to table cache failed", output->ctbFname); - goto error_exit; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); - goto error_exit; + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } } int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { ctgError("push table[%s] to table cache failed", output->tbFname); - goto error_exit; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) { ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); - goto error_exit; + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } return TSDB_CODE_SUCCESS; - -error_exit: - if (pCatalog->vgroupCache.cache) { - taosHashCleanup(pCatalog->vgroupCache.cache); - pCatalog->vgroupCache.cache = NULL; - } - - pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; - - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } int32_t catalogInit(SCatalogCfg *cfg) { - ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); + if (ctgMgmt.pCluster) { + ctgError("catalog already init"); + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (cfg) { memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); } else { - ctgMgmt.cfg.enableVgroupCache = true; ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; } + if (CTG_CACHE_ENABLED()) { + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == ctgMgmt.pCluster) { + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); + } + } + return TSDB_CODE_SUCCESS; } @@ -381,21 +415,19 @@ int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle } size_t clen = strlen(clusterId); - SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen); + SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, clusterId, clen); - if (clusterCtg) { - *catalogHandle = clusterCtg; + if (ctg && (*ctg)) { + *catalogHandle = *ctg; return TSDB_CODE_SUCCESS; } - clusterCtg = calloc(1, sizeof(*clusterCtg)); + SCatalog *clusterCtg = calloc(1, sizeof(SCatalog)); if (NULL == clusterCtg) { - ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg)); + ctgError("calloc %d failed", (int32_t)sizeof(SCatalog)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; - if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { ctgError("put cluster %s cache to hash failed", clusterId); tfree(clusterCtg); @@ -443,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName } if (NULL == pCatalog->dbCache.cache) { - pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->dbCache.cache) { ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); @@ -515,7 +547,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe STableMetaOutput output = {0}; - CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); + //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); + + CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output)); CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output)); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 7ff9596045..d1604d99d6 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -313,7 +313,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p return code; } - code = tNameGetTableName(&name, pCreateTableInfo->tagdata.name); + const char* pSTableName = tNameGetTableName(&name); SArray* pValList = pCreateTableInfo->pTagVals; if (code != TSDB_CODE_SUCCESS) { @@ -326,7 +326,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p char dbName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(&name, dbName); - catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pCreateTableInfo->tagdata.name, &pSuperTableMeta); + catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pSTableName, &pSuperTableMeta); // too long tag values will return invalid sql, not be truncated automatically SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 34db262d5d..8f753283c1 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -266,9 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { void initQueryModuleMsgHandle() { queryBuildMsg[TDMT_VND_TABLE_META] = queryBuildTableMetaReqMsg; + queryBuildMsg[TDMT_MND_STB_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TDMT_MND_USE_DB] = queryBuildUseDbMsg; queryProcessMsgRsp[TDMT_VND_TABLE_META] = queryProcessTableMetaRsp; + queryProcessMsgRsp[TDMT_MND_STB_META] = queryProcessTableMetaRsp; queryProcessMsgRsp[TDMT_MND_USE_DB] = queryProcessUseDBRsp; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 6f454e2f81..07ca91729d 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -42,25 +42,41 @@ enum { QW_WRITE, }; -typedef struct SQWorkerTaskStatus { +enum { + QW_EXIST_ACQUIRE = 1, + QW_EXIST_RET_ERR, +}; + +enum { + QW_NOT_EXIST_RET_ERR = 1, + QW_NOT_EXIST_ADD, +}; + +enum { + QW_ADD_RET_ERR = 1, + QW_ADD_ACQUIRE, +}; + + +typedef struct SQWTaskStatus { SRWLatch lock; int32_t code; int8_t status; int8_t ready; bool cancel; bool drop; -} SQWorkerTaskStatus; +} SQWTaskStatus; typedef struct SQWorkerResCache { SRWLatch lock; void *data; } SQWorkerResCache; -typedef struct SQWorkerSchStatus { +typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second SRWLatch tasksLock; SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus -} SQWorkerSchStatus; +} SQWSchStatus; // Qnode/Vnode level task management typedef struct SQWorkerMgmt { @@ -71,7 +87,7 @@ typedef struct SQWorkerMgmt { SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache } SQWorkerMgmt; -#define QW_GOT_RES_DATA(data) (false) +#define QW_GOT_RES_DATA(data) (true) #define QW_LOW_RES_DATA(data) (false) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) @@ -86,8 +102,31 @@ typedef struct SQWorkerMgmt { #define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) -#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) +#define QW_LOCK(type, _lock) do { \ + if (QW_READ == (type)) { \ + if ((*(_lock)) < 0) assert(0); \ + taosRLockLatch(_lock); \ + qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) < 0) assert(0); \ + taosWLockLatch(_lock); \ + qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + +#define QW_UNLOCK(type, _lock) do { \ + if (QW_READ == (type)) { \ + if ((*(_lock)) <= 0) assert(0); \ + taosRUnLockLatch(_lock); \ + qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) <= 0) assert(0); \ + taosWUnLockLatch(_lock); \ + qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + +static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 149f46273c..4296e82a56 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -4,38 +4,42 @@ #include "qworkerInt.h" #include "planner.h" -int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { +int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t code = 0; if (oriStatus == newStatus) { - if (newStatus == JOB_TASK_STATUS_CANCELLING) { - return TSDB_CODE_SUCCESS; - } - QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } switch (oriStatus) { case JOB_TASK_STATUS_NULL: - if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED ) { + if (newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_NOT_START) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_NOT_START: - if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED) { + if (newStatus != JOB_TASK_STATUS_CANCELLED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_EXECUTING: - if (newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING) { + if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_CANCELLING + && newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_DROPPING) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_PARTIAL_SUCCEED: - if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_CANCELLING) { + if (newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_SUCCEED + && newStatus != JOB_TASK_STATUS_CANCELLED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -49,6 +53,10 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { break; case JOB_TASK_STATUS_CANCELLED: + case JOB_TASK_STATUS_DROPPING: + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + break; + default: qError("invalid task status:%d", oriStatus); return TSDB_CODE_QRY_APP_ERROR; @@ -58,17 +66,17 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { _return: - qError("invalid task status:%d", oriStatus); + qError("invalid task status, from %d to %d", oriStatus, newStatus); QW_ERR_RET(code); } -int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { +int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) { int32_t code = 0; switch (type) { case QW_TASK_INFO_STATUS: { int8_t newStatus = *(int8_t *)data; - QW_ERR_RET(qwCheckStatusSwitch(task->status, newStatus)); + QW_ERR_RET(qwValidateStatus(task->status, newStatus)); task->status = newStatus; break; } @@ -80,9 +88,9 @@ int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +int32_t qwAddTaskResCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, void *data) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); SQWorkerResCache resCache = {0}; resCache.data = data; @@ -90,7 +98,7 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v QW_LOCK(QW_WRITE, &mgmt->resLock); if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) { QW_UNLOCK(QW_WRITE, &mgmt->resLock); - qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId); + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); return TSDB_CODE_QRY_APP_ERROR; } @@ -99,37 +107,8 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v return TSDB_CODE_SUCCESS; } - -int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); - - SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id)); - if (NULL == resCache) { - qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId); - return TSDB_CODE_QRY_APP_ERROR; - } - - *data = resCache->data; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) { - QW_LOCK(rwType, &mgmt->schLock); - *sch = taosHashGet(mgmt->schHash, &schedulerId, sizeof(schedulerId)); - if (NULL == (*sch)) { - QW_LOCK(rwType, &mgmt->schLock); - return TSDB_CODE_QRY_SCH_NOT_EXIST; - } - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) { - SQWorkerSchStatus newSch = {0}; +static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { + SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); @@ -138,19 +117,18 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker while (true) { QW_LOCK(QW_WRITE, &mgmt->schLock); - int32_t code = taosHashPut(mgmt->schHash, &schedulerId, sizeof(schedulerId), &newSch, sizeof(newSch)); + int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch)); if (0 != code) { if (!HASH_NODE_EXIST(code)) { QW_UNLOCK(QW_WRITE, &mgmt->schLock); - qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId); + qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId); taosHashCleanup(newSch.tasksHash); return TSDB_CODE_QRY_APP_ERROR; } } QW_UNLOCK(QW_WRITE, &mgmt->schLock); - if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, schedulerId, sch)) { - taosHashCleanup(newSch.tasksHash); + if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) { return TSDB_CODE_SUCCESS; } } @@ -159,63 +137,122 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker } +static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { + QW_LOCK(rwType, &mgmt->schLock); + *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); + if (NULL == (*sch)) { + QW_UNLOCK(rwType, &mgmt->schLock); + + if (QW_NOT_EXIST_ADD == nOpt) { + return qwAddScheduler(rwType, mgmt, sId, sch); + } else if (QW_NOT_EXIST_RET_ERR == nOpt) { + return TSDB_CODE_QRY_SCH_NOT_EXIST; + } else { + assert(0); + } + } + + return TSDB_CODE_SUCCESS; +} + + + static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } -static FORCE_INLINE int32_t qwAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **task) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); QW_LOCK(rwType, &sch->tasksLock); *task = taosHashGet(sch->tasksHash, id, sizeof(id)); if (NULL == (*task)) { QW_UNLOCK(rwType, &sch->tasksLock); + return TSDB_CODE_QRY_TASK_NOT_EXIST; } return TSDB_CODE_SUCCESS; } -static FORCE_INLINE int32_t qwInsertAndAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, int8_t status, bool *inserted, SQWorkerTaskStatus **task) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { + return qwAcquireTaskImpl(rwType, sch, qId, tId, task); +} + + +static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) { + QW_UNLOCK(rwType, &sch->tasksLock); +} + + +int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) { + int32_t code = 0; + + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + SQWTaskStatus ntask = {0}; + ntask.status = status; while (true) { - *inserted = false; - QW_LOCK(QW_WRITE, &sch->tasksLock); - int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &status, sizeof(status)); + int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); if (0 != code) { QW_UNLOCK(QW_WRITE, &sch->tasksLock); if (HASH_NODE_EXIST(code)) { - if (qwAcquireTask(rwType, sch, queryId, taskId, task)) { - continue; + if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { + if (qwAcquireTask(rwType, sch, qId, tId, task)) { + continue; + } + } else if (QW_EXIST_RET_ERR == eOpt) { + return TSDB_CODE_QRY_TASK_ALREADY_EXIST; + } else { + assert(0); } break; } else { - qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", queryId, taskId); + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId); return TSDB_CODE_QRY_APP_ERROR; } } + QW_UNLOCK(QW_WRITE, &sch->tasksLock); - *inserted = true; - - if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, queryId, taskId, task)) { - return TSDB_CODE_SUCCESS; + if (rwType && task) { + if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) { + return TSDB_CODE_SUCCESS; + } + } else { + break; } - } + } return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWorkerSchStatus *sch) { - QW_UNLOCK(rwType, &sch->tasksLock); +static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) { + SQWSchStatus *tsch = NULL; + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD)); + + int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task); + if (code) { + qwReleaseScheduler(QW_WRITE, mgmt); + } + + if (NULL == task) { + qwReleaseScheduler(QW_READ, mgmt); + } else if (sch) { + *sch = tsch; + } + + QW_RET(code); } + + static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) { char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); @@ -235,27 +272,24 @@ static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgm } -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) { - SQWorkerSchStatus *schStatus = NULL; +int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { + SQWSchStatus *sch = NULL; int32_t taskNum = 0; - if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)) { - qWarn("no scheduler for schedulerId[%"PRIx64"]", schedulerId); - } else { - schStatus->lastAccessTs = taosGetTimestampSec(); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + + sch->lastAccessTs = taosGetTimestampSec(); - QW_LOCK(QW_READ, &schStatus->tasksLock); - taskNum = taosHashGetSize(schStatus->tasksHash); - } + QW_LOCK(QW_READ, &sch->tasksLock); + + taskNum = taosHashGetSize(sch->tasksHash); int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; *rsp = calloc(1, size); if (NULL == *rsp) { qError("calloc %d failed", size); - if (schStatus) { - QW_UNLOCK(QW_READ, &schStatus->tasksLock); - qwReleaseScheduler(QW_READ, mgmt); - } + QW_UNLOCK(QW_READ, &sch->tasksLock); + qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -264,23 +298,19 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler size_t keyLen = 0; int32_t i = 0; - if (schStatus) { - void *pIter = taosHashIterate(schStatus->tasksHash, NULL); - while (pIter) { - SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter; - taosHashGetKey(pIter, &key, &keyLen); + void *pIter = taosHashIterate(sch->tasksHash, NULL); + while (pIter) { + SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter; + taosHashGetKey(pIter, &key, &keyLen); - QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); - (*rsp)->status[i].status = taskStatus->status; - - pIter = taosHashIterate(schStatus->tasksHash, pIter); - } - } + QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); + (*rsp)->status[i].status = taskStatus->status; + + pIter = taosHashIterate(sch->tasksHash, pIter); + } - if (schStatus) { - QW_UNLOCK(QW_READ, &schStatus->tasksLock); - qwReleaseScheduler(QW_READ, mgmt); - } + QW_UNLOCK(QW_READ, &sch->tasksLock); + qwReleaseScheduler(QW_READ, mgmt); (*rsp)->num = taskNum; @@ -289,115 +319,81 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler -int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) { - SQWorkerSchStatus *schStatus = NULL; +int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) { + SQWSchStatus *sch = NULL; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); - schStatus->lastAccessTs = taosGetTimestampSec(); + sch->lastAccessTs = taosGetTimestampSec(); qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_SUCCESS; } - -int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); - *taskStatus = task->status; - -_return: - if (task) { - qwReleaseTask(QW_READ, sch); - } - - if (sch) { - qwReleaseScheduler(QW_READ, mgmt); - } - - QW_RET(code); -} - - -int32_t qwSwitchTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; - int32_t code = 0; - bool inserted = false; - - if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { - if (qwCheckStatusSwitch(JOB_TASK_STATUS_NULL, taskStatus)) { - qError("switch status error, not start to %d", taskStatus); - QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - QW_ERR_RET(qwInsertAndAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); - } - - if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { - if (qwCheckStatusSwitch(JOB_TASK_STATUS_NOT_START, taskStatus)) { - qwReleaseScheduler(QW_READ, mgmt); - qError("switch status error, not start to %d", taskStatus); - QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - QW_ERR_JRET(qwInsertAndAcquireTask(QW_READ, sch, queryId, taskId, taskStatus, &inserted, &task)); - - if (inserted) { - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - return TSDB_CODE_SUCCESS; - } - - QW_LOCK(QW_WRITE, &task->lock); - code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus); - QW_UNLOCK(QW_WRITE, &task->lock); - - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - - QW_RET(code); - } + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task)); QW_LOCK(QW_WRITE, &task->lock); - code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); QW_UNLOCK(QW_WRITE, &task->lock); - + _return: qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - + qwReleaseScheduler(QW_READ, mgmt); + QW_RET(code); } -int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { - QW_ERR_RET(qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START)); - - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + + if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { + *taskStatus = JOB_TASK_STATUS_NULL; + return TSDB_CODE_SUCCESS; } if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { - code = qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START); + qwReleaseScheduler(QW_READ, mgmt); + + *taskStatus = JOB_TASK_STATUS_NULL; + return TSDB_CODE_SUCCESS; + } + + *taskStatus = task->status; + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(code); +} + + +int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; + int32_t code = 0; + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); + + if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + qwReleaseScheduler(QW_READ, mgmt); + + code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); if (code) { qwReleaseScheduler(QW_READ, mgmt); QW_ERR_RET(code); } - - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); } QW_LOCK(QW_WRITE, &task->lock); @@ -423,6 +419,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, } QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -449,9 +446,9 @@ _return: -int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); @@ -462,15 +459,15 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u } QW_UNLOCK(QW_WRITE, &mgmt->resLock); - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, schedulerId, &sch)) { - qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); + if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { + qWarn("scheduler %"PRIx64" doesn't exist", sId); return TSDB_CODE_SUCCESS; } if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) { qwReleaseScheduler(QW_WRITE, mgmt); - qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); + qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId); return TSDB_CODE_SUCCESS; } @@ -483,21 +480,21 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u } -int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { - qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); - return TSDB_CODE_SUCCESS; - } + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { qwReleaseScheduler(QW_READ, mgmt); - qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); - return TSDB_CODE_SUCCESS; + code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); + if (code) { + qwReleaseScheduler(QW_READ, mgmt); + QW_ERR_RET(code); + } } QW_LOCK(QW_WRITE, &task->lock); @@ -508,7 +505,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer int8_t newStatus = 0; if (task->status == JOB_TASK_STATUS_EXECUTING) { - newStatus = JOB_TASK_STATUS_CANCELLING; + newStatus = JOB_TASK_STATUS_DROPPING; QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); } else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { QW_UNLOCK(QW_WRITE, &task->lock); @@ -521,7 +518,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - QW_ERR_RET(qwDropTask(mgmt, schedulerId, queryId, taskId)); + QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId)); return TSDB_CODE_SUCCESS; } @@ -604,6 +601,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { } SRpcMsg rpcRsp = { + .msgType = pMsg->msgType + 1, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -673,12 +671,12 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { -int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -706,10 +704,8 @@ _return: if (task) { QW_UNLOCK(QW_WRITE, &task->lock); - } - - if (sch) { qwReleaseTask(QW_READ, sch); + } qwReleaseScheduler(QW_READ, mgmt); @@ -717,12 +713,12 @@ _return: QW_RET(code); } -int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -745,9 +741,6 @@ _return: if (task) { QW_UNLOCK(QW_WRITE, &task->lock); - } - - if (sch) { qwReleaseTask(QW_READ, sch); } @@ -756,15 +749,15 @@ _return: QW_RET(code); } -int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, bool *needStop) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; int8_t status = JOB_TASK_STATUS_CANCELLED; *needStop = false; - if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { + if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { return TSDB_CODE_SUCCESS; } @@ -776,11 +769,13 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_ QW_LOCK(QW_READ, &task->lock); if ((!task->cancel) && (!task->drop)) { + qError("no cancel or drop, but task:%"PRIx64" exists", taskId); + QW_UNLOCK(QW_READ, &task->lock); qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - return TSDB_CODE_SUCCESS; + QW_RET(TSDB_CODE_QRY_APP_ERROR); } QW_UNLOCK(QW_READ, &task->lock); @@ -791,30 +786,40 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_ QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); QW_UNLOCK(QW_WRITE, &task->lock); - } else if (task->drop) { + } + + if (task->drop) { qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - qwDropTask(mgmt, schedulerId, queryId, taskId); + return qwDropTask(mgmt, sId, queryId, taskId); } + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + return TSDB_CODE_SUCCESS; } -int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; int32_t needRsp = true; void *data = NULL; - QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_READ, &task->lock); - if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED && task->status != JOB_TASK_STATUS_SUCCEED) { + if (task->cancel || task->drop) { + qError("task is already cancelled or dropped"); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { qError("invalid status %d for fetch", task->status); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -840,10 +845,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu _return: if (task) { QW_UNLOCK(QW_READ, &task->lock); + qwReleaseTask(QW_READ, sch); } if (sch) { - qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); } @@ -854,37 +859,46 @@ _return: QW_RET(code); } -int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t status, int32_t errCode) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; int8_t newStatus = JOB_TASK_STATUS_CANCELLED; - code = qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch); + code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD); if (code) { - qError("schedulerId:%"PRIx64" not in cache", schedulerId); + qError("sId:%"PRIx64" not in cache", sId); QW_ERR_RET(code); } - code = qwAcquireTask(QW_READ, sch, queryId, taskId, &task); + code = qwAcquireTask(QW_READ, sch, qId, tId, &task); if (code) { qwReleaseScheduler(QW_READ, mgmt); - qError("schedulerId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", schedulerId, queryId, taskId); - QW_ERR_RET(code); + + if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) { + qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); + QW_ERR_RET(code); + } + + QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task)); } if (task->cancel) { QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); QW_UNLOCK(QW_WRITE, &task->lock); - } else if (task->drop) { + } + + if (task->drop) { qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - qwDropTask(mgmt, schedulerId, queryId, taskId); + qwDropTask(mgmt, sId, qId, tId); return TSDB_CODE_SUCCESS; - } else { + } + + if (!(task->cancel || task->drop)) { QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); task->code = errCode; @@ -938,24 +952,24 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { qError("invalid query msg"); - QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->schedulerId = htobe64(msg->schedulerId); + msg->sId = htobe64(msg->sId); msg->queryId = htobe64(msg->queryId); msg->taskId = htobe64(msg->taskId); msg->contentLen = ntohl(msg->contentLen); bool queryDone = false; - bool queryRsp = false; + bool queryRsped = false; bool needStop = false; SSubplan *plan = NULL; - int32_t code = 0; - QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); + QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); if (needStop) { qWarn("task need stop"); QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); @@ -963,7 +977,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { code = qStringToSubplan(msg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code); + qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code); QW_ERR_JRET(code); } @@ -974,12 +988,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (code) { QW_ERR_JRET(code); } else { - QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING)); + QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL)); } QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); - queryRsp = true; + queryRsped = true; //TODO call executer to execute subquery code = 0; @@ -990,29 +1004,29 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (code) { QW_ERR_JRET(code); } else { - QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data)); + QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, data)); - QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); - } + QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); + } _return: - if (queryRsp) { - code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg, code); + if (queryRsped) { + code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code); } else { code = qwBuildAndSendQueryRsp(pMsg, code); } int8_t status = 0; - if (TSDB_CODE_SUCCESS != code || queryDone) { - if (code) { - status = JOB_TASK_STATUS_FAILED; //TODO set CANCELLED from code - } else { - status = JOB_TASK_STATUS_SUCCEED; - } - - qwQueryPostProcess(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, status, code); + if (TSDB_CODE_SUCCESS != code) { + status = JOB_TASK_STATUS_FAILED; + } else if (queryDone) { + status = JOB_TASK_STATUS_SUCCEED; + } else { + status = JOB_TASK_STATUS_PARTIAL_SUCCEED; } + + qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); QW_RET(code); } @@ -1023,12 +1037,16 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ } SResReadyMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); return TSDB_CODE_SUCCESS; } @@ -1040,14 +1058,16 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSchTasksStatusMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + msg->sId = htobe64(msg->sId); + SSchedulerStatusRsp *sStatus = NULL; - QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); _return: @@ -1062,11 +1082,15 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } SResFetchMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId)); void *data = NULL; SQWorkerResCache *res = NULL; @@ -1074,7 +1098,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res)); - QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); + QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); _return: @@ -1090,12 +1114,16 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskCancelMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: @@ -1111,12 +1139,16 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskDropMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task drop msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: @@ -1125,6 +1157,31 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + int32_t code = 0; + int8_t status = 0; + bool queryDone = false; + uint64_t sId, qId, tId; + + //TODO call executer to continue execute subquery + code = 0; + void *data = NULL; + queryDone = false; + //TODO call executer to continue execute subquery + + if (TSDB_CODE_SUCCESS != code) { + status = JOB_TASK_STATUS_FAILED; + } else if (queryDone) { + status = JOB_TASK_STATUS_SUCCEED; + } else { + status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + } + + code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code); + + QW_RET(code); +} + void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 4b54b77544..7bc1c4ff40 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -36,10 +36,25 @@ namespace { +bool testStop = false; + int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { return 0; } +void qwtRpcSendResponse(const SRpcMsg *pRsp) { + if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) { + SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont; + printf("task num:%d\n", rsp->num); + for (int32_t i = 0; i < rsp->num; ++i) { + STaskStatus *task = &rsp->status[i]; + printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status); + } + } + return; +} + + void stubSetStringToPlan() { static Stub stub; @@ -54,11 +69,148 @@ void stubSetStringToPlan() { } } +void stubSetRpcSendResponse() { + static Stub stub; + stub.set(rpcSendResponse, qwtRpcSendResponse); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendResponse$", result); + for (const auto& f : result) { + stub.set(f.second, qwtRpcSendResponse); + } + } +} + +void *queryThread(void *param) { + SRpcMsg queryRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->sId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; + + while (!testStop) { + qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("query:%d\n", n); + } + } + + return NULL; +} + +void *readyThread(void *param) { + SRpcMsg readyRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SResReadyMsg readyMsg = {0}; + readyMsg.sId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + readyRpc.contLen = sizeof(SResReadyMsg); + + while (!testStop) { + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("ready:%d\n", n); + } + } + + return NULL; +} + +void *fetchThread(void *param) { + SRpcMsg fetchRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SResFetchMsg fetchMsg = {0}; + fetchMsg.sId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + fetchRpc.contLen = sizeof(SResFetchMsg); + + while (!testStop) { + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("fetch:%d\n", n); + } + } + + return NULL; +} + +void *dropThread(void *param) { + SRpcMsg dropRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + STaskDropMsg dropMsg = {0}; + dropMsg.sId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + + while (!testStop) { + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("drop:%d\n", n); + } + } + + return NULL; +} + +void *statusThread(void *param) { + SRpcMsg statusRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + + while (!testStop) { + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("status:%d\n", n); + } + } + + return NULL; +} + + + + } -TEST(testCase, normalCase) { +TEST(seqTest, normalCase) { void *mgmt = NULL; int32_t code = 0; void *mockPointer = (void *)0x1; @@ -66,48 +218,254 @@ TEST(testCase, normalCase) { SRpcMsg readyRpc = {0}; SRpcMsg fetchRpc = {0}; SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); queryMsg->queryId = htobe64(1); - queryMsg->schedulerId = htobe64(1); + queryMsg->sId = htobe64(1); queryMsg->taskId = htobe64(1); queryMsg->contentLen = htonl(100); queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; SResReadyMsg readyMsg = {0}; - readyMsg.schedulerId = htobe64(1); + readyMsg.sId = htobe64(1); readyMsg.queryId = htobe64(1); readyMsg.taskId = htobe64(1); readyRpc.pCont = &readyMsg; + readyRpc.contLen = sizeof(SResReadyMsg); SResFetchMsg fetchMsg = {0}; - fetchMsg.schedulerId = htobe64(1); + fetchMsg.sId = htobe64(1); fetchMsg.queryId = htobe64(1); fetchMsg.taskId = htobe64(1); fetchRpc.pCont = &fetchMsg; + fetchRpc.contLen = sizeof(SResFetchMsg); STaskDropMsg dropMsg = {0}; - dropMsg.schedulerId = htobe64(1); + dropMsg.sId = htobe64(1); dropMsg.queryId = htobe64(1); dropMsg.taskId = htobe64(1); dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + stubSetStringToPlan(); + stubSetRpcSendResponse(); code = qWorkerInit(NULL, &mgmt); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + qWorkerDestroy(&mgmt); +} + +TEST(seqTest, cancelFirst) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->sId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; + + STaskDropMsg dropMsg = {0}; + dropMsg.sId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + qWorkerDestroy(&mgmt); +} + +TEST(seqTest, randCase) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg readyRpc = {0}; + SRpcMsg fetchRpc = {0}; + SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->sId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; + + SResReadyMsg readyMsg = {0}; + readyMsg.sId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + readyRpc.contLen = sizeof(SResReadyMsg); + + SResFetchMsg fetchMsg = {0}; + fetchMsg.sId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + fetchRpc.contLen = sizeof(SResFetchMsg); + + STaskDropMsg dropMsg = {0}; + dropMsg.sId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + + srand(time(NULL)); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + int32_t t = 0; + int32_t maxr = 10001; + while (true) { + int32_t r = rand() % maxr; + + if (r >= 0 && r < maxr/5) { + printf("Query,%d\n", t++); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + } else if (r >= maxr/5 && r < maxr * 2/5) { + printf("Ready,%d\n", t++); + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + } else if (r >= maxr * 2/5 && r < maxr* 3/5) { + printf("Fetch,%d\n", t++); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + } else if (r >= maxr * 3/5 && r < maxr * 4/5) { + printf("Drop,%d\n", t++); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + } else if (r >= maxr * 4/5 && r < maxr-1) { + printf("Status,%d\n", t++); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + } else { + printf("QUIT RAND NOW"); + break; + } + } + + qWorkerDestroy(&mgmt); +} + +TEST(seqTest, multithreadRand) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + + srand(time(NULL)); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t t1,t2,t3,t4,t5; + pthread_create(&(t1), &thattr, queryThread, mgmt); + pthread_create(&(t2), &thattr, readyThread, NULL); + pthread_create(&(t3), &thattr, fetchThread, NULL); + pthread_create(&(t4), &thattr, dropThread, NULL); + pthread_create(&(t5), &thattr, statusThread, NULL); + + int32_t t = 0; + int32_t maxr = 10001; + sleep(300); + testStop = true; + sleep(1); + + qWorkerDestroy(&mgmt); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2381a1dd49..c327e4cfea 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -38,11 +38,16 @@ enum { typedef struct SSchedulerMgmt { uint64_t taskId; - uint64_t schedulerId; + uint64_t sId; SSchedulerCfg cfg; SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; +typedef struct SSchCallbackParam { + uint64_t queryId; + uint64_t taskId; +} SSchCallbackParam; + typedef struct SSchLevel { int32_t level; int8_t status; @@ -120,6 +125,7 @@ typedef struct SSchJob { extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); +extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 978e10cf94..4cde24e38c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -21,36 +21,6 @@ SSchedulerMgmt schMgmt = {0}; -int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { -/* - SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT); - if (pRequest == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - SRequestMsgBody body = {0}; - buildConnectMsg(pRequest, &body); - - int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); - - tsem_wait(&pRequest->body.rspSem); - destroyConnectMsg(&body); - - if (pRequest->code != TSDB_CODE_SUCCESS) { - const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); - printf("failed to connect to server, reason: %s\n\n", errorMsg); - - destroyRequest(pRequest); - taos_close(pTscObj); - pTscObj = NULL; - } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); - destroyRequest(pRequest); - } -*/ -} - int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { SSchLevel *level = taosArrayGet(job->levels, i); @@ -312,100 +282,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } - -int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { - int32_t msgSize = 0; - void *msg = NULL; - - switch (msgType) { - case TDMT_VND_SUBMIT: { - if (NULL == task->msg || task->msgLen <= 0) { - qError("submit msg is NULL"); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - - msgSize = task->msgLen; - msg = task->msg; - break; - } - case TDMT_VND_QUERY: { - if (NULL == task->msg) { - qError("query msg is NULL"); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - - msgSize = sizeof(SSubQueryMsg) + task->msgLen; - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SSubQueryMsg *pMsg = msg; - - pMsg->schedulerId = htobe64(schMgmt.schedulerId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - pMsg->contentLen = htonl(task->msgLen); - memcpy(pMsg->msg, task->msg, task->msgLen); - break; - } - case TDMT_VND_RES_READY: { - msgSize = sizeof(SResReadyMsg); - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SResReadyMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - break; - } - case TDMT_VND_FETCH: { - if (NULL == task) { - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - msgSize = sizeof(SResFetchMsg); - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SResFetchMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - break; - } - case TDMT_VND_DROP_TASK:{ - msgSize = sizeof(STaskDropMsg); - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - STaskDropMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - break; - } - default: - qError("unknown msg type:%d", msgType); - break; - } - - //TODO SEND MSG - //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code); - - return TSDB_CODE_SUCCESS; -} - int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -424,7 +300,7 @@ int32_t schFetchFromRemote(SSchJob *job) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); + SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; @@ -577,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { - case TDMT_VND_SUBMIT: { + case TDMT_VND_SUBMIT_RSP: { SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; if (rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); @@ -595,20 +471,20 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg } break; } - case TDMT_VND_QUERY: { + case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; if (rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); } else { - code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); + code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY); if (code) { goto _task_error; } } break; } - case TDMT_VND_RES_READY: { + case TDMT_VND_RES_READY_RSP: { SResReadyRsp *rsp = (SResReadyRsp *)msg; if (rsp->code != TSDB_CODE_SUCCESS) { @@ -621,7 +497,7 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg } break; } - case TDMT_VND_FETCH: { + case TDMT_VND_FETCH_RSP: { SCH_ERR_JRET(rspCode); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; @@ -631,6 +507,9 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg SCH_ERR_JRET(schProcessOnDataFetched(job)); break; } + case TDMT_VND_DROP_TASK: { + + } default: qError("unknown msg type:%d received", msgType); return TSDB_CODE_QRY_INVALID_INPUT; @@ -648,6 +527,211 @@ _return: } +int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + SSchCallbackParam *pParam = (SSchCallbackParam *)param; + + SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); + if (NULL == job || NULL == (*job)) { + qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + if (NULL == task || NULL == (*task)) { + qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); + +_return: + tfree(param); + + SCH_RET(code); +} + +int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); +} +int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); +} +int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); +} +int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); +} +int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { + SSchCallbackParam *pParam = (SSchCallbackParam *)param; + qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); +} + +int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { + switch (msgType) { + case TDMT_VND_SUBMIT: + *fp = schHandleSubmitCallback; + break; + case TDMT_VND_QUERY: + *fp = schHandleQueryCallback; + break; + case TDMT_VND_RES_READY: + *fp = schHandleReadyCallback; + break; + case TDMT_VND_FETCH: + *fp = schHandleFetchCallback; + break; + case TDMT_VND_DROP_TASK: + *fp = schHandleDropCallback; + break; + default: + qError("unknown msg type:%d", msgType); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { + int32_t code = 0; + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam)); + if (NULL == param) { + qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + __async_send_cb_fn_t fp = NULL; + SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); + + param->queryId = qId; + param->taskId = tId; + + pMsgSendInfo->param = param; + pMsgSendInfo->msgInfo.pData = msg; + pMsgSendInfo->msgInfo.len = msgSize; + pMsgSendInfo->msgType = msgType; + + pMsgSendInfo->fp = fp; + + int64_t transporterId = 0; + SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo)); + + return TSDB_CODE_SUCCESS; + +_return: + tfree(param); + tfree(pMsgSendInfo); + + SCH_RET(code); +} + + +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { + uint32_t msgSize = 0; + void *msg = NULL; + int32_t code = 0; + + switch (msgType) { + case TDMT_VND_SUBMIT: { + if (NULL == task->msg || task->msgLen <= 0) { + qError("submit msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + msgSize = task->msgLen; + msg = task->msg; + break; + } + case TDMT_VND_QUERY: { + if (NULL == task->msg) { + qError("query msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + msgSize = sizeof(SSubQueryMsg) + task->msgLen; + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSubQueryMsg *pMsg = msg; + + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + pMsg->contentLen = htonl(task->msgLen); + memcpy(pMsg->msg, task->msg, task->msgLen); + break; + } + case TDMT_VND_RES_READY: { + msgSize = sizeof(SResReadyMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SResReadyMsg *pMsg = msg; + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + case TDMT_VND_FETCH: { + if (NULL == task) { + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + msgSize = sizeof(SResFetchMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SResFetchMsg *pMsg = msg; + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + case TDMT_VND_DROP_TASK:{ + msgSize = sizeof(STaskDropMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + STaskDropMsg *pMsg = msg; + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + default: + qError("unknown msg type:%d", msgType); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + break; + } + + SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(msg); + SCH_RET(code); +} int32_t schLaunchTask(SSchJob *job, SSchTask *task) { @@ -664,7 +748,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY; - SCH_ERR_RET(schAsyncSendMsg(job, task, msgType)); + SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType)); SCH_ERR_RET(schPushTaskToExecList(job, task)); @@ -673,6 +757,8 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } + + int32_t schLaunchJob(SSchJob *job) { SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { @@ -690,7 +776,7 @@ void schDropJobAllTasks(SSchJob *job) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); + schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); pIter = taosHashIterate(job->succTasks, pIter); } @@ -699,7 +785,7 @@ void schDropJobAllTasks(SSchJob *job) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); + schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); pIter = taosHashIterate(job->succTasks, pIter); } @@ -717,7 +803,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); } - schMgmt.schedulerId = 1; //TODO GENERATE A UUID + schMgmt.sId = 1; //TODO GENERATE A UUID return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 4732429d0b..6163bc0c1a 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -36,7 +36,7 @@ namespace { -extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); void schtBuildQueryDag(SQueryDag *dag) { uint64_t qId = 0x0000000000000001; @@ -182,7 +182,7 @@ void *schtSendRsp(void *param) { SShellSubmitRspMsg rsp = {0}; rsp.affectedRows = 10; - schHandleRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -227,7 +227,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -238,7 +238,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -249,7 +249,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -260,7 +260,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -269,7 +269,7 @@ TEST(queryTest, normalCase) { SRetrieveTableRsp rsp = {0}; rsp.completed = 1; rsp.numOfRows = 10; - code = schHandleRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0);