Merge pull request #16926 from taosdata/feature/stream
enh: stream backend for sma
This commit is contained in:
commit
1ac9ff08bb
|
@ -34,7 +34,7 @@ typedef struct {
|
|||
TXN txn;
|
||||
} SStreamState;
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath);
|
||||
void streamStateClose(SStreamState* pState);
|
||||
int32_t streamStateBegin(SStreamState* pState);
|
||||
int32_t streamStateCommit(SStreamState* pState);
|
||||
|
|
|
@ -760,7 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
|
||||
// expand executor
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -774,7 +774,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -18,14 +18,19 @@
|
|||
#include "tcommon.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
|
||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (pState == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char statePath[300];
|
||||
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||
if (!specPath) {
|
||||
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||
} else {
|
||||
memcpy(statePath, path, 300);
|
||||
}
|
||||
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) {
|
|||
char* walMetaSerialize(SWal* pWal) {
|
||||
char buf[30];
|
||||
ASSERT(pWal->fileInfoSet);
|
||||
int sz = pWal->fileInfoSet->size;
|
||||
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
cJSON* pMeta = cJSON_CreateObject();
|
||||
cJSON* pFiles = cJSON_CreateArray();
|
||||
|
@ -384,8 +384,10 @@ static int walFindCurMetaVer(SWal* pWal) {
|
|||
int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
|
||||
if (code == 0) {
|
||||
sscanf(name, "meta-ver%d", &metaVer);
|
||||
wDebug("vgId:%d, wal find current meta: %s is the meta file, ver %d", pWal->cfg.vgId, name, metaVer);
|
||||
break;
|
||||
}
|
||||
wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name);
|
||||
}
|
||||
taosCloseDir(&pDir);
|
||||
regfree(&walMetaRegexPattern);
|
||||
|
|
Loading…
Reference in New Issue