Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

This commit is contained in:
Hongze Cheng 2022-09-22 09:43:20 +08:00
commit 46c7c0c8ce
28 changed files with 472 additions and 169 deletions

View File

@ -1,11 +1,11 @@
#include <stdio.h>
#include <math.h> #include <math.h>
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "lua.h" #include "taos.h"
#include "lauxlib.h" #include "lauxlib.h"
#include "lua.h"
#include "lualib.h" #include "lualib.h"
#include <taos.h>
struct cb_param{ struct cb_param{
lua_State* state; lua_State* state;
@ -60,7 +60,6 @@ static int l_connect(lua_State *L){
lua_settop(L,0); lua_settop(L,0);
taos_init();
lua_newtable(L); lua_newtable(L);
int table_index = lua_gettop(L); int table_index = lua_gettop(L);
@ -137,19 +136,15 @@ static int l_query(lua_State *L){
lua_pushstring(L,fields[i].name); lua_pushstring(L,fields[i].name);
int32_t* length = taos_fetch_lengths(result); int32_t* length = taos_fetch_lengths(result);
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i])); lua_pushinteger(L,*((char *)row[i]));
break; break;
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i])); lua_pushinteger(L,*((short *)row[i]));
break; break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i])); lua_pushinteger(L,*((int *)row[i]));
break; break;
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i])); lua_pushinteger(L,*((int64_t *)row[i]));
break; break;
@ -159,7 +154,6 @@ static int l_query(lua_State *L){
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i])); lua_pushnumber(L,*((double *)row[i]));
break; break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
//printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]); //printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]);
@ -241,67 +235,6 @@ static int l_async_query(lua_State *L){
return 1; return 1;
} }
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result);
int numFields = taos_num_fields(result);
// printf("\nnumfields:%d\n", numFields);
//printf("\n\r-----------------------------------------------------------------------------------\n");
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
for (int i = 0; i < numFields; ++i) {
if (row[i] == NULL) {
continue;
}
lua_pushstring(L,fields[i].name);
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
lua_pushnumber(L,*((float *)row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i]));
break;
default:
lua_pushnil(L);
break;
}
lua_settable(L, -3);
}
lua_call(L, 1, 0);
// printf("-----------------------------------------------------------------------------------\n\r");
}
static int l_close(lua_State *L){ static int l_close(lua_State *L){
TAOS *taos= (TAOS*)lua_topointer(L,1); TAOS *taos= (TAOS*)lua_topointer(L,1);

View File

@ -173,16 +173,16 @@ function async_query_callback(res)
end end
end end
driver.query_a(conn,"insert into therm1 values ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)") res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)")
print("From now on we start continous insertion in an definite (infinite if you want) loop.") print("From now on we start continous insert in an definite loop, pls wait for about 10 seconds and check stream table for result.")
local loop_index = 0 local loop_index = 0
while loop_index < 30 do while loop_index < 10 do
local t = os.time()*1000 local t = os.time()*1000
local v = loop_index local v = math.random(20)
res = driver.query(conn,string.format("insert into therm1 values (%d, %d)",t,v)) res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v))
if res.code ~=0 then if res.code ~=0 then
print("continous insertion--- failed:" .. res.error) print("continous insertion--- failed:" .. res.error)
@ -190,17 +190,8 @@ while loop_index < 30 do
else else
--print("insert successfully, affected:"..res.affected) --print("insert successfully, affected:"..res.affected)
end end
local res1 = driver.query(conn, string.format("select last(*) from avg_degree"))
if res1.code ~=0 then
print("select failed: "..res1.error)
return
else
-- print(dump(res1))
if(#res1.item > 0) then print("avg_degree: " .. res1.item[1]["last(avg(degree))"]) end
end
os.execute("sleep " .. 1) os.execute("sleep " .. 1)
loop_index = loop_index + 1 loop_index = loop_index + 1
end end
driver.query(conn,"DROP STREAM IF EXISTS avg_therm_s")
driver.close(conn) driver.close(conn)

View File

@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;
extern bool tsQueryPlannerTrace; extern bool tsQueryPlannerTrace;
extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;

View File

@ -275,6 +275,17 @@ typedef struct SNodeList {
SListCell* pTail; SListCell* pTail;
} SNodeList; } SNodeList;
typedef struct SNodeAllocator SNodeAllocator;
int32_t nodesInitAllocatorSet();
void nodesDestroyAllocatorSet();
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId);
int32_t nodesAcquireAllocator(int64_t allocatorId);
int32_t nodesReleaseAllocator(int64_t allocatorId);
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId);
int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId);
void nodesDestroyAllocator(int64_t allocatorId);
SNode* nodesMakeNode(ENodeType type); SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode); void nodesDestroyNode(SNode* pNode);

View File

@ -56,6 +56,7 @@ typedef struct SParseContext {
bool nodeOffline; bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId;
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);

View File

@ -39,6 +39,7 @@ typedef struct SPlanContext {
int32_t msgLen; int32_t msgLen;
const char* pUser; const char* pUser;
bool sysInfo; bool sysInfo;
int64_t allocatorId;
} SPlanContext; } SPlanContext;
// Create the physical plan for the query, according to the AST. // Create the physical plan for the query, according to the AST.

View File

@ -67,6 +67,7 @@ typedef struct SSchedulerReq {
SRequestConnInfo *pConn; SRequestConnInfo *pConn;
SArray *pNodeList; SArray *pNodeList;
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId;
const char *sql; const char *sql;
int64_t startTs; int64_t startTs;
schedulerExecFp execFp; schedulerExecFp execFp;

View File

@ -215,6 +215,7 @@ elif [[ ${packgeName} =~ "tar" ]];then
exit -1 exit -1
else else
echoColor G "The number and names of files are the same as previous installation packages" echoColor G "The number and names of files are the same as previous installation packages"
rm -rf ${installPath}/diffFile.log
fi fi
echoColor YD "===== install Package of tar =====" echoColor YD "===== install Package of tar ====="
cd ${installPath}/${tdPath} cd ${installPath}/${tdPath}
@ -251,6 +252,9 @@ if [[ ${packgeName} =~ "server" ]] ;then
systemctl restart taosd systemctl restart taosd
fi fi
rm -rf ${installPath}/${packgeName}
rm -rf ${installPath}/${tdPath}/
# if ([[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]]) || [[ ${packgeName} =~ "client" ]] ;then # if ([[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]]) || [[ ${packgeName} =~ "client" ]] ;then
# echoColor G "===== install taos-tools when package is lite or client =====" # echoColor G "===== install taos-tools when package is lite or client ====="
# cd ${installPath} # cd ${installPath}

View File

@ -250,6 +250,7 @@ typedef struct SRequestObj {
bool inRetry; bool inRetry;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId;
} SRequestObj; } SRequestObj;
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {

View File

@ -288,6 +288,7 @@ void *createRequest(uint64_t connId, int32_t type) {
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type; pRequest->type = type;
pRequest->allocatorRefId = -1;
pRequest->pDb = getDbOfConnection(pTscObj); pRequest->pDb = getDbOfConnection(pTscObj);
pRequest->pTscObj = pTscObj; pRequest->pTscObj = pTscObj;
@ -349,6 +350,7 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList); taosArrayDestroy(pRequest->targetTableList);
nodesDestroyAllocator(pRequest->allocatorRefId);
destroyQueryExecRes(&pRequest->body.resInfo.execRes); destroyQueryExecRes(&pRequest->body.resInfo.execRes);
@ -411,6 +413,7 @@ void taos_init_imp(void) {
initTaskQueue(); initTaskQueue();
fmFuncMgtInit(); fmFuncMgtInit();
nodesInitAllocatorSet();
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);

View File

@ -195,6 +195,19 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pRequest)->allocatorRefId = -1;
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
if (TSDB_CODE_SUCCESS !=
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1023,7 +1036,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user, .pUser = pRequest->pTscObj->user,
.sysInfo = pRequest->pTscObj->sysInfo}; .sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SAppInstInfo* pAppInfo = getAppInfo(pRequest); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SQueryPlan* pDag = NULL; SQueryPlan* pDag = NULL;
@ -1048,6 +1062,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pConn = &conn, .pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pDag, .pDag = pDag,
.allocatorRefId = pRequest->allocatorRefId,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.execFp = schedulerExecCb, .execFp = schedulerExecCb,

View File

@ -65,6 +65,7 @@ void taos_cleanup(void) {
fmFuncMgtDestroy(); fmFuncMgtDestroy();
qCleanupKeywordsTable(); qCleanupKeywordsTable();
nodesDestroyAllocatorSet();
id = clientConnRefPool; id = clientConnRefPool;
clientConnRefPool = -1; clientConnRefPool = -1;
@ -775,7 +776,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.async = true, .async = true,
.svrVer = pTscObj->sVer, .svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.allocatorId = pRequest->allocatorRefId};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -92,6 +92,8 @@ bool tsSmlDataFormat =
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -285,6 +287,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
@ -645,6 +649,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
return 0; return 0;
} }
@ -979,6 +985,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
} else if (strcasecmp("queryPlannerTrace", name) == 0) { } else if (strcasecmp("queryPlannerTrace", name) == 0) {
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
} else if (strcasecmp("queryNodeChunkSize", name) == 0) {
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
} else if (strcasecmp("queryUseNodeAllocator", name) == 0) {
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
} }
break; break;
} }

View File

@ -21,9 +21,209 @@
#include "taoserror.h" #include "taoserror.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tref.h"
static SNode* makeNode(ENodeType type, size_t size) { typedef struct SNodeMemChunk {
SNode* p = taosMemoryCalloc(1, size); int32_t availableSize;
int32_t usedSize;
char* pBuf;
struct SNodeMemChunk* pNext;
} SNodeMemChunk;
typedef struct SNodeAllocator {
int64_t self;
int64_t queryId;
int32_t chunkSize;
int32_t chunkNum;
SNodeMemChunk* pCurrChunk;
SNodeMemChunk* pChunks;
TdThreadMutex mutex;
} SNodeAllocator;
static threadlocal SNodeAllocator* g_pNodeAllocator;
static int32_t g_allocatorReqRefPool = -1;
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
if (NULL == pNewChunk) {
return NULL;
}
pNewChunk->pBuf = (char*)(pNewChunk + 1);
pNewChunk->availableSize = pAllocator->chunkSize;
pNewChunk->usedSize = 0;
pNewChunk->pNext = NULL;
if (NULL != pAllocator->pCurrChunk) {
pAllocator->pCurrChunk->pNext = pNewChunk;
}
pAllocator->pCurrChunk = pNewChunk;
if (NULL == pAllocator->pChunks) {
pAllocator->pChunks = pNewChunk;
}
++(pAllocator->chunkNum);
return pNewChunk;
}
static void* nodesCallocImpl(int32_t size) {
if (NULL == g_pNodeAllocator) {
return taosMemoryCalloc(1, size);
}
if (g_pNodeAllocator->pCurrChunk->usedSize + size > g_pNodeAllocator->pCurrChunk->availableSize) {
if (NULL == callocNodeChunk(g_pNodeAllocator)) {
return NULL;
}
}
void* p = g_pNodeAllocator->pCurrChunk->pBuf + g_pNodeAllocator->pCurrChunk->usedSize;
g_pNodeAllocator->pCurrChunk->usedSize += size;
return p;
}
static void* nodesCalloc(int32_t num, int32_t size) {
void* p = nodesCallocImpl(num * size + 1);
if (NULL == p) {
return NULL;
}
*(char*)p = (NULL != g_pNodeAllocator) ? 1 : 0;
return (char*)p + 1;
}
static void nodesFree(void* p) {
char* ptr = (char*)p - 1;
if (0 == *ptr) {
taosMemoryFree(ptr);
}
return;
}
static int32_t createNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) {
*pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator));
if (NULL == *pAllocator) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pAllocator)->chunkSize = chunkSize;
if (NULL == callocNodeChunk(*pAllocator)) {
taosMemoryFreeClear(*pAllocator);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosThreadMutexInit(&(*pAllocator)->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
static void destroyNodeAllocator(void* p) {
if (NULL == p) {
return;
}
SNodeAllocator* pAllocator = p;
nodesDebug("query id %" PRIx64 " allocator id %" PRIx64 " alloc chunkNum: %d, chunkTotakSize: %d",
pAllocator->queryId, pAllocator->self, pAllocator->chunkNum, pAllocator->chunkNum * pAllocator->chunkSize);
SNodeMemChunk* pChunk = pAllocator->pChunks;
while (NULL != pChunk) {
SNodeMemChunk* pTemp = pChunk->pNext;
taosMemoryFree(pChunk);
pChunk = pTemp;
}
taosThreadMutexDestroy(&pAllocator->mutex);
taosMemoryFree(pAllocator);
}
int32_t nodesInitAllocatorSet() {
if (g_allocatorReqRefPool >= 0) {
nodesWarn("nodes already initialized");
return TSDB_CODE_SUCCESS;
}
g_allocatorReqRefPool = taosOpenRef(1024, destroyNodeAllocator);
if (g_allocatorReqRefPool < 0) {
nodesError("init nodes failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
void nodesDestroyAllocatorSet() {
if (g_allocatorReqRefPool >= 0) {
SNodeAllocator* pAllocator = taosIterateRef(g_allocatorReqRefPool, 0);
int64_t refId = 0;
while (NULL != pAllocator) {
refId = pAllocator->self;
taosRemoveRef(g_allocatorReqRefPool, refId);
pAllocator = taosIterateRef(g_allocatorReqRefPool, refId);
}
taosCloseRef(g_allocatorReqRefPool);
}
}
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId) {
SNodeAllocator* pAllocator = NULL;
int32_t code = createNodeAllocator(chunkSize, &pAllocator);
if (TSDB_CODE_SUCCESS == code) {
pAllocator->self = taosAddRef(g_allocatorReqRefPool, pAllocator);
if (pAllocator->self <= 0) {
return terrno;
}
pAllocator->queryId = queryId;
*pAllocatorId = pAllocator->self;
}
return code;
}
int32_t nodesAcquireAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
if (NULL == pAllocator) {
return terrno;
}
taosThreadMutexLock(&pAllocator->mutex);
g_pNodeAllocator = pAllocator;
return TSDB_CODE_SUCCESS;
}
int32_t nodesReleaseAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
if (NULL == g_pNodeAllocator) {
nodesError("allocator id %" PRIx64
" release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator "
"function is called!",
allocatorId);
return TSDB_CODE_FAILED;
}
SNodeAllocator* pAllocator = g_pNodeAllocator;
g_pNodeAllocator = NULL;
taosThreadMutexUnlock(&pAllocator->mutex);
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
}
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
if (allocatorId <= 0) {
return 0;
}
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
return pAllocator->self;
}
int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId) { return taosReleaseRef(g_allocatorReqRefPool, allocatorId); }
void nodesDestroyAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return;
}
taosRemoveRef(g_allocatorReqRefPool, allocatorId);
}
static SNode* makeNode(ENodeType type, int32_t size) {
SNode* p = nodesCalloc(1, size);
if (NULL == p) { if (NULL == p) {
return NULL; return NULL;
} }
@ -824,6 +1024,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pWStartTs); nodesDestroyNode(pLogicNode->pWStartTs);
nodesDestroyNode(pLogicNode->pValues); nodesDestroyNode(pLogicNode->pValues);
nodesDestroyList(pLogicNode->pFillExprs); nodesDestroyList(pLogicNode->pFillExprs);
nodesDestroyList(pLogicNode->pNotFillExprs);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_SORT: { case QUERY_NODE_LOGIC_PLAN_SORT: {
@ -1021,12 +1222,12 @@ void nodesDestroyNode(SNode* pNode) {
default: default:
break; break;
} }
taosMemoryFreeClear(pNode); nodesFree(pNode);
return; return;
} }
SNodeList* nodesMakeList() { SNodeList* nodesMakeList() {
SNodeList* p = taosMemoryCalloc(1, sizeof(SNodeList)); SNodeList* p = nodesCalloc(1, sizeof(SNodeList));
if (NULL == p) { if (NULL == p) {
return NULL; return NULL;
} }
@ -1037,7 +1238,7 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); SListCell* p = nodesCalloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1104,7 +1305,7 @@ int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
} }
pTarget->pTail = pSrc->pTail; pTarget->pTail = pSrc->pTail;
pTarget->length += pSrc->length; pTarget->length += pSrc->length;
taosMemoryFreeClear(pSrc); nodesFree(pSrc);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1124,7 +1325,7 @@ int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) {
if (NULL == pList || NULL == pNode) { if (NULL == pList || NULL == pNode) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); SListCell* p = nodesCalloc(1, sizeof(SListCell));
if (NULL == p) { if (NULL == p) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1152,7 +1353,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
} }
SListCell* pNext = pCell->pNext; SListCell* pNext = pCell->pNext;
nodesDestroyNode(pCell->pNode); nodesDestroyNode(pCell->pNode);
taosMemoryFreeClear(pCell); nodesFree(pCell);
--(pList->length); --(pList->length);
return pNext; return pNext;
} }
@ -1172,7 +1373,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) {
pPos->pPrev = pSrc->pTail; pPos->pPrev = pSrc->pTail;
pTarget->length += pSrc->length; pTarget->length += pSrc->length;
taosMemoryFreeClear(pSrc); nodesFree(pSrc);
} }
SNode* nodesListGetNode(SNodeList* pList, int32_t index) { SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
@ -1204,7 +1405,7 @@ void nodesDestroyList(SNodeList* pList) {
while (NULL != pNext) { while (NULL != pNext) {
pNext = nodesListErase(pList, pNext); pNext = nodesListErase(pList, pNext);
} }
taosMemoryFreeClear(pList); nodesFree(pList);
} }
void nodesClearList(SNodeList* pList) { void nodesClearList(SNodeList* pList) {
@ -1216,9 +1417,9 @@ void nodesClearList(SNodeList* pList) {
while (NULL != pNext) { while (NULL != pNext) {
SListCell* tmp = pNext; SListCell* tmp = pNext;
pNext = pNext->pNext; pNext = pNext->pNext;
taosMemoryFreeClear(tmp); nodesFree(tmp);
} }
taosMemoryFreeClear(pList); nodesFree(pList);
} }
void* nodesGetValueFromNode(SValueNode* pNode) { void* nodesGetValueFromNode(SValueNode* pNode) {

View File

@ -247,7 +247,8 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
pExpr->userAlias[len] = '\0'; pExpr->userAlias[len] = '\0';
} }
} }
taosMemoryFreeClear(pNode); pRawExpr->pNode = NULL;
nodesDestroyNode(pNode);
return pRealizedExpr; return pRealizedExpr;
} }

View File

@ -177,15 +177,18 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) { if (TSDB_CODE_SUCCESS == code) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache); if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
} else { code = parseInsertSyntax(pCxt, pQuery, &metaCache);
code = parseSqlSyntax(pCxt, pQuery, &metaCache); } else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq); code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
} }
nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, true); destoryParseMetaCache(&metaCache, true);
terrno = code; terrno = code;
return code; return code;
@ -194,7 +197,10 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) { const struct SMetaData* pMetaData, SQuery* pQuery) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot); int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) { if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache); code = parseInsertSql(pCxt, &pQuery, &metaCache);
@ -202,6 +208,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
code = analyseSemantic(pCxt, pQuery, &metaCache); code = analyseSemantic(pCxt, pQuery, &metaCache);
} }
} }
nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, false); destoryParseMetaCache(&metaCache, false);
terrno = code; terrno = code;
return code; return code;

View File

@ -119,12 +119,18 @@ class ParserTestBaseImpl {
TEST_INTERFACE_ASYNC_API TEST_INTERFACE_ASYNC_API
}; };
static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { static void destoryParseContext(SParseContext* pCxt) {
taosArrayDestroy(pCxt->pTableMetaPos);
taosArrayDestroy(pCxt->pTableVgroupPos);
delete pCxt;
}
static void destoryParseMetaCacheWarpper(SParseMetaCache* pMetaCache, bool request) {
destoryParseMetaCache(pMetaCache, request); destoryParseMetaCache(pMetaCache, request);
delete pMetaCache; delete pMetaCache;
} }
static void _destroyQuery(SQuery** pQuery) { static void destroyQuery(SQuery** pQuery) {
if (nullptr == pQuery) { if (nullptr == pQuery) {
return; return;
} }
@ -303,10 +309,10 @@ class ParserTestBaseImpl {
setParseContext(sql, &cxt); setParseContext(sql, &cxt);
if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) { if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseInsertSql(&cxt, query.get(), nullptr); doParseInsertSql(&cxt, query.get(), nullptr);
} else { } else {
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParse(&cxt, query.get()); doParse(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
@ -335,7 +341,7 @@ class ParserTestBaseImpl {
SParseContext cxt = {0}; SParseContext cxt = {0};
setParseContext(sql, &cxt); setParseContext(sql, &cxt);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseSql(&cxt, query.get()); doParseSql(&cxt, query.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
@ -354,26 +360,26 @@ class ParserTestBaseImpl {
void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) { void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL); reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL);
try { try {
SParseContext cxt = {0}; unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, &cxt, true); setParseContext(sql, cxt.get(), true);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
bool request = true; bool request = true;
unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache( unique_ptr<SParseMetaCache, function<void(SParseMetaCache*)> > metaCache(
new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request))); new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request)));
bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen); bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen);
if (isInsertValues) { if (isInsertValues) {
doParseInsertSyntax(&cxt, query.get(), metaCache.get()); doParseInsertSyntax(cxt.get(), query.get(), metaCache.get());
} else { } else {
doParse(&cxt, query.get()); doParse(cxt.get(), query.get());
doCollectMetaKey(&cxt, *(query.get()), metaCache.get()); doCollectMetaKey(cxt.get(), *(query.get()), metaCache.get());
} }
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq); MockCatalogService::destoryCatalogReq);
doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get()); doBuildCatalogReq(cxt.get(), metaCache.get(), catalogReq.get());
string err; string err;
thread t1([&]() { thread t1([&]() {
@ -386,13 +392,13 @@ class ParserTestBaseImpl {
doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues); doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues);
if (isInsertValues) { if (isInsertValues) {
doParseInsertSql(&cxt, query.get(), metaCache.get()); doParseInsertSql(cxt.get(), query.get(), metaCache.get());
} else { } else {
doAuthenticate(&cxt, pQuery, metaCache.get()); doAuthenticate(cxt.get(), pQuery, metaCache.get());
doTranslate(&cxt, pQuery, metaCache.get()); doTranslate(cxt.get(), pQuery, metaCache.get());
doCalculateConstant(&cxt, pQuery); doCalculateConstant(cxt.get(), pQuery);
} }
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
// success and terminate // success and terminate
@ -423,13 +429,13 @@ class ParserTestBaseImpl {
void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) { void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage, TEST_INTERFACE_ASYNC_API); reset(expect, checkStage, TEST_INTERFACE_ASYNC_API);
try { try {
SParseContext cxt = {0}; unique_ptr<SParseContext, function<void(SParseContext*)> > cxt(new SParseContext(), destoryParseContext);
setParseContext(sql, &cxt); setParseContext(sql, cxt.get());
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(), unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq); MockCatalogService::destoryCatalogReq);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery);
doParseSqlSyntax(&cxt, query.get(), catalogReq.get()); doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get());
SQuery* pQuery = *(query.get()); SQuery* pQuery = *(query.get());
string err; string err;
@ -438,7 +444,7 @@ class ParserTestBaseImpl {
unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData); unique_ptr<SMetaData, void (*)(SMetaData*)> metaData(new SMetaData(), MockCatalogService::destoryMetaData);
doGetAllMeta(catalogReq.get(), metaData.get()); doGetAllMeta(catalogReq.get(), metaData.get());
doAnalyseSqlSemantic(&cxt, catalogReq.get(), metaData.get(), pQuery); doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery);
} catch (const TerminateFlag& e) { } catch (const TerminateFlag& e) {
// success and terminate // success and terminate
} catch (const runtime_error& e) { } catch (const runtime_error& e) {

View File

@ -1007,6 +1007,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pScan);
code = nodesListMakeStrictAppend(&pSubplan->pChildren, code = nodesListMakeStrictAppend(&pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT)); (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
} }

View File

@ -33,7 +33,10 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
SLogicSubplan* pLogicSubplan = NULL; SLogicSubplan* pLogicSubplan = NULL;
SQueryLogicPlan* pLogicPlan = NULL; SQueryLogicPlan* pLogicPlan = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicSubplan); int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
code = createLogicPlan(pCxt, &pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = optimizeLogicPlan(pCxt, pLogicSubplan); code = optimizeLogicPlan(pCxt, pLogicSubplan);
} }
@ -49,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan); dumpQueryPlan(*pPlan);
} }
nodesReleaseAllocator(pCxt->allocatorId);
nodesDestroyNode((SNode*)pLogicSubplan); nodesDestroyNode((SNode*)pLogicSubplan);
nodesDestroyNode((SNode*)pLogicPlan); nodesDestroyNode((SNode*)pLogicPlan);

View File

@ -22,6 +22,7 @@
#include "mockCatalog.h" #include "mockCatalog.h"
#include "parser.h" #include "parser.h"
#include "planTestUtil.h" #include "planTestUtil.h"
#include "tglobal.h"
class PlannerEnv : public testing::Environment { class PlannerEnv : public testing::Environment {
public: public:
@ -30,6 +31,8 @@ class PlannerEnv : public testing::Environment {
initMetaDataEnv(); initMetaDataEnv();
generateMetaData(); generateMetaData();
initLog(TD_TMP_DIR_PATH "td"); initLog(TD_TMP_DIR_PATH "td");
initCfg();
nodesInitAllocatorSet();
} }
virtual void TearDown() { virtual void TearDown() {
@ -37,6 +40,7 @@ class PlannerEnv : public testing::Environment {
qCleanupKeywordsTable(); qCleanupKeywordsTable();
fmFuncMgtDestroy(); fmFuncMgtDestroy();
taosCloseLog(); taosCloseLog();
nodesDestroyAllocatorSet();
} }
PlannerEnv() {} PlannerEnv() {}
@ -67,6 +71,8 @@ class PlannerEnv : public testing::Environment {
std::cout << "failed to init log file" << std::endl; std::cout << "failed to init log file" << std::endl;
} }
} }
void initCfg() { tsQueryPlannerTrace = true; }
}; };
static void parseArg(int argc, char* argv[]) { static void parseArg(int argc, char* argv[]) {
@ -79,6 +85,7 @@ static void parseArg(int argc, char* argv[]) {
{"limitSql", required_argument, NULL, 'i'}, {"limitSql", required_argument, NULL, 'i'},
{"log", required_argument, NULL, 'l'}, {"log", required_argument, NULL, 'l'},
{"queryPolicy", required_argument, NULL, 'q'}, {"queryPolicy", required_argument, NULL, 'q'},
{"useNodeAllocator", required_argument, NULL, 'a'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
// clang-format on // clang-format on
@ -99,6 +106,9 @@ static void parseArg(int argc, char* argv[]) {
case 'q': case 'q':
setQueryPolicy(optarg); setQueryPolicy(optarg);
break; break;
case 'a':
setUseNodeAllocator(optarg);
break;
default: default:
break; break;
} }

View File

@ -41,6 +41,7 @@ using namespace testing;
enum DumpModule { enum DumpModule {
DUMP_MODULE_NOTHING = 1, DUMP_MODULE_NOTHING = 1,
DUMP_MODULE_SQL,
DUMP_MODULE_PARSER, DUMP_MODULE_PARSER,
DUMP_MODULE_LOGIC, DUMP_MODULE_LOGIC,
DUMP_MODULE_OPTIMIZED, DUMP_MODULE_OPTIMIZED,
@ -56,10 +57,13 @@ int32_t g_skipSql = 0;
int32_t g_limitSql = 0; int32_t g_limitSql = 0;
int32_t g_logLevel = 131; int32_t g_logLevel = 131;
int32_t g_queryPolicy = QUERY_POLICY_VNODE; int32_t g_queryPolicy = QUERY_POLICY_VNODE;
bool g_useNodeAllocator = false;
void setDumpModule(const char* pModule) { void setDumpModule(const char* pModule) {
if (NULL == pModule) { if (NULL == pModule) {
g_dumpModule = DUMP_MODULE_ALL; g_dumpModule = DUMP_MODULE_ALL;
} else if (0 == strncasecmp(pModule, "sql", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_SQL;
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) { } else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_PARSER; g_dumpModule = DUMP_MODULE_PARSER;
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) { } else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
@ -79,10 +83,11 @@ void setDumpModule(const char* pModule) {
} }
} }
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); } void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); }
void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); } void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); }
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); } void setLogLevel(const char* pArg) { g_logLevel = stoi(pArg); }
void setQueryPolicy(const char* pQueryPolicy) { g_queryPolicy = stoi(pQueryPolicy); } void setQueryPolicy(const char* pArg) { g_queryPolicy = stoi(pArg); }
void setUseNodeAllocator(const char* pArg) { g_useNodeAllocator = stoi(pArg); }
int32_t getLogLevel() { return g_logLevel; } int32_t getLogLevel() { return g_logLevel; }
@ -124,6 +129,12 @@ class PlannerTestBaseImpl {
} }
void runImpl(const string& sql, int32_t queryPolicy) { void runImpl(const string& sql, int32_t queryPolicy) {
int64_t allocatorId = 0;
if (g_useNodeAllocator) {
nodesCreateAllocator(sqlNo_, 32 * 1024, &allocatorId);
nodesAcquireAllocator(allocatorId);
}
reset(); reset();
tsQueryPolicy = queryPolicy; tsQueryPolicy = queryPolicy;
try { try {
@ -155,8 +166,13 @@ class PlannerTestBaseImpl {
dump(g_dumpModule); dump(g_dumpModule);
} catch (...) { } catch (...) {
dump(DUMP_MODULE_ALL); dump(DUMP_MODULE_ALL);
nodesReleaseAllocator(allocatorId);
nodesDestroyAllocator(allocatorId);
throw; throw;
} }
nodesReleaseAllocator(allocatorId);
nodesDestroyAllocator(allocatorId);
} }
void prepare(const string& sql) { void prepare(const string& sql) {
@ -216,6 +232,8 @@ class PlannerTestBaseImpl {
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan); doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode); unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode);
checkPlanMsg((SNode*)pPlan);
dump(g_dumpModule); dump(g_dumpModule);
} catch (...) { } catch (...) {
dump(DUMP_MODULE_ALL); dump(DUMP_MODULE_ALL);
@ -252,7 +270,6 @@ class PlannerTestBaseImpl {
string splitLogicPlan_; string splitLogicPlan_;
string scaledLogicPlan_; string scaledLogicPlan_;
string physiPlan_; string physiPlan_;
string physiPlanMsg_;
vector<string> physiSubplans_; vector<string> physiSubplans_;
}; };
@ -276,17 +293,16 @@ class PlannerTestBaseImpl {
res_.splitLogicPlan_.clear(); res_.splitLogicPlan_.clear();
res_.scaledLogicPlan_.clear(); res_.scaledLogicPlan_.clear();
res_.physiPlan_.clear(); res_.physiPlan_.clear();
res_.physiPlanMsg_.clear();
res_.physiSubplans_.clear(); res_.physiSubplans_.clear();
} }
void dump(DumpModule module) { void dump(DumpModule module) {
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_NOTHING == module) { if (DUMP_MODULE_NOTHING == module) {
return; return;
} }
cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
if (res_.prepareAst_.empty()) { if (res_.prepareAst_.empty()) {
cout << "+++++++++++++++++++++syntax tree : " << endl; cout << "+++++++++++++++++++++syntax tree : " << endl;
@ -411,8 +427,6 @@ class PlannerTestBaseImpl {
SNode* pSubplan; SNode* pSubplan;
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); }
} }
res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan));
cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl;
} }
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
@ -451,27 +465,16 @@ class PlannerTestBaseImpl {
string toString(const SNode* pRoot) { string toString(const SNode* pRoot) {
char* pStr = NULL; char* pStr = NULL;
int32_t len = 0; int32_t len = 0;
auto start = chrono::steady_clock::now();
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pRoot)) {
cout << "nodesNodeToString: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
}
string str(pStr); string str(pStr);
taosMemoryFreeClear(pStr); taosMemoryFreeClear(pStr);
return str; return str;
} }
string toMsg(const SNode* pRoot) { void checkPlanMsg(const SNode* pRoot) {
char* pStr = NULL; char* pStr = NULL;
int32_t len = 0; int32_t len = 0;
auto start = chrono::steady_clock::now();
DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len)
cout << "nodesNodeToMsg: "
<< chrono::duration_cast<chrono::microseconds>(chrono::steady_clock::now() - start).count() << "us" << endl;
string copyStr(pStr, len); string copyStr(pStr, len);
SNode* pNode = NULL; SNode* pNode = NULL;
@ -491,9 +494,7 @@ class PlannerTestBaseImpl {
nodesDestroyNode(pNode); nodesDestroyNode(pNode);
taosMemoryFreeClear(pNewStr); taosMemoryFreeClear(pNewStr);
string str(pStr, len);
taosMemoryFreeClear(pStr); taosMemoryFreeClear(pStr);
return str;
} }
caseEnv caseEnv_; caseEnv caseEnv_;

View File

@ -41,11 +41,12 @@ class PlannerTestBase : public testing::Test {
std::unique_ptr<PlannerTestBaseImpl> impl_; std::unique_ptr<PlannerTestBaseImpl> impl_;
}; };
extern void setDumpModule(const char* pModule); extern void setDumpModule(const char* pArg);
extern void setSkipSqlNum(const char* pNum); extern void setSkipSqlNum(const char* pArg);
extern void setLimitSqlNum(const char* pNum); extern void setLimitSqlNum(const char* pArg);
extern void setLogLevel(const char* pLogLevel); extern void setLogLevel(const char* pArg);
extern void setQueryPolicy(const char* pQueryPolicy); extern void setQueryPolicy(const char* pArg);
extern void setUseNodeAllocator(const char* pArg);
extern int32_t getLogLevel(); extern int32_t getLogLevel();
#endif // PLAN_TEST_UTIL_H #endif // PLAN_TEST_UTIL_H

View File

@ -847,7 +847,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
memcpy(res->datum.p, output.columnData->pData, len); memcpy(res->datum.p, output.columnData->pData, len);
} else if (IS_VAR_DATA_TYPE(type)) { } else if (IS_VAR_DATA_TYPE(type)) {
//res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1); res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1);
res->node.resType.bytes = varDataTLen(output.columnData->pData); res->node.resType.bytes = varDataTLen(output.columnData->pData);
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else { } else {

View File

@ -254,7 +254,8 @@ typedef struct SSchJob {
SRequestConnInfo conn; SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad> SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel> SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId;
SArray *dataSrcTasks; // SArray<SQueryTask*> SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;

View File

@ -673,6 +673,7 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes); destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag); qDestroyQueryPlan(pJob->pDag);
nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->fetchRes);
@ -724,6 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->sql = strdup(pReq->sql); pJob->sql = strdup(pReq->sql);
} }
pJob->pDag = pReq->pDag; pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam; pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp; pJob->userRes.execFp = pReq->execFp;

View File

@ -116,7 +116,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
} }
#endif #endif
} }
// TODO truncate file
if (found == NULL) { if (found == NULL) {
// file corrupted, no complete log // file corrupted, no complete log
@ -125,8 +124,20 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
// truncate file
SWalCkHead* lastEntry = (SWalCkHead*)found; SWalCkHead* lastEntry = (SWalCkHead*)found;
int64_t retVer = lastEntry->head.version; int64_t retVer = lastEntry->head.version;
int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf);
int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset,
fileSize);
taosFtruncateFile(pFile, lastEntryEndOffset);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset;
pWal->totSize -= (fileSize - lastEntryEndOffset);
}
taosCloseFile(&pFile); taosCloseFile(&pFile);
taosMemoryFree(buf); taosMemoryFree(buf);
@ -226,16 +237,92 @@ int walCheckAndRepairMeta(SWal* pWal) {
} }
} }
// TODO: set fileSize and lastVer if necessary
return 0; return 0;
} }
int walCheckAndRepairIdx(SWal* pWal) { int walCheckAndRepairIdx(SWal* pWal) {
// TODO: iterate all log files int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
// if idx not found, scan log and write idx for (int32_t i = 0; i < sz; i++) {
// if found, check complete by first and last entry of each idx file SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
// if idx incomplete, binary search last valid entry, and then build other part
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fsize;
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
if (pIdxFile == NULL) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr());
return -1;
}
taosFStatFile(pIdxFile, &fsize, NULL);
if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
taosCloseFile(&pIdxFile);
continue;
}
int32_t left = fsize % sizeof(SWalIdxEntry);
int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END);
if (left != 0) {
taosFtruncateFile(pIdxFile, offset);
wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr,
offset, fsize);
}
offset -= sizeof(SWalIdxEntry);
SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer};
while (1) {
if (offset < 0) {
taosLSeekFile(pIdxFile, 0, SEEK_SET);
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
break;
}
taosLSeekFile(pIdxFile, offset, SEEK_SET);
int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) {
taosFtruncateFile(pIdxFile, offset);
wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld",
pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset);
offset -= sizeof(SWalIdxEntry);
} else {
break;
}
}
if (idxEntry.ver < pFileInfo->lastVer) {
char fLogNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
return -1;
}
while (idxEntry.ver < pFileInfo->lastVer) {
taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET);
SWalCkHead ckHead;
taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead));
if (idxEntry.ver != ckHead.head.version) {
// todo truncate this idx also
taosCloseFile(&pLogFile);
wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId,
idxEntry.offset, idxEntry.ver, ckHead.head.version);
return -1;
}
idxEntry.ver = ckHead.head.version + 1;
idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen;
wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
}
taosCloseFile(&pLogFile);
}
taosCloseFile(&pIdxFile);
}
return 0; return 0;
} }

View File

@ -149,15 +149,21 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walLoadMeta(pWal); walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) { if (walCheckAndRepairMeta(pWal) < 0) {
wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash); taosHashCleanup(pWal->pRefHash);
taosRemoveRef(tsWal.refSetId, pWal->refId); taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex); taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
taosMemoryFree(pWal);
return NULL; return NULL;
} }
if (walCheckAndRepairIdx(pWal) < 0) { if (walCheckAndRepairIdx(pWal) < 0) {
wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash);
taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
return NULL;
} }
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,