Merge pull request #17650 from taosdata/feature/stream
refactor(stream): recover and fill history
This commit is contained in:
commit
b56e7a0f8c
|
@ -161,6 +161,7 @@ typedef enum EStreamType {
|
|||
STREAM_RETRIEVE,
|
||||
STREAM_PULL_DATA,
|
||||
STREAM_PULL_OVER,
|
||||
STREAM_FILL_OVER,
|
||||
} EStreamType;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -198,6 +198,9 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||
|
|
|
@ -169,6 +169,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
|||
|
||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
|
||||
|
||||
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
|
||||
/**
|
||||
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
|
||||
* @param tinfo
|
||||
|
@ -200,9 +201,11 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
|
|||
|
||||
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
|
||||
|
||||
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer);
|
||||
|
||||
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
|
||||
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
|
||||
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
|
||||
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
|
||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -47,7 +47,9 @@ enum {
|
|||
TASK_STATUS__FAIL,
|
||||
TASK_STATUS__STOP,
|
||||
TASK_STATUS__RECOVER_DOWNSTREAM,
|
||||
TASK_STATUS__RECOVER_SELF,
|
||||
TASK_STATUS__RECOVER_PREPARE,
|
||||
TASK_STATUS__RECOVER1,
|
||||
TASK_STATUS__RECOVER2,
|
||||
};
|
||||
|
||||
enum {
|
||||
|
@ -329,6 +331,9 @@ typedef struct SStreamTask {
|
|||
// state backend
|
||||
SStreamState* pState;
|
||||
|
||||
// do not serialize
|
||||
int32_t recoverWaitingChild;
|
||||
|
||||
} SStreamTask;
|
||||
|
||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||
|
@ -435,6 +440,20 @@ typedef struct {
|
|||
int32_t rspToTaskId;
|
||||
} SStreamRetrieveRsp;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t childId;
|
||||
} SStreamRecoverFinishReq;
|
||||
|
||||
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
|
||||
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
|
||||
|
||||
#if 0
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
|
@ -521,8 +540,29 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
|||
int32_t streamTryExec(SStreamTask* pTask);
|
||||
int32_t streamSchedExec(SStreamTask* pTask);
|
||||
|
||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||
|
||||
// recover and fill history
|
||||
// common
|
||||
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
||||
// source level
|
||||
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver);
|
||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
|
||||
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
|
||||
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
|
||||
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
|
||||
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
|
||||
// agg level
|
||||
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
|
||||
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
|
||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
|
||||
|
||||
// expand and deploy
|
||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
||||
|
||||
// meta
|
||||
typedef struct SStreamMeta {
|
||||
char* path;
|
||||
TDB* db;
|
||||
|
@ -533,12 +573,13 @@ typedef struct SStreamMeta {
|
|||
void* ahandle;
|
||||
TXN txn;
|
||||
FTaskExpand* expandFunc;
|
||||
int32_t vgId;
|
||||
} SStreamMeta;
|
||||
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
|
||||
void streamMetaClose(SStreamMeta* streamMeta);
|
||||
|
||||
// int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
|
||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
|
|
@ -33,16 +33,16 @@ extern "C" {
|
|||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
||||
// clang-format on
|
||||
|
||||
#define WAL_PROTO_VER 0
|
||||
#define WAL_NOSUFFIX_LEN 20
|
||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||
#define WAL_LOG_SUFFIX "log"
|
||||
#define WAL_INDEX_SUFFIX "idx"
|
||||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||
#define WAL_PROTO_VER 0
|
||||
#define WAL_NOSUFFIX_LEN 20
|
||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||
#define WAL_LOG_SUFFIX "log"
|
||||
#define WAL_INDEX_SUFFIX "idx"
|
||||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
|
||||
|
||||
typedef enum {
|
||||
|
@ -204,7 +204,6 @@ SWalRef *walRefCommittedVer(SWal *);
|
|||
SWalRef *walOpenRef(SWal *);
|
||||
void walCloseRef(SWal *pWal, int64_t refId);
|
||||
int32_t walRefVer(SWalRef *, int64_t ver);
|
||||
int32_t walPreRefVer(SWalRef *pRef, int64_t ver);
|
||||
void walUnrefVer(SWalRef *);
|
||||
|
||||
// helper function for raft
|
||||
|
|
|
@ -1732,6 +1732,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
|
||||
// in no topic status, delayed task also need to be processed
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||
tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -4853,6 +4853,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
if (tEncodeCStr(&encoder, pReq->sourceDB) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->fillHistory) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
|
||||
|
@ -4889,6 +4890,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
if (tDecodeCStrTo(&decoder, pReq->sourceDB) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->fillHistory) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1;
|
||||
|
|
|
@ -181,15 +181,15 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
|
|||
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
|
||||
|
||||
// tqSink
|
||||
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
// void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
|
||||
// tqOffset
|
||||
char* tqOffsetBuildFName(const char* path, int32_t ver);
|
||||
char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
||||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||
|
||||
// tqStream
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
|
||||
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -872,7 +872,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
}
|
||||
|
@ -891,6 +891,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
|
||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||
|
||||
pTask->startVer = ver;
|
||||
|
||||
// expand executor
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||
|
@ -906,6 +908,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
||||
if (pTask->fillHistory) {
|
||||
pTask->taskStatus = TASK_STATUS__RECOVER_PREPARE;
|
||||
}
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
|
@ -945,8 +951,163 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
//
|
||||
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
||||
int32_t code;
|
||||
#if 0
|
||||
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
||||
if (code < 0) return code;
|
||||
#endif
|
||||
|
||||
// 1.deserialize msg and build task
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
code = tDecodeSStreamTask(&decoder, pTask);
|
||||
if (code < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
// 2.save task
|
||||
code = streamMetaAddTask(pTq->pStreamMeta, version, pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 3.go through recover steps to fill history
|
||||
if (pTask->fillHistory) {
|
||||
streamSetParamForRecover(pTask);
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamSourceRecoverPrepareStep1(pTask, version);
|
||||
|
||||
SStreamRecoverStep1Req req;
|
||||
streamBuildSourceRecover1Req(pTask, &req);
|
||||
|
||||
void* serialziedReq = (void*)&req;
|
||||
int32_t len = sizeof(SStreamRecoverStep1Req);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.contLen = len,
|
||||
.pCont = serialziedReq,
|
||||
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg);
|
||||
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
streamAggRecoverPrepare(pTask);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecover1Req(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// check param
|
||||
int64_t fillVer1 = pTask->startVer;
|
||||
if (fillVer1 <= 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// do recovery step 1
|
||||
streamSourceRecoverScanStep1(pTask);
|
||||
|
||||
// build msg to launch next step
|
||||
SStreamRecoverStep2Req req;
|
||||
code = streamBuildSourceRecover2Req(pTask, &req);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// serialize msg
|
||||
int32_t len = sizeof(SStreamRecoverStep2Req);
|
||||
void* serializedReq = (void*)&req;
|
||||
|
||||
// dispatch msg
|
||||
SRpcMsg rpcMsg = {
|
||||
.code = 0,
|
||||
.contLen = len,
|
||||
.msgType = TDMT_VND_STREAM_RECOVER_STEP2,
|
||||
.pCont = (void*)serializedReq,
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// do recovery step 2
|
||||
code = streamSourceRecoverScanStep2(pTask, version);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// restore param
|
||||
code = streamRestoreParam(pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// set status normal
|
||||
code = streamSetStatusNormal(pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// dispatch recover finish req to all related downstream task
|
||||
code = streamDispatchRecoverFinishReq(pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
|
||||
// deserialize
|
||||
int32_t len;
|
||||
SStreamRecoverFinishReq req;
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, sizeof(SStreamRecoverFinishReq));
|
||||
tDecodeSStreamRecoverFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
// find task
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// do process request
|
||||
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||
|
@ -1081,6 +1242,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
||||
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) continue;
|
||||
|
||||
qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
|
||||
|
||||
|
|
|
@ -85,11 +85,11 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
tqDebug("tmq task start to execute");
|
||||
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tqDebug("tmq task executed, get %p", pDataBlock);
|
||||
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
|
|
|
@ -22,10 +22,10 @@ struct STqOffsetStore {
|
|||
SHashObj* pHash; // SHashObj<subscribeKey, offset>
|
||||
};
|
||||
|
||||
char* tqOffsetBuildFName(const char* path, int32_t ver) {
|
||||
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
|
||||
int32_t len = strlen(path);
|
||||
char* fname = taosMemoryCalloc(1, len + 40);
|
||||
snprintf(fname, len + 40, "%s/offset-ver%d", path, ver);
|
||||
snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
|
||||
return fname;
|
||||
}
|
||||
|
||||
|
|
|
@ -530,6 +530,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
taosArrayDestroy(tagArray);
|
||||
}
|
||||
|
||||
#if 0
|
||||
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pRes = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
@ -585,3 +586,4 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data
|
|||
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
// clang-format off
|
||||
#ifndef TDENGINE_EXECUTORIMPL_H
|
||||
#define TDENGINE_EXECUTORIMPL_H
|
||||
|
||||
|
@ -47,7 +46,7 @@ extern "C" {
|
|||
|
||||
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
||||
|
||||
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
|
||||
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
|
||||
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
|
||||
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
|
||||
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
|
||||
|
@ -135,18 +134,19 @@ typedef struct STaskIdInfo {
|
|||
|
||||
enum {
|
||||
STREAM_RECOVER_STEP__NONE = 0,
|
||||
STREAM_RECOVER_STEP__PREPARE,
|
||||
STREAM_RECOVER_STEP__PREPARE1,
|
||||
STREAM_RECOVER_STEP__PREPARE2,
|
||||
STREAM_RECOVER_STEP__SCAN,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
// TODO remove prepareStatus
|
||||
STqOffsetVal prepareStatus; // for tmq
|
||||
STqOffsetVal lastStatus; // for tmq
|
||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||
int8_t returned;
|
||||
int64_t snapshotVer;
|
||||
const SSubmitReq* pReq;
|
||||
STqOffsetVal prepareStatus; // for tmq
|
||||
STqOffsetVal lastStatus; // for tmq
|
||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||
int8_t returned;
|
||||
int64_t snapshotVer;
|
||||
const SSubmitReq* pReq;
|
||||
|
||||
SSchemaWrapper* schema;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
|
@ -159,7 +159,10 @@ typedef struct {
|
|||
int64_t recoverEndVer;
|
||||
int64_t fillHistoryVer1;
|
||||
int64_t fillHistoryVer2;
|
||||
SStreamState* pState;
|
||||
|
||||
int8_t triggerSaved;
|
||||
int64_t deleteMarkSaved;
|
||||
SStreamState* pState;
|
||||
} SStreamTaskInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -187,7 +190,7 @@ typedef struct SExecTaskInfo {
|
|||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
SSubplan* pSubplan;
|
||||
struct SOperatorInfo* pRoot;
|
||||
SLocalFetch localFetch;
|
||||
SLocalFetch localFetch;
|
||||
} SExecTaskInfo;
|
||||
|
||||
enum {
|
||||
|
@ -235,7 +238,7 @@ typedef struct SOperatorInfo {
|
|||
|
||||
typedef enum {
|
||||
EX_SOURCE_DATA_NOT_READY = 0x1,
|
||||
EX_SOURCE_DATA_READY = 0x2,
|
||||
EX_SOURCE_DATA_READY = 0x2,
|
||||
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
||||
} EX_SOURCE_STATUS;
|
||||
|
||||
|
@ -373,9 +376,9 @@ typedef struct STableMergeScanInfo {
|
|||
int32_t dataBlockLoadFlag;
|
||||
// if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||
// window to check if current data block needs to be loaded.
|
||||
SInterval interval;
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
SSortExecInfo sortExecInfo;
|
||||
SInterval interval;
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
SSortExecInfo sortExecInfo;
|
||||
} STableMergeScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
|
@ -388,17 +391,17 @@ typedef struct STagScanInfo {
|
|||
} STagScanInfo;
|
||||
|
||||
typedef struct SLastrowScanInfo {
|
||||
SSDataBlock* pRes;
|
||||
SReadHandle readHandle;
|
||||
void* pLastrowReader;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t* pSlotIds;
|
||||
SExprSupp pseudoExprSup;
|
||||
int32_t retrieveType;
|
||||
int32_t currentGroupIndex;
|
||||
SSDataBlock* pBufferredRes;
|
||||
SArray* pUidList;
|
||||
int32_t indexOfBufferedRes;
|
||||
SSDataBlock* pRes;
|
||||
SReadHandle readHandle;
|
||||
void* pLastrowReader;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t* pSlotIds;
|
||||
SExprSupp pseudoExprSup;
|
||||
int32_t retrieveType;
|
||||
int32_t currentGroupIndex;
|
||||
SSDataBlock* pBufferredRes;
|
||||
SArray* pUidList;
|
||||
int32_t indexOfBufferedRes;
|
||||
} SLastrowScanInfo;
|
||||
|
||||
typedef enum EStreamScanMode {
|
||||
|
@ -426,8 +429,8 @@ typedef struct SStreamAggSupporter {
|
|||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
SSDataBlock* pScanBlock;
|
||||
SStreamState* pState;
|
||||
int64_t gap; // stream session window gap
|
||||
SqlFunctionCtx* pDummyCtx; // for combine
|
||||
int64_t gap; // stream session window gap
|
||||
SqlFunctionCtx* pDummyCtx; // for combine
|
||||
SSHashObj* pResultRows;
|
||||
int32_t stateKeySize;
|
||||
int16_t stateKeyType;
|
||||
|
@ -465,28 +468,28 @@ typedef struct STimeWindowAggSupp {
|
|||
} STimeWindowAggSupp;
|
||||
|
||||
typedef struct SStreamScanInfo {
|
||||
uint64_t tableUid; // queried super table uid
|
||||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
SExprSupp tbnameCalSup;
|
||||
SExprSupp tagCalSup;
|
||||
int32_t primaryTsIndex; // primary time stamp slot id
|
||||
SReadHandle readHandle;
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||
SColMatchInfo matchInfo;
|
||||
SNode* pCondition;
|
||||
|
||||
SArray* pBlockLists; // multiple SSDatablock.
|
||||
SSDataBlock* pRes; // result SSDataBlock
|
||||
SSDataBlock* pUpdateRes; // update SSDataBlock
|
||||
int32_t updateResIndex;
|
||||
int32_t blockType; // current block type
|
||||
int32_t validBlockIndex; // Is current data has returned?
|
||||
uint64_t numOfExec; // execution times
|
||||
STqReader* tqReader;
|
||||
uint64_t tableUid; // queried super table uid
|
||||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
SExprSupp tbnameCalSup;
|
||||
SExprSupp tagCalSup;
|
||||
int32_t primaryTsIndex; // primary time stamp slot id
|
||||
SReadHandle readHandle;
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||
SColMatchInfo matchInfo;
|
||||
SNode* pCondition;
|
||||
|
||||
uint64_t groupId;
|
||||
SUpdateInfo* pUpdateInfo;
|
||||
SArray* pBlockLists; // multiple SSDatablock.
|
||||
SSDataBlock* pRes; // result SSDataBlock
|
||||
SSDataBlock* pUpdateRes; // update SSDataBlock
|
||||
int32_t updateResIndex;
|
||||
int32_t blockType; // current block type
|
||||
int32_t validBlockIndex; // Is current data has returned?
|
||||
uint64_t numOfExec; // execution times
|
||||
STqReader* tqReader;
|
||||
|
||||
uint64_t groupId;
|
||||
SUpdateInfo* pUpdateInfo;
|
||||
|
||||
EStreamScanMode scanMode;
|
||||
SOperatorInfo* pStreamScanOp;
|
||||
|
@ -524,9 +527,9 @@ typedef struct {
|
|||
} SStreamRawScanInfo;
|
||||
|
||||
typedef struct SSysTableIndex {
|
||||
int8_t init;
|
||||
SArray *uids;
|
||||
int32_t lastIdx;
|
||||
int8_t init;
|
||||
SArray* uids;
|
||||
int32_t lastIdx;
|
||||
} SSysTableIndex;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
@ -541,7 +544,7 @@ typedef struct SSysTableScanInfo {
|
|||
bool showRewrite;
|
||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||
SMTbCursor* pCur; // cursor for iterate the local table meta store.
|
||||
SSysTableIndex* pIdx; // idx for local table meta
|
||||
SSysTableIndex* pIdx; // idx for local table meta
|
||||
SColMatchInfo matchInfo;
|
||||
SName name;
|
||||
SSDataBlock* pRes;
|
||||
|
@ -585,7 +588,7 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
||||
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
|
||||
|
||||
// bool hasGroupId;
|
||||
// bool hasGroupId;
|
||||
uint64_t groupId; // current groupId
|
||||
int64_t curTs; // current ts
|
||||
SSDataBlock* prefetchedBlock;
|
||||
|
@ -595,21 +598,21 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
|||
|
||||
typedef struct SStreamIntervalOperatorInfo {
|
||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||
SInterval interval; // interval info
|
||||
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||
SInterval interval; // interval info
|
||||
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
||||
STimeWindowAggSupp twAggSup;
|
||||
bool invertible;
|
||||
bool ignoreExpiredData;
|
||||
SArray* pDelWins; // SWinRes
|
||||
SArray* pDelWins; // SWinRes
|
||||
int32_t delIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
SPhysiNode* pPhyNode; // create new child
|
||||
SPhysiNode* pPhyNode; // create new child
|
||||
SHashObj* pPullDataMap;
|
||||
SArray* pPullWins; // SPullWindowInfo
|
||||
SArray* pPullWins; // SPullWindowInfo
|
||||
int32_t pullIndex;
|
||||
SSDataBlock* pPullDataRes;
|
||||
bool isFinal;
|
||||
|
@ -677,9 +680,9 @@ typedef struct SGroupbyOperatorInfo {
|
|||
SArray* pGroupCols; // group by columns, SArray<SColumn>
|
||||
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
|
||||
SNode* pCondition;
|
||||
bool isInit; // denote if current val is initialized or not
|
||||
char* keyBuf; // group by keys for hash
|
||||
int32_t groupKeyLen; // total group by column width
|
||||
bool isInit; // denote if current val is initialized or not
|
||||
char* keyBuf; // group by keys for hash
|
||||
int32_t groupKeyLen; // total group by column width
|
||||
SGroupResInfo groupResInfo;
|
||||
SExprSupp scalarSup;
|
||||
} SGroupbyOperatorInfo;
|
||||
|
@ -730,9 +733,9 @@ typedef struct SSessionAggOperatorInfo {
|
|||
} SSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SResultWindowInfo {
|
||||
void* pOutputBuf;
|
||||
SSessionKey sessionWin;
|
||||
bool isOutput;
|
||||
void* pOutputBuf;
|
||||
SSessionKey sessionWin;
|
||||
bool isOutput;
|
||||
} SResultWindowInfo;
|
||||
|
||||
typedef struct SStateWindowInfo {
|
||||
|
@ -743,20 +746,20 @@ typedef struct SStateWindowInfo {
|
|||
typedef struct SStreamSessionAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo;
|
||||
int32_t primaryTsIndex; // primary timestamp slot id
|
||||
int32_t endTsIndex; // window end timestamp slot id
|
||||
int32_t order; // current SSDataBlock scan order
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SSDataBlock* pWinBlock; // window result
|
||||
SSDataBlock* pDelRes; // delete result
|
||||
SSDataBlock* pUpdateRes; // update window
|
||||
SSDataBlock* pWinBlock; // window result
|
||||
SSDataBlock* pDelRes; // delete result
|
||||
SSDataBlock* pUpdateRes; // update window
|
||||
bool returnUpdate;
|
||||
SSHashObj* pStDeleted;
|
||||
void* pDelIterator;
|
||||
SArray* pChildren; // cache for children's result; final stream operator
|
||||
SPhysiNode* pPhyNode; // create new child
|
||||
SArray* pChildren; // cache for children's result; final stream operator
|
||||
SPhysiNode* pPhyNode; // create new child
|
||||
bool isFinal;
|
||||
bool ignoreExpiredData;
|
||||
SHashObj* pGroupIdTbNameMap;
|
||||
|
@ -765,7 +768,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
typedef struct SStreamStateAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo;
|
||||
int32_t primaryTsIndex; // primary timestamp slot id
|
||||
STimeWindowAggSupp twAggSup;
|
||||
|
@ -773,7 +776,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
SSDataBlock* pDelRes;
|
||||
SSHashObj* pSeDeleted;
|
||||
void* pDelIterator;
|
||||
SArray* pChildren; // cache for children's result;
|
||||
SArray* pChildren; // cache for children's result;
|
||||
bool ignoreExpiredData;
|
||||
SHashObj* pGroupIdTbNameMap;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
@ -793,18 +796,18 @@ typedef struct SStreamPartitionOperatorInfo {
|
|||
|
||||
typedef struct SStreamFillOperatorInfo {
|
||||
SStreamFillSupporter* pFillSup;
|
||||
SSDataBlock* pRes;
|
||||
SSDataBlock* pSrcBlock;
|
||||
int32_t srcRowIndex;
|
||||
SSDataBlock* pPrevSrcBlock;
|
||||
SSDataBlock* pSrcDelBlock;
|
||||
int32_t srcDelRowIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
SNode* pCondition;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t primaryTsCol;
|
||||
int32_t primarySrcSlotId;
|
||||
SStreamFillInfo* pFillInfo;
|
||||
SSDataBlock* pRes;
|
||||
SSDataBlock* pSrcBlock;
|
||||
int32_t srcRowIndex;
|
||||
SSDataBlock* pPrevSrcBlock;
|
||||
SSDataBlock* pSrcDelBlock;
|
||||
int32_t srcDelRowIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
SNode* pCondition;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t primaryTsCol;
|
||||
int32_t primarySrcSlotId;
|
||||
SStreamFillInfo* pFillInfo;
|
||||
} SStreamFillOperatorInfo;
|
||||
|
||||
typedef struct STimeSliceOperatorInfo {
|
||||
|
@ -837,7 +840,7 @@ typedef struct SStateWindowOperatorInfo {
|
|||
SStateKeys stateKey;
|
||||
int32_t tsSlotId; // primary timestamp column slot id
|
||||
STimeWindowAggSupp twAggSup;
|
||||
const SNode* pCondition;
|
||||
const SNode* pCondition;
|
||||
} SStateWindowOperatorInfo;
|
||||
|
||||
typedef struct SSortOperatorInfo {
|
||||
|
@ -894,8 +897,8 @@ void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
|||
|
||||
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
SDiskbasedBuf* pBuf);
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
SDiskbasedBuf* pBuf);
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
SDiskbasedBuf* pBuf);
|
||||
|
||||
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||
|
@ -964,7 +967,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
|
||||
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
@ -991,8 +994,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
@ -1044,20 +1047,21 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
|||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
|
||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
|
||||
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
|
||||
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
||||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
|
||||
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
||||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
|
||||
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
|
||||
uint64_t* pGp, void* pTbName);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
|
||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
|
@ -1078,8 +1082,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
|
|||
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
|
||||
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
|
||||
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
|
||||
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
||||
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
|
||||
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
||||
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
||||
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
|
||||
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||
|
|
|
@ -643,12 +643,114 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
|
||||
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
pTaskInfo->streamInfo.recoverStartVer = startVer;
|
||||
pTaskInfo->streamInfo.recoverEndVer = endVer;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
|
||||
pTaskInfo->streamInfo.recoverStartVer = 0;
|
||||
pTaskInfo->streamInfo.recoverEndVer = ver;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
pTaskInfo->streamInfo.recoverStartVer = pTaskInfo->streamInfo.recoverEndVer;
|
||||
pTaskInfo->streamInfo.recoverEndVer = ver;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
|
||||
while (1) {
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
|
||||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
pInfo->twAggSup.deleteMark = INT64_MAX;
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
|
||||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
pInfo->twAggSup.deleteMark = INT64_MAX;
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
|
||||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
pInfo->twAggSup.deleteMark = INT64_MAX;
|
||||
}
|
||||
|
||||
// iterate operator tree
|
||||
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
|
||||
if (pOperator->numOfDownstream > 1) {
|
||||
qError("unexpected stream, multiple downstream");
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
|
||||
while (1) {
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
|
||||
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
|
||||
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
|
||||
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
|
||||
}
|
||||
|
||||
// iterate operator tree
|
||||
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
|
||||
if (pOperator->numOfDownstream > 1) {
|
||||
qError("unexpected stream, multiple downstream");
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -3426,7 +3426,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
|
||||
bool groupbyTbname(SNodeList* pGroupList) {
|
||||
bool bytbname = false;
|
||||
if (LIST_LENGTH(pGroupList) > 0) {
|
||||
if (LIST_LENGTH(pGroupList) == 1) {
|
||||
SNode* p = nodesListGetNode(pGroupList, 0);
|
||||
if (p->type == QUERY_NODE_FUNCTION) {
|
||||
// partition by tbname/group by tbname
|
||||
|
@ -3947,8 +3947,11 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
goto _complete;
|
||||
}
|
||||
|
||||
if (pHandle && pHandle->pStateBackend) {
|
||||
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
|
||||
if (pHandle) {
|
||||
/*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
|
||||
if (pHandle->pStateBackend) {
|
||||
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
|
||||
}
|
||||
}
|
||||
|
||||
(*pTaskInfo)->sql = sql;
|
||||
|
|
|
@ -988,8 +988,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
|
||||
destroyBlockDistScanOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
@ -1771,11 +1771,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
#endif
|
||||
|
||||
#if 1
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
|
||||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
||||
pTSInfo->cond.startVersion = -1;
|
||||
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
|
||||
pTSInfo->cond.startVersion = -1;
|
||||
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||
} else {
|
||||
pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
|
||||
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
|
||||
}
|
||||
pTSInfo->scanTimes = 0;
|
||||
pTSInfo->currentGroupId = -1;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
||||
|
@ -2229,7 +2235,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->pGroupTags = pTableScanNode->pGroupTags;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code =
|
||||
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
|
||||
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
||||
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
||||
|
@ -2287,7 +2294,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pTSInfo->dataReader = NULL;
|
||||
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2353,8 +2361,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -3922,8 +3929,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -4035,7 +4041,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
int32_t num = 0;
|
||||
int32_t numOfExprs = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||
int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code =
|
||||
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4058,8 +4065,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -4555,7 +4561,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
||||
&pInfo->matchInfo);
|
||||
|
||||
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -1777,11 +1777,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
|||
}
|
||||
|
||||
SInterval interval = {.interval = pPhyNode->interval,
|
||||
.sliding = pPhyNode->sliding,
|
||||
.intervalUnit = pPhyNode->intervalUnit,
|
||||
.slidingUnit = pPhyNode->slidingUnit,
|
||||
.offset = pPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
|
||||
.sliding = pPhyNode->sliding,
|
||||
.intervalUnit = pPhyNode->intervalUnit,
|
||||
.slidingUnit = pPhyNode->slidingUnit,
|
||||
.offset = pPhyNode->offset,
|
||||
.precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
|
||||
|
||||
STimeWindowAggSupp as = {
|
||||
.waterMark = pPhyNode->window.watermark,
|
||||
|
@ -1832,7 +1832,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2638,7 +2639,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL);
|
||||
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
|
@ -2708,8 +2710,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
|
||||
destroyStateWindowOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2782,8 +2784,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||
destroySWindowOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4257,9 +4259,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
|
||||
NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL,
|
||||
destroyStreamSessionAggOperatorInfo, NULL);
|
||||
if (downstream) {
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
||||
pInfo->primaryTsIndex);
|
||||
|
@ -4404,9 +4405,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
|||
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||
pOperator->name = "StreamSessionSemiAggOperator";
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||
destroyStreamSessionAggOperatorInfo, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||
destroyStreamSessionAggOperatorInfo, NULL);
|
||||
}
|
||||
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
|
@ -4739,6 +4739,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
};
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
@ -4774,8 +4775,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
|
||||
destroyStreamStateOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, NULL);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
||||
pInfo->primaryTsIndex);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -5051,8 +5052,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = miaInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
|
||||
destroyMAIOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMAIOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5363,8 +5364,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pMergeIntervalInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||
destroyMergeIntervalOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5528,6 +5529,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
};
|
||||
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->interval = interval;
|
||||
pInfo->twAggSup = twAggSupp;
|
||||
|
@ -5595,9 +5597,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL,
|
||||
destroyStreamFinalIntervalOperatorInfo, NULL);
|
||||
|
||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -32,7 +32,7 @@ typedef struct {
|
|||
|
||||
static SStreamGlobalEnv streamEnv;
|
||||
|
||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
||||
// int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
||||
|
||||
int32_t streamDispatch(SStreamTask* pTask);
|
||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||
|
|
|
@ -210,6 +210,46 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||
SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
int32_t tlen;
|
||||
tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
msg.contLen = tlen + sizeof(SMsgHead);
|
||||
msg.pCont = buf;
|
||||
msg.msgType = TDMT_VND_STREAM_RECOVER_FINISH;
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
code = 0;
|
||||
return 0;
|
||||
FAIL:
|
||||
if (buf) rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
|
@ -244,9 +284,10 @@ int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq,
|
|||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
code = 0;
|
||||
FAIL:
|
||||
if (code < 0 && buf) rpcFreeCont(buf);
|
||||
return 0;
|
||||
FAIL:
|
||||
if (buf) rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||
|
@ -439,3 +480,4 @@ FREE:
|
|||
taosFreeQitem(pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,54 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
void* exec = pTask->exec.executor;
|
||||
|
||||
while (1) {
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t batchCnt = 0;
|
||||
while (1) {
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
|
||||
SSDataBlock block = {0};
|
||||
assignOneDataBlock(&block, output);
|
||||
block.info.childId = pTask->selfChildId;
|
||||
taosArrayPush(pRes, &block);
|
||||
|
||||
if (++batchCnt >= batchSz) break;
|
||||
}
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
taosArrayDestroy(pRes);
|
||||
break;
|
||||
}
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
if (qRes == NULL) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||
qRes->blocks = pRes;
|
||||
streamTaskOutput(pTask, qRes);
|
||||
// TODO stream sched dispatch
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
|
||||
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
|
||||
|
||||
|
@ -144,6 +192,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
|
|||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t streamExecForAll(SStreamTask* pTask) {
|
||||
while (1) {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "streamInc.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
|
||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||
if (pMeta == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -86,7 +86,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
taosMemoryFree(pMeta);
|
||||
}
|
||||
|
||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
|
||||
#if 0
|
||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
|
@ -99,7 +100,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
|||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
ASSERT(0);
|
||||
goto FAIL;
|
||||
}
|
||||
|
@ -114,26 +115,20 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
if (pTask->fillHistory) {
|
||||
// pipeline exec
|
||||
// if finished, dispatch a stream-prepare-finished msg to downstream task
|
||||
// set status normal
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask) tFreeSStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
#if 1
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
|
||||
void* buf = NULL;
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
return -1;
|
||||
}
|
||||
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||
|
||||
int32_t len;
|
||||
int32_t code;
|
||||
|
@ -141,20 +136,24 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
buf = taosMemoryCalloc(1, sizeof(len));
|
||||
buf = taosMemoryCalloc(1, len);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosMemoryFree(buf);
|
||||
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
@ -269,7 +268,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
|||
tDecodeSStreamTask(&decoder, pTask);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, -1) < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
|
|
|
@ -15,6 +15,111 @@
|
|||
|
||||
#include "streamInc.h"
|
||||
|
||||
// common
|
||||
int32_t streamSetParamForRecover(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.executor;
|
||||
return qStreamSetParamForRecover(exec);
|
||||
}
|
||||
int32_t streamRestoreParam(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.executor;
|
||||
return qStreamRestoreParam(exec);
|
||||
}
|
||||
int32_t streamSetStatusNormal(SStreamTask* pTask) {
|
||||
pTask->taskStatus = TASK_STATUS__NORMAL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// source
|
||||
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) {
|
||||
void* exec = pTask->exec.executor;
|
||||
return qStreamSourceRecoverStep1(exec, ver);
|
||||
}
|
||||
|
||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
|
||||
pReq->streamId = pTask->streamId;
|
||||
pReq->taskId = pTask->taskId;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
|
||||
//
|
||||
return streamScanExec(pTask, 100);
|
||||
// TODO next: dispatch msg to launch scan step2
|
||||
}
|
||||
|
||||
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
|
||||
pReq->streamId = pTask->streamId;
|
||||
pReq->taskId = pTask->taskId;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
|
||||
void* exec = pTask->exec.executor;
|
||||
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
return streamScanExec(pTask, 100);
|
||||
}
|
||||
|
||||
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
|
||||
SStreamRecoverFinishReq req = {
|
||||
.streamId = pTask->streamId,
|
||||
.taskId = pTask->taskId,
|
||||
.childId = pTask->selfChildId,
|
||||
};
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// agg
|
||||
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.executor;
|
||||
if (qStreamSetParamForRecover(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.executor;
|
||||
if (qStreamRestoreParam(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (qStreamRecoverFinish(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
streamSetStatusNormal(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
||||
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1);
|
||||
ASSERT(left >= 0);
|
||||
if (left == 0) {
|
||||
streamAggChildrenRecoverFinish(pTask);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
#if 0
|
||||
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -132,6 +237,7 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) {
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||
|
@ -174,6 +280,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
#if 0
|
||||
|
@ -269,6 +376,7 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) {
|
||||
int32_t taskId = pVgInfo->taskId;
|
||||
int32_t nodeId = pVgInfo->vgId;
|
||||
|
@ -339,7 +447,9 @@ int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
|
||||
// if failed, set timer and retry
|
||||
// if successful
|
||||
|
@ -430,3 +540,4 @@ int32_t streamRecoverTask(SStreamTask* pTask) {
|
|||
//
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -65,11 +65,6 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walPreRefVer(SWalRef *pRef, int64_t ver) {
|
||||
pRef->refVer = ver;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void walUnrefVer(SWalRef *pRef) {
|
||||
pRef->refId = -1;
|
||||
pRef->refFile = -1;
|
||||
|
|
Loading…
Reference in New Issue