Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2
This commit is contained in:
commit
8883d66227
|
@ -319,11 +319,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
|
|
||||||
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
||||||
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -838,6 +838,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
|
||||||
if (hasPrevWindow(pFillSup)) {
|
if (hasPrevWindow(pFillSup)) {
|
||||||
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
||||||
pFillInfo->pos = FILL_POS_END;
|
pFillInfo->pos = FILL_POS_END;
|
||||||
|
resetFillWindow(&pFillSup->next);
|
||||||
pFillSup->next.key = pFillSup->cur.key;
|
pFillSup->next.key = pFillSup->cur.key;
|
||||||
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
|
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
|
||||||
pFillInfo->preRowKey = INT64_MIN;
|
pFillInfo->preRowKey = INT64_MIN;
|
||||||
|
|
|
@ -1535,6 +1535,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
|
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -2861,6 +2862,7 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
@ -2874,6 +2876,7 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
|
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||||
blockDataDestroy(pInfo->pCheckpointRes);
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
|
|
|
@ -308,7 +308,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
|
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
|
ASSERT(status == TASK_STATUS__NORMAL);
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue