diff --git a/source/libs/qcom/inc/queryInt.h b/source/libs/qcom/inc/queryInt.h index f120bf26ce..8f52f21d23 100644 --- a/source/libs/qcom/inc/queryInt.h +++ b/source/libs/qcom/inc/queryInt.h @@ -20,7 +20,6 @@ extern "C" { #endif - #ifdef __cplusplus } #endif diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e54937114c..621a38ea4a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -13,18 +13,19 @@ * along with this program. If not, see . */ -#include "tmsg.h" -#include "queryInt.h" #include "query.h" -#include "trpc.h" +#include "queryInt.h" #include "systable.h" +#include "tmsg.h" +#include "trpc.h" #pragma GCC diagnostic push #ifdef COMPILER_SUPPORTS_CXX13 #pragma GCC diagnostic ignored "-Wformat-truncation" #endif -int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)) = {0}; +int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, + void *(*mallocFp)(int32_t)) = {0}; int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0}; int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { @@ -46,7 +47,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { if (usedbRsp->vgNum <= 0) { return TSDB_CODE_SUCCESS; } - + pOut->dbVgroup->vgHash = taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup->vgHash) { @@ -57,7 +58,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); pOut->dbVgroup->numOfTable += pVgInfo->numOfTable; qDebug("the %dth vgroup, id %d, epNum %d, current %s port %d", i, pVgInfo->vgId, pVgInfo->epSet.numOfEps, - pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port); + pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port); if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -66,7 +67,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { return TSDB_CODE_SUCCESS; } -int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, + void *(*mallcFp)(int32_t)) { SBuildTableInput *pInput = input; if (NULL == input || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -89,7 +91,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } -int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { SBuildUseDBInput *pInput = input; if (NULL == pInput || NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -112,7 +114,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms return TSDB_CODE_SUCCESS; } -int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -130,7 +132,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -148,7 +150,7 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -165,8 +167,7 @@ int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } - -int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -184,7 +185,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -202,7 +203,8 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, + void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -225,7 +227,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -243,7 +245,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32 return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -261,13 +263,13 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_ return TSDB_CODE_SUCCESS; } -int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { +int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { return TSDB_CODE_TSC_INVALID_INPUT; } SBuildTableInput *pInput = input; - STableCfgReq cfgReq = {0}; + STableCfgReq cfgReq = {0}; cfgReq.header.vgId = pInput->vgId; strcpy(cfgReq.dbFName, pInput->dbFName); strcpy(cfgReq.tbName, pInput->tbName); @@ -282,7 +284,6 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } - int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -362,13 +363,12 @@ int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) { pMeta->uid = msg->tuid; pMeta->suid = msg->suid; - qDebug("ctable %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s suid %" PRIx64 , - msg->tbName, pMeta->uid, pMeta->tableType, pMeta->vgId, msg->dbFName, pMeta->suid); + qDebug("ctable %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s suid %" PRIx64, msg->tbName, pMeta->uid, + pMeta->tableType, pMeta->vgId, msg->dbFName, pMeta->suid); return TSDB_CODE_SUCCESS; } - int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) { int32_t total = msg->numOfColumns + msg->numOfTags; int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; @@ -425,7 +425,8 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) { goto PROCESS_META_OVER; } - if (0 != strcmp(metaRsp.dbFName, TSDB_INFORMATION_SCHEMA_DB) && !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) { + if (0 != strcmp(metaRsp.dbFName, TSDB_INFORMATION_SCHEMA_DB) && + !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) { code = TSDB_CODE_TSC_INVALID_VALUE; goto PROCESS_META_OVER; } @@ -461,7 +462,6 @@ PROCESS_META_OVER: return code; } - int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) { SQnodeListRsp out = {0}; int32_t code = 0; @@ -496,7 +496,7 @@ int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) { return code; } - *(SArray**)output = out.dnodeList; + *(SArray **)output = out.dnodeList; return code; } @@ -516,12 +516,11 @@ int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) { return code; } - *(char**)output = strdup(out.ver); + *(char **)output = strdup(out.ver); return code; } - int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { SDbCfgRsp out = {0}; @@ -573,7 +572,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_INVALID_MSG; } - SFuncInfo * funcInfo = taosArrayGet(out.pFuncInfos, 0); + SFuncInfo *funcInfo = taosArrayGet(out.pFuncInfos, 0); memcpy(output, funcInfo, sizeof(*funcInfo)); taosArrayDestroy(out.pFuncInfos); @@ -599,12 +598,12 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_INVALID_INPUT; } - STableIndexRsp *out = (STableIndexRsp*)output; + STableIndexRsp *out = (STableIndexRsp *)output; if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) { qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize); return TSDB_CODE_INVALID_MSG; } - + return TSDB_CODE_SUCCESS; } @@ -619,39 +618,39 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_INVALID_MSG; } - *(STableCfgRsp**)output = out; - + *(STableCfgRsp **)output = out; + return TSDB_CODE_SUCCESS; } void initQueryModuleMsgHandle() { - queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryBuildDnodeListMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg; - queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg; + queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryBuildDnodeListMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg; + queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg; - queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryProcessDnodeListRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryProcessDnodeListRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; } #pragma GCC diagnostic pop diff --git a/source/libs/qcom/test/queryTest.cpp b/source/libs/qcom/test/queryTest.cpp index 9615557c88..7134f5c7b8 100644 --- a/source/libs/qcom/test/queryTest.cpp +++ b/source/libs/qcom/test/queryTest.cpp @@ -15,8 +15,9 @@ #include #include -#include "tmsg.h" + #include "query.h" +#include "tmsg.h" #include "trpc.h" #pragma GCC diagnostic push @@ -37,7 +38,7 @@ int32_t testPrint(void* p) { } int32_t testPrintError(void* p) { - SParam* param = (SParam*) p; + SParam* param = (SParam*)p; taosMemoryFreeClear(p); return -1; @@ -67,8 +68,8 @@ TEST(testCase, async_task_test) { } TEST(testCase, many_async_task_test) { - for(int32_t i = 0; i < 50; ++i) { - SParam* p = (SParam*) taosMemoryCalloc(1, sizeof(SParam)); + for (int32_t i = 0; i < 50; ++i) { + SParam* p = (SParam*)taosMemoryCalloc(1, sizeof(SParam)); p->v = i; taosAsyncExec(testPrint, p, NULL); } @@ -78,7 +79,7 @@ TEST(testCase, many_async_task_test) { TEST(testCase, error_in_async_test) { int32_t code = 0; - SParam* p = (SParam*) taosMemoryCalloc(1, sizeof(SParam)); + SParam* p = (SParam*)taosMemoryCalloc(1, sizeof(SParam)); taosAsyncExec(testPrintError, p, &code); taosMsleep(1); printf("Error code:%d after asynchronously exec function\n", code); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 3a32263599..626e78b4d3 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -20,22 +20,22 @@ extern "C" { #endif +#include "executor.h" #include "osDef.h" +#include "plannodes.h" #include "qworker.h" #include "tlockfree.h" -#include "ttimer.h" #include "tref.h" -#include "plannodes.h" -#include "executor.h" #include "trpc.h" +#include "ttimer.h" #define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 -#define QW_SCH_TIMEOUT_MSEC 180000 -#define QW_MIN_RES_ROWS 4096 +#define QW_SCH_TIMEOUT_MSEC 180000 +#define QW_MIN_RES_ROWS 4096 enum { QW_PHASE_PRE_QUERY = 1, @@ -128,16 +128,16 @@ typedef struct SQWTaskCtx { bool queryContinue; bool queryInQueue; int32_t rspCode; - int64_t affectedRows; // for insert ...select stmt + int64_t affectedRows; // for insert ...select stmt SRpcHandleInfo ctrlConnInfo; SRpcHandleInfo dataConnInfo; int8_t events[QW_EVENT_MAX]; - SArray *explainRes; - void *taskHandle; - void *sinkHandle; + SArray *explainRes; + void *taskHandle; + void *sinkHandle; STbVerInfo tbInfo; } SQWTaskCtx; @@ -157,14 +157,14 @@ typedef struct SQWTimeInQ { typedef struct SQWMsgStat { SQWTimeInQ waitTime[2]; - uint64_t queryProcessed; - uint64_t cqueryProcessed; - uint64_t fetchProcessed; - uint64_t rspProcessed; - uint64_t cancelProcessed; - uint64_t dropProcessed; - uint64_t hbProcessed; - uint64_t deleteProcessed; + uint64_t queryProcessed; + uint64_t cqueryProcessed; + uint64_t fetchProcessed; + uint64_t rspProcessed; + uint64_t cancelProcessed; + uint64_t dropProcessed; + uint64_t hbProcessed; + uint64_t deleteProcessed; } SQWMsgStat; typedef struct SQWRTStat { @@ -173,8 +173,8 @@ typedef struct SQWRTStat { } SQWRTStat; typedef struct SQWStat { - SQWMsgStat msgStat; - SQWRTStat rtStat; + SQWMsgStat msgStat; + SQWRTStat rtStat; } SQWStat; // Qnode/Vnode level task management @@ -183,15 +183,15 @@ typedef struct SQWorker { SQWorkerCfg cfg; int8_t nodeType; int32_t nodeId; - void * timer; + void *timer; tmr_h hbTimer; SRWLatch schLock; // SRWLatch ctxLock; - SHashObj *schHash; // key: schedulerId, value: SQWSchStatus - SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx - SMsgCb msgCb; - SQWStat stat; - int32_t *destroyed; + SHashObj *schHash; // key: schedulerId, value: SQWSchStatus + SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx + SMsgCb msgCb; + SQWStat stat; + int32_t *destroyed; } SQWorker; typedef struct SQWorkerMgmt { @@ -208,16 +208,21 @@ typedef struct SQWorkerMgmt { #define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) -#define QW_STAT_GET(_item) atomic_load_64(&(_item)) +#define QW_STAT_GET(_item) atomic_load_64(&(_item)) -#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) -#define QW_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) -#define QW_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) +#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) +#define QW_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) +#define QW_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase) -#define QW_SET_PHASE(ctx, _value) do { if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { atomic_store_8(&(ctx)->phase, _value); } } while (0) +#define QW_SET_PHASE(ctx, _value) \ + do { \ + if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { \ + atomic_store_8(&(ctx)->phase, _value); \ + } \ + } while (0) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) @@ -230,7 +235,7 @@ typedef struct SQWorkerMgmt { *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \ *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \ } while (0) - + #define QW_GET_QTID(id, qId, tId, eId) \ do { \ (qId) = *(uint64_t *)(id); \ @@ -268,10 +273,10 @@ typedef struct SQWorkerMgmt { #define QW_TLOG(_param, ...) qTrace("QW:%p " _param, mgmt, __VA_ARGS__) #define QW_DUMP(_param, ...) \ - do { \ - if (gQWDebug.dumpEnable) { \ + do { \ + if (gQWDebug.dumpEnable) { \ qDebug("QW:%p " _param, mgmt, __VA_ARGS__); \ - } \ + } \ } while (0) #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) @@ -287,12 +292,15 @@ typedef struct SQWorkerMgmt { #define QW_TASK_WLOG_E(param) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) #define QW_TASK_DLOG_E(param) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) -#define QW_SCH_TASK_ELOG(param, ...) \ - qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__) -#define QW_SCH_TASK_WLOG(param, ...) \ - qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__) -#define QW_SCH_TASK_DLOG(param, ...) \ - qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__) +#define QW_SCH_TASK_ELOG(param, ...) \ + qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \ + __VA_ARGS__) +#define QW_SCH_TASK_WLOG(param, ...) \ + qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \ + __VA_ARGS__) +#define QW_SCH_TASK_DLOG(param, ...) \ + qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \ + __VA_ARGS__) #define QW_LOCK_DEBUG(...) \ do { \ @@ -337,41 +345,41 @@ typedef struct SQWorkerMgmt { } \ } while (0) - extern SQWorkerMgmt gQwMgmt; -static FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); } +static FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { + return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); +} static FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); } -char *qwPhaseStr(int32_t phase); -char *qwBufStatusStr(int32_t bufStatus); +char *qwPhaseStr(int32_t phase); +char *qwBufStatusStr(int32_t bufStatus); int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); -void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt); +void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt); int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status); int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); -void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); +void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwDropTask(QW_FPARAMS_DEF); -void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); +void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwOpenRef(void); -void qwSetHbParam(int64_t refId, SQWHbParam **pParam); +void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); -void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch); +void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); -void qwFreeTaskCtx(SQWTaskCtx *ctx); +void qwFreeTaskCtx(SQWTaskCtx *ctx); -void qwDbgDumpMgmtInfo(SQWorker *mgmt); +void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); -void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); -void qwDbgSimulateSleep(void); -void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); - +void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); +void qwDbgSimulateSleep(void); +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); #ifdef __cplusplus } diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index b46c5d6baf..f226b223f7 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -20,12 +20,12 @@ extern "C" { #endif -#include "qwInt.h" #include "dataSinkMgt.h" +#include "qwInt.h" int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql); +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); @@ -35,11 +35,12 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); +int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, + int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); -int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList); +int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList); int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 98d7825b2c..6997bdfd9c 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -1,15 +1,19 @@ -#include "qworker.h" #include "dataSinkMgt.h" #include "executor.h" #include "planner.h" #include "query.h" #include "qwInt.h" #include "qwMsg.h" +#include "qworker.h" #include "tcommon.h" #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .redirectSimulate = false, .deadSimulate = false, .sleepSimulate = false}; +SQWDebug gQWDebug = {.statusEnable = true, + .dumpEnable = false, + .redirectSimulate = false, + .deadSimulate = false, + .sleepSimulate = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -29,15 +33,13 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, switch (oriStatus) { case JOB_TASK_STATUS_NULL: - if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && - newStatus != JOB_TASK_STATUS_INIT) { + if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_INIT) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_INIT: - if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC - && newStatus != JOB_TASK_STATUS_FAIL) { + if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -50,8 +52,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, break; case JOB_TASK_STATUS_PART_SUCC: - if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && - newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) { + if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC && newStatus != JOB_TASK_STATUS_FAIL && + newStatus != JOB_TASK_STATUS_DROP) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -89,7 +91,8 @@ _return: void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { QW_LOCK(QW_READ, &sch->tasksLock); - QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taosHashGetSize(sch->tasksHash)); + QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, + taosHashGetSize(sch->tasksHash)); QW_UNLOCK(QW_READ, &sch->tasksLock); } @@ -120,11 +123,10 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); } - int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { int32_t contLen = 0; - char* rsp = NULL; - + char *rsp = NULL; + if (pEpSet) { contLen = tSerializeSEpSet(NULL, 0, pEpSet); rsp = rpcMallocCont(contLen); @@ -152,12 +154,12 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { if (*rsped) { return; } - + if (gQWDebug.redirectSimulate) { if (++ignoreTime <= 10) { return; } - + if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; epSet.inUse = 1; @@ -169,12 +171,12 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { strcpy(epSet.eps[2].fqdn, "localhost"); epSet.eps[2].port = 7300; - ctx->phase = QW_PHASE_POST_QUERY; + ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); *rsped = true; return; } - + if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); @@ -213,18 +215,18 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { static int32_t ignoreTime = 0; if (++ignoreTime > 10 && 0 == taosRand() % 9) { - SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); + SRpcHandleInfo *pConn = + ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo + : &ctx->ctrlConnInfo); qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); qwBuildAndSendDropMsg(QW_FPARAMS(), pConn); *rsped = true; - + return; } } - - int32_t qwDbgEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { gQWDebug.lockEnable = true; @@ -263,8 +265,6 @@ int32_t qwDbgEnableDebug(char *option) { } qError("invalid qw debug option:%s", option); - + return TSDB_CODE_APP_ERROR; } - - diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index e4271dfcda..24ab45be2c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -3,8 +3,8 @@ #include "executor.h" #include "planner.h" #include "query.h" -#include "qworker.h" #include "qwInt.h" +#include "qworker.h" #include "tcommon.h" #include "tmsg.h" #include "tname.h" @@ -12,7 +12,8 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize)); + SRetrieveTableRsp *pRsp = + (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize)); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -21,7 +22,7 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r if (NULL == *rsp) { memset(pRsp, 0, sizeof(SRetrieveTableRsp)); } - + *rsp = pRsp; return TSDB_CODE_SUCCESS; @@ -61,8 +62,8 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c } int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { - STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL; - int64_t affectedRows = ctx ? ctx->affectedRows : 0; + STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL; + int64_t affectedRows = ctx ? ctx->affectedRows : 0; SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = htonl(code); pRsp->affectedRows = htobe64(affectedRows); @@ -85,12 +86,12 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) { - SExplainExecInfo* pInfo = taosArrayGet(pExecList, 0); - SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo}; +int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) { + SExplainExecInfo *pInfo = taosArrayGet(pExecList, 0); + SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); - void * pRsp = rpcMallocCont(contLen); + void *pRsp = rpcMallocCont(contLen); tSerializeSExplainRsp(pRsp, contLen, &rsp); SRpcMsg rpcRsp = { @@ -108,7 +109,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) { int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); - void * pRsp = rpcMallocCont(contLen); + void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { @@ -124,7 +125,8 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, + int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -209,7 +211,6 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { return TSDB_CODE_SUCCESS; } - int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { @@ -309,7 +310,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -330,7 +331,8 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = msg->refId; int32_t eId = msg->execId; - SQWMsg qwMsg = {.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; + SQWMsg qwMsg = { + .msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg)); @@ -345,7 +347,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { } SSubQueryMsg *msg = pMsg->pCont; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; uint64_t sId = msg->sId; uint64_t qId = msg->queryId; @@ -367,7 +369,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); @@ -383,13 +385,18 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int64_t rId = msg->refId; int32_t eId = msg->execId; - SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; + SQWMsg qwMsg = {.node = node, + .msg = msg->msg + msg->sqlLen, + .msgLen = msg->phyLen, + .connInfo = pMsg->info, + .msgType = pMsg->msgType}; qwMsg.msgInfo.explain = msg->explain; qwMsg.msgInfo.taskType = msg->taskType; qwMsg.msgInfo.needFetch = msg->needFetch; - - char * sql = strndup(msg->msg, msg->sqlLen); - QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); + + char *sql = strndup(msg->msg, msg->sqlLen); + QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), + pMsg->info.handle, sql); QW_ERR_JRET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); _return: @@ -405,8 +412,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in bool queryDone = false; SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; - SQWTaskCtx * handles = NULL; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWTaskCtx *handles = NULL; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); @@ -439,7 +446,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int } SResFetchReq *msg = pMsg->pCont; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); @@ -472,7 +479,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int } int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (mgmt) { qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); @@ -488,7 +495,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; @@ -531,7 +538,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t code = 0; STaskDropReq *msg = pMsg->pCont; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); @@ -575,7 +582,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t code = 0; SSchedulerHbReq req = {0}; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); @@ -606,20 +613,19 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ return TSDB_CODE_SUCCESS; } - int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; + int32_t code = 0; SVDeleteReq req = {0}; - SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); - + uint64_t sId = req.sId; uint64_t qId = req.queryId; uint64_t tId = req.taskId; @@ -639,5 +645,3 @@ _return: QW_RET(code); } - - diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 9f1a9a3146..3038b87930 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -295,7 +295,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { if (ctx->ctrlConnInfo.handle) { tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); } - + ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.refId = -1; @@ -454,21 +454,21 @@ void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksH void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; - int8_t nodeType = mgmt->nodeType; - int32_t nodeId = mgmt->nodeId; - - qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); + int8_t nodeType = mgmt->nodeType; + int32_t nodeId = mgmt->nodeId; + + qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); taosTmrStop(mgmt->hbTimer); mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); uint64_t qId, tId; - int32_t eId; - void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; - void *key = taosHashGetKey(pIter, NULL); + void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, tId, eId); qwFreeTaskCtx(ctx); @@ -486,14 +486,14 @@ void qwDestroyImpl(void *pMgmt) { taosHashCleanup(mgmt->schHash); *mgmt->destroyed = 1; - + taosMemoryFree(mgmt); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); qwCloseRef(); - qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); + qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); } int32_t qwOpenRef(void) { @@ -550,11 +550,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return -1; } - -void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) { +void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { - uint64_t *sId = taosArrayGet(pExpiredSch, i); + uint64_t *sId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { continue; @@ -569,5 +568,3 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) { qwReleaseScheduler(QW_WRITE, mgmt); } } - - diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ae9dd82a58..9115c2d3aa 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -7,9 +7,9 @@ #include "qwInt.h" #include "qwMsg.h" #include "tcommon.h" +#include "tdatablock.h" #include "tmsg.h" #include "tname.h" -#include "tdatablock.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -17,8 +17,8 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; -static void freeBlock(void* param) { - SSDataBlock* pBlock = *(SSDataBlock**)param; +static void freeBlock(void *param) { + SSDataBlock *pBlock = *(SSDataBlock **)param; blockDataDestroy(pBlock); } @@ -100,7 +100,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes}; + SLocalFetch localFetch = {(void *)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { @@ -512,7 +512,7 @@ _return: QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); } } - + ctx->queryRsped = true; } @@ -541,8 +541,8 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { } int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { - int32_t code = 0; - SQWTaskCtx *ctx = NULL; + int32_t code = 0; + SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -1113,10 +1113,10 @@ void qWorkerDestroy(void **qWorkerMgmt) { return; } - int32_t destroyed = 0; + int32_t destroyed = 0; SQWorker *mgmt = *qWorkerMgmt; mgmt->destroyed = &destroyed; - + if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) { qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId); return; @@ -1153,15 +1153,16 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) { - SQWorker *mgmt = (SQWorker*)pMgmt; +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, + SQWMsg *qwMsg, SArray *explainRes) { + SQWorker *mgmt = (SQWorker *)pMgmt; int32_t code = 0; SQWTaskCtx *ctx = NULL; SSubplan *plan = (SSubplan *)qwMsg->msg; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; - SReadHandle rHandle = {0}; + SReadHandle rHandle = {0}; QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); @@ -1178,7 +1179,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; - + code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -1199,7 +1200,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 _return: taosMemoryFree(rHandle.pMsgCb); - + input.code = code; input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); @@ -1212,13 +1213,14 @@ _return: QW_RET(code); } -int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) { - SQWorker *mgmt = (SQWorker*)pMgmt; - int32_t code = 0; - int32_t dataLen = 0; - SQWTaskCtx *ctx = NULL; - void *rsp = NULL; - bool queryStop = false; +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, + void **pRsp, SArray *explainRes) { + SQWorker *mgmt = (SQWorker *)pMgmt; + int32_t code = 0; + int32_t dataLen = 0; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; + bool queryStop = false; SQWPhaseInput input = {0}; @@ -1228,15 +1230,15 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ctx->msgType = TDMT_SCH_MERGE_FETCH; ctx->explainRes = explainRes; - + SOutputData sOutput = {0}; - + while (true) { QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); - + continue; } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); @@ -1259,5 +1261,3 @@ _return: QW_RET(code); } - - diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 60d6594c1b..c855f005e7 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -32,18 +32,17 @@ #endif #include "os.h" -#include "tglobal.h" -#include "taos.h" -#include "tdef.h" -#include "tvariant.h" -#include "tdatablock.h" -#include "trpc.h" +#include "dataSinkMgt.h" +#include "executor.h" #include "planner.h" #include "qworker.h" #include "stub.h" -#include "executor.h" -#include "dataSinkMgt.h" - +#include "taos.h" +#include "tdatablock.h" +#include "tdef.h" +#include "tglobal.h" +#include "trpc.h" +#include "tvariant.h" namespace { @@ -55,56 +54,52 @@ bool qwtEnableLog = true; int32_t qwtTestMaxExecTaskUsec = 2; int32_t qwtTestReqMaxDelayUsec = 2; -int64_t qwtTestQueryId = 0; -bool qwtTestEnableSleep = true; -bool qwtTestStop = false; -bool qwtTestDeadLoop = false; -int32_t qwtTestMTRunSec = 2; -int32_t qwtTestPrintNum = 10000; +int64_t qwtTestQueryId = 0; +bool qwtTestEnableSleep = true; +bool qwtTestStop = false; +bool qwtTestDeadLoop = false; +int32_t qwtTestMTRunSec = 2; +int32_t qwtTestPrintNum = 10000; uint64_t qwtTestCaseIdx = 0; uint64_t qwtTestCaseNum = 4; -bool qwtTestCaseFinished = false; -tsem_t qwtTestQuerySem; -tsem_t qwtTestFetchSem; -int32_t qwtTestQuitThreadNum = 0; +bool qwtTestCaseFinished = false; +tsem_t qwtTestQuerySem; +tsem_t qwtTestFetchSem; +int32_t qwtTestQuitThreadNum = 0; - -int32_t qwtTestQueryQueueRIdx = 0; -int32_t qwtTestQueryQueueWIdx = 0; -int32_t qwtTestQueryQueueNum = 0; -SRWLatch qwtTestQueryQueueLock = 0; +int32_t qwtTestQueryQueueRIdx = 0; +int32_t qwtTestQueryQueueWIdx = 0; +int32_t qwtTestQueryQueueNum = 0; +SRWLatch qwtTestQueryQueueLock = 0; struct SRpcMsg *qwtTestQueryQueue[qwtTestQueryQueueSize] = {0}; -int32_t qwtTestFetchQueueRIdx = 0; -int32_t qwtTestFetchQueueWIdx = 0; -int32_t qwtTestFetchQueueNum = 0; -SRWLatch qwtTestFetchQueueLock = 0; +int32_t qwtTestFetchQueueRIdx = 0; +int32_t qwtTestFetchQueueWIdx = 0; +int32_t qwtTestFetchQueueNum = 0; +SRWLatch qwtTestFetchQueueLock = 0; struct SRpcMsg *qwtTestFetchQueue[qwtTestFetchQueueSize] = {0}; - -int32_t qwtTestSinkBlockNum = 0; -int32_t qwtTestSinkMaxBlockNum = 0; -bool qwtTestSinkQueryEnd = false; +int32_t qwtTestSinkBlockNum = 0; +int32_t qwtTestSinkMaxBlockNum = 0; +bool qwtTestSinkQueryEnd = false; SRWLatch qwtTestSinkLock = 0; -int32_t qwtTestSinkLastLen = 0; +int32_t qwtTestSinkLastLen = 0; - -SSubQueryMsg qwtqueryMsg = {0}; -SRpcMsg qwtfetchRpc = {0}; -SResFetchReq qwtfetchMsg = {0}; -SRpcMsg qwtreadyRpc = {0}; -SResReadyReq qwtreadyMsg = {0}; -SRpcMsg qwtdropRpc = {0}; -STaskDropReq qwtdropMsg = {0}; +SSubQueryMsg qwtqueryMsg = {0}; +SRpcMsg qwtfetchRpc = {0}; +SResFetchReq qwtfetchMsg = {0}; +SRpcMsg qwtreadyRpc = {0}; +SResReadyReq qwtreadyMsg = {0}; +SRpcMsg qwtdropRpc = {0}; +STaskDropReq qwtdropMsg = {0}; SSchTasksStatusReq qwtstatusMsg = {0}; - void qwtInitLogFile() { if (!qwtEnableLog) { return; } - const char *defaultLogFileNamePrefix = "taosdlog"; - const int32_t maxLogFileNum = 10; + const char *defaultLogFileNamePrefix = "taosdlog"; + const int32_t maxLogFileNum = 10; tsAsyncLog = 0; qDebugFlag = 159; @@ -113,7 +108,6 @@ void qwtInitLogFile() { if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { printf("failed to open log file in directory:%s\n", tsLogDir); } - } void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { @@ -145,7 +139,7 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropRpc->contLen = sizeof(STaskDropReq); } -int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { +int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { *subplan = (SSubplan *)0x1; return 0; } @@ -153,12 +147,12 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestFetchQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); - memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); + memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { qwtTestFetchQueueWIdx = 0; } - + qwtTestFetchQueueNum++; if (qwtTestFetchQueueWIdx == qwtTestFetchQueueRIdx) { @@ -166,9 +160,9 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { assert(0); } taosWUnLockLatch(&qwtTestFetchQueueLock); - + tsem_post(&qwtTestFetchSem); - + return 0; } @@ -188,19 +182,15 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { assert(0); } taosWUnLockLatch(&qwtTestQueryQueueLock); - + tsem_post(&qwtTestQuerySem); - + return 0; } -void qwtSendReqToDnode(void* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { - -} - +void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {} void qwtRpcSendResponse(const SRpcMsg *pRsp) { - switch (pRsp->msgType) { case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { @@ -210,14 +200,14 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); } - + rpcFreeCont(rsp); break; } case TDMT_SCH_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont; - + if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); @@ -228,7 +218,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); rpcFreeCont(rsp); - + break; } case TDMT_SCH_DROP_TASK_RSP: { @@ -240,26 +230,26 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { } } - return; } -int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo, + DataSinkHandle *handle) { qwtTestSinkBlockNum = 0; qwtTestSinkMaxBlockNum = taosRand() % 100 + 1; qwtTestSinkQueryEnd = false; - - *pTaskInfo = (qTaskInfo_t)((char*)qwtTestCaseIdx+1); - *handle = (DataSinkHandle)((char*)qwtTestCaseIdx+2); + + *pTaskInfo = (qTaskInfo_t)((char *)qwtTestCaseIdx + 1); + *handle = (DataSinkHandle)((char *)qwtTestCaseIdx + 2); ++qwtTestCaseIdx; - + return 0; } -int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { +int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { int32_t endExec = 0; - + if (NULL == tinfo) { *pRes = NULL; *useconds = 0; @@ -269,9 +259,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { *useconds = taosRand() % 10; return 0; } - + endExec = taosRand() % 5; - + int32_t runTime = 0; if (qwtTestEnableSleep && qwtTestMaxExecTaskUsec > 0) { runTime = taosRand() % qwtTestMaxExecTaskUsec; @@ -282,28 +272,24 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { taosUsleep(runTime); } } - + if (endExec) { - *pRes = (SSDataBlock*)taosMemoryCalloc(1, sizeof(SSDataBlock)); + *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); (*pRes)->info.rows = taosRand() % 1000 + 1; } else { *pRes = NULL; *useconds = taosRand() % 10; } } - + return 0; } -int32_t qwtKillTask(qTaskInfo_t qinfo) { - return 0; -} +int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; } -void qwtDestroyTask(qTaskInfo_t qHandle) { -} +void qwtDestroyTask(qTaskInfo_t qHandle) {} - -int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { +int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData *pInput, bool *pContinue) { if (NULL == handle || NULL == pInput || NULL == pContinue) { assert(0); } @@ -320,7 +306,7 @@ int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* p *pContinue = true; } taosWUnLockLatch(&qwtTestSinkLock); - + return 0; } @@ -332,7 +318,7 @@ void qwtEndPut(DataSinkHandle handle, uint64_t useconds) { qwtTestSinkQueryEnd = true; } -void qwtGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) { +void qwtGetDataLength(DataSinkHandle handle, int64_t *pLen, bool *pQueryEnd) { static int32_t in = 0; if (in > 0) { @@ -340,7 +326,7 @@ void qwtGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) { } atomic_add_fetch_32(&in, 1); - + if (NULL == handle) { assert(0); } @@ -360,7 +346,7 @@ void qwtGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) { atomic_sub_fetch_32(&in, 1); } -int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { +int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData *pOutput) { taosWLockLatch(&qwtTestSinkLock); if (qwtTestSinkLastLen > 0) { pOutput->numOfRows = taosRand() % 10 + 1; @@ -368,7 +354,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { pOutput->queryEnd = qwtTestSinkQueryEnd; if (qwtTestSinkBlockNum == 0) { pOutput->bufStatus = DS_BUF_EMPTY; - } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) { + } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum * 0.5) { pOutput->bufStatus = DS_BUF_LOW; } else { pOutput->bufStatus = DS_BUF_FULL; @@ -382,7 +368,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { pOutput->queryEnd = qwtTestSinkQueryEnd; if (qwtTestSinkBlockNum == 0) { pOutput->bufStatus = DS_BUF_EMPTY; - } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum*0.5) { + } else if (qwtTestSinkBlockNum <= qwtTestSinkMaxBlockNum * 0.5) { pOutput->bufStatus = DS_BUF_LOW; } else { pOutput->bufStatus = DS_BUF_FULL; @@ -393,31 +379,27 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { assert(0); } taosWUnLockLatch(&qwtTestSinkLock); - + return 0; } -void qwtDestroyDataSinker(DataSinkHandle handle) { - -} - - +void qwtDestroyDataSinker(DataSinkHandle handle) {} void stubSetStringToPlan() { static Stub stub; stub.set(qStringToSubplan, qwtStringToPlan); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("qStringToSubplan", result); #endif #ifdef LINUX - AddrAny any("libplanner.so"); - std::map result; + AddrAny any("libplanner.so"); + std::map result; any.get_global_func_addr_dynsym("^qStringToSubplan$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtStringToPlan); } } @@ -428,38 +410,36 @@ void stubSetExecTask() { stub.set(qExecTask, qwtExecTask); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("qExecTask", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^qExecTask$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtExecTask); } } } - - void stubSetCreateExecTask() { static Stub stub; stub.set(qCreateExecTask, qwtCreateExecTask); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("qCreateExecTask", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^qCreateExecTask$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtCreateExecTask); } } @@ -470,16 +450,16 @@ void stubSetAsyncKillTask() { stub.set(qAsyncKillTask, qwtKillTask); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("qAsyncKillTask", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^qAsyncKillTask$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtKillTask); } } @@ -490,37 +470,36 @@ void stubSetDestroyTask() { stub.set(qDestroyTask, qwtDestroyTask); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("qDestroyTask", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^qDestroyTask$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtDestroyTask); } } } - void stubSetDestroyDataSinker() { static Stub stub; stub.set(dsDestroyDataSinker, qwtDestroyDataSinker); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("dsDestroyDataSinker", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtDestroyDataSinker); } } @@ -531,16 +510,16 @@ void stubSetGetDataLength() { stub.set(dsGetDataLength, qwtGetDataLength); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("dsGetDataLength", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^dsGetDataLength$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtGetDataLength); } } @@ -551,16 +530,16 @@ void stubSetEndPut() { stub.set(dsEndPut, qwtEndPut); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("dsEndPut", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^dsEndPut$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtEndPut); } } @@ -571,16 +550,16 @@ void stubSetPutDataBlock() { stub.set(dsPutDataBlock, qwtPutDataBlock); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("dsPutDataBlock", result); #endif #ifdef LINUX - AddrAny any("libexecutor.so"); - std::map result; + AddrAny any("libexecutor.so"); + std::map result; any.get_global_func_addr_dynsym("^dsPutDataBlock$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtPutDataBlock); } } @@ -591,16 +570,16 @@ void stubSetRpcSendResponse() { stub.set(rpcSendResponse, qwtRpcSendResponse); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("rpcSendResponse", result); #endif #ifdef LINUX - AddrAny any("libtransport.so"); - std::map result; + AddrAny any("libtransport.so"); + std::map result; any.get_global_func_addr_dynsym("^rpcSendResponse$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtRpcSendResponse); } } @@ -611,34 +590,33 @@ void stubSetGetDataBlock() { stub.set(dsGetDataBlock, qwtGetDataBlock); { #ifdef WINDOWS - AddrAny any; - std::map result; + AddrAny any; + std::map result; any.get_func_addr("dsGetDataBlock", result); #endif #ifdef LINUX - AddrAny any("libtransport.so"); - std::map result; + AddrAny any("libtransport.so"); + std::map result; any.get_global_func_addr_dynsym("^dsGetDataBlock$", result); #endif - for (const auto& f : result) { + for (const auto &f : result) { stub.set(f.second, qwtGetDataBlock); } } } - void *queryThread(void *param) { - SRpcMsg queryRpc = {0}; - int32_t code = 0; + SRpcMsg queryRpc = {0}; + int32_t code = 0; uint32_t n = 0; - void *mockPointer = (void *)0x1; - void *mgmt = param; + void *mockPointer = (void *)0x1; + void *mgmt = param; while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); if (qwtTestEnableSleep) { - taosUsleep(taosRand()%5); + taosUsleep(taosRand() % 5); } if (++n % qwtTestPrintNum == 0) { printf("query:%d\n", n); @@ -649,55 +627,55 @@ void *queryThread(void *param) { } void *fetchThread(void *param) { - SRpcMsg fetchRpc = {0}; - int32_t code = 0; - uint32_t n = 0; - void *mockPointer = (void *)0x1; - void *mgmt = param; + SRpcMsg fetchRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; SResFetchReq fetchMsg = {0}; while (!qwtTestStop) { qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); if (qwtTestEnableSleep) { - taosUsleep(taosRand()%5); + taosUsleep(taosRand() % 5); } if (++n % qwtTestPrintNum == 0) { printf("fetch:%d\n", n); - } + } } return NULL; } void *dropThread(void *param) { - SRpcMsg dropRpc = {0}; - int32_t code = 0; - uint32_t n = 0; - void *mockPointer = (void *)0x1; - void *mgmt = param; - STaskDropReq dropMsg = {0}; + SRpcMsg dropRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + STaskDropReq dropMsg = {0}; while (!qwtTestStop) { qwtBuildDropReqMsg(&dropMsg, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); if (qwtTestEnableSleep) { - taosUsleep(taosRand()%5); + taosUsleep(taosRand() % 5); } if (++n % qwtTestPrintNum == 0) { printf("drop:%d\n", n); - } + } } return NULL; } void *qwtclientThread(void *param) { - int32_t code = 0; + int32_t code = 0; uint32_t n = 0; - void *mgmt = param; - void *mockPointer = (void *)0x1; - SRpcMsg queryRpc = {0}; + void *mgmt = param; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; taosSsleep(1); @@ -710,8 +688,7 @@ void *qwtclientThread(void *param) { while (!qwtTestCaseFinished) { taosUsleep(1); } - - + if (++n % qwtTestPrintNum == 0) { printf("case run:%d\n", n); } @@ -723,9 +700,9 @@ void *qwtclientThread(void *param) { } void *queryQueueThread(void *param) { - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; SRpcMsg *queryRpc = NULL; - void *mgmt = param; + void *mgmt = param; while (true) { tsem_wait(&qwtTestQuerySem); @@ -739,17 +716,16 @@ void *queryQueueThread(void *param) { printf("query queue is empty\n"); assert(0); } - + queryRpc = qwtTestQueryQueue[qwtTestQueryQueueRIdx++]; - + if (qwtTestQueryQueueRIdx >= qwtTestQueryQueueSize) { qwtTestQueryQueueRIdx = 0; } - + qwtTestQueryQueueNum--; taosWUnLockLatch(&qwtTestQueryQueueLock); - if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) { int32_t delay = taosRand() % qwtTestReqMaxDelayUsec; @@ -757,7 +733,7 @@ void *queryQueueThread(void *param) { taosUsleep(delay); } } - + if (TDMT_SCH_QUERY == queryRpc->msgType) { qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) { @@ -780,29 +756,29 @@ void *queryQueueThread(void *param) { } void *fetchQueueThread(void *param) { - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; SRpcMsg *fetchRpc = NULL; - void *mgmt = param; + void *mgmt = param; while (true) { tsem_wait(&qwtTestFetchSem); if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) { break; - } + } taosWLockLatch(&qwtTestFetchQueueLock); if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) { printf("Fetch queue is empty\n"); assert(0); } - + fetchRpc = qwtTestFetchQueue[qwtTestFetchQueueRIdx++]; - + if (qwtTestFetchQueueRIdx >= qwtTestFetchQueueSize) { qwtTestFetchQueueRIdx = 0; } - + qwtTestFetchQueueNum--; taosWUnLockLatch(&qwtTestFetchQueueLock); @@ -835,7 +811,7 @@ void *fetchQueueThread(void *param) { if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) { break; - } + } } atomic_add_fetch_32(&qwtTestQuitThreadNum, 1); @@ -843,15 +819,12 @@ void *fetchQueueThread(void *param) { return NULL; } - - -} - +} // namespace TEST(seqTest, normalCase) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; SRpcMsg queryRpc = {0}; SRpcMsg fetchRpc = {0}; SRpcMsg dropRpc = {0}; @@ -861,7 +834,7 @@ TEST(seqTest, normalCase) { qwtBuildQueryReqMsg(&queryRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetExecTask(); @@ -873,7 +846,7 @@ TEST(seqTest, normalCase) { stubSetEndPut(); stubSetPutDataBlock(); stubSetGetDataBlock(); - + SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; @@ -883,8 +856,8 @@ TEST(seqTest, normalCase) { code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); ASSERT_EQ(code, 0); - //code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); - //ASSERT_EQ(code, 0); + // code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + // ASSERT_EQ(code, 0); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); ASSERT_EQ(code, 0); @@ -896,14 +869,14 @@ TEST(seqTest, normalCase) { } TEST(seqTest, cancelFirst) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; SRpcMsg queryRpc = {0}; SRpcMsg dropRpc = {0}; qwtInitLogFile(); - + qwtBuildQueryReqMsg(&queryRpc); qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); @@ -926,21 +899,21 @@ TEST(seqTest, cancelFirst) { } TEST(seqTest, randCase) { - void *mgmt = NULL; - int32_t code = 0; - void *mockPointer = (void *)0x1; - SRpcMsg queryRpc = {0}; - SRpcMsg readyRpc = {0}; - SRpcMsg fetchRpc = {0}; - SRpcMsg dropRpc = {0}; - SRpcMsg statusRpc = {0}; - SResReadyReq readyMsg = {0}; - SResFetchReq fetchMsg = {0}; - STaskDropReq dropMsg = {0}; + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg readyRpc = {0}; + SRpcMsg fetchRpc = {0}; + SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + SResReadyReq readyMsg = {0}; + SResFetchReq fetchMsg = {0}; + STaskDropReq dropMsg = {0}; SSchTasksStatusReq statusMsg = {0}; qwtInitLogFile(); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetCreateExecTask(); @@ -957,37 +930,37 @@ TEST(seqTest, randCase) { int32_t maxr = 10001; while (true) { int32_t r = taosRand() % maxr; - - if (r >= 0 && r < maxr/5) { - printf("Query,%d\n", t++); + + if (r >= 0 && r < maxr / 5) { + printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); - } else if (r >= maxr/5 && r < maxr * 2/5) { - //printf("Ready,%d\n", t++); - //qwtBuildReadyReqMsg(&readyMsg, &readyRpc); - //code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); - //if (qwtTestEnableSleep) { - // taosUsleep(1); - //} - } else if (r >= maxr * 2/5 && r < maxr* 3/5) { + } else if (r >= maxr / 5 && r < maxr * 2 / 5) { + // printf("Ready,%d\n", t++); + // qwtBuildReadyReqMsg(&readyMsg, &readyRpc); + // code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + // if (qwtTestEnableSleep) { + // taosUsleep(1); + // } + } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); if (qwtTestEnableSleep) { taosUsleep(1); } - } else if (r >= maxr * 3/5 && r < maxr * 4/5) { + } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) { printf("Drop,%d\n", t++); qwtBuildDropReqMsg(&dropMsg, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); if (qwtTestEnableSleep) { taosUsleep(1); } - } else if (r >= maxr * 4/5 && r < maxr-1) { + } else if (r >= maxr * 4 / 5 && r < maxr - 1) { printf("Status,%d\n", t++); if (qwtTestEnableSleep) { taosUsleep(1); - } + } } else { printf("QUIT RAND NOW"); break; @@ -998,12 +971,12 @@ TEST(seqTest, randCase) { } TEST(seqTest, multithreadRand) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; qwtInitLogFile(); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetExecTask(); @@ -1027,9 +1000,9 @@ TEST(seqTest, multithreadRand) { TdThreadAttr thattr; taosThreadAttrInit(&thattr); - TdThread t1,t2,t3,t4,t5,t6; + TdThread t1, t2, t3, t4, t5, t6; taosThreadCreate(&(t1), &thattr, queryThread, mgmt); - //taosThreadCreate(&(t2), &thattr, readyThread, NULL); + // taosThreadCreate(&(t2), &thattr, readyThread, NULL); taosThreadCreate(&(t3), &thattr, fetchThread, NULL); taosThreadCreate(&(t4), &thattr, dropThread, NULL); taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); @@ -1042,7 +1015,7 @@ TEST(seqTest, multithreadRand) { break; } } - + qwtTestStop = true; taosSsleep(3); @@ -1054,17 +1027,17 @@ TEST(seqTest, multithreadRand) { qwtTestFetchQueueRIdx = 0; qwtTestFetchQueueWIdx = 0; qwtTestFetchQueueLock = 0; - + qWorkerDestroy(&mgmt); } TEST(rcTest, shortExecshortDelay) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; qwtInitLogFile(); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetExecTask(); @@ -1096,7 +1069,7 @@ TEST(rcTest, shortExecshortDelay) { TdThreadAttr thattr; taosThreadAttrInit(&thattr); - TdThread t1,t2,t3,t4,t5; + TdThread t1, t2, t3, t4, t5; taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); @@ -1109,25 +1082,24 @@ TEST(rcTest, shortExecshortDelay) { break; } } - + qwtTestStop = true; while (true) { if (qwtTestQuitThreadNum == 3) { break; } - + taosSsleep(1); if (qwtTestCaseFinished) { - if (qwtTestQuitThreadNum < 3) { + if (qwtTestQuitThreadNum < 3) { tsem_post(&qwtTestQuerySem); tsem_post(&qwtTestFetchSem); taosUsleep(10); } } - } qwtTestQueryQueueNum = 0; @@ -1138,17 +1110,17 @@ TEST(rcTest, shortExecshortDelay) { qwtTestFetchQueueRIdx = 0; qwtTestFetchQueueWIdx = 0; qwtTestFetchQueueLock = 0; - - qWorkerDestroy(&mgmt); + + qWorkerDestroy(&mgmt); } TEST(rcTest, longExecshortDelay) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; qwtInitLogFile(); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetExecTask(); @@ -1180,7 +1152,7 @@ TEST(rcTest, longExecshortDelay) { TdThreadAttr thattr; taosThreadAttrInit(&thattr); - TdThread t1,t2,t3,t4,t5; + TdThread t1, t2, t3, t4, t5; taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); @@ -1193,26 +1165,24 @@ TEST(rcTest, longExecshortDelay) { break; } } - + qwtTestStop = true; - while (true) { if (qwtTestQuitThreadNum == 3) { break; } - + taosSsleep(1); if (qwtTestCaseFinished) { - if (qwtTestQuitThreadNum < 3) { + if (qwtTestQuitThreadNum < 3) { tsem_post(&qwtTestQuerySem); tsem_post(&qwtTestFetchSem); - + taosUsleep(10); } } - } qwtTestQueryQueueNum = 0; @@ -1223,18 +1193,17 @@ TEST(rcTest, longExecshortDelay) { qwtTestFetchQueueRIdx = 0; qwtTestFetchQueueWIdx = 0; qwtTestFetchQueueLock = 0; - + qWorkerDestroy(&mgmt); } - TEST(rcTest, shortExeclongDelay) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; qwtInitLogFile(); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetExecTask(); @@ -1266,7 +1235,7 @@ TEST(rcTest, shortExeclongDelay) { TdThreadAttr thattr; taosThreadAttrInit(&thattr); - TdThread t1,t2,t3,t4,t5; + TdThread t1, t2, t3, t4, t5; taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); @@ -1279,26 +1248,24 @@ TEST(rcTest, shortExeclongDelay) { break; } } - - qwtTestStop = true; + qwtTestStop = true; while (true) { if (qwtTestQuitThreadNum == 3) { break; } - + taosSsleep(1); if (qwtTestCaseFinished) { - if (qwtTestQuitThreadNum < 3) { + if (qwtTestQuitThreadNum < 3) { tsem_post(&qwtTestQuerySem); tsem_post(&qwtTestFetchSem); - + taosUsleep(10); } } - } qwtTestQueryQueueNum = 0; @@ -1309,18 +1276,17 @@ TEST(rcTest, shortExeclongDelay) { qwtTestFetchQueueRIdx = 0; qwtTestFetchQueueWIdx = 0; qwtTestFetchQueueLock = 0; - + qWorkerDestroy(&mgmt); } - TEST(rcTest, dropTest) { - void *mgmt = NULL; + void *mgmt = NULL; int32_t code = 0; - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; qwtInitLogFile(); - + stubSetStringToPlan(); stubSetRpcSendResponse(); stubSetExecTask(); @@ -1347,7 +1313,7 @@ TEST(rcTest, dropTest) { TdThreadAttr thattr; taosThreadAttrInit(&thattr); - TdThread t1,t2,t3,t4,t5; + TdThread t1, t2, t3, t4, t5; taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); @@ -1360,15 +1326,14 @@ TEST(rcTest, dropTest) { break; } } - + qwtTestStop = true; taosSsleep(3); - + qWorkerDestroy(&mgmt); } - -int main(int argc, char** argv) { +int main(int argc, char **argv) { taosSeedRand(taosGetTimestampSec()); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/tools/scripts/codeFormat.sh b/tools/scripts/codeFormat.sh index 798b566344..16c519d1f2 100644 --- a/tools/scripts/codeFormat.sh +++ b/tools/scripts/codeFormat.sh @@ -22,8 +22,8 @@ FORMAT_DIR_LIST=( "${PRJ_ROOT_DIR}/source/libs/nodes" # "${PRJ_ROOT_DIR}/source/libs/parser" "${PRJ_ROOT_DIR}/source/libs/planner" - # "${PRJ_ROOT_DIR}/source/libs/qcom" - # "${PRJ_ROOT_DIR}/source/libs/qworker" + "${PRJ_ROOT_DIR}/source/libs/qcom" + "${PRJ_ROOT_DIR}/source/libs/qworker" "${PRJ_ROOT_DIR}/source/libs/scalar" "${PRJ_ROOT_DIR}/source/libs/stream" "${PRJ_ROOT_DIR}/source/libs/sync"