diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index d5f1da957d..045f2bad70 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -150,19 +150,6 @@ typedef struct { int32_t colNum; } SMetaStbStats; -// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -// bool tqCurrentBlockConsumed(const STqReader* pReader); -// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -// bool tqNextBlockInWal(STqReader* pReader, const char* idstr); -// bool tqNextBlockImpl(STqReader *pReader, const char* idstr); -// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t -// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext -// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx); - // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef struct TsdReader { @@ -197,27 +184,6 @@ typedef struct SStoreCacheReader { // clang-format on /*------------------------------------------------------------------------------------------------------------------*/ -/* -void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -bool tqCurrentBlockConsumed(const STqReader* pReader); - -int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -bool tqNextBlockInWal(STqReader* pReader, const char* idstr); -bool tqNextBlockImpl(STqReader *pReader, const char* idstr); - - int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr); -STqReader *tqReaderOpen(void *pVnode); -void tqReaderClose(STqReader *); - -int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); -bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -SWalReader* tqGetWalReader(STqReader* pReader); -int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); -*/ // todo rename typedef struct SStoreTqReader { struct STqReader* (*tqReaderOpen)(); @@ -281,28 +247,18 @@ typedef struct SStoreMeta { void* (*storeGetIndexInfo)(); void* (*getInvertIndex)(void* pVnode); - int32_t (*getChildTableList)( - void* pVnode, int64_t suid, - SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList + // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] + int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list); + int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); void* storeGetVersionRange; void* storeGetLastTimestamp; int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema + int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); + void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); - // db name, vgId, numOfTables, numOfSTables - int32_t (*getNumOfChildTables)( - void* pVnode, int64_t uid, int64_t* numOfTables, - int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); - void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, - int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & - // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); int64_t (*getNumOfRowsInMem)(void* pVnode); - /** -int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); -int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); -int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); - */ + SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6dbeaef6cb..2de6205804 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1136,16 +1136,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // let's decide which step should be executed now if (pTask->execInfo.step1Start == 0) { - ASSERT(pTask->status.pauseAllowed == false); int64_t ts = taosGetTimestampMs(); pTask->execInfo.step1Start = ts; tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts); - - // NOTE: in case of stream task, scan-history data in wal is not allowed to pause - if (pTask->info.fillHistory == 1) { - streamTaskEnablePause(pTask); - } } else { if (pTask->execInfo.step2Start == 0) { tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs", diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 04b449aaaf..1c951e1452 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -269,6 +269,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); taosMsleep(100); } else { + // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); @@ -451,60 +452,43 @@ int32_t initStateTransferTable() { return TSDB_CODE_SUCCESS; } +//clang-format off void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, - streamTaskInitStatus, onNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, - streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, - streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, - NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, - NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, &info, true); - taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, - &info, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, - streamTaskDoCheckpoint, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = - createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // pause & resume related event handle @@ -571,4 +555,5 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); -} \ No newline at end of file +} +//clang-format on \ No newline at end of file