Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-08-19 10:12:48 +08:00
commit c7a09b7826
23 changed files with 352 additions and 264 deletions

View File

@ -1,38 +1,8 @@
IF (EXISTS /var/lib/taos/dnode/dnodeCfg.json)
INSTALL(CODE "MESSAGE(\"The default data directory /var/lib/taos contains old data of tdengine 2.x, please clear it before installing!\")")
ELSEIF (EXISTS C:/TDengine/data/dnode/dnodeCfg.json)
INSTALL(CODE "MESSAGE(\"The default data directory C:/TDengine/data contains old data of tdengine 2.x, please clear it before installing!\")")
ELSEIF (TD_LINUX)
IF (TD_LINUX)
SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
ELSEIF (TD_WINDOWS)
SET(CMAKE_INSTALL_PREFIX C:/TDengine)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/go DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/nodejs DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .)
INSTALL(CODE "IF (NOT EXISTS ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg)
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg)
ENDIF ()")
INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos_static.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosd.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/udfd.exe DESTINATION .)
IF (BUILD_TOOLS)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosBenchmark.exe DESTINATION .)
ENDIF ()
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.38-dist.jar DESTINATION connector/jdbc)
ENDIF ()
SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.bat")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} :needAdmin ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Windows ${TD_VER_NUMBER})")

View File

@ -135,24 +135,6 @@ execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
# clear submodule
execute_process(COMMAND git submodule deinit -f tools/taos-tools
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached tools/taos-tools
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git submodule deinit -f tools/taosadapter
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached tools/taosadapter
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git submodule deinit -f tools/taosws-rs
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached tools/taosws-rs
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git submodule deinit -f examples/rust
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached examples/rust
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
# ================================================================================================
# Build

View File

@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个
TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。
当 TDengine 集群中的节点部署在不同的物理机上并设置多个副本数时就实现了系统的高可靠性无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX)

View File

@ -1,7 +1,47 @@
@echo off
goto %1
:needAdmin
if exist C:\\TDengine\\data\\dnode\\dnodeCfg.json (
echo The default data directory C:/TDengine/data contains old data of tdengine 2.x, please clear it before installing!
)
set source_dir=%2
set source_dir=%source_dir:/=\\%
set binary_dir=%3
set binary_dir=%binary_dir:/=\\%
set osType=%4
set verNumber=%5
set tagert_dir=C:\\TDengine
if not exist %tagert_dir% (
mkdir %tagert_dir%
)
if not exist %tagert_dir%\\cfg (
mkdir %tagert_dir%\\cfg
)
if not exist %tagert_dir%\\include (
mkdir %tagert_dir%\\include
)
if not exist %tagert_dir%\\driver (
mkdir %tagert_dir%\\driver
)
if not exist C:\\TDengine\\cfg\\taos.cfg (
copy %source_dir%\\packaging\\cfg\\taos.cfg %tagert_dir%\\cfg\\taos.cfg > nul
)
copy %source_dir%\\include\\client\\taos.h %tagert_dir%\\include > nul
copy %source_dir%\\include\\util\\taoserror.h %tagert_dir%\\include > nul
copy %source_dir%\\include\\libs\\function\\taosudf.h %tagert_dir%\\include > nul
copy %binary_dir%\\build\\lib\\taos.lib %tagert_dir%\\driver > nul
copy %binary_dir%\\build\\lib\\taos_static.lib %tagert_dir%\\driver > nul
copy %binary_dir%\\build\\lib\\taos.dll %tagert_dir%\\driver > nul
copy %binary_dir%\\build\\bin\\taos.exe %tagert_dir% > nul
copy %binary_dir%\\build\\bin\\taosd.exe %tagert_dir% > nul
copy %binary_dir%\\build\\bin\\udfd.exe %tagert_dir% > nul
if exist %binary_dir%\\build\\bin\\taosBenchmark.exe (
copy %binary_dir%\\build\\bin\\taosBenchmark.exe %tagert_dir% > nul
)
mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)&& echo To start/stop TDengine with administrator privileges: sc start/stop taosd &goto :eof
:hasAdmin
cp -f C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32
copy /y C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 > nul
sc query "taosd" >nul || sc create "taosd" binPath= "C:\\TDengine\\taosd.exe --win_service" start= DEMAND

View File

@ -664,7 +664,9 @@ function install_TDengine() {
## ==============================Main program starts from here============================
echo source directory: $1
echo binary directory: $2
if [ "$osType" != "Darwin" ]; then
if [ -x ${data_dir}/dnode/dnodeCfg.json ]; then
echo -e "\033[44;31;5mThe default data directory ${data_dir} contains old data of tdengine 2.x, please clear it before installing!\033[0m"
elif [ "$osType" != "Darwin" ]; then
if [ -x ${bin_dir}/${clientName} ]; then
update_TDengine
else

View File

@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false;
// telem
bool tsEnableTelem = false;
bool tsEnableTelem = true;
int32_t tsTelemInterval = 86400;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80;
@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
memset(tsDataDir, 0, PATH_MAX);
@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
uError("failed to create dataDir:%s", tsDataDir);
return -1;
}
return 0;
return 0;
}
#else
int32_t taosSetTfsCfg(SConfig *pCfg);

View File

@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
ASSERT(smaObj.uid != 0);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name);
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
smaObj.stbUid = pStb->uid;
@ -530,7 +530,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.sourceDbUid = pDb->uid;
streamObj.targetDbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
streamObj.sql = strdup(pCreate->sql);
streamObj.smaId = smaObj.uid;
streamObj.watermark = pCreate->watermark;
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
@ -585,6 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
return -1;
}
if (pAst != NULL) nodesDestroyNode(pAst);
nodesDestroyNode((SNode *)pPlan);
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
@ -609,6 +610,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
code = 0;
_OVER:
tFreeStreamObj(&streamObj);
mndDestroySmaObj(&smaObj);
mndTransDrop(pTrans);
return code;

View File

@ -509,6 +509,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
pVgroup->replica = 1;
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
taosArrayDestroy(pArray);
mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
return 0;
@ -1862,4 +1863,4 @@ _OVER:
#endif
}
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }

View File

@ -260,7 +260,7 @@ void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
// tsdbCache
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
@ -298,6 +298,7 @@ struct STsdb {
SMemTable *imem;
STsdbFS fs;
SLRUCache *lruCache;
TdThreadMutex lruMutex;
};
struct TSDBKEY {

View File

@ -341,7 +341,7 @@ FAIL:
return -1;
}
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
if (pReader->tbIdHash) {

View File

@ -33,16 +33,21 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
taosLRUCacheSetStrictCapacity(pCache, true);
taosThreadMutexInit(&pTsdb->lruMutex, NULL);
_err:
pTsdb->lruCache = pCache;
return code;
}
void tsdbCloseCache(SLRUCache *pCache) {
void tsdbCloseCache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->lruCache;
if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->lruMutex);
}
}
@ -1100,29 +1105,40 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
// getTableCacheKeyS(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
} else {
STSRow *pRow = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
if (!dup && pRow) {
taosMemoryFree(pRow);
}
*handle = NULL;
return 0;
}
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STSRow *pRow = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
if (!dup && pRow) {
taosMemoryFree(pRow);
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL;
return 0;
}
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
} else {
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
}
*handle = h;

View File

@ -86,7 +86,7 @@ int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) {
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbFSClose(*pTsdb);
tsdbCloseCache((*pTsdb)->lruCache);
tsdbCloseCache(*pTsdb);
taosMemoryFreeClear(*pTsdb);
}
return 0;

View File

@ -3217,8 +3217,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
}
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
SSDataBlock* pResBlock = pInfo->pFinalRes;
@ -3242,8 +3242,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp
pInfo->existNewGroupBlock = NULL;
}
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
@ -3259,8 +3259,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
SFillOperatorInfo* pInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pResBlock = pInfo->pFinalRes;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pResBlock = pInfo->pFinalRes;
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
@ -3270,13 +3270,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
ASSERT(pCol->notFillCol);
SExprInfo* pExpr = pCol->pExpr;
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
int32_t dstSlotId = pExpr->base.resSchema.slotId;
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
int32_t dstSlotId = pExpr->base.resSchema.slotId;
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
@ -3664,7 +3664,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}
void freeSourceDataInfo(void *p) {
void freeSourceDataInfo(void* p) {
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
taosMemoryFreeClear(pInfo->pRsp);
}
@ -3694,8 +3694,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
pInfo->pFillInfo =
taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
pInfo->primaryTsCol, order, id);
pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
@ -3721,10 +3721,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
SInterval* pInterval =
SInterval* pInterval =
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
int32_t type = convertFillType(pPhyFillNode->mode);
@ -3741,9 +3741,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
int32_t code =
initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues,
pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type, order);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -139,7 +139,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
}
assert(w.ekey > pBlockInfo->window.ekey);
if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
return true;
}
}
@ -147,7 +147,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
assert(w.skey <= pBlockInfo->window.ekey);
if (w.skey > pBlockInfo->window.skey) {
if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
return true;
}
@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
}
assert(w.skey < pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
return true;
}
}
@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
taosArrayDestroy(tableIdList);
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
} else {
taosArrayDestroy(pColIds);
}
// create the pseduo columns info
@ -2038,10 +2040,34 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaReaderClear(&smr);
if (numOfRows >= pOperator->resultInfo.capacity) {
break;
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataCleanup(p);
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
break;
}
}
}
if (numOfRows > 0) {
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataCleanup(p);
numOfRows = 0;
}
blockDataDestroy(p);
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
@ -2049,14 +2075,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataDestroy(p);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
@ -2213,10 +2231,34 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
colDataAppend(pColInfoData, numOfRows, n, false);
if (++numOfRows >= pOperator->resultInfo.capacity) {
break;
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataCleanup(p);
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
break;
}
}
}
if (numOfRows > 0) {
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataCleanup(p);
numOfRows = 0;
}
blockDataDestroy(p);
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
@ -2224,14 +2266,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataDestroy(p);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}

View File

@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pRecycledPages);
blockDataDestroy(pInfo->pUpdateRes);
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
if (pInfo->pChildren) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
taosMemoryFree(pChildOp->pDownstream);
cleanupExprSupp(&pChildOp->exprSupp);
taosMemoryFreeClear(pChildOp);
}
taosArrayDestroy(pInfo->pChildren);
}
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));

View File

@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0);
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) {
pInfo->max =
(pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
pInfo->max = (pInfo->max < ptsList[start]) ? ptsList[start] : pInfo->max;
} else {
pInfo->max = pCtx->start.key + 1;
}
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
if (pCtx->end.key == INT64_MIN) {
pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ?
ptsList[start + pInput->numOfRows - 1] : pInfo->min;
} else {
pInfo->min = ptsList[start];
pInfo->min = pCtx->end.key;
}
} else {
if (pCtx->start.key == INT64_MIN) {
@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
pInfo->min = pCtx->start.key;
}
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
if (pCtx->end.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ?
ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else {
pInfo->max = ptsList[start + pInput->numOfRows - 1];
pInfo->max = pCtx->end.key + 1;
}
}
}

View File

@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
void tFreeSStreamTask(SStreamTask* pTask) {
qDebug("free stream task %d", pTask->taskId);
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
taosArrayDestroy(pTask->childEpInfo);
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);

View File

@ -98,6 +98,17 @@ typedef void* queue[2];
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
@ -151,6 +162,7 @@ typedef struct {
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
char user[TSDB_UNI_LEN];
uint32_t magicNum;
STraceId traceId;
uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later
@ -203,6 +215,7 @@ typedef struct SConnBuffer {
int cap;
int left;
int total;
int invalid;
} SConnBuffer;
typedef void (*AsyncCB)(uv_async_t* handle);

View File

@ -14,15 +14,22 @@
*/
#define _DEFAULT_SOURCE
#ifdef USE_UV
#include <uv.h>
#endif
// clang-format off
#include <uv.h>
#include "zlib.h"
#include "thttp.h"
#include "taoserror.h"
#include "tlog.h"
typedef struct SHttpClient {
uv_connect_t conn;
uv_tcp_t tcp;
uv_write_t req;
uv_buf_t* buf;
char* addr;
uint16_t port;
} SHttpClient;
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) {
if (flag == HTTP_FLAT) {
@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH
}
}
int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
int32_t code = -1;
int32_t destLen = srcLen;
void* pDest = taosMemoryMalloc(destLen);
@ -114,84 +121,53 @@ _OVER:
return code;
}
#ifdef USE_UV
static void clientConnCb(uv_connect_t* req, int32_t status) {
if (status < 0) {
static void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->buf);
taosMemoryFree(cli->addr);
taosMemoryFree(cli);
}
static void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data;
destroyHttpClient(cli);
}
static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("connection error %s", uv_strerror(status));
uv_close((uv_handle_t*)req->handle, NULL);
uError("http-report failed to send data %s", uv_strerror(status));
} else {
uInfo("http-report succ to send data");
}
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
return;
}
uv_buf_t* wb = req->data;
assert(wb != NULL);
uv_write_t write_req;
uv_write(&write_req, req->handle, wb, 2, NULL);
uv_close((uv_handle_t*)req->handle, NULL);
uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb);
}
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr());
return -1;
}
char ipv4Buf[128] = {0};
tinet_ntoa(ipv4Buf, ipv4);
struct sockaddr_in dest = {0};
uv_ip4_addr(ipv4Buf, port, &dest);
uv_tcp_t socket_tcp = {0};
uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &socket_tcp);
uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t));
if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) {
contLen = dstLen;
} else {
flag = HTTP_FLAT;
}
}
char header[1024] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
uv_buf_t wb[2];
wb[0] = uv_buf_init((char*)header, headLen);
wb[1] = uv_buf_init((char*)pCont, contLen);
connect->data = wb;
terrno = 0;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
taosMemoryFree(connect);
return terrno;
}
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1;
TdSocketPtr pSocket = NULL;
static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr());
goto SEND_OVER;
uError("http-report failed to get http server:%s ip since %s", server, terrstr());
return -1;
}
pSocket = taosOpenTcpClientSocket(ip, port, 0);
if (pSocket == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create http socket to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
char buf[128] = {0};
tinet_ntoa(buf, ip);
uv_ip4_addr(buf, port, dest);
return 0;
}
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
struct sockaddr_in dest = {0};
if (taosBuildDstAddr(server, port, &dest) < 0) {
return -1;
}
if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) {
@ -200,37 +176,38 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
flag = HTTP_FLAT;
}
}
terrno = 0;
char header[1024] = {0};
char header[2048] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
if (taosWriteMsg(pSocket, header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http header to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
wb[0] = uv_buf_init((char*)header, headLen); // stack var
wb[1] = uv_buf_init((char*)pCont, contLen); // heap var
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
cli->conn.data = cli;
cli->tcp.data = cli;
cli->req.data = cli;
cli->buf = wb;
cli->addr = tstrdup(server);
cli->port = port;
uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &cli->tcp);
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5);
uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb);
if (ret != 0) {
uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
destroyHttpClient(cli);
}
if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http content to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
// read something to avoid nginx error 499
if (taosWriteMsg(pSocket, header, 10) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
code = 0;
SEND_OVER:
if (pSocket != NULL) {
taosCloseSocket(&pSocket);
}
return code;
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
return terrno;
}
// clang-format on
#endif

View File

@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) {
}
STransMsgHead* pHead = NULL;
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return;
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL;
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
if (CONN_NO_PERSIST_BY_APP(conn)) {
pMsg = transQueuePop(&conn->cliMsgs);
@ -598,7 +601,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
pBuf->len += nread;
while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
cliHandleResp(conn);
if (pBuf->invalid) {
cliHandleExcept(conn);
break;
} else {
cliHandleResp(conn);
}
}
return;
}
@ -759,6 +767,7 @@ void cliSend(SCliConn* pConn) {
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);

View File

@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) {
buf->left = -1;
buf->len = 0;
buf->total = 0;
buf->invalid = 0;
return 0;
}
int transDestroyBuffer(SConnBuffer* p) {
@ -108,19 +109,25 @@ int transClearBuffer(SConnBuffer* buf) {
p->left = -1;
p->len = 0;
p->total = 0;
p->invalid = 0;
return 0;
}
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf;
if (p->left != 0) {
return -1;
}
int total = connBuf->total;
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
transResetBuffer(connBuf);
if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
transResetBuffer(connBuf);
} else {
total = -1;
}
return total;
}
@ -173,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
memcpy((char*)&head, connBuf->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen);
p->total = msgLen;
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
}
if (p->total >= p->len) {
p->left = p->total - p->len;
@ -180,7 +188,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
p->left = 0;
}
}
return p->left == 0 ? true : false;
return (p->left == 0 || p->invalid) ? true : false;
}
int transSetConnOption(uv_tcp_t* stream) {

View File

@ -183,17 +183,25 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug("%p timeout since no activity", conn);
}
static void uvHandleReq(SSvrConn* pConn) {
STransMsgHead* msg = NULL;
int msgLen = 0;
static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst;
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
STransMsgHead* msg = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
STransMsgHead* pHead = (STransMsgHead*)msg;
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
memcpy(pConn->user, pHead->user, strlen(pHead->user));
if (uvRecvReleaseReq(pConn, pHead)) {
return true;
}
// TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
@ -201,10 +209,6 @@ static void uvHandleReq(SSvrConn* pConn) {
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
if (uvRecvReleaseReq(pConn, pHead)) {
return;
}
STransMsg transMsg;
memset(&transMsg, 0, sizeof(transMsg));
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
@ -220,7 +224,6 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug("conn %p acquired by server app", pConn);
}
}
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pHead->traceId;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
@ -258,21 +261,33 @@ static void uvHandleReq(SSvrConn* pConn) {
transReleaseExHandle(transGetRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
return true;
}
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SSvrConn* conn = cli->data;
SSvrConn* conn = cli->data;
STrans* pTransInst = conn->pTransInst;
SConnBuffer* pBuf = &conn->readBuf;
STrans* pTransInst = conn->pTransInst;
if (nread > 0) {
pBuf->len += nread;
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
uvHandleReq(conn);
if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (pBuf->invalid) {
tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn);
destroyConn(conn, true);
return;
} else {
if (false == uvHandleReq(conn)) break;
}
}
return;
} else {
destroyConn(conn, true);
return;
}
return;
}
if (nread == 0) {
return;
@ -364,6 +379,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
@ -859,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) {
}
static void uvDestroyConn(uv_handle_t* handle) {
SSvrConn* conn = handle->data;
if (conn == NULL) {
return;
}
@ -874,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
destroySmsg(msg);
}
transReqQueueClear(&conn->wreqQueue);
transQueueDestroy(&conn->srvMsgs);
transReqQueueClear(&conn->wreqQueue);
QUEUE_REMOVE(&conn->queue);
taosMemoryFree(conn->pTcp);

View File

@ -1,7 +1,7 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== create database
@ -137,4 +137,17 @@ if $data13 != 789 then
return -1
endi
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
$null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 0 then
return -1
endi
if $system_content == $null then
return -1
endi