fix(stream): set the correct initial checkpoint version to restore the operators state and add check for the initial destination tables.
This commit is contained in:
parent
a3b02a80c0
commit
1350af5267
|
@ -179,6 +179,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
|||
int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||
|
||||
// tq util
|
||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
|
||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
||||
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
// 2: wait to be inited or cleaup
|
||||
#define WAL_READ_TASKS_ID (-1)
|
||||
|
||||
static int32_t tqInitialize(STQ* pTq);
|
||||
|
||||
int32_t tqInit() {
|
||||
int8_t old;
|
||||
while (1) {
|
||||
|
@ -109,25 +111,30 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
|
||||
|
||||
tqInitialize(pVnode->pTq);
|
||||
return pTq;
|
||||
}
|
||||
|
||||
int32_t tqInitialize(STQ* pTq) {
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
return NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTq->pOffsetStore = tqOffsetOpen(pTq);
|
||||
if (pTq->pOffsetStore == NULL) {
|
||||
return NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
|
||||
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
return NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) {
|
||||
return NULL;
|
||||
if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return pTq;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tqClose(STQ* pTq) {
|
||||
|
@ -547,13 +554,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||
// todo extract method
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId);
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
pTask->id.idStr = taosStrdup(buf);
|
||||
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
||||
pTask->refCnt = 1;
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->inputQueue = streamQueueOpen();
|
||||
|
@ -567,7 +570,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||
pTask->pMeta = pTq->pStreamMeta;
|
||||
pTask->chkInfo.version = ver;
|
||||
|
||||
// expand executor
|
||||
if (pTask->fillHistory) {
|
||||
|
@ -633,8 +635,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
}
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
|
||||
|
||||
// next valid version will add one
|
||||
pTask->chkInfo.version += 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -52,9 +52,6 @@ int tqStreamTasksScanWal(STQ* pTq) {
|
|||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);
|
||||
|
||||
// restore wal scan flag
|
||||
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -99,8 +96,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
|
||||
pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||
int8_t status = pTask->status.taskStatus;
|
||||
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
|
||||
pTask->status.taskStatus);
|
||||
continue;
|
||||
|
|
|
@ -19,6 +19,12 @@
|
|||
|
||||
static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp);
|
||||
|
||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%" PRIx64 "-%d", streamId, taskId);
|
||||
return taosStrdup(buf);
|
||||
}
|
||||
|
||||
// stream_task:stream_id:task_id
|
||||
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
|
||||
int32_t n = 12;
|
||||
|
|
|
@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
walApplyVer(pVnode->pWal, commitIdx);
|
||||
|
||||
pVnode->restored = true;
|
||||
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
|
||||
vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId);
|
||||
|
||||
// start to restore all stream tasks
|
||||
tqStartStreamTasks(pVnode->pTq);
|
||||
|
|
|
@ -108,6 +108,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList);
|
|||
uint64_t tableListGetSuid(const STableListInfo* pTableList);
|
||||
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
|
||||
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
|
||||
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type);
|
||||
|
||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||
|
|
|
@ -427,7 +427,6 @@ typedef struct STimeWindowAggSupp {
|
|||
} STimeWindowAggSupp;
|
||||
|
||||
typedef struct SStreamScanInfo {
|
||||
uint64_t tableUid; // queried super table uid
|
||||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
SExprSupp tbnameCalSup;
|
||||
|
|
|
@ -36,6 +36,7 @@ struct STableListInfo {
|
|||
SArray* pTableList;
|
||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||
uint64_t suid;
|
||||
int32_t tableType; // queried table type
|
||||
};
|
||||
|
||||
typedef struct tagFilterAssist {
|
||||
|
@ -1026,14 +1027,17 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
size_t numOfTables = 0;
|
||||
|
||||
pListInfo->suid = pScanNode->suid;
|
||||
pListInfo->tableType = pScanNode->tableType;
|
||||
|
||||
SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
|
||||
|
||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
||||
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
|
||||
pListInfo->suid = pScanNode->uid;
|
||||
|
||||
if (metaIsTableExist(metaHandle, pScanNode->uid)) {
|
||||
taosArrayPush(pUidList, &pScanNode->uid);
|
||||
}
|
||||
|
||||
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, metaHandle, status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
|
@ -1819,6 +1823,11 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st
|
|||
return -1;
|
||||
}
|
||||
|
||||
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type) {
|
||||
*psuid = pTableList->suid;
|
||||
*type = pTableList->tableType;
|
||||
}
|
||||
|
||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
||||
ASSERT(pTableList->map != NULL && slot != NULL);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "executor.h"
|
||||
#include <vnode.h>
|
||||
#include "executorimpl.h"
|
||||
#include "planner.h"
|
||||
#include "tdatablock.h"
|
||||
|
@ -327,6 +328,13 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
return qa;
|
||||
}
|
||||
|
||||
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
||||
|
||||
uint64_t suid = 0;
|
||||
int32_t type = 0;
|
||||
tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &type);
|
||||
int32_t numOfExisted = tableListGetSize(pTableScanInfo->base.pTableListInfo);
|
||||
|
||||
// let's discard the tables those are not created according to the queried super table.
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
||||
|
@ -341,9 +349,21 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
|
||||
tDecoderClear(&mr.coder);
|
||||
|
||||
// TODO handle ntb case
|
||||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
|
||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||
continue;
|
||||
} else {
|
||||
if (type == TSDB_SUPER_TABLE) {
|
||||
// this new created child table does not belong to the scanned super table.
|
||||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
|
||||
continue;
|
||||
}
|
||||
} else { // ordinary table
|
||||
// In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
|
||||
// should check all newly created ordinary table to make sure that this table isn't the destination table.
|
||||
if (mr.me.uid != suid) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pScanInfo->pTagCond != NULL) {
|
||||
|
@ -382,7 +402,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
|
|||
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||
|
||||
if (isAdd) { // add new table id
|
||||
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
||||
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, id);
|
||||
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
|
||||
qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
|
||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||
|
|
|
@ -2441,7 +2441,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
}
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
|
||||
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
||||
blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
|
||||
|
|
|
@ -217,8 +217,8 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
|
||||
int32_t queueNum = taosGetQueueNumber(pool->qset);
|
||||
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
|
||||
if (dstWorkerNum < 2) dstWorkerNum = 2;
|
||||
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
|
||||
if (dstWorkerNum < 1) dstWorkerNum = 1;
|
||||
|
||||
// spawn a thread to process queue
|
||||
while (curWorkerNum < dstWorkerNum) {
|
||||
|
|
Loading…
Reference in New Issue