refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-11-15 16:03:23 +08:00
parent 3907268f40
commit b712f5e7dc
3 changed files with 23 additions and 88 deletions

View File

@ -150,19 +150,6 @@ typedef struct {
int32_t colNum;
} SMetaStbStats;
// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
// bool tqCurrentBlockConsumed(const STqReader* pReader);
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t
// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext
// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx);
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
typedef struct TsdReader {
@ -197,27 +184,6 @@ typedef struct SStoreCacheReader {
// clang-format on
/*------------------------------------------------------------------------------------------------------------------*/
/*
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
bool tqCurrentBlockConsumed(const STqReader* pReader);
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr);
STqReader *tqReaderOpen(void *pVnode);
void tqReaderClose(STqReader *);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
SWalReader* tqGetWalReader(STqReader* pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
*/
// todo rename
typedef struct SStoreTqReader {
struct STqReader* (*tqReaderOpen)();
@ -281,28 +247,18 @@ typedef struct SStoreMeta {
void* (*storeGetIndexInfo)();
void* (*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(
void* pVnode, int64_t suid,
SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list);
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
void* storeGetVersionRange;
void* storeGetLastTimestamp;
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables);
// db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)(
void* pVnode, int64_t uid, int64_t* numOfTables,
int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode);
/**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);

View File

@ -1136,16 +1136,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
int64_t ts = taosGetTimestampMs();
pTask->execInfo.step1Start = ts;
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
// NOTE: in case of stream task, scan-history data in wal is not allowed to pause
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",

View File

@ -269,6 +269,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
taosMsleep(100);
} else {
// no active event trans exists, handle this event directly
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event));
@ -451,60 +452,43 @@ int32_t initStateTransferTable() {
return TSDB_CODE_SUCCESS;
}
//clang-format off
void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT,
streamTaskInitStatus, onNormalTaskReady, false, false);
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL,
NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL,
NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal,
&info, true);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
streamTaskDoCheckpoint, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans =
createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
// pause & resume related event handle
@ -571,4 +555,5 @@ void doInitStateTransferTable(void) {
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
}
}
//clang-format on