Merge pull request #17328 from taosdata/feature/stream
refactor(stream): set state buffer size
This commit is contained in:
commit
7f8d2ff56d
|
@ -36,7 +36,7 @@ typedef struct {
|
|||
int32_t number;
|
||||
} SStreamState;
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath);
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
|
||||
void streamStateClose(SStreamState* pState);
|
||||
int32_t streamStateBegin(SStreamState* pState);
|
||||
int32_t streamStateCommit(SStreamState* pState);
|
||||
|
|
|
@ -116,11 +116,13 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
|||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
taosMemoryFree(pConsumerNew);
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
FAIL:
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
taosMemoryFree(pConsumerNew);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -154,6 +154,8 @@ TOPIC_ENCODE_OVER:
|
|||
|
||||
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
void *buf = NULL;
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
|
||||
|
||||
|
@ -215,17 +217,15 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||
if (len) {
|
||||
void *buf = taosMemoryMalloc(len);
|
||||
buf = taosMemoryMalloc(len);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
|
||||
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
|
||||
taosMemoryFree(buf);
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
taosMemoryFree(buf);
|
||||
} else {
|
||||
pTopic->schema.nCols = 0;
|
||||
pTopic->schema.version = 0;
|
||||
|
@ -251,6 +251,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
TOPIC_DECODE_OVER:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
|
||||
taosMemoryFreeClear(pRow);
|
||||
|
|
|
@ -321,7 +321,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
|||
}
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
pStreamState = streamStateOpen(taskInfDir, NULL, true);
|
||||
pStreamState = streamStateOpen(taskInfDir, NULL, true, -1, -1);
|
||||
if (!pStreamState) {
|
||||
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
|
@ -890,7 +890,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
|
||||
// expand executor
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -904,7 +904,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -177,6 +177,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
|||
tDecoderClear(&decoder);
|
||||
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdbFree(pKey);
|
||||
tdbTbcClose(pCur);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -49,7 +49,9 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
|
|||
return 0;
|
||||
}
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
||||
szPage = szPage < 0 ? 4096 : szPage;
|
||||
pages = pages < 0 ? 256 : pages;
|
||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (pState == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -63,7 +65,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
|
|||
memset(statePath, 0, 300);
|
||||
strncpy(statePath, path, 300);
|
||||
}
|
||||
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
|
||||
if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue