diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 9e8a139b31..d445fc26e8 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads; extern int32_t tsNumOfMnodeFetchThreads; extern int32_t tsNumOfMnodeReadThreads; extern int32_t tsNumOfVnodeQueryThreads; -extern int32_t tsNumOfVnodeStreamThreads; +extern float tsRatioOfVnodeStreamThreads; extern int32_t tsNumOfVnodeFetchThreads; extern int32_t tsNumOfVnodeRsmaThreads; extern int32_t tsNumOfQnodeQueryThreads; diff --git a/include/util/tworker.h b/include/util/tworker.h index 8766f87a08..0636f16dbb 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -17,6 +17,7 @@ #define _TD_UTIL_WORKER_H_ #include "tqueue.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { @@ -26,10 +27,10 @@ typedef struct SQWorkerPool SQWorkerPool; typedef struct SWWorkerPool SWWorkerPool; typedef struct SQWorker { - int32_t id; // worker id - int64_t pid; // thread pid - TdThread thread; // thread id - SQWorkerPool *pool; + int32_t id; // worker id + int64_t pid; // thread pid + TdThread thread; // thread id + void *pool; } SQWorker; typedef struct SQWorkerPool { @@ -42,6 +43,14 @@ typedef struct SQWorkerPool { TdThreadMutex mutex; } SQWorkerPool; +typedef struct SAutoQWorkerPool { + float ratio; + STaosQset *qset; + const char *name; + SArray *workers; + TdThreadMutex mutex; +} SAutoQWorkerPool; + typedef struct SWWorker { int32_t id; // worker id int64_t pid; // thread pid @@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool); STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); +int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool); +void tAutoQWorkerCleanup(SAutoQWorkerPool *pool); +STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp); +void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue); + int32_t tWWorkerInit(SWWorkerPool *pool); void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d3fd625a91..98b9b566ec 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -int32_t tsNumOfVnodeStreamThreads = 2; +float tsRatioOfVnodeStreamThreads = 1.0; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; @@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeStreamThreads = tsNumOfCores / 4; - tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); - if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; + if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, 0) != 0) return -1; tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); @@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } - pItem = cfgGetItem(tsCfg, "numOfVnodeStreamThreads"); + pItem = cfgGetItem(tsCfg, "ratioOfVnodeStreamThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfVnodeStreamThreads = numOfCores / 4; - tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); - pItem->i32 = tsNumOfVnodeStreamThreads; + pItem->fval = tsRatioOfVnodeStreamThreads; pItem->stype = stype; } @@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; - tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32; + tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval; tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index b5c554e0ca..6e724f4d43 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,20 +26,20 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SQWorkerPool streamPool; - SWWorkerPool fetchPool; - SSingleWorker mgmtWorker; - SHashObj *hash; - TdThreadRwlock lock; - SVnodesStat state; - STfs *pTfs; - TdThread thread; - bool stop; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQWorkerPool queryPool; + SAutoQWorkerPool streamPool; + SWWorkerPool fetchPool; + SSingleWorker mgmtWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; + TdThread thread; + bool stop; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 5afb9a0512..202dc50ac6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { (void)tMultiWorkerInit(&pVnode->pApplyW, &acfg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); - pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); + pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL || @@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); + tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; pVnode->pStreamQ = NULL; @@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pQPool->max = tsNumOfVnodeQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; - SQWorkerPool *pStreamPool = &pMgmt->streamPool; + SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool; pStreamPool->name = "vnode-stream"; - pStreamPool->min = tsNumOfVnodeStreamThreads; - pStreamPool->max = tsNumOfVnodeStreamThreads; - if (tQWorkerInit(pStreamPool) != 0) return -1; + pStreamPool->ratio = tsRatioOfVnodeStreamThreads; + if (tAutoQWorkerInit(pStreamPool) != 0) return -1; SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; @@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { void vmStopWorker(SVnodeMgmt *pMgmt) { tQWorkerCleanup(&pMgmt->queryPool); - tQWorkerCleanup(&pMgmt->streamPool); + tAutoQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 756f97d1d0..0ff41d429e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -111,12 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; - } /* else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && - (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { - dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); - terrno = pRpc->code; - goto _OVER; - }*/ + } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && + (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { + dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); + terrno = pRpc->code; + goto _OVER; + } if (pHandle->defaultNtype == NODE_END) { dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType)); @@ -248,9 +248,9 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND || - code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || - code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || - code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { + code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER || + code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || + code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4e93a1d96e..2f824b48b4 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -193,6 +193,7 @@ typedef struct { int64_t lastAccessTime; int32_t accessTimes; int32_t numOfVnodes; + int32_t numOfOtherNodes; int32_t numOfSupportVnodes; float numOfCores; int64_t memTotal; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 58ae85a628..d7b16c2c8e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -397,8 +397,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; - pDnode->accessTimes++; - pDnode->lastAccessTime = curMs; const STraceId *trace = &pReq->info.traceId; mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); @@ -534,6 +532,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pReq->info.rsp = pHead; } + pDnode->accessTimes++; + pDnode->lastAccessTime = curMs; code = 0; _OVER: diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 31ab1f3259..2550c68cfb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -425,6 +425,7 @@ void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgOb static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SDnodeObj *pDnode = pObj; pDnode->numOfVnodes = 0; + pDnode->numOfOtherNodes = 0; return true; } @@ -447,7 +448,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed); if (isMnode) { - pDnode->numOfVnodes++; + pDnode->numOfOtherNodes++; } if (online && pDnode->numOfSupportVnodes > 0) { @@ -468,14 +469,25 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) { sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL); sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL); + + mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray)); + for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) { + SDnodeObj *pDnode = taosArrayGet(pArray, i); + mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes); + } return pArray; } static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) { return *dnode1Id >= *dnode2Id ? 1 : 0; } +static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) { + float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes; + return totalDnodes / pDnode->numOfSupportVnodes; +} + static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) { - float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes; - float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes; + float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9); + float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9); return d1Score >= d2Score ? 1 : 0; } @@ -494,7 +506,12 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup int32_t allocedVnodes = 0; void *pIter = NULL; + mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray)); taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) { + SDnodeObj *pDnode = taosArrayGet(pArray, i); + mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9)); + } int32_t size = taosArrayGetSize(pArray); if (size < pVgroup->replica) { @@ -875,7 +892,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { SDnodeObj *pDnode = taosArrayGet(pArray, i); - mInfo("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); + mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes); } SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica]; @@ -935,7 +952,7 @@ static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj * taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { SDnodeObj *pDnode = taosArrayGet(pArray, i); - mInfo("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); + mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes); } int32_t code = -1; @@ -1970,16 +1987,16 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) { taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { SDnodeObj *pDnode = taosArrayGet(pArray, i); - mInfo("dnode:%d, equivalent vnodes:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes, - pDnode->numOfSupportVnodes, (float)pDnode->numOfVnodes / pDnode->numOfSupportVnodes); + mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes, + pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1)); } SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1); SDnodeObj *pDst = taosArrayGet(pArray, 0); - float srcScore = (float)(pSrc->numOfVnodes - 1) / pSrc->numOfSupportVnodes; - float dstScore = (float)(pDst->numOfVnodes + 1) / pDst->numOfSupportVnodes; - mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, srcScore, + float srcScore = mndGetDnodeScore(pSrc, -1, 1); + float dstScore = mndGetDnodeScore(pDst, 1, 1); + mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore, pDst->id, dstScore); if (srcScore > dstScore - 0.000001) { diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 0b309bc8f5..acfd5c07af 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -841,36 +841,42 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS for (int32_t i = 0; i < numOfCols; ++i) { numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows; } - output->info.rows = numOfRows; - output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfCols; ++i) { - if ((input+i)->numOfRows < numOfRows) { - SColumnInfoData* pColInfoData = (input+i)->columnData; - int32_t startRow = (input+i)->numOfRows; - int32_t expandRows = numOfRows - startRow; - colInfoDataEnsureCapacity(pColInfoData, numOfRows, false); + + // create the basic block info structure + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pInfo = input[i].columnData; + SColumnInfoData d = {0}; + d.info = pInfo->info; + + blockDataAppendColInfo(output, &d); + } + + blockDataEnsureCapacity(output, numOfRows); + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i); + + SColumnInfoData* pColInfoData = input[i].columnData; + colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info); + + if (input[i].numOfRows < numOfRows) { + int32_t startRow = input[i].numOfRows; + int expandRows = numOfRows - startRow; bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1); if (isNull) { - colDataAppendNNULL(pColInfoData, startRow, expandRows); + colDataAppendNNULL(pDest, startRow, expandRows); } else { char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); - int32_t bytes = pColInfoData->info.bytes; - char* data = taosMemoryMalloc(bytes); - memcpy(data, src, bytes); for (int j = 0; j < expandRows; ++j) { - colDataAppend(pColInfoData, startRow+j, data, false); + colDataAppend(pDest, startRow+j, src, false); } //colDataAppendNItems(pColInfoData, startRow, data, expandRows); - taosMemoryFree(data); } } - - taosArrayPush(output->pDataBlock, (input + i)->columnData); - - if (IS_VAR_DATA_TYPE((input + i)->columnData->info.type)) { - output->info.hasVarCol = true; - } } + + output->info.rows = numOfRows; + return 0; } @@ -1824,8 +1830,8 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t convertDataBlockToScalarParm(&resultBlock, output); taosArrayDestroy(resultBlock.pDataBlock); } - - taosArrayDestroy(inputBlock.pDataBlock); + + blockDataFreeRes(&inputBlock); return err; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3048d53738..9f2bf0b2b4 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5703,7 +5703,8 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCrea return code; } -static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { +static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || crossTableWithUdaf(pSelect)) { @@ -5713,6 +5714,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must be of VARCHAR type"); } + if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "The trigger mode of non window query can only be AT_ONCE"); + } return TSDB_CODE_SUCCESS; } @@ -5726,7 +5731,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt code = translateQuery(pCxt, pStmt->pQuery); } if (TSDB_CODE_SUCCESS == code) { - code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt->pQuery); + code = checkStreamQuery(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4e0b0630bc..f2b1db19e8 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosMemoryFree(pReqs); } return code; - } else { - ASSERT(0); } return 0; } @@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) { int32_t code = 0; if (streamDispatchAllBlocks(pTask, pBlock) < 0) { - ASSERT(0); code = -1; streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 6889a870d1..52777fd834 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ASSERT(left >= 0); if (left == 0) { taosArrayDestroy(pTask->checkReqIds); + pTask->checkReqIds = NULL; streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f7252ed8a0..e9aba0bc39 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { } if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + taosArrayDestroy(pTask->checkReqIds); + pTask->checkReqIds = NULL; } if (pTask->pState) streamStateClose(pTask->pState); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 863cee9b08..a9a84c1860 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { worker->pool = pool; } - uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); + uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); return 0; } @@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) { for (int32_t i = 0; i < pool->max; ++i) { SQWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { + uInfo("worker:%s:%d is stopping", pool->name, worker->id); taosThreadJoin(worker->thread, NULL); taosThreadClear(&worker->thread); + uInfo("worker:%s:%d is stopped", pool->name, worker->id); } } @@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { taosCloseQset(pool->qset); taosThreadMutexDestroy(&pool->mutex); - uDebug("worker:%s is closed", pool->name); + uInfo("worker:%s is closed", pool->name); } static void *tQWorkerThreadFp(SQWorker *worker) { @@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { taosThreadAttrDestroy(&thAttr); pool->num++; - uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); + uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); } while (pool->num < pool->min); } @@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { } void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { - uDebug("worker:%s, queue:%p is freed", pool->name, queue); + uInfo("worker:%s, queue:%p is freed", pool->name, queue); + taosCloseQueue(queue); +} + +int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { + pool->qset = taosOpenQset(); + pool->workers = taosArrayInit(2, sizeof(SQWorker *)); + if (pool->workers == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + (void)taosThreadMutexInit(&pool->mutex, NULL); + + uInfo("worker:%s is initialized as auto", pool->name); + return 0; +} + +void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { + int32_t size = taosArrayGetSize(pool->workers); + for (int32_t i = 0; i < size; ++i) { + SQWorker *worker = taosArrayGetP(pool->workers, i); + if (taosCheckPthreadValid(worker->thread)) { + taosQsetThreadResume(pool->qset); + } + } + + for (int32_t i = 0; i < size; ++i) { + SQWorker *worker = taosArrayGetP(pool->workers, i); + if (taosCheckPthreadValid(worker->thread)) { + uInfo("worker:%s:%d is stopping", pool->name, worker->id); + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + uInfo("worker:%s:%d is stopped", pool->name, worker->id); + } + taosMemoryFree(worker); + } + + taosArrayDestroy(pool->workers); + taosCloseQset(pool->qset); + taosThreadMutexDestroy(&pool->mutex); + + uInfo("worker:%s is closed", pool->name); +} + +static void *tAutoQWorkerThreadFp(SQWorker *worker) { + SAutoQWorkerPool *pool = worker->pool; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + worker->pid = taosGetSelfPthreadId(); + uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); + + while (1) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { + uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset, + worker->pid); + break; + } + + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = taosArrayGetSize(pool->workers); + (*((FItem)qinfo.fp))(&qinfo, msg); + } + + taosUpdateItemSize(qinfo.queue, 1); + } + + return NULL; +} + +STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) { + STaosQueue *queue = taosOpenQueue(); + if (queue == NULL) return NULL; + + taosThreadMutexLock(&pool->mutex); + taosSetQueueFp(queue, fp, NULL); + taosAddIntoQset(pool->qset, queue, ahandle); + + int32_t queueNum = taosGetQueueNumber(pool->qset); + int32_t curWorkerNum = taosArrayGetSize(pool->workers); + int32_t dstWorkerNum = ceil(queueNum * pool->ratio); + if (dstWorkerNum < 1) dstWorkerNum = 1; + + // spawn a thread to process queue + while (curWorkerNum < dstWorkerNum) { + SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker)); + if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) { + uError("worker:%s:%d failed to create", pool->name, curWorkerNum); + taosMemoryFree(worker); + taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + worker->id = curWorkerNum; + worker->pool = pool; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) { + uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum); + (void)taosArrayPop(pool->workers); + taosMemoryFree(worker); + taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + taosThreadAttrDestroy(&thAttr); + uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers)); + + curWorkerNum++; + } + + taosThreadMutexUnlock(&pool->mutex); + uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + + return queue; +} + +void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) { + uInfo("worker:%s, queue:%p is freed", pool->name, queue); taosCloseQueue(queue); } @@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) { worker->pool = pool; } - uDebug("worker:%s is initialized, max:%d", pool->name, pool->max); + uInfo("worker:%s is initialized, max:%d", pool->name, pool->max); return 0; } @@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) { for (int32_t i = 0; i < pool->max; ++i) { SWWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { + uInfo("worker:%s:%d is stopping", pool->name, worker->id); taosThreadJoin(worker->thread, NULL); taosThreadClear(&worker->thread); taosFreeQall(worker->qall); taosCloseQset(worker->qset); + uInfo("worker:%s:%d is stopped", pool->name, worker->id); } } taosMemoryFreeClear(pool->workers); taosThreadMutexDestroy(&pool->mutex); - uDebug("worker:%s is closed", pool->name); + uInfo("worker:%s is closed", pool->name); } static void *tWWorkerThreadFp(SWWorker *worker) { @@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER; - uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); + uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); pool->nextId = (pool->nextId + 1) % pool->max; taosThreadAttrDestroy(&thAttr); @@ -259,13 +390,14 @@ _OVER: } else { while (worker->pid <= 0) taosMsleep(10); queue->threadId = worker->pid; - uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId); + uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, + queue->threadId); return queue; } } void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { - uDebug("worker:%s, queue:%p is freed", pool->name, queue); + uInfo("worker:%s, queue:%p is freed", pool->name, queue); taosCloseQueue(queue); } diff --git a/tests/script/tsim/dnode/balance1.sim b/tests/script/tsim/dnode/balance1.sim index 2b0154c8e5..3bb9c4f3eb 100644 --- a/tests/script/tsim/dnode/balance1.sim +++ b/tests/script/tsim/dnode/balance1.sim @@ -74,10 +74,10 @@ sql insert into d2.t2 values(now+5s, 21) sql select * from information_schema.ins_dnodes print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(2)[2] -if $data(1)[2] != 0 then +if $data(1)[2] != 1 then return -1 endi -if $data(2)[2] != 2 then +if $data(2)[2] != 1 then return -1 endi diff --git a/tests/script/tsim/dnode/balance2.sim b/tests/script/tsim/dnode/balance2.sim index 3f5a42d4d3..9eeb7e1251 100644 --- a/tests/script/tsim/dnode/balance2.sim +++ b/tests/script/tsim/dnode/balance2.sim @@ -161,13 +161,13 @@ print dnode1 openVnodes $data(1)[2] print dnode3 openVnodes $data(3)[2] print dnode4 openVnodes $data(4)[2] print dnode5 openVnodes $data(5)[2] -if $data(1)[2] != 2 then +if $data(1)[2] != 3 then return -1 endi if $data(3)[2] != 3 then return -1 endi -if $data(4)[2] != 4 then +if $data(4)[2] != 3 then return -1 endi if $data(5)[2] != 3 then diff --git a/tests/script/tsim/dnode/balance3.sim b/tests/script/tsim/dnode/balance3.sim index ce328f10bd..2fb284b466 100644 --- a/tests/script/tsim/dnode/balance3.sim +++ b/tests/script/tsim/dnode/balance3.sim @@ -127,10 +127,10 @@ print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(2)[2] print dnode3 openVnodes $data(3)[2] print dnode4 openVnodes $data(4)[2] -if $data(1)[2] != 0 then +if $data(1)[2] != 1 then return -1 endi -if $data(2)[2] != 2 then +if $data(2)[2] != 1 then return -1 endi if $data(3)[2] != 2 then @@ -228,10 +228,10 @@ print dnode1 openVnodes $data(1)[2] print dnode3 openVnodes $data(3)[2] print dnode4 openVnodes $data(4)[2] print dnode5 openVnodes $data(5)[2] -if $data(1)[2] != 1 then +if $data(1)[2] != 2 then return -1 endi -if $data(3)[2] != 3 then +if $data(3)[2] != 2 then return -1 endi if $data(4)[2] != 3 then diff --git a/tests/script/tsim/dnode/balancex.sim b/tests/script/tsim/dnode/balancex.sim index 6b16e8b569..0cfc64a954 100644 --- a/tests/script/tsim/dnode/balancex.sim +++ b/tests/script/tsim/dnode/balancex.sim @@ -142,10 +142,10 @@ print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(2)[2] print dnode2 openVnodes $data(3)[2] print dnode2 openVnodes $data(4)[2] -if $data(1)[2] != 0 then +if $data(1)[2] != 1 then return -1 endi -if $data(2)[2] != 2 then +if $data(2)[2] != 1 then return -1 endi if $data(3)[2] != 2 then diff --git a/tests/script/tsim/dnode/vnode_clean.sim b/tests/script/tsim/dnode/vnode_clean.sim index 112e5f28a4..ba1d083c68 100644 --- a/tests/script/tsim/dnode/vnode_clean.sim +++ b/tests/script/tsim/dnode/vnode_clean.sim @@ -71,10 +71,10 @@ sql insert into d2.t2 values(now+5s, 21) sql select * from information_schema.ins_dnodes print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(2)[2] -if $data(1)[2] != 0 then +if $data(1)[2] != 1 then return -1 endi -if $data(2)[2] != 2 then +if $data(2)[2] != 1 then return -1 endi @@ -181,10 +181,10 @@ sql select * from information_schema.ins_dnodes print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(3)[2] print dnode2 openVnodes $data(4)[2] -if $data(1)[2] != 0 then +if $data(1)[2] != 1 then return -1 endi -if $data(3)[2] != 2 then +if $data(3)[2] != 1 then return -1 endi if $data(4)[2] != 1 then @@ -204,10 +204,10 @@ sql select * from information_schema.ins_dnodes print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(3)[2] print dnode2 openVnodes $data(4)[2] -if $data(1)[2] != 0 then +if $data(1)[2] != 1 then return -1 endi -if $data(3)[2] != 2 then +if $data(3)[2] != 1 then return -1 endi if $data(4)[2] != 2 then @@ -220,13 +220,13 @@ sql select * from information_schema.ins_dnodes print dnode1 openVnodes $data(1)[2] print dnode2 openVnodes $data(3)[2] print dnode2 openVnodes $data(4)[2] -if $data(1)[2] != 1 then +if $data(1)[2] != 2 then return -1 endi if $data(3)[2] != null then return -1 endi -if $data(4)[2] != 3 then +if $data(4)[2] != 2 then return -1 endi diff --git a/tests/system-test/compatibilityAllTest.sh b/tests/system-test/compatibilityAllTest.sh index 08f2f5581b..8b599afd86 100755 --- a/tests/system-test/compatibilityAllTest.sh +++ b/tests/system-test/compatibilityAllTest.sh @@ -42,5 +42,5 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqCheckData1.py python3 ./test.py -f 7-tmq/tmqConsumerGroup.py python3 ./test.py -f 7-tmq/tmqShow.py -python3 ./test.py -f 7-tmq/tmqAlterSchema. -python3 ./test.py -f 99-TDcase/TD-20582.py \ No newline at end of file +python3 ./test.py -f 7-tmq/tmqAlterSchema.py +python3 ./test.py -f 99-TDcase/TD-20582.py