enh: added memory allocators for parser and planner
This commit is contained in:
parent
300f0c2c0f
commit
8a010a58f3
|
@ -275,6 +275,12 @@ typedef struct SNodeList {
|
||||||
SListCell* pTail;
|
SListCell* pTail;
|
||||||
} SNodeList;
|
} SNodeList;
|
||||||
|
|
||||||
|
typedef struct SNodeAllocator SNodeAllocator;
|
||||||
|
|
||||||
|
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator);
|
||||||
|
void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator);
|
||||||
|
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator);
|
||||||
|
|
||||||
SNode* nodesMakeNode(ENodeType type);
|
SNode* nodesMakeNode(ENodeType type);
|
||||||
void nodesDestroyNode(SNode* pNode);
|
void nodesDestroyNode(SNode* pNode);
|
||||||
|
|
||||||
|
|
|
@ -22,8 +22,98 @@
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
|
||||||
static SNode* makeNode(ENodeType type, size_t size) {
|
typedef struct SNodeMemChunk {
|
||||||
SNode* p = taosMemoryCalloc(1, size);
|
int32_t availableSize;
|
||||||
|
int32_t usedSize;
|
||||||
|
char* pBuf;
|
||||||
|
struct SNodeMemChunk* pNext;
|
||||||
|
} SNodeMemChunk;
|
||||||
|
|
||||||
|
typedef struct SNodeAllocator {
|
||||||
|
int32_t chunkSize;
|
||||||
|
int32_t chunkNum;
|
||||||
|
SNodeMemChunk* pCurrChunk;
|
||||||
|
SNodeMemChunk* pChunks;
|
||||||
|
} SNodeAllocator;
|
||||||
|
|
||||||
|
static threadlocal SNodeAllocator* pNodeAllocator;
|
||||||
|
|
||||||
|
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
|
||||||
|
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
|
||||||
|
if (NULL == pNewChunk) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pNewChunk->pBuf = (char*)(pNewChunk + 1);
|
||||||
|
pNewChunk->availableSize = pAllocator->chunkSize;
|
||||||
|
pNewChunk->usedSize = 0;
|
||||||
|
pNewChunk->pNext = NULL;
|
||||||
|
if (NULL != pAllocator->pCurrChunk) {
|
||||||
|
pAllocator->pCurrChunk->pNext = pNewChunk;
|
||||||
|
}
|
||||||
|
pAllocator->pCurrChunk = pNewChunk;
|
||||||
|
if (NULL == pAllocator->pChunks) {
|
||||||
|
pAllocator->pChunks = pNewChunk;
|
||||||
|
}
|
||||||
|
++(pAllocator->chunkNum);
|
||||||
|
return pNewChunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void* nodesCalloc(int32_t num, int32_t size) {
|
||||||
|
if (NULL == pNodeAllocator) {
|
||||||
|
return taosMemoryCalloc(num, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) {
|
||||||
|
if (NULL == callocNodeChunk(pNodeAllocator)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void* p = pNodeAllocator->pCurrChunk->pBuf + pNodeAllocator->pCurrChunk->usedSize;
|
||||||
|
pNodeAllocator->pCurrChunk->usedSize += size;
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void nodesFree(void* p) {
|
||||||
|
if (NULL == pNodeAllocator) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) {
|
||||||
|
*pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator));
|
||||||
|
if (NULL == *pAllocator) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
(*pAllocator)->chunkSize = chunkSize;
|
||||||
|
if (NULL == callocNodeChunk(*pAllocator)) {
|
||||||
|
taosMemoryFreeClear(*pAllocator);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) {
|
||||||
|
if (NULL == pAllocator) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum,
|
||||||
|
pAllocator->chunkNum * pAllocator->chunkSize);
|
||||||
|
|
||||||
|
SNodeMemChunk* pChunk = pAllocator->pChunks;
|
||||||
|
while (NULL != pChunk) {
|
||||||
|
SNodeMemChunk* pTemp = pChunk->pNext;
|
||||||
|
taosMemoryFree(pChunk);
|
||||||
|
pChunk = pTemp;
|
||||||
|
}
|
||||||
|
taosMemoryFree(pAllocator);
|
||||||
|
}
|
||||||
|
|
||||||
|
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; }
|
||||||
|
|
||||||
|
static SNode* makeNode(ENodeType type, int32_t size) {
|
||||||
|
SNode* p = nodesCalloc(1, size);
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -824,6 +914,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyNode(pLogicNode->pWStartTs);
|
nodesDestroyNode(pLogicNode->pWStartTs);
|
||||||
nodesDestroyNode(pLogicNode->pValues);
|
nodesDestroyNode(pLogicNode->pValues);
|
||||||
nodesDestroyList(pLogicNode->pFillExprs);
|
nodesDestroyList(pLogicNode->pFillExprs);
|
||||||
|
nodesDestroyList(pLogicNode->pNotFillExprs);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||||
|
@ -1021,12 +1112,12 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pNode);
|
nodesFree(pNode);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeList* nodesMakeList() {
|
SNodeList* nodesMakeList() {
|
||||||
SNodeList* p = taosMemoryCalloc(1, sizeof(SNodeList));
|
SNodeList* p = nodesCalloc(1, sizeof(SNodeList));
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1037,7 +1128,7 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
|
||||||
if (NULL == pList || NULL == pNode) {
|
if (NULL == pList || NULL == pNode) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell));
|
SListCell* p = nodesCalloc(1, sizeof(SListCell));
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1104,7 +1195,7 @@ int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
||||||
}
|
}
|
||||||
pTarget->pTail = pSrc->pTail;
|
pTarget->pTail = pSrc->pTail;
|
||||||
pTarget->length += pSrc->length;
|
pTarget->length += pSrc->length;
|
||||||
taosMemoryFreeClear(pSrc);
|
nodesFree(pSrc);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1124,7 +1215,7 @@ int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) {
|
||||||
if (NULL == pList || NULL == pNode) {
|
if (NULL == pList || NULL == pNode) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell));
|
SListCell* p = nodesCalloc(1, sizeof(SListCell));
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1152,7 +1243,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
|
||||||
}
|
}
|
||||||
SListCell* pNext = pCell->pNext;
|
SListCell* pNext = pCell->pNext;
|
||||||
nodesDestroyNode(pCell->pNode);
|
nodesDestroyNode(pCell->pNode);
|
||||||
taosMemoryFreeClear(pCell);
|
nodesFree(pCell);
|
||||||
--(pList->length);
|
--(pList->length);
|
||||||
return pNext;
|
return pNext;
|
||||||
}
|
}
|
||||||
|
@ -1172,7 +1263,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
|
||||||
pPos->pPrev = pSrc->pTail;
|
pPos->pPrev = pSrc->pTail;
|
||||||
|
|
||||||
pTarget->length += pSrc->length;
|
pTarget->length += pSrc->length;
|
||||||
taosMemoryFreeClear(pSrc);
|
nodesFree(pSrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
|
SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
|
||||||
|
@ -1204,7 +1295,7 @@ void nodesDestroyList(SNodeList* pList) {
|
||||||
while (NULL != pNext) {
|
while (NULL != pNext) {
|
||||||
pNext = nodesListErase(pList, pNext);
|
pNext = nodesListErase(pList, pNext);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pList);
|
nodesFree(pList);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nodesClearList(SNodeList* pList) {
|
void nodesClearList(SNodeList* pList) {
|
||||||
|
@ -1216,9 +1307,9 @@ void nodesClearList(SNodeList* pList) {
|
||||||
while (NULL != pNext) {
|
while (NULL != pNext) {
|
||||||
SListCell* tmp = pNext;
|
SListCell* tmp = pNext;
|
||||||
pNext = pNext->pNext;
|
pNext = pNext->pNext;
|
||||||
taosMemoryFreeClear(tmp);
|
nodesFree(tmp);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pList);
|
nodesFree(pList);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* nodesGetValueFromNode(SValueNode* pNode) {
|
void* nodesGetValueFromNode(SValueNode* pNode) {
|
||||||
|
|
|
@ -247,7 +247,8 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
||||||
pExpr->userAlias[len] = '\0';
|
pExpr->userAlias[len] = '\0';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pNode);
|
pRawExpr->pNode = NULL;
|
||||||
|
nodesDestroyNode(pNode);
|
||||||
return pRealizedExpr;
|
return pRealizedExpr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -119,12 +119,18 @@ class ParserTestBaseImpl {
|
||||||
TEST_INTERFACE_ASYNC_API
|
TEST_INTERFACE_ASYNC_API
|
||||||
};
|
};
|
||||||
|
|
||||||
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
|
static void destoryParseContext(SParseContext* pCxt) {
|
||||||
|
taosArrayDestroy(pCxt->pTableMetaPos);
|
||||||
|
taosArrayDestroy(pCxt->pTableVgroupPos);
|
||||||
|
delete pCxt;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destoryParseMetaCacheWarpper(SParseMetaCache* pMetaCache, bool request) {
|
||||||
destoryParseMetaCache(pMetaCache, request);
|
destoryParseMetaCache(pMetaCache, request);
|
||||||
delete pMetaCache;
|
delete pMetaCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _destroyQuery(SQuery** pQuery) {
|
static void destroyQuery(SQuery** pQuery) {
|
||||||
if (nullptr == pQuery) {
|
if (nullptr == pQuery) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -303,10 +309,10 @@ class ParserTestBaseImpl {
|
||||||
setParseContext(sql, &cxt);
|
setParseContext(sql, &cxt);
|
||||||
|
|
||||||
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
|
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
|
||||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
|
||||||
doParseInsertSql(&cxt, query.get(), nullptr);
|
doParseInsertSql(&cxt, query.get(), nullptr);
|
||||||
} else {
|
} else {
|
||||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
|
||||||
doParse(&cxt, query.get());
|
doParse(&cxt, query.get());
|
||||||
SQuery* pQuery = *(query.get());
|
SQuery* pQuery = *(query.get());
|
||||||
|
|
||||||
|
@ -335,7 +341,7 @@ class ParserTestBaseImpl {
|
||||||
SParseContext cxt = {0};
|
SParseContext cxt = {0};
|
||||||
setParseContext(sql, &cxt);
|
setParseContext(sql, &cxt);
|
||||||
|
|
||||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
|
||||||
doParseSql(&cxt, query.get());
|
doParseSql(&cxt, query.get());
|
||||||
SQuery* pQuery = *(query.get());
|
SQuery* pQuery = *(query.get());
|
||||||
|
|
||||||
|
@ -354,26 +360,26 @@ class ParserTestBaseImpl {
|
||||||
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) {
|
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) {
|
||||||
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL);
|
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL);
|
||||||
try {
|
try {
|
||||||
SParseContext cxt = {0};
|
unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
|
||||||
setParseContext(sql, &cxt, true);
|
setParseContext(sql, cxt.get(), true);
|
||||||
|
|
||||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
|
||||||
bool request = true;
|
bool request = true;
|
||||||
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
|
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
|
||||||
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request)));
|
new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request)));
|
||||||
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen);
|
bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen);
|
||||||
if (isInsertValues) {
|
if (isInsertValues) {
|
||||||
doParseInsertSyntax(&cxt, query.get(), metaCache.get());
|
doParseInsertSyntax(cxt.get(), query.get(), metaCache.get());
|
||||||
} else {
|
} else {
|
||||||
doParse(&cxt, query.get());
|
doParse(cxt.get(), query.get());
|
||||||
doCollectMetaKey(&cxt, *(query.get()), metaCache.get());
|
doCollectMetaKey(cxt.get(), *(query.get()), metaCache.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
SQuery* pQuery = *(query.get());
|
SQuery* pQuery = *(query.get());
|
||||||
|
|
||||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||||
MockCatalogService::destoryCatalogReq);
|
MockCatalogService::destoryCatalogReq);
|
||||||
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get());
|
doBuildCatalogReq(cxt.get(), metaCache.get(), catalogReq.get());
|
||||||
|
|
||||||
string err;
|
string err;
|
||||||
thread t1([&]() {
|
thread t1([&]() {
|
||||||
|
@ -386,13 +392,13 @@ class ParserTestBaseImpl {
|
||||||
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
|
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
|
||||||
|
|
||||||
if (isInsertValues) {
|
if (isInsertValues) {
|
||||||
doParseInsertSql(&cxt, query.get(), metaCache.get());
|
doParseInsertSql(cxt.get(), query.get(), metaCache.get());
|
||||||
} else {
|
} else {
|
||||||
doAuthenticate(&cxt, pQuery, metaCache.get());
|
doAuthenticate(cxt.get(), pQuery, metaCache.get());
|
||||||
|
|
||||||
doTranslate(&cxt, pQuery, metaCache.get());
|
doTranslate(cxt.get(), pQuery, metaCache.get());
|
||||||
|
|
||||||
doCalculateConstant(&cxt, pQuery);
|
doCalculateConstant(cxt.get(), pQuery);
|
||||||
}
|
}
|
||||||
} catch (const TerminateFlag& e) {
|
} catch (const TerminateFlag& e) {
|
||||||
// success and terminate
|
// success and terminate
|
||||||
|
@ -423,13 +429,13 @@ class ParserTestBaseImpl {
|
||||||
void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) {
|
void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) {
|
||||||
reset(expect, checkStage, TEST_INTERFACE_ASYNC_API);
|
reset(expect, checkStage, TEST_INTERFACE_ASYNC_API);
|
||||||
try {
|
try {
|
||||||
SParseContext cxt = {0};
|
unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
|
||||||
setParseContext(sql, &cxt);
|
setParseContext(sql, cxt.get());
|
||||||
|
|
||||||
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
|
||||||
MockCatalogService::destoryCatalogReq);
|
MockCatalogService::destoryCatalogReq);
|
||||||
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
|
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
|
||||||
doParseSqlSyntax(&cxt, query.get(), catalogReq.get());
|
doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get());
|
||||||
SQuery* pQuery = *(query.get());
|
SQuery* pQuery = *(query.get());
|
||||||
|
|
||||||
string err;
|
string err;
|
||||||
|
@ -438,7 +444,7 @@ class ParserTestBaseImpl {
|
||||||
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
|
||||||
doGetAllMeta(catalogReq.get(), metaData.get());
|
doGetAllMeta(catalogReq.get(), metaData.get());
|
||||||
|
|
||||||
doAnalyseSqlSemantic(&cxt, catalogReq.get(), metaData.get(), pQuery);
|
doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery);
|
||||||
} catch (const TerminateFlag& e) {
|
} catch (const TerminateFlag& e) {
|
||||||
// success and terminate
|
// success and terminate
|
||||||
} catch (const runtime_error& e) {
|
} catch (const runtime_error& e) {
|
||||||
|
|
|
@ -997,6 +997,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub
|
||||||
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
|
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
nodesDestroyNode((SNode*)pScan);
|
||||||
code = nodesListMakeStrictAppend(&pSubplan->pChildren,
|
code = nodesListMakeStrictAppend(&pSubplan->pChildren,
|
||||||
(SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
|
(SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "mockCatalog.h"
|
#include "mockCatalog.h"
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "planTestUtil.h"
|
#include "planTestUtil.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
|
||||||
class PlannerEnv : public testing::Environment {
|
class PlannerEnv : public testing::Environment {
|
||||||
public:
|
public:
|
||||||
|
@ -30,6 +31,7 @@ class PlannerEnv : public testing::Environment {
|
||||||
initMetaDataEnv();
|
initMetaDataEnv();
|
||||||
generateMetaData();
|
generateMetaData();
|
||||||
initLog(TD_TMP_DIR_PATH "td");
|
initLog(TD_TMP_DIR_PATH "td");
|
||||||
|
initCfg();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void TearDown() {
|
virtual void TearDown() {
|
||||||
|
@ -67,6 +69,8 @@ class PlannerEnv : public testing::Environment {
|
||||||
std::cout << "failed to init log file" << std::endl;
|
std::cout << "failed to init log file" << std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void initCfg() { tsQueryPlannerTrace = true; }
|
||||||
};
|
};
|
||||||
|
|
||||||
static void parseArg(int argc, char* argv[]) {
|
static void parseArg(int argc, char* argv[]) {
|
||||||
|
@ -79,6 +83,7 @@ static void parseArg(int argc, char* argv[]) {
|
||||||
{"limitSql", required_argument, NULL, 'i'},
|
{"limitSql", required_argument, NULL, 'i'},
|
||||||
{"log", required_argument, NULL, 'l'},
|
{"log", required_argument, NULL, 'l'},
|
||||||
{"queryPolicy", required_argument, NULL, 'q'},
|
{"queryPolicy", required_argument, NULL, 'q'},
|
||||||
|
{"useNodeAllocator", required_argument, NULL, 'a'},
|
||||||
{0, 0, 0, 0}
|
{0, 0, 0, 0}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
@ -99,6 +104,9 @@ static void parseArg(int argc, char* argv[]) {
|
||||||
case 'q':
|
case 'q':
|
||||||
setQueryPolicy(optarg);
|
setQueryPolicy(optarg);
|
||||||
break;
|
break;
|
||||||
|
case 'a':
|
||||||
|
setUseNodeAllocator(optarg);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ using namespace testing;
|
||||||
|
|
||||||
enum DumpModule {
|
enum DumpModule {
|
||||||
DUMP_MODULE_NOTHING = 1,
|
DUMP_MODULE_NOTHING = 1,
|
||||||
|
DUMP_MODULE_SQL,
|
||||||
DUMP_MODULE_PARSER,
|
DUMP_MODULE_PARSER,
|
||||||
DUMP_MODULE_LOGIC,
|
DUMP_MODULE_LOGIC,
|
||||||
DUMP_MODULE_OPTIMIZED,
|
DUMP_MODULE_OPTIMIZED,
|
||||||
|
@ -56,10 +57,13 @@ int32_t g_skipSql = 0;
|
||||||
int32_t g_limitSql = 0;
|
int32_t g_limitSql = 0;
|
||||||
int32_t g_logLevel = 131;
|
int32_t g_logLevel = 131;
|
||||||
int32_t g_queryPolicy = QUERY_POLICY_VNODE;
|
int32_t g_queryPolicy = QUERY_POLICY_VNODE;
|
||||||
|
bool g_useNodeAllocator = false;
|
||||||
|
|
||||||
void setDumpModule(const char* pModule) {
|
void setDumpModule(const char* pModule) {
|
||||||
if (NULL == pModule) {
|
if (NULL == pModule) {
|
||||||
g_dumpModule = DUMP_MODULE_ALL;
|
g_dumpModule = DUMP_MODULE_ALL;
|
||||||
|
} else if (0 == strncasecmp(pModule, "sql", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_SQL;
|
||||||
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
|
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
|
||||||
g_dumpModule = DUMP_MODULE_PARSER;
|
g_dumpModule = DUMP_MODULE_PARSER;
|
||||||
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
|
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
|
||||||
|
@ -79,10 +83,11 @@ void setDumpModule(const char* pModule) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); }
|
void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); }
|
||||||
void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); }
|
void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); }
|
||||||
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); }
|
void setLogLevel(const char* pArg) { g_logLevel = stoi(pArg); }
|
||||||
void setQueryPolicy(const char* pQueryPolicy) { g_queryPolicy = stoi(pQueryPolicy); }
|
void setQueryPolicy(const char* pArg) { g_queryPolicy = stoi(pArg); }
|
||||||
|
void setUseNodeAllocator(const char* pArg) { g_useNodeAllocator = stoi(pArg); }
|
||||||
|
|
||||||
int32_t getLogLevel() { return g_logLevel; }
|
int32_t getLogLevel() { return g_logLevel; }
|
||||||
|
|
||||||
|
@ -124,6 +129,12 @@ class PlannerTestBaseImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
void runImpl(const string& sql, int32_t queryPolicy) {
|
void runImpl(const string& sql, int32_t queryPolicy) {
|
||||||
|
SNodeAllocator* pAllocator = NULL;
|
||||||
|
if (g_useNodeAllocator) {
|
||||||
|
nodesCreateNodeAllocator(32 * 1024, &pAllocator);
|
||||||
|
nodesResetThreadLevelAllocator(pAllocator);
|
||||||
|
}
|
||||||
|
|
||||||
reset();
|
reset();
|
||||||
tsQueryPolicy = queryPolicy;
|
tsQueryPolicy = queryPolicy;
|
||||||
try {
|
try {
|
||||||
|
@ -155,8 +166,13 @@ class PlannerTestBaseImpl {
|
||||||
dump(g_dumpModule);
|
dump(g_dumpModule);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
dump(DUMP_MODULE_ALL);
|
dump(DUMP_MODULE_ALL);
|
||||||
|
nodesDestroyNodeAllocator(pAllocator);
|
||||||
|
nodesResetThreadLevelAllocator(NULL);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nodesDestroyNodeAllocator(pAllocator);
|
||||||
|
nodesResetThreadLevelAllocator(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void prepare(const string& sql) {
|
void prepare(const string& sql) {
|
||||||
|
@ -216,6 +232,8 @@ class PlannerTestBaseImpl {
|
||||||
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
|
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
|
||||||
unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode);
|
unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode);
|
||||||
|
|
||||||
|
checkPlanMsg((SNode*)pPlan);
|
||||||
|
|
||||||
dump(g_dumpModule);
|
dump(g_dumpModule);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
dump(DUMP_MODULE_ALL);
|
dump(DUMP_MODULE_ALL);
|
||||||
|
@ -252,7 +270,6 @@ class PlannerTestBaseImpl {
|
||||||
string splitLogicPlan_;
|
string splitLogicPlan_;
|
||||||
string scaledLogicPlan_;
|
string scaledLogicPlan_;
|
||||||
string physiPlan_;
|
string physiPlan_;
|
||||||
string physiPlanMsg_;
|
|
||||||
vector<string> physiSubplans_;
|
vector<string> physiSubplans_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -276,17 +293,16 @@ class PlannerTestBaseImpl {
|
||||||
res_.splitLogicPlan_.clear();
|
res_.splitLogicPlan_.clear();
|
||||||
res_.scaledLogicPlan_.clear();
|
res_.scaledLogicPlan_.clear();
|
||||||
res_.physiPlan_.clear();
|
res_.physiPlan_.clear();
|
||||||
res_.physiPlanMsg_.clear();
|
|
||||||
res_.physiSubplans_.clear();
|
res_.physiSubplans_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
void dump(DumpModule module) {
|
void dump(DumpModule module) {
|
||||||
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
|
|
||||||
|
|
||||||
if (DUMP_MODULE_NOTHING == module) {
|
if (DUMP_MODULE_NOTHING == module) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
|
||||||
if (res_.prepareAst_.empty()) {
|
if (res_.prepareAst_.empty()) {
|
||||||
cout << "+++++++++++++++++++++syntax tree : " << endl;
|
cout << "+++++++++++++++++++++syntax tree : " << endl;
|
||||||
|
@ -411,8 +427,6 @@ class PlannerTestBaseImpl {
|
||||||
SNode* pSubplan;
|
SNode* pSubplan;
|
||||||
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); }
|
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); }
|
||||||
}
|
}
|
||||||
res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan));
|
|
||||||
cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
|
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
|
||||||
|
@ -451,27 +465,16 @@ class PlannerTestBaseImpl {
|
||||||
string toString(const SNode* pRoot) {
|
string toString(const SNode* pRoot) {
|
||||||
char* pStr = NULL;
|
char* pStr = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
|
||||||
auto start = chrono::steady_clock::now();
|
|
||||||
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
|
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pRoot)) {
|
|
||||||
cout << "nodesNodeToString: "
|
|
||||||
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
string str(pStr);
|
string str(pStr);
|
||||||
taosMemoryFreeClear(pStr);
|
taosMemoryFreeClear(pStr);
|
||||||
return str;
|
return str;
|
||||||
}
|
}
|
||||||
|
|
||||||
string toMsg(const SNode* pRoot) {
|
void checkPlanMsg(const SNode* pRoot) {
|
||||||
char* pStr = NULL;
|
char* pStr = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
|
||||||
auto start = chrono::steady_clock::now();
|
|
||||||
DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len)
|
DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len)
|
||||||
cout << "nodesNodeToMsg: "
|
|
||||||
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
|
|
||||||
|
|
||||||
string copyStr(pStr, len);
|
string copyStr(pStr, len);
|
||||||
SNode* pNode = NULL;
|
SNode* pNode = NULL;
|
||||||
|
@ -491,9 +494,7 @@ class PlannerTestBaseImpl {
|
||||||
nodesDestroyNode(pNode);
|
nodesDestroyNode(pNode);
|
||||||
taosMemoryFreeClear(pNewStr);
|
taosMemoryFreeClear(pNewStr);
|
||||||
|
|
||||||
string str(pStr, len);
|
|
||||||
taosMemoryFreeClear(pStr);
|
taosMemoryFreeClear(pStr);
|
||||||
return str;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
caseEnv caseEnv_;
|
caseEnv caseEnv_;
|
||||||
|
|
|
@ -41,11 +41,12 @@ class PlannerTestBase : public testing::Test {
|
||||||
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
||||||
};
|
};
|
||||||
|
|
||||||
extern void setDumpModule(const char* pModule);
|
extern void setDumpModule(const char* pArg);
|
||||||
extern void setSkipSqlNum(const char* pNum);
|
extern void setSkipSqlNum(const char* pArg);
|
||||||
extern void setLimitSqlNum(const char* pNum);
|
extern void setLimitSqlNum(const char* pArg);
|
||||||
extern void setLogLevel(const char* pLogLevel);
|
extern void setLogLevel(const char* pArg);
|
||||||
extern void setQueryPolicy(const char* pQueryPolicy);
|
extern void setQueryPolicy(const char* pArg);
|
||||||
|
extern void setUseNodeAllocator(const char* pArg);
|
||||||
extern int32_t getLogLevel();
|
extern int32_t getLogLevel();
|
||||||
|
|
||||||
#endif // PLAN_TEST_UTIL_H
|
#endif // PLAN_TEST_UTIL_H
|
||||||
|
|
|
@ -847,7 +847,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
||||||
memcpy(res->datum.p, output.columnData->pData, len);
|
memcpy(res->datum.p, output.columnData->pData, len);
|
||||||
} else if (IS_VAR_DATA_TYPE(type)) {
|
} else if (IS_VAR_DATA_TYPE(type)) {
|
||||||
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
||||||
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1);
|
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1);
|
||||||
res->node.resType.bytes = varDataTLen(output.columnData->pData);
|
res->node.resType.bytes = varDataTLen(output.columnData->pData);
|
||||||
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue