fix(stream): set the correct initial offset value.

This commit is contained in:
Haojun Liao 2023-04-18 08:56:31 +08:00
parent 930b267a75
commit 2ae05b8cf6
2 changed files with 6 additions and 3 deletions

View File

@ -130,6 +130,8 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
// the version is kept in task's meta data
// todo check if this version is required or not
if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
return -1;
}
@ -554,7 +556,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
return 0;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
@ -570,6 +572,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.version = ver;
// expand executor
if (pTask->fillHistory) {
@ -755,7 +758,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tDecoderClear(&decoder);
// 2.save task
// 2.save task, use the newest commit version as the initial start version of stream task.
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,

View File

@ -27,7 +27,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
self.dbname = 'db_test'
self.setsql = TDSetSql()
self.stbname = 'stb'