Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
99702ff4ee
|
@ -157,6 +157,7 @@ int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
|||
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx);
|
||||
int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx);
|
||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
|
|
@ -1427,9 +1427,12 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
}
|
||||
|
||||
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
// first(col_list) will be rewritten as first(col)
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
for (int32_t i = 0; i < numOfParams; ++i) {
|
||||
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, i);
|
||||
if (QUERY_NODE_VALUE == nodeType(pParamNode)) {
|
||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
}
|
||||
|
||||
pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
|
||||
|
@ -2323,7 +2326,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.translateFunc = translateHistogramPartial,
|
||||
.getEnvFunc = getHistogramFuncEnv,
|
||||
.initFunc = histogramFunctionSetup,
|
||||
.processFunc = histogramFunction,
|
||||
.processFunc = histogramFunctionPartial,
|
||||
.finalizeFunc = histogramPartialFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = histogramCombine,
|
||||
|
|
|
@ -4096,7 +4096,7 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t histogramFunction(SqlFunctionCtx* pCtx) {
|
||||
static int32_t histogramFunctionImpl(SqlFunctionCtx* pCtx, bool isPartial) {
|
||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
@ -4128,10 +4128,22 @@ int32_t histogramFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, pInfo->numOfBins);
|
||||
if (!isPartial) {
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, pInfo->numOfBins);
|
||||
} else {
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t histogramFunction(SqlFunctionCtx* pCtx) {
|
||||
return histogramFunctionImpl(pCtx, false);
|
||||
}
|
||||
|
||||
int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx) {
|
||||
return histogramFunctionImpl(pCtx, true);
|
||||
}
|
||||
|
||||
static void histogramTransferInfo(SHistoFuncInfo* pInput, SHistoFuncInfo* pOutput) {
|
||||
pOutput->normalized = pInput->normalized;
|
||||
pOutput->numOfBins = pInput->numOfBins;
|
||||
|
@ -4151,10 +4163,12 @@ int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx) {
|
|||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
char* data = colDataGetData(pCol, start);
|
||||
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo*)varDataVal(data);
|
||||
|
||||
histogramTransferInfo(pInputInfo, pInfo);
|
||||
for(int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo*)varDataVal(data);
|
||||
histogramTransferInfo(pInputInfo, pInfo);
|
||||
}
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4197,6 +4211,7 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t resultBytes = getHistogramInfoSize();
|
||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
@ -4210,7 +4225,7 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return 1;
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
|
|
|
@ -1364,9 +1364,9 @@ static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool partTagsOptHasCol(SNodeList* pPartKeys) {
|
||||
static bool planOptNodeListHasCol(SNodeList* pKeys) {
|
||||
bool hasCol = false;
|
||||
nodesWalkExprs(pPartKeys, partTagsOptHasColImpl, &hasCol);
|
||||
nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol);
|
||||
return hasCol;
|
||||
}
|
||||
|
||||
|
@ -1409,7 +1409,7 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
return !partTagsOptHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode));
|
||||
return !planOptNodeListHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode));
|
||||
}
|
||||
|
||||
static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) {
|
||||
|
@ -2096,6 +2096,37 @@ static int32_t mergeProjectsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
|||
return mergeProjectsOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
|
||||
}
|
||||
|
||||
static bool tagScanMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || (SCAN_TYPE_TAG == ((SScanLogicNode*)pNode)->scanType)) {
|
||||
return false;
|
||||
}
|
||||
SScanLogicNode *pScan = (SScanLogicNode*)pNode;
|
||||
if (NULL != pScan->pScanCols) {
|
||||
return false;
|
||||
}
|
||||
if (NULL == pNode->pParent || QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent) || 1 != LIST_LENGTH(pNode->pParent->pChildren)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent);
|
||||
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanMayBeOptimized);
|
||||
if (NULL == pScanNode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pScanNode->scanType = SCAN_TYPE_TAG;
|
||||
pCxt->optimized = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// clang-format off
|
||||
static const SOptimizeRule optimizeRuleSet[] = {
|
||||
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
|
||||
|
@ -2108,7 +2139,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
||||
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
||||
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}
|
||||
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
|
||||
{.pName = "TagScan", .optimizeFunc = tagScanOptimize}
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -87,4 +87,11 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
|
|||
TEST_F(PlanOptimizeTest, pushDownProjectCond) {
|
||||
useDb("root", "test");
|
||||
run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first");
|
||||
}
|
||||
|
||||
TEST_F(PlanOptimizeTest, tagScan) {
|
||||
useDb("root", "test");
|
||||
run("select tag1 from st1 group by tag1");
|
||||
run("select distinct tag1 from st1");
|
||||
run("select tag1*tag1 from st1 group by tag1*tag1");
|
||||
}
|
|
@ -317,6 +317,11 @@ typedef struct STransReq {
|
|||
void* data;
|
||||
} STransReq;
|
||||
|
||||
void transReqQueueInit(queue* q);
|
||||
void* transReqQueuePushReq(queue* q);
|
||||
void* transReqQueueRemove(void* arg);
|
||||
void transReqQueueClear(queue* q);
|
||||
|
||||
// queue sending msgs
|
||||
typedef struct {
|
||||
SArray* q;
|
||||
|
|
|
@ -19,7 +19,7 @@ typedef struct SCliConn {
|
|||
T_REF_DECLARE()
|
||||
uv_connect_t connReq;
|
||||
uv_stream_t* stream;
|
||||
uv_write_t writeReq;
|
||||
queue wreqQueue;
|
||||
|
||||
void* hostThrd;
|
||||
|
||||
|
@ -586,9 +586,10 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
|||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||
conn->stream->data = conn;
|
||||
|
||||
conn->writeReq.data = conn;
|
||||
conn->connReq.data = conn;
|
||||
|
||||
transReqQueueInit(&conn->wreqQueue);
|
||||
|
||||
transQueueInit(&conn->cliMsgs, NULL);
|
||||
QUEUE_INIT(&conn->conn);
|
||||
conn->hostThrd = pThrd;
|
||||
|
@ -627,6 +628,8 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
transCtxCleanup(&conn->ctx);
|
||||
transQueueDestroy(&conn->cliMsgs);
|
||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
transReqQueueClear(&conn->wreqQueue);
|
||||
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
taosMemoryFree(conn);
|
||||
}
|
||||
|
@ -649,11 +652,8 @@ static bool cliHandleNoResp(SCliConn* conn) {
|
|||
return res;
|
||||
}
|
||||
static void cliSendCb(uv_write_t* req, int status) {
|
||||
SCliConn* pConn = req && req->handle ? req->handle->data : NULL;
|
||||
taosMemoryFree(req);
|
||||
if (pConn == NULL) {
|
||||
return;
|
||||
}
|
||||
SCliConn* pConn = transReqQueueRemove(req);
|
||||
if (pConn == NULL) return;
|
||||
|
||||
if (status == 0) {
|
||||
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
|
@ -711,7 +711,7 @@ void cliSend(SCliConn* pConn) {
|
|||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
}
|
||||
|
||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
||||
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue);
|
||||
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
return;
|
||||
_RETURN:
|
||||
|
|
|
@ -293,6 +293,48 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void transReqQueueInit(queue* q) {
|
||||
// init req queue
|
||||
QUEUE_INIT(q);
|
||||
}
|
||||
void* transReqQueuePushReq(queue* q) {
|
||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
||||
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
|
||||
wreq->data = req;
|
||||
req->data = wreq;
|
||||
QUEUE_PUSH(q, &wreq->q);
|
||||
return req;
|
||||
}
|
||||
void* transReqQueueRemove(void* arg) {
|
||||
void* ret = NULL;
|
||||
uv_write_t* req = arg;
|
||||
STransReq* wreq = req && req->data ? req->data : NULL;
|
||||
|
||||
assert(wreq->data == req);
|
||||
if (wreq == NULL || wreq->data == NULL) {
|
||||
taosMemoryFree(wreq->data);
|
||||
taosMemoryFree(wreq);
|
||||
return req;
|
||||
}
|
||||
|
||||
QUEUE_REMOVE(&wreq->q);
|
||||
|
||||
ret = req && req->handle ? req->handle->data : NULL;
|
||||
taosMemoryFree(wreq->data);
|
||||
taosMemoryFree(wreq);
|
||||
|
||||
return ret;
|
||||
}
|
||||
void transReqQueueClear(queue* q) {
|
||||
while (!QUEUE_IS_EMPTY(q)) {
|
||||
queue* h = QUEUE_HEAD(q);
|
||||
QUEUE_REMOVE(h);
|
||||
STransReq* req = QUEUE_DATA(h, STransReq, q);
|
||||
taosMemoryFree(req->data);
|
||||
taosMemoryFree(req);
|
||||
}
|
||||
}
|
||||
|
||||
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
|
||||
queue->q = taosArrayInit(2, sizeof(void*));
|
||||
queue->freeFunc = (void (*)(const void*))freeFunc;
|
||||
|
|
|
@ -331,14 +331,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
|
|||
}
|
||||
|
||||
void uvOnSendCb(uv_write_t* req, int status) {
|
||||
STransReq* wreq = req && req->data ? req->data : NULL;
|
||||
SSvrConn* conn = req && req->handle ? req->handle->data : NULL;
|
||||
if (wreq != NULL && conn != NULL) {
|
||||
QUEUE_REMOVE(&wreq->q);
|
||||
taosMemoryFree(wreq->data);
|
||||
taosMemoryFree(wreq);
|
||||
}
|
||||
|
||||
SSvrConn* conn = transReqQueueRemove(req);
|
||||
if (conn == NULL) return;
|
||||
|
||||
if (status == 0) {
|
||||
|
@ -442,12 +435,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
|
|||
|
||||
transRefSrvHandle(pConn);
|
||||
|
||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
||||
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
|
||||
wreq->data = req;
|
||||
req->data = wreq;
|
||||
QUEUE_PUSH(&pConn->wreqQueue, &wreq->q);
|
||||
|
||||
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue);
|
||||
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
|
||||
}
|
||||
static void uvStartSendResp(SSvrMsg* smsg) {
|
||||
|
@ -757,7 +745,7 @@ static SSvrConn* createConn(void* hThrd) {
|
|||
|
||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||
|
||||
QUEUE_INIT(&pConn->wreqQueue);
|
||||
transReqQueueInit(&pConn->wreqQueue);
|
||||
QUEUE_INIT(&pConn->queue);
|
||||
|
||||
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
||||
|
@ -834,13 +822,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
|||
destroySmsg(msg);
|
||||
}
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&conn->wreqQueue)) {
|
||||
queue* h = QUEUE_HEAD(&conn->wreqQueue);
|
||||
QUEUE_REMOVE(h);
|
||||
STransReq* req = QUEUE_DATA(h, STransReq, q);
|
||||
taosMemoryFree(req->data);
|
||||
taosMemoryFree(req);
|
||||
}
|
||||
transReqQueueClear(&conn->wreqQueue);
|
||||
transQueueDestroy(&conn->srvMsgs);
|
||||
|
||||
QUEUE_REMOVE(&conn->queue);
|
||||
|
|
|
@ -98,7 +98,7 @@ print ----> start to check if there are ERRORS in vagrind log file for each dnod
|
|||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 2 then
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ print ----> start to check if there are ERRORS in vagrind log file for each dnod
|
|||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 2 then
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue