From a696ed583a8f4ea7a5dd4a270e5efba2dbcee900 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 18 May 2022 21:23:06 +0800 Subject: [PATCH] test(stream): _wstartts should be reverse quoted --- example/src/tmq.c | 4 +- source/dnode/mnode/impl/src/mndSubscribe.c | 12 +- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 3 + source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorMain.c | 85 ++++++------ source/libs/executor/src/executorimpl.c | 151 +++++++++++---------- source/libs/qworker/src/qworker.c | 104 +++++++------- tests/script/tsim/tstream/basic1.sim | 16 +-- 9 files changed, 195 insertions(+), 183 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index b79d21d051..00eb8462f4 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -106,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c853794e86..c82472eec0 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -206,7 +206,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { int32_t totalVgNum = pOutput->pSub->vgNum; - mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); + mInfo("mq rebalance: subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -231,6 +231,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from consumer %ld", pVgEp->vgId, consumerId); } taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); // put into removed @@ -250,6 +251,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from unassigned", pVgEp->vgId); } } @@ -263,6 +265,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR minVgCnt = totalVgNum / afterRebConsumerNum; imbConsumerNum = totalVgNum % afterRebConsumerNum; } + mInfo("mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg", afterRebConsumerNum, + minVgCnt, imbConsumerNum); // 4. first scan: remove consumer more than wanted, put to remove hash int32_t imbCnt = 0; @@ -290,6 +294,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from consumer %ld (first scan)", pVgEp->vgId, pConsumerEp->consumerId); } imbCnt++; } @@ -303,6 +308,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("mq rebalance: remove vg %d from consumer %ld (first scan)", pVgEp->vgId, pConsumerEp->consumerId); } } } @@ -319,6 +325,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); + mInfo("mq rebalance: add new consumer %ld", consumerId); } } @@ -343,6 +350,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); + mInfo("mq rebalance: add vg %d to consumer %ld (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } @@ -360,6 +368,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); + mInfo("mq rebalance: add vg %d to consumer %ld (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } else { // if all consumer is removed, put all vg into unassigned @@ -372,6 +381,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ASSERT(pRebOutput->newConsumerId == -1); taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); + mInfo("mq rebalance: unassign vg %d (second scan)", pRebOutput->pVgEp->vgId); } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 38dedee5a2..d0420e1b84 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -179,6 +179,7 @@ struct STQ { SHashObj* pStreamTasks; SVnode* pVnode; SWal* pWal; + // TDB* pTdb; }; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 80aa350e4d..c69f7d71d5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -32,6 +32,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; + /*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/ + /*ASSERT(0);*/ + /*}*/ #if 0 pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 66073b70eb..35b81b135f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -106,7 +106,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { pMsg->contentLen = pMsg->contentLen; #endif - qDebugL("stream task string %s", (const char*)msg); + /*qDebugL("stream task string %s", (const char*)msg);*/ struct SSubplan* plan = NULL; int32_t code = qStringToSubplan(msg, &plan); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 354f4d8752..e2af6cb73b 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -15,10 +15,10 @@ #include #include "dataSinkMgt.h" -#include "texception.h" #include "os.h" #include "tarray.h" #include "tcache.h" +#include "texception.h" #include "tglobal.h" #include "tmsg.h" #include "tudf.h" @@ -32,15 +32,15 @@ typedef struct STaskMgmt { TdThreadMutex lock; - SCacheObj *qinfoPool; // query handle pool - int32_t vgId; - bool closed; + SCacheObj *qinfoPool; // query handle pool + int32_t vgId; + bool closed; } STaskMgmt; -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { +int32_t qCreateExecTask(SReadHandle *readHandle, int32_t vgId, uint64_t taskId, SSubplan *pSubplan, + qTaskInfo_t *pTaskInfo, DataSinkHandle *handle, EOPTR_EXEC_MODEL model) { assert(readHandle != NULL && pSubplan != NULL); - SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; + SExecTaskInfo **pTask = (SExecTaskInfo **)pTaskInfo; int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model); if (code != TSDB_CODE_SUCCESS) { @@ -56,46 +56,46 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, code = dsCreateDataSinker(pSubplan->pDataSink, handle); } - _error: +_error: // if failed to add ref for all tables in this query, abort current query return code; } #ifdef TEST_IMPL // wait moment -int waitMoment(SQInfo* pQInfo){ - if(pQInfo->sql) { - int ms = 0; - char* pcnt = strstr(pQInfo->sql, " count(*)"); - if(pcnt) return 0; - - char* pos = strstr(pQInfo->sql, " t_"); - if(pos){ +int waitMoment(SQInfo *pQInfo) { + if (pQInfo->sql) { + int ms = 0; + char *pcnt = strstr(pQInfo->sql, " count(*)"); + if (pcnt) return 0; + + char *pos = strstr(pQInfo->sql, " t_"); + if (pos) { pos += 3; ms = atoi(pos); - while(*pos >= '0' && *pos <= '9'){ - pos ++; + while (*pos >= '0' && *pos <= '9') { + pos++; } char unit_char = *pos; - if(unit_char == 'h'){ - ms *= 3600*1000; - } else if(unit_char == 'm'){ - ms *= 60*1000; - } else if(unit_char == 's'){ + if (unit_char == 'h') { + ms *= 3600 * 1000; + } else if (unit_char == 'm') { + ms *= 60 * 1000; + } else if (unit_char == 's') { ms *= 1000; } } - if(ms == 0) return 0; + if (ms == 0) return 0; printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql); - - if(ms < 1000) { + + if (ms < 1000) { taosMsleep(ms); } else { int used_ms = 0; - while(used_ms < ms) { + while (used_ms < ms) { taosMsleep(1000); used_ms += 1000; - if(isTaskKilled(pQInfo)){ + if (isTaskKilled(pQInfo)) { printf("test check query is canceled, sleep break.%s\n", pQInfo->sql); break; } @@ -106,15 +106,14 @@ int waitMoment(SQInfo* pQInfo){ } #endif -int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo; int64_t threadId = taosGetSelfPthreadId(); *pRes = NULL; int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, - (void*)curOwner); + qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void *)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; } @@ -133,12 +132,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), - tstrerror(pTaskInfo->code)); + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; } - qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); + /*qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));*/ publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -154,12 +152,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { *useconds = pTaskInfo->cost.elapsedTime; } - int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; + int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; pTaskInfo->totalRows += current; cleanUpUdfs(); - qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", - GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); + /*qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",*/ + /*GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);*/ atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; @@ -208,18 +206,17 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { } void qDestroyTask(qTaskInfo_t qTaskHandle) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; - qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows); + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qTaskHandle; + qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows); - queryCostStatis(pTaskInfo); // print the query cost summary + queryCostStatis(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) { SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo; - int32_t capacity = 0; + int32_t capacity = 0; - return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); + return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); } - diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d0a1840d72..b8f06ebc67 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -106,7 +106,7 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); -static void releaseQueryBuf(size_t numOfTables); +static void releaseQueryBuf(size_t numOfTables); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); @@ -154,8 +154,9 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); +static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, + SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t* rowCellOffset, + SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); @@ -342,14 +343,12 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, return pResultRow; } -void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, - uint64_t groupId, int32_t numOfOutput) { +void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, uint64_t groupId, int32_t numOfOutput) { SAggSupporter* pSup = &pInfo->aggSup; SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, - GET_RES_WINDOW_KEY_LEN(bytes)); - SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1); + (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1); SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset); @@ -599,8 +598,9 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, + SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, + int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily bool hasAgg = pCtx[k].input.colDataAggIsSet; @@ -683,8 +683,8 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC } } -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, - bool createDummyCol) { +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, + int32_t scanFlag, bool createDummyCol) { if (pBlock->pBlockAgg != NULL) { doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); } else { @@ -735,7 +735,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, - int32_t scanFlag, bool createDummyCol) { + int32_t scanFlag, bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { @@ -743,7 +743,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].input.numOfRows = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; - pCtx[i].scanFlag = scanFlag; + pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; pInput->uid = pBlock->info.uid; @@ -1003,14 +1003,14 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } -// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { -// // return QUERY_IS_ASC_QUERY(pQueryAttr); -// } -// -// // denote the order type -// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { -// // return pCtx->param[0].i == pQueryAttr->order.order; -// } + // if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { + // // return QUERY_IS_ASC_QUERY(pQueryAttr); + // } + // + // // denote the order type + // if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { + // // return pCtx->param[0].i == pQueryAttr->order.order; + // } // in the reverse table scan, only the following functions need to be executed // if (IS_REVERSE_SCAN(pRuntimeEnv) || @@ -1128,16 +1128,16 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu } else if (fmIsAggFunc(pCtx[i].functionId)) { p = &pCtx[i]; } -// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { -// tagLen += pCtx[i].resDataInfo.bytes; -// pTagCtx[num++] = &pCtx[i]; -// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { -// // tag function may be the group by tag column -// // ts may be the required primary timestamp column -// continue; -// } else { -// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ -// } + // if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { + // tagLen += pCtx[i].resDataInfo.bytes; + // pTagCtx[num++] = &pCtx[i]; + // } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { + // // tag function may be the group by tag column + // // ts may be the required primary timestamp column + // continue; + // } else { + // // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ + // } } if (p != NULL) { @@ -2015,7 +2015,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) { if (pFilterNode == NULL) { return; } @@ -2133,8 +2133,9 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p * @param pQInfo * @param result */ -int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { +int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, + int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t start = pGroupResInfo->index; @@ -2172,11 +2173,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn } else { // expand the result into multiple rows. E.g., _wstartts, top(k, 20) // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - for(int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } } } @@ -2192,11 +2193,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn return 0; } -void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { +void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); - SExprInfo* pExprInfo = pOperator->pExpr; - int32_t numOfExprs = pOperator->numOfExprs; + SExprInfo* pExprInfo = pOperator->pExpr; + int32_t numOfExprs = pOperator->numOfExprs; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; @@ -3215,7 +3217,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -SOperatorInfo* createExchangeOperatorInfo(void *pTransporter, const SNodeList* pSources, SSDataBlock* pBlock, +SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3328,7 +3330,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, int32_t rowIndex) { for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index -// pCtx[j].startRow = rowIndex; + // pCtx[j].startRow = rowIndex; } for (int32_t j = 0; j < numOfExpr; ++j) { @@ -3379,7 +3381,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { -// pCtx[i].size = 1; + // pCtx[i].size = 1; } for (int32_t i = 0; i < pBlock->info.rows; ++i) { @@ -3605,7 +3607,7 @@ _error: return NULL; } -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) { +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { *order = TSDB_ORDER_ASC; @@ -3635,7 +3637,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; @@ -3975,7 +3977,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); + code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, + pProjectInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -4225,7 +4228,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { - STableKeyInfo* pk = taosArrayGet(pa, j); + STableKeyInfo* pk = taosArrayGet(pa, j); STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; pTQueryInfo->lastKey = pk->lastKey; } @@ -4361,9 +4364,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p goto _error; } - pInfo->limit = *pLimit; - pInfo->slimit = *pSlimit; - pInfo->curOffset = pLimit->offset; + pInfo->limit = *pLimit; + pInfo->slimit = *pSlimit; + pInfo->curOffset = pLimit->offset; pInfo->curSOffset = pSlimit->offset; pInfo->binfo.pRes = pResBlock; @@ -4503,10 +4506,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa } pCol->slotId = slotId; - pCol->colId = colId; - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; + pCol->colId = colId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; pCol->precision = pType->precision; pCol->dataBlockId = blockId; @@ -4581,10 +4584,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) { pFuncNode->pParameterList = nodesMakeList(); ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); - SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); - if (NULL == res) { // todo handle error + SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == res) { // todo handle error } else { - res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; + res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; nodesListAppend(pFuncNode->pParameterList, res); } } @@ -4677,7 +4680,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { - SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. + SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; @@ -4686,27 +4689,27 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); } else { - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId); + doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); } if (pDataReader == NULL && terrno != 0) { - qDebug("pDataReader is NULL"); + /*qDebug("pDataReader is NULL");*/ // return NULL; } else { - qDebug("pDataReader is not NULL"); + /*qDebug("pDataReader is not NULL");*/ } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pDescNode); SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo, - pScanPhyNode->node.pConditions, pOperatorDumy); + SOperatorInfo* pOperator = + createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, + tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { @@ -4855,7 +4858,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; - SColumn col = extractColumnFromColumnNode(pColNode); + SColumn col = extractColumnFromColumnNode(pColNode); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; @@ -4923,11 +4926,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { SColumn c = {0}; - c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; - c.scale = pColNode->node.resType.scale; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.scale = pColNode->node.resType.scale; c.precision = pColNode->node.resType.precision; return c; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f48095ecb8..db63c71d11 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -9,11 +9,11 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; SQWorkerMgmt gQwMgmt = { - .lock = 0, - .qwRef = -1, - .qwNum = 0, + .lock = 0, + .qwRef = -1, + .qwNum = 0, }; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { @@ -110,9 +110,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_LOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash)); + /*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/ - void * key = NULL; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; SQWSchStatus *sch = NULL; @@ -127,7 +127,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_UNLOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash)); + /*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/ } char *qwPhaseStr(int32_t phase) { @@ -462,7 +462,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { } int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -499,7 +499,7 @@ _return: } int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -550,11 +550,11 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t code = 0; bool qcontinue = true; - SSDataBlock * pRes = NULL; + SSDataBlock *pRes = NULL; uint64_t useconds = 0; int32_t i = 0; int32_t execNum = 0; - qTaskInfo_t * taskHandle = &ctx->taskHandle; + qTaskInfo_t *taskHandle = &ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; while (true) { @@ -632,7 +632,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) return TSDB_CODE_QRY_OUT_OF_MEMORY; } - void * key = NULL; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; STaskStatus status = {0}; @@ -719,8 +719,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void } int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { - int32_t code = 0; - SQWTaskCtx * ctx = NULL; + int32_t code = 0; + SQWTaskCtx *ctx = NULL; SRpcHandleInfo *dropConnection = NULL; SRpcHandleInfo *cancelConnection = NULL; @@ -925,13 +925,13 @@ _return: } int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { - int32_t code = 0; - bool queryRsped = false; - SSubplan* plan = NULL; - SQWPhaseInput input = {0}; - qTaskInfo_t pTaskInfo = NULL; - DataSinkHandle sinkHandle = NULL; - SQWTaskCtx * ctx = NULL; + int32_t code = 0; + bool queryRsped = false; + SSubplan *plan = NULL; + SQWPhaseInput input = {0}; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -944,7 +944,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ctx->ctrlConnInfo = qwMsg->connInfo; - QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); + /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/ code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { @@ -1055,10 +1055,10 @@ _return: } int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { - SQWTaskCtx * ctx = NULL; + SQWTaskCtx *ctx = NULL; int32_t code = 0; SQWPhaseInput input = {0}; - void * rsp = NULL; + void *rsp = NULL; int32_t dataLen = 0; bool queryEnd = false; @@ -1138,8 +1138,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; int32_t dataLen = 0; bool locked = false; - SQWTaskCtx * ctx = NULL; - void * rsp = NULL; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; SQWPhaseInput input = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); @@ -1274,7 +1274,7 @@ _return: int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); @@ -1300,7 +1300,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; if (qwMsg->code) { QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); @@ -1338,28 +1338,28 @@ _return: qwMsg->connInfo.handle = NULL; } - QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); + /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/ QW_RET(TSDB_CODE_SUCCESS); } void qwProcessHbTimerEvent(void *param, void *tmrId) { - SQWHbParam* hbParam = (SQWHbParam*)param; + SQWHbParam *hbParam = (SQWHbParam *)param; if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { return; } - int64_t refId = hbParam->refId; + int64_t refId = hbParam->refId; SQWorker *mgmt = qwAcquire(refId); if (NULL == mgmt) { QW_DLOG("qwAcquire %" PRIx64 "failed", refId); taosMemoryFree(param); return; } - + SQWSchStatus *sch = NULL; int32_t taskNum = 0; - SQWHbInfo * rspList = NULL; + SQWHbInfo *rspList = NULL; int32_t code = 0; qwDbgDumpMgmtInfo(mgmt); @@ -1383,7 +1383,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { return; } - void * key = NULL; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; @@ -1413,29 +1413,27 @@ _return: for (int32_t j = 0; j < i; ++j) { qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); - QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), - (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); + /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ + /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ tFreeSSchedulerHbRsp(&rspList[j].rsp); } taosMemoryFreeClear(rspList); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); - qwRelease(refId); + qwRelease(refId); } void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { taosCloseRef(gQwMgmt.qwRef); - gQwMgmt.qwRef= -1; + gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); } -void qwDestroySchStatus(SQWSchStatus *pStatus) { - taosHashCleanup(pStatus->tasksHash); -} +void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); } void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; @@ -1454,12 +1452,12 @@ void qwDestroyImpl(void *pMgmt) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); pIter = taosHashIterate(mgmt->schHash, pIter); - } + } taosHashCleanup(mgmt->schHash); taosMemoryFree(mgmt); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); qwCloseRef(); } @@ -1467,7 +1465,7 @@ void qwDestroyImpl(void *pMgmt) { int32_t qwOpenRef(void) { taosWLockLatch(&gQwMgmt.lock); if (gQwMgmt.qwRef < 0) { - gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl); + gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl); if (gQwMgmt.qwRef < 0) { taosWUnLockLatch(&gQwMgmt.lock); qError("init qworker ref failed"); @@ -1475,14 +1473,14 @@ int32_t qwOpenRef(void) { } } taosWUnLockLatch(&gQwMgmt.lock); - + return TSDB_CODE_SUCCESS; } void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { int32_t paramIdx = 0; int32_t newParamIdx = 0; - + while (true) { paramIdx = atomic_load_32(&gQwMgmt.paramIdx); if (paramIdx == tListLen(gQwMgmt.param)) { @@ -1490,7 +1488,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { } else { newParamIdx = paramIdx + 1; } - + if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) { break; } @@ -1577,12 +1575,12 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW SQWHbParam *param = NULL; qwSetHbParam(mgmt->refId, ¶m); - mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer); if (NULL == mgmt->hbTimer) { qError("start hb timer failed"); QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + *qWorkerMgmt = mgmt; qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); @@ -1599,9 +1597,9 @@ _return: taosTmrCleanUp(mgmt->timer); taosMemoryFreeClear(mgmt); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); } - + QW_RET(code); } @@ -1678,7 +1676,7 @@ int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64 } int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -1705,7 +1703,7 @@ int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId } int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { - SQWSchStatus * sch = NULL; + SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index cb084ad537..9965772c75 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -24,7 +24,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791243003,4,2,3,3.1); sql insert into t1 values(1648791213004,4,2,3,4.1); sleep 1000 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then print ======$rows @@ -137,7 +137,7 @@ endi sql insert into t1 values(1648791223001,12,14,13,11.1); sleep 500 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then print ======$rows @@ -250,7 +250,7 @@ endi sql insert into t1 values(1648791223002,12,14,13,11.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 2 then @@ -280,7 +280,7 @@ endi sql insert into t1 values(1648791223003,12,14,13,11.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 3 then @@ -312,7 +312,7 @@ sql insert into t1 values(1648791223001,1,1,1,1.1); sql insert into t1 values(1648791223002,2,2,2,2.1); sql insert into t1 values(1648791223003,3,3,3,3.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 3 then @@ -344,7 +344,7 @@ sql insert into t1 values(1648791233003,3,2,3,2.1); sql insert into t1 values(1648791233002,5,6,7,8.1); sql insert into t1 values(1648791233002,3,2,3,2.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 2 if $data21 != 2 then @@ -374,7 +374,7 @@ endi sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 0 if $data01 != 4 then @@ -404,7 +404,7 @@ endi sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); sleep 100 -sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; +sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 4 then