fix(stream): fix a typo.

This commit is contained in:
Haojun Liao 2024-06-19 10:29:01 +08:00
parent bca3cf4e9e
commit 2534ce5071
6 changed files with 6 additions and 8 deletions

View File

@ -410,7 +410,7 @@ typedef struct SStateStore {
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
SStreamState* (*streamStateOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, int32_t szPage, int32_t pages);
SStreamState* (*streamStateOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId);
void (*streamStateClose)(SStreamState* pState, bool remove);
int32_t (*streamStateBegin)(SStreamState* pState);
int32_t (*streamStateCommit)(SStreamState* pState);

View File

@ -29,8 +29,7 @@ extern "C" {
#include "storageapi.h"
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath,
int32_t szPage, int32_t pages);
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId);
void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);

View File

@ -299,7 +299,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId, true, -1, -1);
pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED;

View File

@ -49,7 +49,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
// sink task does not need the pState
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
pTask->pState = streamStateOpen(pMeta->path, pTask, false, streamId, taskId, -1, -1);
pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
return -1;

View File

@ -98,8 +98,7 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
return winKeyCmprImpl(&pWin1->key, &pWin2->key);
}
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath,
int32_t szPage, int32_t pages) {
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) {

View File

@ -46,7 +46,7 @@ SStreamState *stateCreate(const char *path) {
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL);
pTask->pMeta = pMeta;
SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0, true, 32, 32 * 1024);
SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0);
ASSERT(p != NULL);
return p;
}