refactor(stream): recover and fill history

This commit is contained in:
Liu Jicong 2022-10-26 00:31:00 +08:00
parent 11f153cf1a
commit 75e5e490fd
23 changed files with 603 additions and 140 deletions

View File

@ -161,6 +161,7 @@ typedef enum EStreamType {
STREAM_RETRIEVE,
STREAM_PULL_DATA,
STREAM_PULL_OVER,
STREAM_FILL_OVER,
} EStreamType;
typedef struct {

View File

@ -198,6 +198,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)

View File

@ -53,7 +53,8 @@ typedef struct {
void* sContext; // SSnapContext*
void* pStateBackend;
void* pStateBackend;
int64_t fillHistoryVer1;
} SReadHandle;
// in queue mode, data streams are seperated by msg
@ -176,6 +177,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
@ -207,9 +209,11 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
#ifdef __cplusplus
}

View File

@ -47,7 +47,9 @@ enum {
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__RECOVER_DOWNSTREAM,
TASK_STATUS__RECOVER_SELF,
TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
};
enum {
@ -329,6 +331,9 @@ typedef struct SStreamTask {
// state backend
SStreamState* pState;
// do not serialize
int32_t recoverWaitingChild;
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -435,6 +440,11 @@ typedef struct {
int32_t rspToTaskId;
} SStreamRetrieveRsp;
typedef struct {
int64_t streamId;
int32_t taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
#if 0
typedef struct {
int64_t streamId;
@ -521,8 +531,29 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
// recover and fill history
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
// agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
// expand and deploy
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
@ -533,12 +564,13 @@ typedef struct SStreamMeta {
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
int32_t vgId;
} SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
// int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);

View File

@ -33,16 +33,16 @@ extern "C" {
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
// clang-format on
#define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
typedef enum {
@ -204,7 +204,6 @@ SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *);
void walCloseRef(SWal *pWal, int64_t refId);
int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walPreRefVer(SWalRef *pRef, int64_t ver);
void walUnrefVer(SWalRef *);
// helper function for raft

View File

@ -1732,6 +1732,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId);
return NULL;
}

View File

@ -181,15 +181,15 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t ver);
char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
#ifdef __cplusplus
}

View File

@ -98,7 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0);
}
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
if (pTq->pStreamMeta == NULL) {
ASSERT(0);
}
@ -872,7 +872,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
return 0;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
@ -891,6 +891,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->startVer = ver;
// expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
@ -903,9 +905,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistoryVer1 = pTask->fillHistory ? ver : -1,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
if (pTask->fillHistory) {
pTask->taskStatus = TASK_STATUS__RECOVER_PREPARE;
}
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
@ -945,8 +952,148 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
int32_t code;
#if 0
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
if (code < 0) return code;
#endif
// 1.deserialize msg and build task
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeSStreamTask(&decoder, pTask);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
// 2.save task
code = streamMetaAddTask(pTq->pStreamMeta, version, pTask);
if (code < 0) {
return -1;
}
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
streamSetParamForRecover(pTask);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
streamSourceRecoverPrepareStep1(pTask, version);
SStreamRecoverStep1Req req;
streamBuildSourceRecover1Req(pTask, &req);
void* serialziedReq = (void*)&req;
int32_t len = sizeof(SStreamRecoverStep1Req);
SRpcMsg rpcMsg = {
.contLen = len,
.pCont = serialziedReq,
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
// do nothing
}
}
return 0;
}
int32_t tqProcessTaskRecover1Req(STQ* pTq, char* msg, int32_t msgLen) {
int32_t code;
SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
return -1;
}
// check param
int64_t fillVer1 = pTask->startVer;
if (fillVer1 <= 0) {
ASSERT(0);
return -1;
}
// do recovery step 1
streamSourceRecoverScanStep1(pTask);
// build msg to launch next step
SStreamRecoverStep2Req req;
code = streamBuildSourceRecover2Req(pTask, &req);
if (code < 0) {
return -1;
}
// serialize msg
int32_t len = sizeof(SStreamRecoverStep2Req);
void* serializedReq = (void*)&req;
// dispatch msg
SRpcMsg rpcMsg = {
.code = 0,
.contLen = len,
.msgType = TDMT_VND_STREAM_RECOVER_STEP2,
.pCont = (void*)serializedReq,
};
tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
int32_t code;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
return -1;
}
// do recovery step 2
code = streamSourceRecoverScanStep2(pTask, version);
if (code < 0) {
return -1;
}
// restore param
code = streamRestoreParam(pTask);
if (code < 0) {
return -1;
}
// set status normal
code = streamSetStatusNormal(pTask);
if (code < 0) {
return -1;
}
// dispatch recover finish req to all related downstream task
code = streamDispatchRecoverFinishReq(pTask);
if (code < 0) {
return -1;
}
return 0;
}
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t code;
// deserialize
// find task
// do process request
//
return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
return 0;
}
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
@ -1081,6 +1228,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) continue;
qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

View File

@ -85,11 +85,11 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("tmq task start to execute");
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
tqDebug("tmq task executed, get %p", pDataBlock);
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
if (pDataBlock == NULL) {
break;

View File

@ -22,10 +22,10 @@ struct STqOffsetStore {
SHashObj* pHash; // SHashObj<subscribeKey, offset>
};
char* tqOffsetBuildFName(const char* path, int32_t ver) {
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
int32_t len = strlen(path);
char* fname = taosMemoryCalloc(1, len + 40);
snprintf(fname, len + 40, "%s/offset-ver%d", path, ver);
snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
return fname;
}

View File

@ -525,7 +525,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
if (pReader->tbIdHash) {
taosHashClear(pReader->tbIdHash);
} else {
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
}
if (pReader->tbIdHash == NULL) {
@ -543,7 +543,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
if (pReader->tbIdHash == NULL) {
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (pReader->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;

View File

@ -530,6 +530,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
taosArrayDestroy(tagArray);
}
#if 0
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
@ -585,3 +586,4 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
#endif

View File

@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H
@ -140,18 +139,19 @@ typedef struct STaskIdInfo {
enum {
STREAM_RECOVER_STEP__NONE = 0,
STREAM_RECOVER_STEP__PREPARE,
STREAM_RECOVER_STEP__PREPARE1,
STREAM_RECOVER_STEP__PREPARE2,
STREAM_RECOVER_STEP__SCAN,
};
typedef struct {
// TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
int64_t snapshotVer;
const SSubmitReq* pReq;
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
int64_t snapshotVer;
const SSubmitReq* pReq;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
@ -164,7 +164,10 @@ typedef struct {
int64_t recoverEndVer;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
SStreamState* pState;
int8_t triggerSaved;
int64_t deleteMarkSaved;
SStreamState* pState;
} SStreamTaskInfo;
typedef struct {
@ -192,7 +195,7 @@ typedef struct SExecTaskInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
SLocalFetch localFetch;
} SExecTaskInfo;
enum {
@ -444,8 +447,8 @@ typedef struct SStreamAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock;
SStreamState* pState;
int64_t gap; // stream session window gap
SqlFunctionCtx* pDummyCtx; // for combine
int64_t gap; // stream session window gap
SqlFunctionCtx* pDummyCtx; // for combine
SSHashObj* pResultRows;
int32_t stateKeySize;
int16_t stateKeyType;
@ -542,9 +545,9 @@ typedef struct {
} SStreamRawScanInfo;
typedef struct SSysTableIndex {
int8_t init;
SArray *uids;
int32_t lastIdx;
int8_t init;
SArray* uids;
int32_t lastIdx;
} SSysTableIndex;
typedef struct SSysTableScanInfo {
@ -559,7 +562,7 @@ typedef struct SSysTableScanInfo {
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store.
SSysTableIndex* pIdx; // idx for local table meta
SSysTableIndex* pIdx; // idx for local table meta
SArray* scanCols; // SArray<int16_t> scan column id list
SName name;
SSDataBlock* pRes;
@ -603,7 +606,7 @@ typedef struct SIntervalAggOperatorInfo {
typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
// bool hasGroupId;
// bool hasGroupId;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
@ -613,21 +616,21 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
typedef struct SStreamIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindowAggSupp twAggSup;
bool invertible;
bool ignoreExpiredData;
SArray* pDelWins; // SWinRes
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SPhysiNode* pPhyNode; // create new child
SPhysiNode* pPhyNode; // create new child
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool isFinal;
@ -695,9 +698,9 @@ typedef struct SGroupbyOperatorInfo {
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
} SGroupbyOperatorInfo;
@ -748,9 +751,9 @@ typedef struct SSessionAggOperatorInfo {
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
} SResultWindowInfo;
typedef struct SStateWindowInfo {
@ -761,20 +764,20 @@ typedef struct SStateWindowInfo {
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SSHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
@ -783,7 +786,7 @@ typedef struct SStreamSessionAggOperatorInfo {
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
@ -791,7 +794,7 @@ typedef struct SStreamStateAggOperatorInfo {
SSDataBlock* pDelRes;
SSHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
} SStreamStateAggOperatorInfo;
@ -811,18 +814,18 @@ typedef struct SStreamPartitionOperatorInfo {
typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup;
SSDataBlock* pRes;
SSDataBlock* pSrcBlock;
int32_t srcRowIndex;
SSDataBlock* pPrevSrcBlock;
SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex;
SSDataBlock* pDelRes;
SNode* pCondition;
SArray* pColMatchColInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
SSDataBlock* pRes;
SSDataBlock* pSrcBlock;
int32_t srcRowIndex;
SSDataBlock* pPrevSrcBlock;
SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex;
SSDataBlock* pDelRes;
SNode* pCondition;
SArray* pColMatchColInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
} SStreamFillOperatorInfo;
typedef struct STimeSliceOperatorInfo {
@ -855,7 +858,7 @@ typedef struct SStateWindowOperatorInfo {
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
const SNode* pCondition;
const SNode* pCondition;
} SStateWindowOperatorInfo;
typedef struct SSortOperatorInfo {
@ -913,8 +916,8 @@ void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
@ -983,7 +986,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
@ -1010,8 +1013,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
@ -1063,20 +1066,21 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
@ -1097,8 +1101,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);

View File

@ -233,7 +233,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
if (pListInfo->map == NULL) {
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
}
// traverse to the stream scanner node to add this table id
@ -660,12 +660,114 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
}
#endif
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStartVer = startVer;
pTaskInfo->streamInfo.recoverEndVer = endVer;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
pTaskInfo->streamInfo.recoverStartVer = 0;
pTaskInfo->streamInfo.recoverEndVer = ver;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
return 0;
}
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStartVer = pTask->streamInfo.recoverEndVer;
pTaskInfo->streamInfo.recoverEndVer = ver;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
return 0;
}
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
return 0;
}
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
}
// iterate operator tree
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream");
ASSERT(0);
return -1;
}
return 0;
} else {
pOperator = pOperator->pDownstream[0];
}
}
return 0;
}
int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
}
// iterate operator tree
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream");
ASSERT(0);
return -1;
}
return 0;
} else {
pOperator = pOperator->pDownstream[0];
}
}
return 0;
}

View File

@ -3479,7 +3479,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;
if (LIST_LENGTH(pGroupList) > 0) {
if (LIST_LENGTH(pGroupList) == 1) {
SNode* p = nodesListGetNode(pGroupList, 0);
if (p->type == QUERY_NODE_FUNCTION) {
// partition by tbname/group by tbname
@ -3495,7 +3495,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TDB_CODE_SUCCESS;
}
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -4002,8 +4002,11 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete;
}
if (pHandle && pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
if (pHandle) {
(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;
if (pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
}
}
(*pTaskInfo)->sql = sql;

View File

@ -1770,11 +1770,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
#endif
#if 1
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
pTSInfo->cond.startVersion = -1;
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->cond.startVersion = -1;
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
} else {
pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
}
pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
@ -2286,7 +2292,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
ASSERT(0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
@ -2322,6 +2329,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} else {
taosArrayDestroy(pColIds);
}
pTaskInfo->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;
// create the pseduo columns info
if (pTableScanNode->scan.pScanPseudoCols != NULL) {

View File

@ -1776,11 +1776,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
}
SInterval interval = {.interval = pPhyNode->interval,
.sliding = pPhyNode->sliding,
.intervalUnit = pPhyNode->intervalUnit,
.slidingUnit = pPhyNode->slidingUnit,
.offset = pPhyNode->offset,
.precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
.sliding = pPhyNode->sliding,
.intervalUnit = pPhyNode->intervalUnit,
.slidingUnit = pPhyNode->slidingUnit,
.offset = pPhyNode->offset,
.precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
STimeWindowAggSupp as = {
.waterMark = pPhyNode->window.watermark,
@ -4235,6 +4235,13 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
.minTs = INT64_MAX,
};
if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) {
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
@ -4741,6 +4748,14 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
};
if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) {
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
SExprSupp* pSup = &pOperator->exprSupp;
@ -5530,6 +5545,14 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
};
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) {
pTaskInfo->streamInfo.triggerSaved = twAggSupp.calTrigger;
pTaskInfo->streamInfo.deleteMarkSaved = twAggSupp.deleteMark;
twAggSupp.calTrigger = STREAM_TRIGGER_AT_ONCE;
twAggSupp.deleteMark = INT64_MAX;
}
pOperator->pTaskInfo = pTaskInfo;
pInfo->interval = interval;
pInfo->twAggSup = twAggSupp;

View File

@ -32,7 +32,7 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
// int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);

View File

@ -439,3 +439,8 @@ FREE:
taosFreeQitem(pBlock);
return code;
}
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
//
return 0;
}

View File

@ -85,6 +85,54 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
return 0;
}
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.executor;
while (1) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t batchCnt = 0;
while (1) {
SSDataBlock* output = NULL;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0);
}
if (output == NULL) break;
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
if (++batchCnt >= batchSz) break;
}
if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
break;
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
streamTaskOutput(pTask, qRes);
// TODO stream sched dispatch
}
return 0;
}
#if 0
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
@ -144,6 +192,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0;
}
#endif
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {

View File

@ -17,7 +17,7 @@
#include "streamInc.h"
#include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -86,7 +86,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosMemoryFree(pMeta);
}
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
@ -99,7 +100,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
}
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
ASSERT(0);
goto FAIL;
}
@ -114,26 +115,20 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
goto FAIL;
}
if (pTask->fillHistory) {
// pipeline exec
// if finished, dispatch a stream-prepare-finished msg to downstream task
// set status normal
}
return 0;
FAIL:
if (pTask) tFreeSStreamTask(pTask);
return -1;
}
#endif
#if 0
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 1
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
void* buf = NULL;
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
return -1;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
int32_t len;
int32_t code;
@ -149,12 +144,15 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
SEncoder encoder;
tEncoderInit(&encoder, buf, len);
tEncodeSStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
ASSERT(0);
return -1;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
return 0;
}
#endif
@ -269,7 +267,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
tDecodeSStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, -1) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);

View File

@ -15,6 +15,90 @@
#include "streamInc.h"
// common
int32_t streamSetParamForRecover(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
return qStreamSetParamForRecover(exec);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
return qStreamRestoreParam(exec);
}
int32_t streamSetStatusNormal(SStreamTask* pTask) {
pTask->taskStatus = TASK_STATUS__NORMAL;
return 0;
}
// source
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.executor;
return qStreamSourceRecoverStep1(exec, ver);
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
return 0;
}
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
//
return streamScanExec(pTask, 100);
// TODO next: dispatch msg to launch scan step2
}
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
return 0;
}
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.executor;
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
ASSERT(0);
}
return streamScanExec(pTask, 100);
}
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
/*SStreamFillFinish*/
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
}
return 0;
}
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
if (qStreamSetParamForRecover(exec) < 0) {
return -1;
}
pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo);
return 0;
}
int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
if (qStreamRestoreParam(exec) < 0) {
return -1;
}
if (qStreamRecoverFinish(exec) < 0) {
return -1;
}
return 0;
}
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1);
ASSERT(left >= 0);
if (left == 0) {
streamAggChildrenRecoverFinish(pTask);
}
return 0;
}
#if 0
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
@ -340,6 +424,7 @@ int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
#if 0
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
// if failed, set timer and retry
// if successful
@ -430,3 +515,4 @@ int32_t streamRecoverTask(SStreamTask* pTask) {
//
return 0;
}
#endif

View File

@ -65,11 +65,6 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
return 0;
}
int32_t walPreRefVer(SWalRef *pRef, int64_t ver) {
pRef->refVer = ver;
return 0;
}
void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1;
pRef->refFile = -1;