Merge pull request #16958 from taosdata/enh/3.0_planner_optimize

enh: added memory allocators for parser and planner
This commit is contained in:
Shengliang Guan 2022-09-21 19:59:28 +08:00 committed by GitHub
commit 9595412159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 358 additions and 76 deletions

View File

@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;
extern bool tsQueryPlannerTrace; extern bool tsQueryPlannerTrace;
extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;

View File

@ -275,6 +275,17 @@ typedef struct SNodeList {
SListCell* pTail; SListCell* pTail;
} SNodeList; } SNodeList;
typedef struct SNodeAllocator SNodeAllocator;
int32_t nodesInitAllocatorSet();
void nodesDestroyAllocatorSet();
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId);
int32_t nodesAcquireAllocator(int64_t allocatorId);
int32_t nodesReleaseAllocator(int64_t allocatorId);
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId);
int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId);
void nodesDestroyAllocator(int64_t allocatorId);
SNode* nodesMakeNode(ENodeType type); SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode); void nodesDestroyNode(SNode* pNode);

View File

@ -56,6 +56,7 @@ typedef struct SParseContext {
bool nodeOffline; bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId;
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);

View File

@ -39,6 +39,7 @@ typedef struct SPlanContext {
int32_t msgLen; int32_t msgLen;
const char* pUser; const char* pUser;
bool sysInfo; bool sysInfo;
int64_t allocatorId;
} SPlanContext; } SPlanContext;
// Create the physical plan for the query, according to the AST. // Create the physical plan for the query, according to the AST.

View File

@ -67,6 +67,7 @@ typedef struct SSchedulerReq {
SRequestConnInfo *pConn; SRequestConnInfo *pConn;
SArray *pNodeList; SArray *pNodeList;
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId;
const char *sql; const char *sql;
int64_t startTs; int64_t startTs;
schedulerExecFp execFp; schedulerExecFp execFp;

View File

@ -250,6 +250,7 @@ typedef struct SRequestObj {
bool inRetry; bool inRetry;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId;
} SRequestObj; } SRequestObj;
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {

View File

@ -288,6 +288,7 @@ void *createRequest(uint64_t connId, int32_t type) {
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type; pRequest->type = type;
pRequest->allocatorRefId = -1;
pRequest->pDb = getDbOfConnection(pTscObj); pRequest->pDb = getDbOfConnection(pTscObj);
pRequest->pTscObj = pTscObj; pRequest->pTscObj = pTscObj;
@ -349,6 +350,7 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList); taosArrayDestroy(pRequest->targetTableList);
nodesDestroyAllocator(pRequest->allocatorRefId);
destroyQueryExecRes(&pRequest->body.resInfo.execRes); destroyQueryExecRes(&pRequest->body.resInfo.execRes);
@ -411,6 +413,7 @@ void taos_init_imp(void) {
initTaskQueue(); initTaskQueue();
fmFuncMgtInit(); fmFuncMgtInit();
nodesInitAllocatorSet();
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);

View File

@ -195,6 +195,19 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pRequest)->allocatorRefId = -1;
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
if (TSDB_CODE_SUCCESS !=
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1023,7 +1036,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user, .pUser = pRequest->pTscObj->user,
.sysInfo = pRequest->pTscObj->sysInfo}; .sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SAppInstInfo* pAppInfo = getAppInfo(pRequest); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SQueryPlan* pDag = NULL; SQueryPlan* pDag = NULL;
@ -1048,6 +1062,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pConn = &conn, .pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pDag, .pDag = pDag,
.allocatorRefId = pRequest->allocatorRefId,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.execFp = schedulerExecCb, .execFp = schedulerExecCb,

View File

@ -65,6 +65,7 @@ void taos_cleanup(void) {
fmFuncMgtDestroy(); fmFuncMgtDestroy();
qCleanupKeywordsTable(); qCleanupKeywordsTable();
nodesDestroyAllocatorSet();
id = clientConnRefPool; id = clientConnRefPool;
clientConnRefPool = -1; clientConnRefPool = -1;
@ -775,7 +776,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.async = true, .async = true,
.svrVer = pTscObj->sVer, .svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.allocatorId = pRequest->allocatorRefId};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -92,6 +92,8 @@ bool tsSmlDataFormat =
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -285,6 +287,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
@ -645,6 +649,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
return 0; return 0;
} }
@ -979,6 +985,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
} else if (strcasecmp("queryPlannerTrace", name) == 0) { } else if (strcasecmp("queryPlannerTrace", name) == 0) {
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
} else if (strcasecmp("queryNodeChunkSize", name) == 0) {
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
} else if (strcasecmp("queryUseNodeAllocator", name) == 0) {
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
} }
break; break;
} }

View File

@ -21,9 +21,209 @@
#include "taoserror.h" #include "taoserror.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tref.h"
static SNode* makeNode(ENodeType type, size_t size) { typedef struct SNodeMemChunk {
SNode* p = taosMemoryCalloc(1, size); int32_t availableSize;
int32_t usedSize;
char* pBuf;
struct SNodeMemChunk* pNext;
} SNodeMemChunk;
typedef struct SNodeAllocator {
int64_t self;
int64_t queryId;
int32_t chunkSize;
int32_t chunkNum;
SNodeMemChunk* pCurrChunk;
SNodeMemChunk* pChunks;
TdThreadMutex mutex;
} SNodeAllocator;
static threadlocal SNodeAllocator* g_pNodeAllocator;
static int32_t g_allocatorReqRefPool = -1;
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
if (NULL == pNewChunk) {
return NULL;
}
pNewChunk->pBuf = (char*)(pNewChunk + 1);
pNewChunk->availableSize = pAllocator->chunkSize;
pNewChunk->usedSize = 0;
pNewChunk->pNext = NULL;
if (NULL != pAllocator->pCurrChunk) {
pAllocator->pCurrChunk->pNext = pNewChunk;
}
pAllocator->pCurrChunk = pNewChunk;
if (NULL == pAllocator->pChunks) {
pAllocator->pChunks = pNewChunk;
}
++(pAllocator->chunkNum);
return pNewChunk;
}
static void* nodesCallocImpl(int32_t size) {
if (NULL == g_pNodeAllocator) {
return taosMemoryCalloc(1, size);
}
if (g_pNodeAllocator->pCurrChunk->usedSize + size > g_pNodeAllocator->pCurrChunk->availableSize) {
if (NULL == callocNodeChunk(g_pNodeAllocator)) {
return NULL;
}
}
void* p = g_pNodeAllocator->pCurrChunk->pBuf + g_pNodeAllocator->pCurrChunk->usedSize;
g_pNodeAllocator->pCurrChunk->usedSize += size;
return p;
}
static void* nodesCalloc(int32_t num, int32_t size) {
void* p = nodesCallocImpl(num * size + 1);
if (NULL == p) {
return NULL;
}
*(char*)p = (NULL != g_pNodeAllocator) ? 1 : 0;
return (char*)p + 1;
}
static void nodesFree(void* p) {
char* ptr = (char*)p - 1;
if (0 == *ptr) {
taosMemoryFree(ptr);
}
return;
}
static int32_t createNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) {
*pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator));
if (NULL == *pAllocator) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pAllocator)->chunkSize = chunkSize;
if (NULL == callocNodeChunk(*pAllocator)) {
taosMemoryFreeClear(*pAllocator);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosThreadMutexInit(&(*pAllocator)->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
static void destroyNodeAllocator(void* p) {
if (NULL == p) {
return;
}
SNodeAllocator* pAllocator = p;
nodesDebug("query id %" PRIx64 " allocator id %" PRIx64 " alloc chunkNum: %d, chunkTotakSize: %d",
pAllocator->queryId, pAllocator->self, pAllocator->chunkNum, pAllocator->chunkNum * pAllocator->chunkSize);
SNodeMemChunk* pChunk = pAllocator->pChunks;
while (NULL != pChunk) {
SNodeMemChunk* pTemp = pChunk->pNext;
taosMemoryFree(pChunk);
pChunk = pTemp;
}
taosThreadMutexDestroy(&pAllocator->mutex);
taosMemoryFree(pAllocator);
}
int32_t nodesInitAllocatorSet() {
if (g_allocatorReqRefPool >= 0) {
nodesWarn("nodes already initialized");
return TSDB_CODE_SUCCESS;
}
g_allocatorReqRefPool = taosOpenRef(1024, destroyNodeAllocator);
if (g_allocatorReqRefPool < 0) {
nodesError("init nodes failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
void nodesDestroyAllocatorSet() {
if (g_allocatorReqRefPool >= 0) {
SNodeAllocator* pAllocator = taosIterateRef(g_allocatorReqRefPool, 0);
int64_t refId = 0;
while (NULL != pAllocator) {
refId = pAllocator->self;
taosRemoveRef(g_allocatorReqRefPool, refId);
pAllocator = taosIterateRef(g_allocatorReqRefPool, refId);
}
taosCloseRef(g_allocatorReqRefPool);
}
}
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId) {
SNodeAllocator* pAllocator = NULL;
int32_t code = createNodeAllocator(chunkSize, &pAllocator);
if (TSDB_CODE_SUCCESS == code) {
pAllocator->self = taosAddRef(g_allocatorReqRefPool, pAllocator);
if (pAllocator->self <= 0) {
return terrno;
}
pAllocator->queryId = queryId;
*pAllocatorId = pAllocator->self;
}
return code;
}
int32_t nodesAcquireAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
if (NULL == pAllocator) {
return terrno;
}
taosThreadMutexLock(&pAllocator->mutex);
g_pNodeAllocator = pAllocator;
return TSDB_CODE_SUCCESS;
}
int32_t nodesReleaseAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
if (NULL == g_pNodeAllocator) {
nodesError("allocator id %" PRIx64
" release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator "
"function is called!",
allocatorId);
return TSDB_CODE_FAILED;
}
SNodeAllocator* pAllocator = g_pNodeAllocator;
g_pNodeAllocator = NULL;
taosThreadMutexUnlock(&pAllocator->mutex);
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
}
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
if (allocatorId <= 0) {
return 0;
}
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
return pAllocator->self;
}
int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId) { return taosReleaseRef(g_allocatorReqRefPool, allocatorId); }
void nodesDestroyAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return;
}
taosRemoveRef(g_allocatorReqRefPool, allocatorId);
}
static SNode* makeNode(ENodeType type, int32_t size) {
SNode* p = nodesCalloc(1, size);
if (NULL == p) { if (NULL == p) {
return NULL; return NULL;
} }
@ -824,6 +1024,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pWStartTs); nodesDestroyNode(pLogicNode->pWStartTs);
nodesDestroyNode(pLogicNode->pValues); nodesDestroyNode(pLogicNode->pValues);
nodesDestroyList(pLogicNode->pFillExprs); nodesDestroyList(pLogicNode->pFillExprs);
nodesDestroyList(pLogicNode->pNotFillExprs);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_SORT: { case QUERY_NODE_LOGIC_PLAN_SORT: {
@ -1021,12 +1222,12 @@ void nodesDestroyNode(SNode* pNode) {
default: default:
break; break;
} }
taosMemoryFreeClear(pNode); nodesFree(pNode);
return; return;
} }
SNodeList* nodesMakeList() { SNodeList* nodesMakeList() {
SNodeList* p = taosMemoryCalloc(1, sizeof(SNodeList)); SNodeList* p = nodesCalloc(1, sizeof(SNodeList));
if (NULL == p) { if (NULL == p) {
return NULL; return NULL;
} }
@ -1037,7 +1238,7 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); SListCell* p = nodesCalloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1104,7 +1305,7 @@ int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
} }
pTarget->pTail = pSrc->pTail; pTarget->pTail = pSrc->pTail;
pTarget->length += pSrc->length; pTarget->length += pSrc->length;
taosMemoryFreeClear(pSrc); nodesFree(pSrc);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1124,7 +1325,7 @@ int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); SListCell* p = nodesCalloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1152,7 +1353,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
} }
SListCell* pNext = pCell->pNext; SListCell* pNext = pCell->pNext;
nodesDestroyNode(pCell->pNode); nodesDestroyNode(pCell->pNode);
taosMemoryFreeClear(pCell); nodesFree(pCell);
--(pList->length); --(pList->length);
return pNext; return pNext;
} }
@ -1172,7 +1373,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
pPos->pPrev = pSrc->pTail; pPos->pPrev = pSrc->pTail;
pTarget->length += pSrc->length; pTarget->length += pSrc->length;
taosMemoryFreeClear(pSrc); nodesFree(pSrc);
} }
SNode* nodesListGetNode(SNodeList* pList, int32_t index) { SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
@ -1204,7 +1405,7 @@ void nodesDestroyList(SNodeList* pList) {
while (NULL != pNext) { while (NULL != pNext) {
pNext = nodesListErase(pList, pNext); pNext = nodesListErase(pList, pNext);
} }
taosMemoryFreeClear(pList); nodesFree(pList);
} }
void nodesClearList(SNodeList* pList) { void nodesClearList(SNodeList* pList) {
@ -1216,9 +1417,9 @@ void nodesClearList(SNodeList* pList) {
while (NULL != pNext) { while (NULL != pNext) {
SListCell* tmp = pNext; SListCell* tmp = pNext;
pNext = pNext->pNext; pNext = pNext->pNext;
taosMemoryFreeClear(tmp); nodesFree(tmp);
} }
taosMemoryFreeClear(pList); nodesFree(pList);
} }
void* nodesGetValueFromNode(SValueNode* pNode) { void* nodesGetValueFromNode(SValueNode* pNode) {

View File

@ -247,7 +247,8 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
pExpr->userAlias[len] = '\0'; pExpr->userAlias[len] = '\0';
} }
} }
taosMemoryFreeClear(pNode); pRawExpr->pNode = NULL;
nodesDestroyNode(pNode);
return pRealizedExpr; return pRealizedExpr;
} }

View File

@ -177,15 +177,18 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache); code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else { } else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache); code = parseSqlSyntax(pCxt, pQuery, &metaCache);
} }
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq); code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
} }
nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, true); destoryParseMetaCache(&metaCache, true);
terrno = code; terrno = code;
return code; return code;
@ -194,7 +197,10 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) { const struct SMetaData* pMetaData, SQuery* pQuery) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot); int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) { if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache); code = parseInsertSql(pCxt, &pQuery, &metaCache);
@ -202,6 +208,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
code = analyseSemantic(pCxt, pQuery, &metaCache); code = analyseSemantic(pCxt, pQuery, &metaCache);
} }
} }
nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, false); destoryParseMetaCache(&metaCache, false);
terrno = code; terrno = code;
return code; return code;

View File

@ -119,12 +119,18 @@ class ParserTestBaseImpl {
TEST_INTERFACE_ASYNC_API TEST_INTERFACE_ASYNC_API
}; };
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { static void destoryParseContext(SParseContext* pCxt) {
taosArrayDestroy(pCxt->pTableMetaPos);
taosArrayDestroy(pCxt->pTableVgroupPos);
delete pCxt;
}
static void destoryParseMetaCacheWarpper(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache, request); destoryParseMetaCache(pMetaCache, request);
delete pMetaCache; delete pMetaCache;
} }
static void _destroyQuery(SQuery** pQuery) { static void destroyQuery(SQuery** pQuery) {
if (nullptr == pQuery) { if (nullptr == pQuery) {
return; return;
} }
@ -303,10 +309,10 @@ class ParserTestBaseImpl {
setParseContext(sql, &cxt); setParseContext(sql, &cxt);
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) { if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseInsertSql(&cxt, query.get(), nullptr); doParseInsertSql(&cxt, query.get(), nullptr);
} else { } else {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParse(&cxt, query.get()); doParse(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
@ -335,7 +341,7 @@ class ParserTestBaseImpl {
SParseContext cxt = {0}; SParseContext cxt = {0};
setParseContext(sql, &cxt); setParseContext(sql, &cxt);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseSql(&cxt, query.get()); doParseSql(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
@ -354,26 +360,26 @@ class ParserTestBaseImpl {
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) { void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL); reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL);
try { try {
SParseContext cxt = {0}; unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, &cxt, true); setParseContext(sql, cxt.get(), true);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
bool request = true; bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache( unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request))); new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request)));
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen); bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen);
if (isInsertValues) { if (isInsertValues) {
doParseInsertSyntax(&cxt, query.get(), metaCache.get()); doParseInsertSyntax(cxt.get(), query.get(), metaCache.get());
} else { } else {
doParse(&cxt, query.get()); doParse(cxt.get(), query.get());
doCollectMetaKey(&cxt, *(query.get()), metaCache.get()); doCollectMetaKey(cxt.get(), *(query.get()), metaCache.get());
} }
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq); MockCatalogService::destoryCatalogReq);
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get()); doBuildCatalogReq(cxt.get(), metaCache.get(), catalogReq.get());
string err; string err;
thread t1([&]() { thread t1([&]() {
@ -386,13 +392,13 @@ class ParserTestBaseImpl {
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues); doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
if (isInsertValues) { if (isInsertValues) {
doParseInsertSql(&cxt, query.get(), metaCache.get()); doParseInsertSql(cxt.get(), query.get(), metaCache.get());
} else { } else {
doAuthenticate(&cxt, pQuery, metaCache.get()); doAuthenticate(cxt.get(), pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get()); doTranslate(cxt.get(), pQuery, metaCache.get());
doCalculateConstant(&cxt, pQuery); doCalculateConstant(cxt.get(), pQuery);
} }
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
// success and terminate // success and terminate
@ -423,13 +429,13 @@ class ParserTestBaseImpl {
void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) { void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_API); reset(expect, checkStage, TEST_INTERFACE_ASYNC_API);
try { try {
SParseContext cxt = {0}; unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, &cxt); setParseContext(sql, cxt.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq); MockCatalogService::destoryCatalogReq);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseSqlSyntax(&cxt, query.get(), catalogReq.get()); doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
string err; string err;
@ -438,7 +444,7 @@ class ParserTestBaseImpl {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData); unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get()); doGetAllMeta(catalogReq.get(), metaData.get());
doAnalyseSqlSemantic(&cxt, catalogReq.get(), metaData.get(), pQuery); doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery);
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
// success and terminate // success and terminate
} catch (const runtime_error& e) { } catch (const runtime_error& e) {

View File

@ -1007,6 +1007,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pScan);
code = nodesListMakeStrictAppend(&pSubplan->pChildren, code = nodesListMakeStrictAppend(&pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
} }

View File

@ -33,7 +33,10 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
SLogicSubplan* pLogicSubplan = NULL; SLogicSubplan* pLogicSubplan = NULL;
SQueryLogicPlan* pLogicPlan = NULL; SQueryLogicPlan* pLogicPlan = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicSubplan); int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
code = createLogicPlan(pCxt, &pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = optimizeLogicPlan(pCxt, pLogicSubplan); code = optimizeLogicPlan(pCxt, pLogicSubplan);
} }
@ -49,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan); dumpQueryPlan(*pPlan);
} }
nodesReleaseAllocator(pCxt->allocatorId);
nodesDestroyNode((SNode*)pLogicSubplan); nodesDestroyNode((SNode*)pLogicSubplan);
nodesDestroyNode((SNode*)pLogicPlan); nodesDestroyNode((SNode*)pLogicPlan);

View File

@ -22,6 +22,7 @@
#include "mockCatalog.h" #include "mockCatalog.h"
#include "parser.h" #include "parser.h"
#include "planTestUtil.h" #include "planTestUtil.h"
#include "tglobal.h"
class PlannerEnv : public testing::Environment { class PlannerEnv : public testing::Environment {
public: public:
@ -30,6 +31,8 @@ class PlannerEnv : public testing::Environment {
initMetaDataEnv(); initMetaDataEnv();
generateMetaData(); generateMetaData();
initLog(TD_TMP_DIR_PATH "td"); initLog(TD_TMP_DIR_PATH "td");
initCfg();
nodesInitAllocatorSet();
} }
virtual void TearDown() { virtual void TearDown() {
@ -37,6 +40,7 @@ class PlannerEnv : public testing::Environment {
qCleanupKeywordsTable(); qCleanupKeywordsTable();
fmFuncMgtDestroy(); fmFuncMgtDestroy();
taosCloseLog(); taosCloseLog();
nodesDestroyAllocatorSet();
} }
PlannerEnv() {} PlannerEnv() {}
@ -67,6 +71,8 @@ class PlannerEnv : public testing::Environment {
std::cout << "failed to init log file" << std::endl; std::cout << "failed to init log file" << std::endl;
} }
} }
void initCfg() { tsQueryPlannerTrace = true; }
}; };
static void parseArg(int argc, char* argv[]) { static void parseArg(int argc, char* argv[]) {
@ -79,6 +85,7 @@ static void parseArg(int argc, char* argv[]) {
{"limitSql", required_argument, NULL, 'i'}, {"limitSql", required_argument, NULL, 'i'},
{"log", required_argument, NULL, 'l'}, {"log", required_argument, NULL, 'l'},
{"queryPolicy", required_argument, NULL, 'q'}, {"queryPolicy", required_argument, NULL, 'q'},
{"useNodeAllocator", required_argument, NULL, 'a'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
// clang-format on // clang-format on
@ -99,6 +106,9 @@ static void parseArg(int argc, char* argv[]) {
case 'q': case 'q':
setQueryPolicy(optarg); setQueryPolicy(optarg);
break; break;
case 'a':
setUseNodeAllocator(optarg);
break;
default: default:
break; break;
} }

View File

@ -41,6 +41,7 @@ using namespace testing;
enum DumpModule { enum DumpModule {
DUMP_MODULE_NOTHING = 1, DUMP_MODULE_NOTHING = 1,
DUMP_MODULE_SQL,
DUMP_MODULE_PARSER, DUMP_MODULE_PARSER,
DUMP_MODULE_LOGIC, DUMP_MODULE_LOGIC,
DUMP_MODULE_OPTIMIZED, DUMP_MODULE_OPTIMIZED,
@ -56,10 +57,13 @@ int32_t g_skipSql = 0;
int32_t g_limitSql = 0; int32_t g_limitSql = 0;
int32_t g_logLevel = 131; int32_t g_logLevel = 131;
int32_t g_queryPolicy = QUERY_POLICY_VNODE; int32_t g_queryPolicy = QUERY_POLICY_VNODE;
bool g_useNodeAllocator = false;
void setDumpModule(const char* pModule) { void setDumpModule(const char* pModule) {
if (NULL == pModule) { if (NULL == pModule) {
g_dumpModule = DUMP_MODULE_ALL; g_dumpModule = DUMP_MODULE_ALL;
} else if (0 == strncasecmp(pModule, "sql", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_SQL;
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) { } else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_PARSER; g_dumpModule = DUMP_MODULE_PARSER;
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) { } else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
@ -79,10 +83,11 @@ void setDumpModule(const char* pModule) {
} }
} }
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); } void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); }
void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); } void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); }
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); } void setLogLevel(const char* pArg) { g_logLevel = stoi(pArg); }
void setQueryPolicy(const char* pQueryPolicy) { g_queryPolicy = stoi(pQueryPolicy); } void setQueryPolicy(const char* pArg) { g_queryPolicy = stoi(pArg); }
void setUseNodeAllocator(const char* pArg) { g_useNodeAllocator = stoi(pArg); }
int32_t getLogLevel() { return g_logLevel; } int32_t getLogLevel() { return g_logLevel; }
@ -124,6 +129,12 @@ class PlannerTestBaseImpl {
} }
void runImpl(const string& sql, int32_t queryPolicy) { void runImpl(const string& sql, int32_t queryPolicy) {
int64_t allocatorId = 0;
if (g_useNodeAllocator) {
nodesCreateAllocator(sqlNo_, 32 * 1024, &allocatorId);
nodesAcquireAllocator(allocatorId);
}
reset(); reset();
tsQueryPolicy = queryPolicy; tsQueryPolicy = queryPolicy;
try { try {
@ -155,8 +166,13 @@ class PlannerTestBaseImpl {
dump(g_dumpModule); dump(g_dumpModule);
} catch (...) { } catch (...) {
dump(DUMP_MODULE_ALL); dump(DUMP_MODULE_ALL);
nodesReleaseAllocator(allocatorId);
nodesDestroyAllocator(allocatorId);
throw; throw;
} }
nodesReleaseAllocator(allocatorId);
nodesDestroyAllocator(allocatorId);
} }
void prepare(const string& sql) { void prepare(const string& sql) {
@ -216,6 +232,8 @@ class PlannerTestBaseImpl {
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan); doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode); unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode);
checkPlanMsg((SNode*)pPlan);
dump(g_dumpModule); dump(g_dumpModule);
} catch (...) { } catch (...) {
dump(DUMP_MODULE_ALL); dump(DUMP_MODULE_ALL);
@ -252,7 +270,6 @@ class PlannerTestBaseImpl {
string splitLogicPlan_; string splitLogicPlan_;
string scaledLogicPlan_; string scaledLogicPlan_;
string physiPlan_; string physiPlan_;
string physiPlanMsg_;
vector<string> physiSubplans_; vector<string> physiSubplans_;
}; };
@ -276,17 +293,16 @@ class PlannerTestBaseImpl {
res_.splitLogicPlan_.clear(); res_.splitLogicPlan_.clear();
res_.scaledLogicPlan_.clear(); res_.scaledLogicPlan_.clear();
res_.physiPlan_.clear(); res_.physiPlan_.clear();
res_.physiPlanMsg_.clear();
res_.physiSubplans_.clear(); res_.physiSubplans_.clear();
} }
void dump(DumpModule module) { void dump(DumpModule module) {
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_NOTHING == module) { if (DUMP_MODULE_NOTHING == module) {
return; return;
} }
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
if (res_.prepareAst_.empty()) { if (res_.prepareAst_.empty()) {
cout << "+++++++++++++++++++++syntax tree : " << endl; cout << "+++++++++++++++++++++syntax tree : " << endl;
@ -411,8 +427,6 @@ class PlannerTestBaseImpl {
SNode* pSubplan; SNode* pSubplan;
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); }
} }
res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan));
cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl;
} }
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
@ -451,27 +465,16 @@ class PlannerTestBaseImpl {
string toString(const SNode* pRoot) { string toString(const SNode* pRoot) {
char* pStr = NULL; char* pStr = NULL;
int32_t len = 0; int32_t len = 0;
auto start = chrono::steady_clock::now();
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pRoot)) {
cout << "nodesNodeToString: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
}
string str(pStr); string str(pStr);
taosMemoryFreeClear(pStr); taosMemoryFreeClear(pStr);
return str; return str;
} }
string toMsg(const SNode* pRoot) { void checkPlanMsg(const SNode* pRoot) {
char* pStr = NULL; char* pStr = NULL;
int32_t len = 0; int32_t len = 0;
auto start = chrono::steady_clock::now();
DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len)
cout << "nodesNodeToMsg: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
string copyStr(pStr, len); string copyStr(pStr, len);
SNode* pNode = NULL; SNode* pNode = NULL;
@ -491,9 +494,7 @@ class PlannerTestBaseImpl {
nodesDestroyNode(pNode); nodesDestroyNode(pNode);
taosMemoryFreeClear(pNewStr); taosMemoryFreeClear(pNewStr);
string str(pStr, len);
taosMemoryFreeClear(pStr); taosMemoryFreeClear(pStr);
return str;
} }
caseEnv caseEnv_; caseEnv caseEnv_;

View File

@ -41,11 +41,12 @@ class PlannerTestBase : public testing::Test {
std::unique_ptr<PlannerTestBaseImpl> impl_; std::unique_ptr<PlannerTestBaseImpl> impl_;
}; };
extern void setDumpModule(const char* pModule); extern void setDumpModule(const char* pArg);
extern void setSkipSqlNum(const char* pNum); extern void setSkipSqlNum(const char* pArg);
extern void setLimitSqlNum(const char* pNum); extern void setLimitSqlNum(const char* pArg);
extern void setLogLevel(const char* pLogLevel); extern void setLogLevel(const char* pArg);
extern void setQueryPolicy(const char* pQueryPolicy); extern void setQueryPolicy(const char* pArg);
extern void setUseNodeAllocator(const char* pArg);
extern int32_t getLogLevel(); extern int32_t getLogLevel();
#endif // PLAN_TEST_UTIL_H #endif // PLAN_TEST_UTIL_H

View File

@ -847,7 +847,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
memcpy(res->datum.p, output.columnData->pData, len); memcpy(res->datum.p, output.columnData->pData, len);
} else if (IS_VAR_DATA_TYPE(type)) { } else if (IS_VAR_DATA_TYPE(type)) {
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1); res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1);
res->node.resType.bytes = varDataTLen(output.columnData->pData); res->node.resType.bytes = varDataTLen(output.columnData->pData);
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else { } else {

View File

@ -255,6 +255,7 @@ typedef struct SSchJob {
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad> SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel> SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId;
SArray *dataSrcTasks; // SArray<SQueryTask*> SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;

View File

@ -673,6 +673,7 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes); destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag); qDestroyQueryPlan(pJob->pDag);
nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->fetchRes);
@ -724,6 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->sql = strdup(pReq->sql); pJob->sql = strdup(pReq->sql);
} }
pJob->pDag = pReq->pDag; pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam; pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp; pJob->userRes.execFp = pReq->execFp;