enh: close tsdb reader
This commit is contained in:
parent
b020383046
commit
e30c00875c
|
@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
|
|||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
|
||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
||||
void qStreamCloseTsdbReader(void* task);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||
|
||||
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
||||
if (code != 0) {
|
||||
|
@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
SMqRebVgReq req = {0};
|
||||
tDecodeSMqRebVgReq(msg, &req);
|
||||
// todo lock
|
||||
|
||||
tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey);
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
if (req.oldConsumerId != -1) {
|
||||
|
@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
taosMemoryFree(req.qmsg);
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
||||
}
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
}
|
||||
// close handle
|
||||
|
|
|
@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
rpcFreeCont(pMsg->pCont);
|
||||
destroySendMsgInfo(pSendInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -604,9 +604,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
|
|||
}
|
||||
}
|
||||
|
||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
|
||||
return (0 != pTaskInfo->code) ? true : false;
|
||||
}
|
||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? true : false; }
|
||||
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
|
||||
|
||||
|
@ -1349,7 +1347,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
|
||||
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
|
||||
if (!tsCountAlwaysReturnValue) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1357,7 +1355,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
|
||||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
|
||||
((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
|
||||
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1411,7 +1409,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
|
||||
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
|
||||
if (!blockAllocated) {
|
||||
return;
|
||||
}
|
||||
|
@ -1481,7 +1479,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||
|
||||
}
|
||||
|
||||
// the downstream operator may return with error code, so let's check the code before generating results.
|
||||
|
@ -1759,7 +1756,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
|||
|
||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
|
||||
optrDefaultBufFn, NULL);
|
||||
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
STableScanInfo* pTableScanInfo = downstream->info;
|
||||
|
@ -2608,3 +2606,22 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
|
|||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qStreamCloseTsdbReader(void* task) {
|
||||
if (task == NULL) return;
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
||||
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
||||
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
|
||||
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
||||
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
||||
if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pInfo = pDownstreamOp->info;
|
||||
if (pInfo->pTableScanOp) {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue