Merge branch '3.0' into feat/TD-27463-3.0

This commit is contained in:
kailixu 2024-02-03 12:11:23 +08:00
commit 1a6b491405
7 changed files with 132 additions and 40 deletions

View File

@ -62,6 +62,7 @@ int metaOpenIdx(SMeta *pMeta) {
return 0;
}
#ifdef BUILD_NO_CALL
void metaCloseIdx(SMeta *pMeta) { /* TODO */
#if 0
if (pMeta->pIdx) {
@ -114,3 +115,4 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
// TODO
return 0;
}
#endif

View File

@ -273,7 +273,9 @@ static void metaCleanup(SMeta **ppMeta) {
if (pMeta) {
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
#ifdef BUILD_NO_CALL
if (pMeta->pIdx) metaCloseIdx(pMeta);
#endif
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);

View File

@ -1455,28 +1455,25 @@ static int32_t sortForJoinOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pL
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0);
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
SScanLogicNode* pScan = NULL;
SLogicNode* pChild = NULL;
SNode** pChildPos = NULL;
EOrder targetOrder = 0;
SSHashObj* pTables = NULL;
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pLeft) && ((SScanLogicNode*)pLeft)->node.outputTsOrder != SCAN_ORDER_BOTH) {
pScan = (SScanLogicNode*)pLeft;
pChild = pRight;
pChildPos = &pJoin->node.pChildren->pTail->pNode;
targetOrder = pScan->node.outputTsOrder;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pRight) && ((SScanLogicNode*)pRight)->node.outputTsOrder != SCAN_ORDER_BOTH) {
pScan = (SScanLogicNode*)pRight;
}
if (NULL != pScan) {
switch (pScan->node.outputTsOrder) {
case SCAN_ORDER_ASC:
pScan->scanSeq[0] = 0;
pScan->scanSeq[1] = 1;
pScan->node.outputTsOrder = ORDER_DESC;
goto _return;
case SCAN_ORDER_DESC:
pScan->scanSeq[0] = 1;
pScan->scanSeq[1] = 0;
pScan->node.outputTsOrder = ORDER_ASC;
goto _return;
default:
break;
}
pChild = pLeft;
pChildPos = &pJoin->node.pChildren->pHead->pNode;
targetOrder = pScan->node.outputTsOrder;
} else {
pChild = pRight;
pChildPos = &pJoin->node.pChildren->pTail->pNode;
targetOrder = pLeft->outputTsOrder;
}
if (QUERY_NODE_OPERATOR != nodeType(pJoin->pPrimKeyEqCond)) {
@ -1490,16 +1487,15 @@ static int32_t sortForJoinOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pL
}
SNode* pOrderByNode = NULL;
SSHashObj* pLeftTables = NULL;
collectTableAliasFromNodes(nodesListGetNode(pJoin->node.pChildren, 0), &pLeftTables);
if (NULL != tSimpleHashGet(pLeftTables, ((SColumnNode*)pOp->pLeft)->tableAlias, strlen(((SColumnNode*)pOp->pLeft)->tableAlias))) {
collectTableAliasFromNodes((SNode*)pChild, &pTables);
if (NULL != tSimpleHashGet(pTables, ((SColumnNode*)pOp->pLeft)->tableAlias, strlen(((SColumnNode*)pOp->pLeft)->tableAlias))) {
pOrderByNode = pOp->pLeft;
} else if (NULL != tSimpleHashGet(pLeftTables, ((SColumnNode*)pOp->pRight)->tableAlias, strlen(((SColumnNode*)pOp->pRight)->tableAlias))) {
} else if (NULL != tSimpleHashGet(pTables, ((SColumnNode*)pOp->pRight)->tableAlias, strlen(((SColumnNode*)pOp->pRight)->tableAlias))) {
pOrderByNode = pOp->pRight;
}
tSimpleHashCleanup(pLeftTables);
tSimpleHashCleanup(pTables);
if (NULL == pOrderByNode) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
@ -1510,7 +1506,13 @@ static int32_t sortForJoinOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pL
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->node.outputTsOrder = (ORDER_ASC == pLeft->outputTsOrder) ? ORDER_DESC : ORDER_ASC;
pSort->node.outputTsOrder = targetOrder;
pSort->node.pTargets = nodesCloneList(pChild->pTargets);
if (NULL == pSort->node.pTargets) {
nodesDestroyNode((SNode *)pSort);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->groupSort = false;
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOrder) {
@ -1519,7 +1521,7 @@ static int32_t sortForJoinOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pL
}
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
pOrder->order = (ORDER_ASC == pLeft->outputTsOrder) ? ORDER_DESC : ORDER_ASC;
pOrder->order = targetOrder;
pOrder->pExpr = nodesCloneNode(pOrderByNode);
pOrder->nullOrder = (ORDER_ASC == pOrder->order) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
if (!pOrder->pExpr) {
@ -1527,9 +1529,9 @@ static int32_t sortForJoinOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pL
return TSDB_CODE_OUT_OF_MEMORY;
}
pLeft->pParent = (SLogicNode*)pSort;
nodesListMakeAppend(&pSort->node.pChildren, (SNode*)pLeft);
pJoin->node.pChildren->pHead->pNode = (SNode*)pSort;
pChild->pParent = (SLogicNode*)pSort;
nodesListMakeAppend(&pSort->node.pChildren, (SNode*)pChild);
*pChildPos = (SNode*)pSort;
pSort->node.pParent = (SLogicNode*)pJoin;;
_return:

View File

@ -2515,7 +2515,7 @@ int transReleaseCliHandle(void* handle) {
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
if (pThrd == NULL) {
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
@ -2535,7 +2535,7 @@ int transReleaseCliHandle(void* handle) {
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
destroyCmsg(cmsg);
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
return 0;
}
@ -2544,7 +2544,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
@ -2577,7 +2577,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
@ -2589,7 +2589,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransRsp);
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
@ -2627,6 +2627,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) {
destroyCmsg(cliMsg);
ret = TSDB_CODE_RPC_BROKEN_LINK;
goto _RETURN;
}
tsem_wait(sem);
@ -2661,7 +2662,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg);
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
@ -2698,6 +2699,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) {
destroyCmsg(cliMsg);
ret = TSDB_CODE_RPC_BROKEN_LINK;
goto _RETURN;
}
@ -2726,7 +2728,7 @@ _RETURN:
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
return -1;
return TSDB_CODE_RPC_BROKEN_LINK;
}
SCvtAddr cvtAddr = {0};
@ -2750,7 +2752,6 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1;
}
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);

View File

@ -89,7 +89,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
}
}
@ -210,7 +210,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
}
}
@ -357,7 +357,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
}
}

View File

@ -128,7 +128,7 @@ class TDTestCase(TBase):
self.checkInsertCorrect()
# check stream correct and drop stream
self.checkStreamCorrect()
#self.checkStreamCorrect()
# drop stream
self.dropStream(self.sname)

View File

@ -47,5 +47,90 @@ sql select a.*,b.* from (select * from tba1 order by ts desc) a, (select * from
if $rows != 4 then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts desc) a, (select * from tba1 order by ts) b where a.ts=b.ts order by a.ts;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts desc) a, (select * from tba1 order by ts) b where a.ts=b.ts order by a.ts desc;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:04.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts) a, (select * from tba1 order by ts) b where a.ts=b.ts order by a.ts;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts) a, (select * from tba1 order by ts) b where a.ts=b.ts order by a.ts desc;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:04.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts limit 2) a, (select * from tba1 order by ts desc limit 2) b where a.ts=b.ts order by a.ts desc;
if $rows != 0 then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts limit 3) a, (select * from tba1 order by ts desc limit 3) b where a.ts=b.ts order by a.ts desc;
if $rows != 2 then
return -1
endi
if $data00 != @23-11-17 16:29:03.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts limit 3) a, (select * from tba1 order by ts desc limit 3) b where a.ts=b.ts order by a.ts;
if $rows != 2 then
return -1
endi
if $data00 != @23-11-17 16:29:02.000@ then
return -1
endi
sql select a.*,b.* from tba1 a, (select * from tba1 order by ts desc limit 3) b where a.ts=b.ts order by a.ts;
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:02.000@ then
return -1
endi
sql select a.*,b.* from tba1 a, (select * from tba1 order by ts limit 3) b where a.ts=b.ts order by a.ts desc limit 2;
if $rows != 2 then
return -1
endi
if $data00 != @23-11-17 16:29:03.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts limit 3) a, tba1 b where a.ts=b.ts order by a.ts desc;
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:03.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts desc limit 3) a, tba1 b where a.ts=b.ts order by a.ts desc;
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:04.000@ then
return -1
endi
sql select a.*,b.* from (select * from tba1 order by ts desc limit 3) a, tba1 b where a.ts=b.ts order by a.ts;
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:02.000@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT