commit
fcc6569a7b
|
@ -477,10 +477,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
|||
blockDataDestroy(pInfo->pDelRes);
|
||||
blockDataDestroy(pInfo->pMidRetriveRes);
|
||||
blockDataDestroy(pInfo->pMidPulloverRes);
|
||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
|
||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||
}
|
||||
taosArrayDestroy(pInfo->pMidPullDatas);
|
||||
|
||||
if (pInfo->pState->dump == 1) {
|
||||
if (pInfo->pState !=NULL && pInfo->pState->dump == 1) {
|
||||
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
|
||||
taosMemoryFreeClear(pInfo->pState->pTdbState);
|
||||
}
|
||||
|
@ -1953,12 +1955,14 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
pInfo->numOfDatapack = 0;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
||||
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
|
||||
QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno);
|
||||
|
||||
pInfo->dataVersion = 0;
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->recvPullover = false;
|
||||
pInfo->recvRetrive = false;
|
||||
|
@ -2032,7 +2036,9 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
|
|||
tSimpleHashCleanup(pSup->pResultRows);
|
||||
destroyDiskbasedBuf(pSup->pResultBuf);
|
||||
blockDataDestroy(pSup->pScanBlock);
|
||||
pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState);
|
||||
if (pSup->stateStore.streamFileStateDestroy != NULL) {
|
||||
pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState);
|
||||
}
|
||||
taosMemoryFreeClear(pSup->pState);
|
||||
taosMemoryFreeClear(pSup->pDummyCtx);
|
||||
}
|
||||
|
@ -2141,7 +2147,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||
SStorageAPI* pApi, int32_t tsIndex) {
|
||||
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -2156,6 +2162,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
}
|
||||
|
||||
pSup->stateStore = *pStore;
|
||||
pSup->pSessionAPI = pApi;
|
||||
|
||||
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
||||
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
|
@ -2168,6 +2175,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
||||
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT);
|
||||
QUERY_CHECK_NULL(pSup->pState->pFileState, code, lino, _end, terrno);
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||
|
@ -2179,8 +2187,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||
}
|
||||
|
||||
pSup->pSessionAPI = pApi;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
|
||||
|
@ -5308,9 +5319,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
pInfo->pUpdatedMap = NULL;
|
||||
int32_t funResSize = getMaxFunResSize(pSup, numOfCols);
|
||||
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
|
||||
QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
@ -5319,7 +5332,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
|
||||
|
|
|
@ -131,21 +131,27 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
|
|||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
||||
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
|
||||
int8_t type) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (memSize <= 0) {
|
||||
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
|
||||
}
|
||||
if (rowSize == 0) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
|
||||
if (!pFileState) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_NULL(pFileState, code, lino, _error, terrno);
|
||||
|
||||
rowSize += selectRowSize;
|
||||
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
|
||||
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _error, terrno);
|
||||
|
||||
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _error, terrno);
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
|
||||
if (type == STREAM_STATE_BUFF_HASH) {
|
||||
|
@ -171,10 +177,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->cfName = taosStrdup("sess");
|
||||
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
||||
}
|
||||
|
||||
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno);
|
||||
|
||||
pFileState->keyLen = keySize;
|
||||
pFileState->rowSize = rowSize;
|
||||
|
@ -188,6 +191,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->flushMark = INT64_MIN;
|
||||
pFileState->maxTs = INT64_MIN;
|
||||
pFileState->id = taosStrdup(taskId);
|
||||
QUERY_CHECK_NULL(pFileState->id, code, lino, _error, terrno);
|
||||
|
||||
// todo(liuyao) optimize
|
||||
if (type == STREAM_STATE_BUFF_HASH) {
|
||||
|
@ -198,8 +202,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
|
||||
void* valBuf = NULL;
|
||||
int32_t len = 0;
|
||||
int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
|
||||
if (tmpRes == TSDB_CODE_SUCCESS) {
|
||||
ASSERT(len == sizeof(TSKEY));
|
||||
streamFileStateDecode(&pFileState->flushMark, valBuf, len);
|
||||
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
|
||||
|
@ -208,6 +212,9 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
return pFileState;
|
||||
|
||||
_error:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
streamFileStateDestroy(pFileState);
|
||||
return NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue