diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index bb06b65898..198163582b 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -636,7 +636,7 @@ bool nodesExprsHasColumn(SNodeList* pList); void* nodesGetValueFromNode(SValueNode* pNode); int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value); char* nodesGetStrValueFromNode(SValueNode* pNode); -void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal); +int32_t nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal); int32_t nodesMakeValueNodeFromString(char* literal, SValueNode** ppValNode); int32_t nodesMakeValueNodeFromBool(bool b, SValueNode** ppValNode); int32_t nodesMakeValueNodeFromInt32(int32_t value, SNode** ppNode); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 127e0f18f1..616c2593cb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1781,7 +1781,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE; - nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param); + code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param); } else if (type == QUERY_NODE_FUNCTION) { pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; SFunctionNode* pFuncNode = (SFunctionNode*)pNode; @@ -1811,12 +1811,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { if (TSDB_CODE_SUCCESS == code) { code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res); } - if (TSDB_CODE_SUCCESS != code) { // todo handle error - } else { - res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; - code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res); - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_CODE(code, lino, _end); + res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; + code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res); + QUERY_CHECK_CODE(code, lino, _end); } #endif @@ -1826,7 +1824,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = numOfParam; - for (int32_t j = 0; j < numOfParam; ++j) { + for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); QUERY_CHECK_NULL(p1, code, lino, _end, terrno); if (p1->type == QUERY_NODE_COLUMN) { @@ -1839,7 +1837,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; - nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param); + code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param); + QUERY_CHECK_CODE(code, lino, _end); } } } else if (type == QUERY_NODE_OPERATOR) { @@ -1871,13 +1870,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SLogicConditionNode* pCond = (SLogicConditionNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); - - if (TSDB_CODE_SUCCESS == code) { - pExp->base.numOfParams = 1; - SDataType* pType = &pCond->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName); - pExp->pExpr->_optrRoot.pRootNode = pNode; - } + pExp->base.numOfParams = 1; + SDataType* pType = &pCond->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName); + pExp->pExpr->_optrRoot.pRootNode = pNode; } else { ASSERT(0); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 6e69c56687..6b06530b3e 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -141,7 +141,7 @@ static int32_t callocNodeChunk(SNodeAllocator* pAllocator, SNodeMemChunk** pOutC static int32_t nodesCallocImpl(int32_t size, void** pOut) { if (NULL == g_pNodeAllocator) { *pOut = taosMemoryCalloc(1, size); - if (!pOut) return TSDB_CODE_OUT_OF_MEMORY; + if (!*pOut) return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_SUCCESS; } @@ -2638,11 +2638,12 @@ int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots) { return num; } -void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { +int32_t nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { + int32_t code = 0; if (pNode->isNull) { pVal->nType = TSDB_DATA_TYPE_NULL; pVal->nLen = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; - return; + return code; } pVal->nType = pNode->node.resType.type; pVal->nLen = pNode->node.resType.bytes; @@ -2676,13 +2677,21 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_GEOMETRY: pVal->pz = taosMemoryMalloc(pVal->nLen + 1); - memcpy(pVal->pz, pNode->datum.p, pVal->nLen); - pVal->pz[pVal->nLen] = 0; + if (pVal->pz) { + memcpy(pVal->pz, pNode->datum.p, pVal->nLen); + pVal->pz[pVal->nLen] = 0; + } else { + code = terrno; + } break; case TSDB_DATA_TYPE_JSON: pVal->nLen = getJsonValueLen(pNode->datum.p); pVal->pz = taosMemoryMalloc(pVal->nLen); - memcpy(pVal->pz, pNode->datum.p, pVal->nLen); + if (pVal->pz) { + memcpy(pVal->pz, pNode->datum.p, pVal->nLen); + } else { + code = terrno; + } break; case TSDB_DATA_TYPE_DECIMAL: case TSDB_DATA_TYPE_BLOB: @@ -2690,6 +2699,7 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { default: break; } + return code; } int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index edf5b0b970..b2064d6787 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -792,6 +792,16 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { int32_t code; + + (void)taosThreadMutexInit(&pool->poolLock, NULL); + (void)taosThreadMutexInit(&pool->backupLock, NULL); + (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); + (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); + + (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); + (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); + (void)taosThreadCondInit(&pool->backupCond, NULL); + code = taosOpenQset(&pool->qset); if (code) return terrno = code; pool->workers = tdListNew(sizeof(SQueryAutoQWorker)); @@ -802,14 +812,6 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; pool->maxInUse = pool->max * 2 + 2; - (void)taosThreadMutexInit(&pool->poolLock, NULL); - (void)taosThreadMutexInit(&pool->backupLock, NULL); - (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); - (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); - - (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); - (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); - (void)taosThreadCondInit(&pool->backupCond, NULL); if (!pool->pCb) { pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB)); @@ -824,13 +826,17 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { (void)taosThreadMutexLock(&pPool->poolLock); pPool->exit = true; - int32_t size = listNEles(pPool->workers); - for (int32_t i = 0; i < size; ++i) { - taosQsetThreadResume(pPool->qset); + int32_t size = 0; + if (pPool->workers) { + size = listNEles(pPool->workers); } - size = listNEles(pPool->backupWorkers); - for (int32_t i = 0; i < size; ++i) { - taosQsetThreadResume(pPool->qset); + if (pPool->backupWorkers) { + size += listNEles(pPool->backupWorkers); + } + if (pPool->qset) { + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } } (void)taosThreadMutexUnlock(&pPool->poolLock); @@ -848,7 +854,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { int32_t idx = 0; SQueryAutoQWorker *worker = NULL; - while (true) { + while (pPool->workers) { (void)taosThreadMutexLock(&pPool->poolLock); if (listNEles(pPool->workers) == 0) { (void)taosThreadMutexUnlock(&pPool->poolLock); @@ -864,7 +870,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosMemoryFree(pNode); } - while (listNEles(pPool->backupWorkers) > 0) { + while (pPool->backupWorkers && listNEles(pPool->backupWorkers) > 0) { SListNode *pNode = tdListPopHead(pPool->backupWorkers); worker = (SQueryAutoQWorker *)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { @@ -874,7 +880,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosMemoryFree(pNode); } - while (listNEles(pPool->exitedWorkers) > 0) { + while (pPool->exitedWorkers && listNEles(pPool->exitedWorkers) > 0) { SListNode *pNode = tdListPopHead(pPool->exitedWorkers); worker = (SQueryAutoQWorker *)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { @@ -935,7 +941,6 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { taosCloseQueue(queue); - terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; break; }