enhance stream backend for sma
This commit is contained in:
parent
bd3db36a40
commit
86c0fb56e0
|
@ -34,7 +34,7 @@ typedef struct {
|
|||
TXN txn;
|
||||
} SStreamState;
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath);
|
||||
void streamStateClose(SStreamState* pState);
|
||||
int32_t streamStateBegin(SStreamState* pState);
|
||||
int32_t streamStateCommit(SStreamState* pState);
|
||||
|
|
|
@ -760,7 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
|
||||
// expand executor
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -774,7 +774,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -18,14 +18,19 @@
|
|||
#include "tcommon.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
|
||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (pState == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char statePath[300];
|
||||
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||
if (!specPath) {
|
||||
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||
} else {
|
||||
memcpy(statePath, path, 300);
|
||||
}
|
||||
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) {
|
|||
char* walMetaSerialize(SWal* pWal) {
|
||||
char buf[30];
|
||||
ASSERT(pWal->fileInfoSet);
|
||||
int sz = pWal->fileInfoSet->size;
|
||||
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
cJSON* pMeta = cJSON_CreateObject();
|
||||
cJSON* pFiles = cJSON_CreateArray();
|
||||
|
|
Loading…
Reference in New Issue