Merge pull request #28187 from taosdata/fix/3_liaohj
fix(stream): use meta id instead of ptr.
This commit is contained in:
commit
84da5eab2e
|
@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
|
|||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
||||
|
||||
extern int32_t streamMetaId;
|
||||
|
||||
enum {
|
||||
STREAM_STATUS__NORMAL = 0,
|
||||
STREAM_STATUS__STOP,
|
||||
|
@ -135,11 +137,6 @@ enum {
|
|||
STREAM_QUEUE__PROCESSING,
|
||||
};
|
||||
|
||||
enum {
|
||||
STREAM_META_WILL_STOP = 1,
|
||||
STREAM_META_OK_TO_STOP = 2,
|
||||
};
|
||||
|
||||
typedef enum EStreamTaskEvent {
|
||||
TASK_EVENT_INIT = 0x1,
|
||||
TASK_EVENT_INIT_SCANHIST = 0x2,
|
||||
|
@ -282,7 +279,6 @@ typedef enum {
|
|||
} EConsenChkptStatus;
|
||||
|
||||
typedef struct SConsenChkptInfo {
|
||||
// bool alreadySendChkptId;
|
||||
EConsenChkptStatus status;
|
||||
int64_t statusTs;
|
||||
int32_t consenChkptTransId;
|
||||
|
|
|
@ -16,8 +16,13 @@
|
|||
#include "tq.h"
|
||||
#include "vnd.h"
|
||||
|
||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||
#define SCAN_WAL_IDLE_DURATION 100
|
||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||
#define SCAN_WAL_IDLE_DURATION 100
|
||||
|
||||
typedef struct SBuildScanWalMsgParam {
|
||||
int64_t metaId;
|
||||
int32_t numOfTasks;
|
||||
} SBuildScanWalMsgParam;
|
||||
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
|
@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t numOfTasks = 0;
|
||||
bool shouldIdle = true;
|
||||
|
||||
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
|
||||
// check all tasks
|
||||
int32_t numOfTasks = 0;
|
||||
bool shouldIdle = true;
|
||||
|
||||
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
|
||||
|
@ -68,54 +72,61 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct SBuildScanWalMsgParam {
|
||||
STQ* pTq;
|
||||
int32_t numOfTasks;
|
||||
} SBuildScanWalMsgParam;
|
||||
|
||||
static void doStartScanWal(void* param, void* tmrId) {
|
||||
int32_t vgId = 0;
|
||||
STQ* pTq = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
||||
|
||||
STQ* pTq = pParam->pTq;
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
|
||||
if (pMeta == NULL) {
|
||||
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
|
||||
taosMemoryFree(pParam);
|
||||
return;
|
||||
}
|
||||
|
||||
vgId = pMeta->vgId;
|
||||
pTq = pMeta->ahandle;
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
||||
pTq->pVnode->restored);
|
||||
|
||||
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
taosMemoryFree(pParam);
|
||||
|
||||
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
code = taosReleaseRef(streamMetaId, pParam->metaId);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
|
||||
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
tmr_h pTimer = NULL;
|
||||
SBuildScanWalMsgParam* pParam = NULL;
|
||||
|
||||
SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
|
||||
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
|
||||
if (pParam == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pParam->pTq = pTq;
|
||||
pParam->metaId = pMeta->rid;
|
||||
pParam->numOfTasks = numOfTasks;
|
||||
|
||||
tmr_h pTimer = NULL;
|
||||
code = streamTimerGetInstance(&pTimer);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pMeta->scanInfo.scanTimer == NULL) {
|
||||
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
|
||||
taosMemoryFree(pParam);
|
||||
} else {
|
||||
bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
|
||||
if (!ret) {
|
||||
// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
|
||||
}
|
||||
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -124,8 +135,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
|||
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
int32_t numOfTasks = 0;
|
||||
|
||||
// do not launch the stream tasks, if it is a follower or not restored vnode.
|
||||
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
|
||||
|
@ -134,7 +145,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
@ -378,13 +389,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
numOfTasks = taosArrayGetSize(pTaskList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
STaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
STaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
if (pTaskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (pTask == NULL || code != 0) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
|
|||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
int32_t lino = 0;
|
||||
int64_t curOwner = 0;
|
||||
|
||||
*pRes = NULL;
|
||||
|
@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
blockDataCheck(*pRes, false);
|
||||
|
|
|
@ -1301,10 +1301,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
|
|||
freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
|
||||
pOperator->pDownstreamGetParams[idx] = NULL;
|
||||
}
|
||||
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -86,11 +86,13 @@ static void destroyGroupOperatorInfo(void* param) {
|
|||
taosArrayDestroy(pInfo->pGroupCols);
|
||||
taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
taosMemoryFreeClear(param);
|
||||
|
|
|
@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
|||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
blockDataCheck(*ppBlock, false);
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1380,8 +1380,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pInfo->currentTable >= numOfTables) {
|
||||
|
@ -1393,11 +1392,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
(*ppRes) = NULL;
|
||||
return terrno;
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
tInfo = *tmp;
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
|
@ -1412,11 +1411,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
}
|
||||
} else { // scan table group by group sequentially
|
||||
code = groupSeqTableScan(pOperator, ppRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -5834,9 +5834,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
|||
SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
|
||||
while (1) {
|
||||
while (1) {
|
||||
pTupleHandle = NULL;
|
||||
|
|
|
@ -204,15 +204,18 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle)
|
|||
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
|
||||
* @param [in, out] pBlock the output block, the group id will be saved in it
|
||||
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
|
||||
* @retval NULL if no more tuples
|
||||
*/
|
||||
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
int32_t code = 0;
|
||||
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
|
||||
STupleHandle** pTupleHandle) {
|
||||
QRY_PARAM_CHECK(pTupleHandle);
|
||||
|
||||
int32_t code = 0;
|
||||
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
|
||||
if (!retTuple) {
|
||||
code = tsortNextTuple(pHandle, &retTuple);
|
||||
if (code) {
|
||||
return NULL;
|
||||
qError("failed to get next tuple, code:%s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,7 +228,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
|
|||
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
|
||||
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
|
||||
}
|
||||
bool emptyBlock = pBlock->info.rows == 0;
|
||||
|
||||
bool emptyBlock = (pBlock->info.rows == 0);
|
||||
if (newGroup) {
|
||||
if (!emptyBlock) {
|
||||
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
|
||||
|
@ -247,17 +251,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
|
|||
}
|
||||
}
|
||||
|
||||
return retTuple;
|
||||
*pTupleHandle = retTuple;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
blockDataCleanup(pDataBlock);
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
SSDataBlock* p = NULL;
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
SSDataBlock* p = NULL;
|
||||
|
||||
code = tsortGetSortedDataBlock(pHandle, &p);
|
||||
if (p == NULL || (code != 0)) {
|
||||
return code;
|
||||
|
@ -266,16 +273,15 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
|||
code = blockDataEnsureCapacity(p, capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
STupleHandle* pTupleHandle;
|
||||
while (1) {
|
||||
if (pInfo->pGroupIdCalc) {
|
||||
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
|
||||
code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
|
||||
} else {
|
||||
code = tsortNextTuple(pHandle, &pTupleHandle);
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL || code != 0) {
|
||||
lino = __LINE__;
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -320,7 +326,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
|
|||
return code;
|
||||
|
||||
_error:
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
|
||||
blockDataDestroy(p);
|
||||
return code;
|
||||
|
@ -330,6 +336,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
|
|||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
|
||||
blockDataCheck(*ppBlock, false);
|
||||
if (code) {
|
||||
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1229,11 +1229,13 @@ static void destroyStateWindowOperatorInfo(void* param) {
|
|||
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
taosMemoryFreeClear(pInfo->stateKey.pData);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
|
@ -1251,13 +1253,17 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
if (param == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
||||
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupExprSupp(&pInfo->scalarSupp);
|
||||
|
||||
|
@ -1265,6 +1271,7 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
|
||||
taosArrayDestroy(pInfo->pInterpCols);
|
||||
pInfo->pInterpCols = NULL;
|
||||
|
||||
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
|
||||
pInfo->pPrevValues = NULL;
|
||||
|
||||
|
@ -1358,6 +1365,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = terrno;
|
||||
lino = __LINE__;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1465,8 +1473,10 @@ _error:
|
|||
if (pInfo != NULL) {
|
||||
destroyIntervalOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
|
||||
pTaskInfo->code = code;
|
||||
qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1754,11 +1764,13 @@ void destroySWindowOperatorInfo(void* param) {
|
|||
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
if (pInfo->pOperator) {
|
||||
|
||||
if (pInfo->pOperator != NULL) {
|
||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->pOperator = NULL;
|
||||
}
|
||||
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupExprSupp(&pInfo->scalarSupp);
|
||||
|
||||
|
|
|
@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam*
|
|||
|
||||
code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pHandle->pDataBlock->info.rows >= capacity) {
|
||||
|
@ -2869,6 +2869,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
|
|||
pHandle->tupleHandle.pBlock = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
pHandle->tupleHandle.pBlock = pBlock;
|
||||
pHandle->tupleHandle.rowIndex = 0;
|
||||
}
|
||||
|
@ -2884,8 +2885,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
return code;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pHandle->opened = true;
|
||||
|
|
|
@ -164,7 +164,6 @@ extern void* streamTimer;
|
|||
extern int32_t streamBackendId;
|
||||
extern int32_t streamBackendCfWrapperId;
|
||||
extern int32_t taskDbWrapperId;
|
||||
extern int32_t streamMetaId;
|
||||
|
||||
int32_t streamTimerInit();
|
||||
void streamTimerCleanUp();
|
||||
|
|
Loading…
Reference in New Issue