Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last
This commit is contained in:
commit
972ad9e70b
|
@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
ASSERT(smaObj.uid != 0);
|
ASSERT(smaObj.uid != 0);
|
||||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name);
|
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
|
||||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
smaObj.stbUid = pStb->uid;
|
smaObj.stbUid = pStb->uid;
|
||||||
|
@ -530,7 +530,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
streamObj.sourceDbUid = pDb->uid;
|
streamObj.sourceDbUid = pDb->uid;
|
||||||
streamObj.targetDbUid = pDb->uid;
|
streamObj.targetDbUid = pDb->uid;
|
||||||
streamObj.version = 1;
|
streamObj.version = 1;
|
||||||
streamObj.sql = pCreate->sql;
|
streamObj.sql = strdup(pCreate->sql);
|
||||||
streamObj.smaId = smaObj.uid;
|
streamObj.smaId = smaObj.uid;
|
||||||
streamObj.watermark = pCreate->watermark;
|
streamObj.watermark = pCreate->watermark;
|
||||||
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
||||||
|
@ -585,6 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyNode((SNode *)pPlan);
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||||
|
@ -609,6 +610,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
tFreeStreamObj(&streamObj);
|
||||||
mndDestroySmaObj(&smaObj);
|
mndDestroySmaObj(&smaObj);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -509,6 +509,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
pVgroup->replica = 1;
|
pVgroup->replica = 1;
|
||||||
|
|
||||||
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
|
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
|
||||||
mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
|
mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -341,7 +341,7 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
|
||||||
|
|
||||||
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
if (pReader->tbIdHash) {
|
if (pReader->tbIdHash) {
|
||||||
|
|
|
@ -3223,8 +3223,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||||
|
|
||||||
|
@ -3248,8 +3248,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp
|
||||||
pInfo->existNewGroupBlock = NULL;
|
pInfo->existNewGroupBlock = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
||||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
|
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
|
||||||
|
@ -3265,8 +3265,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
|
||||||
|
|
||||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||||
SFillOperatorInfo* pInfo = pOperator->info;
|
SFillOperatorInfo* pInfo = pOperator->info;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||||
|
@ -3276,13 +3276,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
|
||||||
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
|
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
|
||||||
|
|
||||||
for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
||||||
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
|
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
|
||||||
ASSERT(pCol->notFillCol);
|
ASSERT(pCol->notFillCol);
|
||||||
|
|
||||||
SExprInfo* pExpr = pCol->pExpr;
|
SExprInfo* pExpr = pCol->pExpr;
|
||||||
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
|
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
|
||||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
|
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
|
||||||
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
|
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
|
||||||
|
@ -3670,7 +3670,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosRemoveRef(exchangeObjRefPool, pExInfo->self);
|
taosRemoveRef(exchangeObjRefPool, pExInfo->self);
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeSourceDataInfo(void *p) {
|
void freeSourceDataInfo(void* p) {
|
||||||
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
|
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
|
||||||
taosMemoryFreeClear(pInfo->pRsp);
|
taosMemoryFreeClear(pInfo->pRsp);
|
||||||
}
|
}
|
||||||
|
@ -3700,8 +3700,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
|
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
|
||||||
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
pInfo->pFillInfo =
|
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||||
taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
|
pInfo->primaryTsCol, order, id);
|
||||||
|
|
||||||
pInfo->win = win;
|
pInfo->win = win;
|
||||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
@ -3727,10 +3727,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||||
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
||||||
|
|
||||||
SInterval* pInterval =
|
SInterval* pInterval =
|
||||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||||
|
|
||||||
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||||
|
@ -3747,9 +3747,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
int32_t code =
|
int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
|
||||||
initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues,
|
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||||
pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
|
pTaskInfo->id.str, pInterval, type, order);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(w.ekey > pBlockInfo->window.ekey);
|
assert(w.ekey > pBlockInfo->window.ekey);
|
||||||
if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
|
if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,7 +147,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
|
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
|
||||||
assert(w.skey <= pBlockInfo->window.ekey);
|
assert(w.skey <= pBlockInfo->window.ekey);
|
||||||
|
|
||||||
if (w.skey > pBlockInfo->window.skey) {
|
if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(w.skey < pBlockInfo->window.skey);
|
assert(w.skey < pBlockInfo->window.skey);
|
||||||
if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
|
if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
|
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
|
||||||
|
} else {
|
||||||
|
taosArrayDestroy(pColIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the pseduo columns info
|
// create the pseduo columns info
|
||||||
|
@ -2038,10 +2040,34 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
metaReaderClear(&smr);
|
metaReaderClear(&smr);
|
||||||
|
|
||||||
if (numOfRows >= pOperator->resultInfo.capacity) {
|
if (numOfRows >= pOperator->resultInfo.capacity) {
|
||||||
break;
|
p->info.rows = numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||||
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataCleanup(p);
|
||||||
|
numOfRows = 0;
|
||||||
|
|
||||||
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (numOfRows > 0) {
|
||||||
|
p->info.rows = numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||||
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataCleanup(p);
|
||||||
|
numOfRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
|
||||||
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
metaCloseTbCursor(pInfo->pCur);
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
@ -2049,14 +2075,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
p->info.rows = numOfRows;
|
|
||||||
pInfo->pRes->info.rows = numOfRows;
|
|
||||||
|
|
||||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
|
||||||
doFilterResult(pInfo);
|
|
||||||
|
|
||||||
blockDataDestroy(p);
|
|
||||||
|
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -2213,10 +2231,34 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
||||||
colDataAppend(pColInfoData, numOfRows, n, false);
|
colDataAppend(pColInfoData, numOfRows, n, false);
|
||||||
|
|
||||||
if (++numOfRows >= pOperator->resultInfo.capacity) {
|
if (++numOfRows >= pOperator->resultInfo.capacity) {
|
||||||
break;
|
p->info.rows = numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||||
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataCleanup(p);
|
||||||
|
numOfRows = 0;
|
||||||
|
|
||||||
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (numOfRows > 0) {
|
||||||
|
p->info.rows = numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||||
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataCleanup(p);
|
||||||
|
numOfRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
|
||||||
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
metaCloseTbCursor(pInfo->pCur);
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
@ -2224,14 +2266,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
p->info.rows = numOfRows;
|
|
||||||
pInfo->pRes->info.rows = numOfRows;
|
|
||||||
|
|
||||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
|
||||||
doFilterResult(pInfo);
|
|
||||||
|
|
||||||
blockDataDestroy(p);
|
|
||||||
|
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
blockDataDestroy(pInfo->pPullDataRes);
|
blockDataDestroy(pInfo->pPullDataRes);
|
||||||
taosArrayDestroy(pInfo->pRecycledPages);
|
taosArrayDestroy(pInfo->pRecycledPages);
|
||||||
blockDataDestroy(pInfo->pUpdateRes);
|
blockDataDestroy(pInfo->pUpdateRes);
|
||||||
|
taosArrayDestroy(pInfo->pDelWins);
|
||||||
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
|
|
||||||
if (pInfo->pChildren) {
|
if (pInfo->pChildren) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
||||||
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
||||||
|
taosMemoryFree(pChildOp->pDownstream);
|
||||||
|
cleanupExprSupp(&pChildOp->exprSupp);
|
||||||
taosMemoryFreeClear(pChildOp);
|
taosMemoryFreeClear(pChildOp);
|
||||||
}
|
}
|
||||||
|
taosArrayDestroy(pInfo->pChildren);
|
||||||
}
|
}
|
||||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
|
||||||
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
|
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
|
|
@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeSStreamTask(SStreamTask* pTask) {
|
void tFreeSStreamTask(SStreamTask* pTask) {
|
||||||
|
qDebug("free stream task %d", pTask->taskId);
|
||||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||||
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
||||||
taosArrayDestroy(pTask->childEpInfo);
|
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
|
||||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||||
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||||
|
|
|
@ -104,6 +104,12 @@ typedef void* queue[2];
|
||||||
|
|
||||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
|
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
||||||
|
|
||||||
|
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||||
|
|
||||||
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
typedef SRpcCtx STransCtx;
|
typedef SRpcCtx STransCtx;
|
||||||
typedef SRpcCtxVal STransCtxVal;
|
typedef SRpcCtxVal STransCtxVal;
|
||||||
|
|
|
@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
|
||||||
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
|
if (cliRecvReleaseReq(conn, pHead)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
STransMsg transMsg = {0};
|
STransMsg transMsg = {0};
|
||||||
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||||
transMsg.pCont = transContFromHead((char*)pHead);
|
transMsg.pCont = transContFromHead((char*)pHead);
|
||||||
|
@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = NULL;
|
||||||
STransConnCtx* pCtx = NULL;
|
STransConnCtx* pCtx = NULL;
|
||||||
if (cliRecvReleaseReq(conn, pHead)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
pMsg = transQueuePop(&conn->cliMsgs);
|
pMsg = transQueuePop(&conn->cliMsgs);
|
||||||
|
|
||||||
|
@ -598,7 +601,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||||
cliHandleResp(conn);
|
if (pBuf->invalid) {
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
cliHandleResp(conn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,7 +188,6 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
p->left = 0;
|
p->left = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return (p->left == 0 || p->invalid) ? true : false;
|
return (p->left == 0 || p->invalid) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -198,15 +198,16 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
||||||
|
|
||||||
|
if (uvRecvReleaseReq(pConn, pHead)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(dengyihao): time-consuming task throwed into BG Thread
|
// TODO(dengyihao): time-consuming task throwed into BG Thread
|
||||||
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
|
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
|
||||||
// wreq->data = pConn;
|
// wreq->data = pConn;
|
||||||
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
||||||
// transRefSrvHandle(pConn);
|
// transRefSrvHandle(pConn);
|
||||||
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||||
if (uvRecvReleaseReq(pConn, pHead)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
STransMsg transMsg;
|
STransMsg transMsg;
|
||||||
memset(&transMsg, 0, sizeof(transMsg));
|
memset(&transMsg, 0, sizeof(transMsg));
|
||||||
|
@ -274,14 +275,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||||
if (uvHandleReq(conn) == false) {
|
if (pBuf->invalid) {
|
||||||
|
tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn);
|
||||||
destroyConn(conn, true);
|
destroyConn(conn, true);
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
if (false == uvHandleReq(conn)) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn);
|
|
||||||
destroyConn(conn, true);
|
destroyConn(conn, true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -872,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) {
|
||||||
}
|
}
|
||||||
static void uvDestroyConn(uv_handle_t* handle) {
|
static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSvrConn* conn = handle->data;
|
SSvrConn* conn = handle->data;
|
||||||
|
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -887,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
|
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
transReqQueueClear(&conn->wreqQueue);
|
|
||||||
transQueueDestroy(&conn->srvMsgs);
|
transQueueDestroy(&conn->srvMsgs);
|
||||||
|
transReqQueueClear(&conn->wreqQueue);
|
||||||
|
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
taosMemoryFree(conn->pTcp);
|
taosMemoryFree(conn->pTcp);
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/cfg.sh -n dnode1 -c debugflag -v 131
|
||||||
sleep 50
|
system sh/exec.sh -n dnode1 -s start -v
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print =============== create database
|
print =============== create database
|
||||||
|
@ -137,4 +137,17 @@ if $data13 != 789 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
_OVER:
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
print =============== check
|
||||||
|
$null=
|
||||||
|
|
||||||
|
system_content sh/checkValgrind.sh -n dnode1
|
||||||
|
print cmd return result ----> [ $system_content ]
|
||||||
|
if $system_content > 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $system_content == $null then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
Loading…
Reference in New Issue