fix ret check caused crash
This commit is contained in:
parent
f83521b2eb
commit
aa4b4a609f
|
@ -636,7 +636,7 @@ bool nodesExprsHasColumn(SNodeList* pList);
|
|||
void* nodesGetValueFromNode(SValueNode* pNode);
|
||||
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
|
||||
char* nodesGetStrValueFromNode(SValueNode* pNode);
|
||||
void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal);
|
||||
int32_t nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal);
|
||||
int32_t nodesMakeValueNodeFromString(char* literal, SValueNode** ppValNode);
|
||||
int32_t nodesMakeValueNodeFromBool(bool b, SValueNode** ppValNode);
|
||||
int32_t nodesMakeValueNodeFromInt32(int32_t value, SNode** ppNode);
|
||||
|
|
|
@ -1781,7 +1781,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
pExp->base.resSchema =
|
||||
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
|
||||
nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
||||
code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
||||
} else if (type == QUERY_NODE_FUNCTION) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
||||
SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
|
||||
|
@ -1811,12 +1811,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) { // todo handle error
|
||||
} else {
|
||||
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -1826,7 +1824,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = numOfParam;
|
||||
|
||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
|
||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
|
||||
if (p1->type == QUERY_NODE_COLUMN) {
|
||||
|
@ -1839,7 +1837,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||
SValueNode* pvn = (SValueNode*)p1;
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||
nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
|
||||
code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
} else if (type == QUERY_NODE_OPERATOR) {
|
||||
|
@ -1871,13 +1870,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pExp->base.numOfParams = 1;
|
||||
SDataType* pType = &pCond->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
|
||||
pExp->pExpr->_optrRoot.pRootNode = pNode;
|
||||
}
|
||||
pExp->base.numOfParams = 1;
|
||||
SDataType* pType = &pCond->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
|
||||
pExp->pExpr->_optrRoot.pRootNode = pNode;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -141,7 +141,7 @@ static int32_t callocNodeChunk(SNodeAllocator* pAllocator, SNodeMemChunk** pOutC
|
|||
static int32_t nodesCallocImpl(int32_t size, void** pOut) {
|
||||
if (NULL == g_pNodeAllocator) {
|
||||
*pOut = taosMemoryCalloc(1, size);
|
||||
if (!pOut) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (!*pOut) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2638,11 +2638,12 @@ int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots) {
|
|||
return num;
|
||||
}
|
||||
|
||||
void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
||||
int32_t nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
||||
int32_t code = 0;
|
||||
if (pNode->isNull) {
|
||||
pVal->nType = TSDB_DATA_TYPE_NULL;
|
||||
pVal->nLen = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
pVal->nType = pNode->node.resType.type;
|
||||
pVal->nLen = pNode->node.resType.bytes;
|
||||
|
@ -2676,13 +2677,21 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
|||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_GEOMETRY:
|
||||
pVal->pz = taosMemoryMalloc(pVal->nLen + 1);
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
pVal->pz[pVal->nLen] = 0;
|
||||
if (pVal->pz) {
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
pVal->pz[pVal->nLen] = 0;
|
||||
} else {
|
||||
code = terrno;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
pVal->nLen = getJsonValueLen(pNode->datum.p);
|
||||
pVal->pz = taosMemoryMalloc(pVal->nLen);
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
if (pVal->pz) {
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
} else {
|
||||
code = terrno;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
|
@ -2690,6 +2699,7 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
|||
default:
|
||||
break;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) {
|
||||
|
|
|
@ -792,6 +792,16 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
|
|||
|
||||
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
|
||||
int32_t code;
|
||||
|
||||
(void)taosThreadMutexInit(&pool->poolLock, NULL);
|
||||
(void)taosThreadMutexInit(&pool->backupLock, NULL);
|
||||
(void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
|
||||
(void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
|
||||
|
||||
(void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
|
||||
(void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
|
||||
(void)taosThreadCondInit(&pool->backupCond, NULL);
|
||||
|
||||
code = taosOpenQset(&pool->qset);
|
||||
if (code) return terrno = code;
|
||||
pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
|
||||
|
@ -802,14 +812,6 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
|
|||
if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
pool->maxInUse = pool->max * 2 + 2;
|
||||
|
||||
(void)taosThreadMutexInit(&pool->poolLock, NULL);
|
||||
(void)taosThreadMutexInit(&pool->backupLock, NULL);
|
||||
(void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
|
||||
(void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
|
||||
|
||||
(void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
|
||||
(void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
|
||||
(void)taosThreadCondInit(&pool->backupCond, NULL);
|
||||
|
||||
if (!pool->pCb) {
|
||||
pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
|
||||
|
@ -824,13 +826,17 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
|
|||
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
pPool->exit = true;
|
||||
int32_t size = listNEles(pPool->workers);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
taosQsetThreadResume(pPool->qset);
|
||||
int32_t size = 0;
|
||||
if (pPool->workers) {
|
||||
size = listNEles(pPool->workers);
|
||||
}
|
||||
size = listNEles(pPool->backupWorkers);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
taosQsetThreadResume(pPool->qset);
|
||||
if (pPool->backupWorkers) {
|
||||
size += listNEles(pPool->backupWorkers);
|
||||
}
|
||||
if (pPool->qset) {
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
taosQsetThreadResume(pPool->qset);
|
||||
}
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
|
||||
|
@ -848,7 +854,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
|
||||
int32_t idx = 0;
|
||||
SQueryAutoQWorker *worker = NULL;
|
||||
while (true) {
|
||||
while (pPool->workers) {
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
if (listNEles(pPool->workers) == 0) {
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
|
@ -864,7 +870,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
taosMemoryFree(pNode);
|
||||
}
|
||||
|
||||
while (listNEles(pPool->backupWorkers) > 0) {
|
||||
while (pPool->backupWorkers && listNEles(pPool->backupWorkers) > 0) {
|
||||
SListNode *pNode = tdListPopHead(pPool->backupWorkers);
|
||||
worker = (SQueryAutoQWorker *)pNode->data;
|
||||
if (worker && taosCheckPthreadValid(worker->thread)) {
|
||||
|
@ -874,7 +880,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
taosMemoryFree(pNode);
|
||||
}
|
||||
|
||||
while (listNEles(pPool->exitedWorkers) > 0) {
|
||||
while (pPool->exitedWorkers && listNEles(pPool->exitedWorkers) > 0) {
|
||||
SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
|
||||
worker = (SQueryAutoQWorker *)pNode->data;
|
||||
if (worker && taosCheckPthreadValid(worker->thread)) {
|
||||
|
@ -935,7 +941,6 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
|
|||
|
||||
if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
|
||||
taosCloseQueue(queue);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
queue = NULL;
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue