fix compile

This commit is contained in:
Liu Jicong 2022-08-23 20:33:26 +08:00
parent de041f45b5
commit c296dd0a63
5 changed files with 20 additions and 22 deletions

View File

@ -23,7 +23,6 @@ extern "C" {
#include "query.h" #include "query.h"
#include "tcommon.h" #include "tcommon.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tstream.h"
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
@ -42,7 +41,7 @@ typedef struct {
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
int32_t numOfVgroups; int32_t numOfVgroups;
SStreamState* pState; void* pStateBackend;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg

View File

@ -8,8 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
# ) # )
target_link_libraries(executor target_link_libraries(executor
PUBLIC stream PRIVATE os util common function parser planner qcom vnode scalar nodes index stream
PRIVATE os util common function parser planner qcom vnode scalar nodes index
) )
target_include_directories( target_include_directories(

View File

@ -4616,8 +4616,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
if (pHandle && pHandle->pState) { if (pHandle && pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pState; (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
} }
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;

View File

@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0; return 0;
} }
// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t batchCnt = 1; int32_t batchCnt = 1;

View File

@ -23,16 +23,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
char streamPath[200]; int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream"); sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = strdup(streamPath); pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err; goto _err;
} }
char checkpointPath[200]; sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
sprintf(checkpointPath, "%s/%s", streamPath, "checkpoints"); mkdir(streamPath, 0755);
mkdir(checkpointPath, 0755); taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err; goto _err;