chore: merge 3.0

This commit is contained in:
kailixu 2023-11-09 15:09:30 +08:00
commit 9cf154543f
27 changed files with 405 additions and 214 deletions

View File

@ -249,6 +249,7 @@ typedef struct SQueryTableDataCond {
SColumnInfo* colList; SColumnInfo* colList;
int32_t* pSlotList; // the column output destation slot, and it may be null int32_t* pSlotList; // the column output destation slot, and it may be null
int32_t type; // data block load type: int32_t type; // data block load type:
bool skipRollup;
STimeWindow twindows; STimeWindow twindows;
int64_t startVersion; int64_t startVersion;
int64_t endVersion; int64_t endVersion;

View File

@ -49,6 +49,7 @@ typedef struct {
uint64_t checkpointId; uint64_t checkpointId;
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
bool skipRollup;
int32_t numOfVgroups; int32_t numOfVgroups;
void* sContext; // SSnapContext* void* sContext; // SSnapContext*
void* pStateBackend; void* pStateBackend;

View File

@ -168,7 +168,6 @@ typedef struct {
struct SStreamFileState *pFileState; struct SStreamFileState *pFileState;
int32_t number; int32_t number;
SSHashObj *parNameMap; SSHashObj *parNameMap;
int64_t checkPointId;
int32_t taskId; int32_t taskId;
int64_t streamId; int64_t streamId;
int64_t streamBackendRid; int64_t streamBackendRid;

View File

@ -826,6 +826,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta); int32_t streamMetaReopen(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta);
@ -840,6 +841,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);

View File

@ -108,6 +108,7 @@ struct SRSmaStat {
int64_t refId; // shared by fetch tasks int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t execStat; // 0 succeed, other failed
volatile int32_t nFetchAll; // active number of fetch all volatile int32_t nFetchAll; // active number of fetch all
volatile int8_t triggerStat; // shared by fetch tasks volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing volatile int8_t commitStat; // 0 not in committing, 1 in committing
@ -115,6 +116,7 @@ struct SRSmaStat {
SRSmaFS fs; // for recovery/snapshot r/w SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer tsem_t notEmpty; // has items in queue buffer
SArray *blocks; // SArray<SSDataBlock>
}; };
struct SSmaStat { struct SSmaStat {
@ -136,13 +138,18 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock) #define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem { struct SRSmaInfoItem {
int8_t level : 4; int8_t level;
int8_t fetchLevel : 4; int8_t fetchLevel;
int8_t triggerStat; int8_t triggerStat;
uint16_t nScanned; int32_t nScanned;
int32_t maxDelay; // ms int32_t streamFlushed : 1;
int32_t maxDelay : 31; // ms
int64_t submitReqVer;
int64_t fetchResultVer;
tmr_h tmrId; tmr_h tmrId;
void *pStreamState; void *pStreamState;
void *pStreamTask; // SStreamTask
SArray *pResList;
}; };
struct SRSmaInfo { struct SRSmaInfo {
@ -160,11 +167,9 @@ struct SRSmaInfo {
STaosQall *qall; // buffer qall of SubmitReq STaosQall *qall; // buffer qall of SubmitReq
}; };
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) #define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) #define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) #define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i]) #define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
enum { enum {
@ -214,11 +219,11 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t smaPreClose(SSma *pSma); int32_t smaPreClose(SSma *pSma);
// rsma // rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName); void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);

View File

@ -209,6 +209,7 @@ int32_t tsdbBegin(STsdb* pTsdb);
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync);
// int32_t tsdbFinishCommit(STsdb* pTsdb); // int32_t tsdbFinishCommit(STsdb* pTsdb);
// int32_t tsdbRollbackCommit(STsdb* pTsdb); // int32_t tsdbRollbackCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
@ -279,7 +280,7 @@ int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma); int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaRetention(SSma* pSma, int64_t now);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);

View File

@ -156,10 +156,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
nLoops = 0; nLoops = 0;
while (1) { while (1) {
if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) { if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) {
smaDebug("vgId:%d, rsma commit:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit); smaDebug("vgId:%d, rsma commit, type:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit);
break; break;
} else { } else {
smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit); smaDebug("vgId:%d, rsma commit, type:%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
} }
TD_SMA_LOOPS_CHECK(nLoops, 1000); TD_SMA_LOOPS_CHECK(nLoops, 1000);
} }
@ -169,22 +169,24 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
* 1) This is high cost task and should not put in asyncPreCommit originally. * 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/ */
smaInfo("vgId:%d, rsma commit:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit, smaInfo("vgId:%d, rsma commit, type:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit,
(void *)taosGetSelfPthreadId()); (void *)taosGetSelfPthreadId());
nLoops = 0; nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
TD_SMA_LOOPS_CHECK(nLoops, 1000); TD_SMA_LOOPS_CHECK(nLoops, 1000);
} }
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (!isCommit) goto _exit; if (!isCommit) goto _exit;
// code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); code = atomic_load_32(&pRSmaStat->execStat);
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
// all rsma results are written completely // all rsma results are written completely
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
if ((pTsdb = VND_RSMA1(pSma->pVnode))) { if ((pTsdb = VND_RSMA1(pSma->pVnode))) {

View File

@ -179,7 +179,7 @@ static void tRSmaInfoHashFreeNode(void *data) {
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) { if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES); taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
} }
tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true); tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo);
} }
} }
@ -209,6 +209,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
pRSmaStat->pSma = (SSma *)pSma; pRSmaStat->pSma = (SSma *)pSma;
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
tsem_init(&pRSmaStat->notEmpty, 0, 0); tsem_init(&pRSmaStat->notEmpty, 0, 0);
if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
SSDataBlock datablock = {.info.type = STREAM_CHECKPOINT};
taosArrayPush(pRSmaStat->blocks, &datablock);
// init smaMgmt // init smaMgmt
smaInit(); smaInit();
@ -290,6 +296,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 5: free pStat // step 5: free pStat
tsem_destroy(&(pStat->notEmpty)); tsem_destroy(&(pStat->notEmpty));
taosArrayDestroy(pStat->blocks);
taosMemoryFreeClear(pStat); taosMemoryFreeClear(pStat);
} }
} }

View File

@ -15,6 +15,7 @@
#include "sma.h" #include "sma.h"
#include "tq.h" #include "tq.h"
#include "tstream.h"
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
@ -22,6 +23,7 @@
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms
#define RSMA_SUBMIT_HEAD_LEN (13) // type(int8_t) + len(int32_t) + version(int64_t) #define RSMA_SUBMIT_HEAD_LEN (13) // type(int8_t) + len(int32_t) + version(int64_t)
#define RSMA_TASK_FLAG "rsma"
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) #define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
@ -43,8 +45,8 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems); static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
int64_t suid); int32_t execType, int8_t *streamFlushed);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
@ -73,33 +75,32 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
* *
* @param pSma * @param pSma
* @param pInfo * @param pInfo
* @param isDeepFree Only stop tmrId and free pTSchema for deep free
* @return void* * @return void*
*/ */
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (pInfo) { if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree && pItem->tmrId) { if (pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1); pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
} }
if (isDeepFree && pItem->pStreamState) { if (pItem->pStreamState) {
streamStateClose(pItem->pStreamState, false); streamStateClose(pItem->pStreamState, false);
} }
if (isDeepFree && pInfo->taskInfo[i]) { if (pItem->pStreamTask) {
tFreeStreamTask(pItem->pStreamTask);
}
taosArrayDestroy(pItem->pResList);
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} }
}
if (isDeepFree) {
taosMemoryFreeClear(pInfo->pTSchema);
}
if (isDeepFree) { taosMemoryFreeClear(pInfo->pTSchema);
if (pInfo->queue) { if (pInfo->queue) {
taosCloseQueue(pInfo->queue); taosCloseQueue(pInfo->queue);
pInfo->queue = NULL; pInfo->queue = NULL;
@ -108,7 +109,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
taosFreeQall(pInfo->qall); taosFreeQall(pInfo->qall);
pInfo->qall = NULL; pInfo->qall = NULL;
} }
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
@ -136,7 +136,9 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (!taosArrayGetSize(tbUids)) { int32_t nTables = taosArrayGetSize(tbUids);
if (0 == nTables) {
smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid); smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -157,8 +159,9 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
terrstr()); terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d", smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i); "nTables:%d level %d",
SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)TARRAY_GET_ELEM(tbUids, 0), nTables, i);
} }
} }
@ -175,8 +178,8 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
void *pIter = taosHashIterate(pStore->uidHash, NULL); void *pIter = NULL;
while (pIter) { while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SArray *pTbUids = *(SArray **)pIter; SArray *pTbUids = *(SArray **)pIter;
@ -184,8 +187,6 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
taosHashCancelIterate(pStore->uidHash, pIter); taosHashCancelIterate(pStore->uidHash, pIter);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pIter = taosHashIterate(pStore->uidHash, pIter);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -233,9 +234,32 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
taosRLockLatch(&pMeta->lock);
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask && *ppTask) {
pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer;
pItem->fetchResultVer = (*ppTask)->info.triggerParam;
}
taosRUnLockLatch(&pMeta->lock);
}
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
streamMetaUnregisterTask(pMeta, streamId, taskId);
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
}
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx) { int8_t idx) {
if ((param->qmsgLen > 0) && param->qmsg[idx]) { if ((param->qmsgLen > 0) && param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
SRetention *pRetention = SMA_RETENTION(pSma); SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
@ -255,25 +279,46 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
taosMemoryFree(s); taosMemoryFree(s);
} }
SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
task.pMeta = pVnode->pTq->pStreamMeta; if (!pStreamTask) {
pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1); terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pItem->pStreamTask = pStreamTask;
pStreamTask->id.taskId = 0;
pStreamTask->id.streamId = pRSmaInfo->suid + idx;
pStreamTask->chkInfo.startTs = taosGetTimestampMs();
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pItem->pStreamState = pStreamState;
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
pItem->pStreamState = pStreamState; return TSDB_CODE_FAILED;
}
if (pItem->fetchResultVer < pItem->submitReqVer) {
// fetch the data when reboot
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
}
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
@ -292,10 +337,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32, ", finally maxdelay:%" PRIi32,
TD_VID(pVnode), pItem, pRSmaInfo->suid, (int8_t)(idx + 1), param->maxdelay[idx], param->watermark[idx], TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
pItem->maxDelay); pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -363,7 +409,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
tdFreeRSmaInfo(pSma, pRSmaInfo, true); tdFreeRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -499,11 +545,10 @@ static void tdUidStoreDestory(STbUidStore *pStore) {
if (pStore->uidHash) { if (pStore->uidHash) {
if (pStore->tbUids) { if (pStore->tbUids) {
// When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys. // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
void *pIter = taosHashIterate(pStore->uidHash, NULL); void *pIter = NULL;
while (pIter) { while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
SArray *arr = *(SArray **)pIter; SArray *arr = *(SArray **)pIter;
taosArrayDestroy(arr); taosArrayDestroy(arr);
pIter = taosHashIterate(pStore->uidHash, pIter);
} }
} }
taosHashCleanup(pStore->uidHash); taosHashCleanup(pStore->uidHash);
@ -563,7 +608,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
* @param now * @param now
* @return int32_t * @return int32_t
*/ */
int32_t smaDoRetention(SSma *pSma, int64_t now) { int32_t smaRetention(SSma *pSma, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!VND_IS_RSMA(pSma->pVnode)) { if (!VND_IS_RSMA(pSma->pVnode)) {
return code; return code;
@ -571,8 +616,8 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) { if (pSma->pRSmaTsdb[i]) {
// code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
// if (code) goto _end; if (code) goto _end;
} }
} }
@ -580,17 +625,14 @@ _end:
return code; return code;
} }
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
int64_t suid) { int32_t execType, int8_t *streamFlushed) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
SArray *pResList = pItem->pResList;
SArray *pResList = taosArrayInit(1, POINTER_BYTES); STSchema *pTSchema = pInfo->pTSchema;
if (pResList == NULL) { int64_t suid = pInfo->suid;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
while (1) { while (1) {
uint64_t ts; uint64_t ts;
@ -605,32 +647,46 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
if (taosArrayGetSize(pResList) == 0) { if (taosArrayGetSize(pResList) == 0) {
break; break;
} }
#if 0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag);
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
output = taosArrayGetP(pResList, i); output = taosArrayGetP(pResList, i);
if (output->info.type == STREAM_CHECKPOINT) {
if (streamFlushed) *streamFlushed = 1;
continue;
}
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows); output->info.id.uid, output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// reset the output version to handle reboot
if (STREAM_GET_ALL == execType && output->info.version == 0) {
// the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
output->info.version = pItem->submitReqVer;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
// TODO: reconfigure SSubmitReq2
} else {
if (terrno == 0) terrno = TSDB_CODE_RSMA_RESULT;
code = terrno;
}
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (STREAM_GET_ALL == execType) {
atomic_store_64(&pItem->fetchResultVer, output->info.version);
}
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
@ -649,7 +705,6 @@ _exit:
} else { } else {
smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level); smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
} }
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo); qCleanExecTaskBlockBuf(taskInfo);
return code; return code;
} }
@ -748,6 +803,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
ERsmaExecType type, int8_t level) { ERsmaExecType type, int8_t level) {
int32_t idx = level - 1; int32_t idx = level - 1;
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
if (!qTaskInfo) { if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
@ -775,10 +831,14 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); if (STREAM_INPUT__MERGED_SUBMIT == inputType) {
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); SPackedData *packData = POINTER_SHIFT(pMsg, sizeof(SPackedData) * (msgSize - 1));
atomic_store_64(&pItem->submitReqVer, packData->ver);
}
return TSDB_CODE_SUCCESS; terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL);
return terrno ? TSDB_CODE_FAILED : TDB_CODE_SUCCESS;
} }
/** /**
@ -884,7 +944,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) { if (!pEnv) {
// only applicable when rsma env exists // only applicable when rsma env exists
return TSDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
}
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr());
goto _err;
} }
STbUidStore uidStore = {0}; STbUidStore uidStore = {0};
@ -916,7 +981,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
tdUidStoreDestory(&uidStore); tdUidStoreDestory(&uidStore);
return TSDB_CODE_FAILED; return terrno;
} }
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
@ -1010,7 +1075,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
#if 0
// reload all ctbUids for suid // reload all ctbUids for suid
uidStore.suid = suid; uidStore.suid = suid;
if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) { if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
@ -1024,7 +1089,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
} }
taosArrayClear(uidStore.tbUids); taosArrayClear(uidStore.tbUids);
#endif
smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid); smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
} }
} }
@ -1072,39 +1137,141 @@ _err:
return code; return code;
} }
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t nTaskInfo = 0;
SSma *pSma = pRSmaStat->pSma; SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) { if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// stream state: trigger checkpoint
do {
void *infoHash = NULL; void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
pRSmaInfo->items[i].streamFlushed = 0;
++nTaskInfo;
}
}
}
} while (0);
// stream state: wait checkpoint ready in async mode
do {
int32_t nStreamFlushed = 0;
int32_t nSleep = 0;
void *infoHash = NULL;
while (true) {
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
int8_t streamFlushed = 0;
code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
STREAM_CHECKPOINT, &streamFlushed);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (streamFlushed) {
pRSmaInfo->items[i].streamFlushed = 1;
if (++nStreamFlushed >= nTaskInfo) {
smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode),
nSleep * 10, nStreamFlushed, nTaskInfo);
taosHashCancelIterate(pInfoHash, infoHash);
goto _checkpoint;
}
}
}
}
}
taosUsleep(10);
++nSleep;
smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode),
nSleep * 10, nStreamFlushed, nTaskInfo);
}
} while (0);
_checkpoint:
// stream state: build checkpoint in backend
do {
SStreamMeta *pMeta = NULL;
int64_t checkpointId = taosGetTimestampNs();
bool checkpointBuilt = false;
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue; continue;
} }
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) { if (pItem && pItem->pStreamTask) {
if (streamStateCommit(pItem->pStreamState) < 0) { SStreamTask *pTask = pItem->pStreamTask;
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
pTask->checkpointingId = checkpointId;
pTask->chkInfo.checkpointId = pTask->checkpointingId;
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
pTask->info.triggerParam = pItem->fetchResultVer;
if (!checkpointBuilt) {
// the stream states share one checkpoint
code = streamTaskBuildCheckpoint(pTask);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), pMeta = pTask->pMeta;
pRSmaInfo->suid, i + 1); checkpointBuilt = true;
}
}
} }
taosWLockLatch(&pMeta->lock);
if (streamMetaSaveTask(pMeta, pTask)) {
taosWUnLockLatch(&pMeta->lock);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosWUnLockLatch(&pMeta->lock);
smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
", table:%" PRIi64 ", level:%d",
TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
}
}
}
if (pMeta) {
taosWLockLatch(&pMeta->lock);
if (streamMetaCommit(pMeta)) {
taosWUnLockLatch(&pMeta->lock);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
taosWUnLockLatch(&pMeta->lock);
}
if (checkpointBuilt) {
smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
}
} while (0);
_exit: _exit:
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
@ -1113,7 +1280,7 @@ _exit:
terrno = code; terrno = code;
return code; return code;
} }
#endif
/** /**
* @brief trigger to get rsma result in async mode * @brief trigger to get rsma result in async mode
* *
@ -1187,14 +1354,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
pItem->fetchLevel = pItem->level; atomic_store_8(&pItem->fetchLevel, 1);
#if 0
// debugging codes
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
make sure(qItem->level == pItem->level);
make sure(qItem->fetchLevel == pItem->fetchLevel);
#endif
if (atomic_load_8(&pRSmaInfo->assigned) == 0) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
tsem_post(&(pStat->notEmpty)); tsem_post(&(pStat->notEmpty));
} }
@ -1239,15 +1400,15 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (pItem->fetchLevel) {
pItem->fetchLevel = 0; if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
if (!taskInfo) { if (!taskInfo) {
continue; continue;
} }
if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi16 " maxDelay:%d, fetch executed", smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
} else { } else {
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
@ -1267,14 +1428,15 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
} }
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL) < 0) {
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
goto _err; goto _err;
} }
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi16 " maxDelay:%d, fetch finished", smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
} else { } else {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi16 smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32
" maxDelay:%d, fetch not executed as fetch level is %" PRIi8, " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
} }
@ -1325,8 +1487,13 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
} }
_resume_delete: _resume_delete:
++nDelete; ++nDelete;
#if 0
if (!taosArrayPush(pSubmitArr, &packData)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
} }
#endif
} else { } else {
break; break;
} }
@ -1364,6 +1531,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
_rtn: _rtn:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
tdFreeRSmaSubmitItems(pSubmitArr); tdFreeRSmaSubmitItems(pSubmitArr);
@ -1439,15 +1607,11 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) { if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
taosHashCancelIterate(infoHash, pIter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat));
if (curStat == 1) {
smaDebug("vgId:%d, fetch all not exec as commit stat is %" PRIi8, SMA_VID(pSma), curStat);
} else {
tdRSmaFetchAllResult(pSma, pInfo); tdRSmaFetchAllResult(pSma, pInfo);
}
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

View File

@ -49,7 +49,7 @@ static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo
STsdbReader* pReader); STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
@ -384,7 +384,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
initReaderStatus(&pReader->status); initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->info.suid = pCond->suid; pReader->info.suid = pCond->suid;
pReader->info.order = pCond->order; pReader->info.order = pCond->order;
@ -3152,9 +3152,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr, static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idStr,
int8_t* pLevel) { int8_t* pLevel) {
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode) && !pCond->skipRollup) {
int8_t level = 0; int8_t level = 0;
int8_t precision = pVnode->config.tsdbCfg.precision; int8_t precision = pVnode->config.tsdbCfg.precision;
int64_t now = taosGetTimestamp(precision); int64_t now = taosGetTimestamp(precision);
@ -3170,7 +3170,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
} }
break; break;
} }
if ((now - pRetention->keep) <= (winSKey + offset)) { if ((now - pRetention->keep) <= (pCond->twindows.skey + offset)) {
break; break;
} }
++level; ++level;

View File

@ -388,6 +388,8 @@ _exit:
return code; return code;
} }
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
static int32_t tsdbDoRetentionSync(void *arg) { static int32_t tsdbDoRetentionSync(void *arg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -410,6 +412,7 @@ _exit:
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} }
tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit); tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit);
tsdbFreeRtnArg(arg);
return code; return code;
} }
@ -439,7 +442,7 @@ _exit:
return code; return code;
} }
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
int32_t code = 0; int32_t code = 0;

View File

@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
arrSize = taosArrayGetSize(pMsg->aSubmitTbData); arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
// scan and convert // scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { if ((terrno = tsdbScanAndConvertSubmitMsg(pTsdb, pMsg)) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno)); tsdbError("vgId:%d, failed to insert data since %s", TD_VID(pTsdb->pVnode), tstrerror(terrno));
} }

View File

@ -15,8 +15,12 @@
#include "vnd.h" #include "vnd.h"
extern int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync);
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) { int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
return tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1); int32_t code = TSDB_CODE_SUCCESS;
code = tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
if (TSDB_CODE_SUCCESS == code) code = smaRetention(pVnode->pSma, now);
return code;
} }

View File

@ -1669,7 +1669,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if (code == 0) { if (code == 0) {
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
} }
// clear // clear

View File

@ -180,7 +180,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode); int32_t convertFillType(int32_t mode);

View File

@ -1713,7 +1713,7 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
return c; return c;
} }
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) {
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
@ -1732,6 +1732,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
pCond->skipRollup = readHandle->skipRollup;
int32_t j = 0; int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {

View File

@ -69,18 +69,18 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedData tmp = { SPackedData tmp = {.pDataBlock = pDataBlock};
.pDataBlock = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) {
SPackedData tmp = {.pDataBlock = input};
taosArrayPush(pInfo->pBlockLists, &tmp);
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedData tmp = { SPackedData tmp = {.pDataBlock = pDataBlock};
.pDataBlock = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK; pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK;
@ -642,7 +642,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
blockIndex += 1; blockIndex += 1;
current += p->info.rows; current += p->info.rows;
ASSERT(p->info.rows > 0); ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT);
taosArrayPush(pResList, &p); taosArrayPush(pResList, &p);
if (current >= rowsThreshold) { if (current >= rowsThreshold) {

View File

@ -1035,7 +1035,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo); initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -3533,7 +3533,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error; goto _error;
} }
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pInfo->base.matchInfo.pList); taosArrayDestroy(pInfo->base.matchInfo.pList);
goto _error; goto _error;

View File

@ -106,7 +106,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);

View File

@ -28,7 +28,6 @@ int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0; int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0; int32_t streamMetaId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId); static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta); static void streamMetaClear(SStreamMeta* pMeta);
static int32_t streamMetaBegin(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta);
@ -191,10 +190,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpCap = 8; pMeta->chkpCap = 2;
taosInitRWLatch(&pMeta->chkpDirLock); taosInitRWLatch(&pMeta->chkpDirLock);
pMeta->chkpId = streamGetLatestCheckpointId(pMeta); pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) { while (pMeta->streamBackend == NULL) {
taosMsleep(100); taosMsleep(100);
@ -602,7 +601,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
return 0; return 0;
} }
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
int64_t chkpId = 0; int64_t chkpId = 0;
TBC* pCur = NULL; TBC* pCur = NULL;

View File

@ -195,7 +195,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
} }
} }
{ {
char* buf = taosMemoryCalloc(1, 512); char* buf = taosMemoryCalloc(1, 1024);
sprintf(buf, "[current: %s,", pFile->pCurrent); sprintf(buf, "[current: %s,", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest); sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions); sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);

View File

@ -221,7 +221,6 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
} }
pState->pTdbState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
pState->checkPointId = 0;
return pState; return pState;
@ -274,7 +273,6 @@ int32_t streamStateCommit(SStreamState* pState) {
SStreamSnapshot* pShot = getSnapshot(pState->pFileState); SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
flushSnapshot(pState->pFileState, pShot, true); flushSnapshot(pState->pFileState, pShot, true);
} }
pState->checkPointId++;
return 0; return 0;
#else #else
if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) { if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
@ -288,7 +286,6 @@ int32_t streamStateCommit(SStreamState* pState) {
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
pState->checkPointId++;
return 0; return 0;
#endif #endif
} }

View File

@ -27,6 +27,9 @@
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024) #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
#define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_ROW_BUFF 10240
#define TASK_KEY "streamFileState"
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
struct SStreamFileState { struct SStreamFileState {
SList* usedBuffs; SList* usedBuffs;
SList* freeBuffs; SList* freeBuffs;
@ -113,6 +116,15 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
return pStateKey; return pStateKey;
} }
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *pKey);
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type) { int8_t type) {
@ -181,6 +193,15 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
recoverSesssion(pFileState, checkpointId); recoverSesssion(pFileState, checkpointId);
} }
void* valBuf = NULL;
int32_t len = 0;
int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
if (code == TSDB_CODE_SUCCESS) {
ASSERT(len == sizeof(TSKEY));
streamFileStateDecode(&pFileState->flushMark, valBuf, len);
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
}
taosMemoryFreeClear(valBuf);
return pFileState; return pFileState;
_error: _error:
@ -506,15 +527,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs; return pFileState->usedBuffs;
} }
void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *pKey);
}
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SListIter iter = {0}; SListIter iter = {0};
@ -538,6 +550,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
continue; continue;
} }
pPos->beFlushed = true; pPos->beFlushed = true;
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey)); qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
@ -565,24 +578,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
pFileState->id, numOfElems, BATCH_LIMIT, elapsed); pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
if (flushState) { if (flushState) {
const char* taskKey = "streamFileState";
{
char keyBuf[128] = {0};
void* valBuf = NULL; void* valBuf = NULL;
int32_t len = 0; int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
taosMemoryFree(valBuf); taosMemoryFree(valBuf);
}
{
char keyBuf[128] = {0};
char valBuf[64] = {0};
int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
}
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
@ -591,26 +592,23 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
const char* taskKey = "streamFileState";
char keyBuf[128] = {0}; char keyBuf[128] = {0};
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId);
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
} }
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
const char* taskKey = "streamFileState"; return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list);
return streamDefaultIterGet_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
} }
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* taskKey = "streamFileState";
int64_t maxCheckPointId = 0; int64_t maxCheckPointId = 0;
{ {
char buf[128] = {0}; char buf[128] = {0};
void* val = NULL; void* val = NULL;
int32_t len = 0; int32_t len = 0;
memcpy(buf, taskKey, strlen(taskKey)); memcpy(buf, TASK_KEY, strlen(TASK_KEY));
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0 || len == 0 || val == NULL) { if (code != 0 || len == 0 || val == NULL) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -624,7 +622,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
char buf[128] = {0}; char buf[128] = {0};
void* val = 0; void* val = 0;
int32_t len = 0; int32_t len = 0;
sprintf(buf, "%s:%" PRId64 "", taskKey, i); sprintf(buf, "%s:%" PRId64 "", TASK_KEY, i);
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0) { if (code != 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;

View File

@ -1181,6 +1181,7 @@ e
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim ,,n,script,./test.sh -f tsim/valgrind/checkError1.sim
,,n,script,./test.sh -f tsim/valgrind/checkError2.sim ,,n,script,./test.sh -f tsim/valgrind/checkError2.sim
,,n,script,./test.sh -f tsim/valgrind/checkError3.sim ,,n,script,./test.sh -f tsim/valgrind/checkError3.sim

View File

@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
#todo wait for streamState checkpoint #todo wait for streamState checkpoint
return 1 #return 1
print =============== create database with retentions print =============== create database with retentions
sql create database d0 retentions -:7d,5m:21d,15m:365d; sql create database d0 retentions -:7d,5m:21d,15m:365d;

View File

@ -114,7 +114,7 @@ endi
vg_ready: vg_ready:
print ====> create stable/child table print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s sql create table stb (ts timestamp, c1 float, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
@ -167,9 +167,6 @@ system sh/exec.sh -n dnode4 -s start
sleep 3000 sleep 3000
print =============== query data of level 1 print =============== query data of level 1
sql connect sql connect
sql use db sql use db
@ -181,12 +178,21 @@ if $rows != 100 then
return -1 return -1
endi endi
print =============== sleep 5s to wait the result
sleep 5000
print =============== query data of level 2 print =============== query data of level 2
sql select * from ct1 where ts > now - 10d sql select * from ct1 where ts > now - 10d
print rows of level 2: $rows
print $data00 $data01 $data02 print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows != 100 then
print rows of level 2: $rows
endi
print =============== query data of level 3 print =============== query data of level 3
sql select * from ct1 sql select * from ct1
print rows of level 3: $rows
print $data00 $data01 $data02 print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows != 100 then
print rows of level 3: $rows
endi

View File

@ -320,6 +320,7 @@
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError2.sim ./test.sh -f tsim/valgrind/checkError2.sim
./test.sh -f tsim/valgrind/checkError3.sim ./test.sh -f tsim/valgrind/checkError3.sim