Merge pull request #21002 from taosdata/fix/liaohj_main
fix(stream): set the correct number of tasks.
This commit is contained in:
commit
2b74583b87
|
@ -273,6 +273,7 @@ typedef struct SStreamId {
|
||||||
typedef struct SCheckpointInfo {
|
typedef struct SCheckpointInfo {
|
||||||
int64_t id;
|
int64_t id;
|
||||||
int64_t version; // offset in WAL
|
int64_t version; // offset in WAL
|
||||||
|
int64_t currentVer;// current offset in WAL, not serialize it
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
typedef struct SStreamStatus {
|
typedef struct SStreamStatus {
|
||||||
|
@ -537,6 +538,7 @@ void streamTaskInputFail(SStreamTask* pTask);
|
||||||
int32_t streamTryExec(SStreamTask* pTask);
|
int32_t streamTryExec(SStreamTask* pTask);
|
||||||
int32_t streamSchedExec(SStreamTask* pTask);
|
int32_t streamSchedExec(SStreamTask* pTask);
|
||||||
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
|
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||||
|
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||||
|
|
||||||
|
|
|
@ -119,6 +119,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
pVnode->pFetchQ->threadId);
|
pVnode->pFetchQ->threadId);
|
||||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||||
|
|
||||||
|
tqNotifyClose(pVnode->pImpl->pTq);
|
||||||
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
|
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
|
||||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||||
|
|
||||||
|
@ -141,7 +142,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
|
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
|
||||||
|
|
||||||
if (commitAndRemoveWal) {
|
if (commitAndRemoveWal) {
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
|
||||||
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
|
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
|
||||||
tfsRmdir(pMgmt->pTfs, path);
|
tfsRmdir(pMgmt->pTfs, path);
|
||||||
|
|
|
@ -180,16 +180,9 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
||||||
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
|
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
|
|
||||||
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
|
|
||||||
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
|
|
||||||
void initOffsetForAllRestoreTasks(STQ* pTq);
|
|
||||||
int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -190,6 +190,7 @@ int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||||
int tqInit();
|
int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||||
|
void tqNotifyClose(STQ*);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
|
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
|
||||||
|
|
|
@ -154,6 +154,31 @@ void tqClose(STQ* pTq) {
|
||||||
taosMemoryFree(pTq);
|
taosMemoryFree(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tqNotifyClose(STQ* pTq) {
|
||||||
|
if (pTq != NULL) {
|
||||||
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
|
void* pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
|
tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__STOP;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
|
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
||||||
int64_t consumerId, int32_t type) {
|
int64_t consumerId, int32_t type) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -573,6 +598,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||||
pTask->pMeta = pTq->pStreamMeta;
|
pTask->pMeta = pTq->pStreamMeta;
|
||||||
pTask->chkInfo.version = ver;
|
pTask->chkInfo.version = ver;
|
||||||
|
pTask->chkInfo.currentVer = ver;
|
||||||
|
|
||||||
// expand executor
|
// expand executor
|
||||||
if (pTask->fillHistory) {
|
if (pTask->fillHistory) {
|
||||||
|
@ -966,14 +992,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
|
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
|
||||||
*pRef = 1;
|
*pRef = 1;
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
break;
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
}
|
||||||
|
|
||||||
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver);
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
|
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
|
||||||
|
|
||||||
if (!failed) {
|
if (!failed) {
|
||||||
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
|
@ -983,15 +1016,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
atomic_add_fetch_32(pRefBlock->dataRef, 1);
|
atomic_add_fetch_32(pRefBlock->dataRef, 1);
|
||||||
|
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
||||||
qError("stream task input del failed, task id %d", pTask->id.taskId);
|
|
||||||
|
|
||||||
atomic_sub_fetch_32(pRef, 1);
|
atomic_sub_fetch_32(pRef, 1);
|
||||||
taosFreeQitem(pRefBlock);
|
taosFreeQitem(pRefBlock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
if (streamSchedExec(pTask) < 0) {
|
||||||
qError("stream task launch failed, task id %d", pTask->id.taskId);
|
qError("s-task:%s stream task launch failed", pTask->id.idStr);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1000,8 +1031,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
||||||
/*A(ref >= 0);*/
|
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
blockDataDestroy(pDelBlock);
|
blockDataDestroy(pDelBlock);
|
||||||
taosMemoryFree(pRef);
|
taosMemoryFree(pRef);
|
||||||
|
@ -1032,23 +1064,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
}
|
}
|
||||||
blockDataDestroy(pDelBlock);
|
blockDataDestroy(pDelBlock);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
|
|
||||||
const char* key, int64_t ver) {
|
|
||||||
doSaveTaskOffset(pOffsetStore, key, ver);
|
|
||||||
int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
|
|
||||||
|
|
||||||
// remove the offset, if all functions are completed successfully.
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
tqOffsetDelete(pOffsetStore, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
||||||
#if 0
|
#if 0
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
|
@ -1310,8 +1328,6 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
|
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
|
||||||
initOffsetForAllRestoreTasks(pTq);
|
|
||||||
|
|
||||||
pRunReq->head.vgId = vgId;
|
pRunReq->head.vgId = vgId;
|
||||||
pRunReq->streamId = 0;
|
pRunReq->streamId = 0;
|
||||||
pRunReq->taskId = WAL_READ_TASKS_ID;
|
pRunReq->taskId = WAL_READ_TASKS_ID;
|
||||||
|
|
|
@ -1023,6 +1023,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the table list handle for each stream scanner/wal reader
|
// update the table list handle for each stream scanner/wal reader
|
||||||
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -1039,5 +1040,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,12 +15,12 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
|
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
|
|
||||||
// this function should be executed by stream threads.
|
// this function should be executed by stream threads.
|
||||||
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
|
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
|
||||||
// will not stop eventually.
|
// will not stop eventually.
|
||||||
int tqStreamTasksScanWal(STQ* pTq) {
|
int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
@ -31,7 +31,7 @@ int tqStreamTasksScanWal(STQ* pTq) {
|
||||||
|
|
||||||
// check all restore tasks
|
// check all restore tasks
|
||||||
bool shouldIdle = true;
|
bool shouldIdle = true;
|
||||||
doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
|
createStreamRunReq(pTq->pStreamMeta, &shouldIdle);
|
||||||
|
|
||||||
int32_t times = 0;
|
int32_t times = 0;
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
|
||||||
return pTaskIdList;
|
return pTaskIdList;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
|
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
*pScanIdle = true;
|
*pScanIdle = true;
|
||||||
bool noNewDataInWal = true;
|
bool noNewDataInWal = true;
|
||||||
int32_t vgId = pStreamMeta->vgId;
|
int32_t vgId = pStreamMeta->vgId;
|
||||||
|
@ -89,6 +89,8 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
|
||||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||||
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
|
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
|
||||||
|
|
||||||
|
// update the new task number
|
||||||
|
numOfTasks = taosArrayGetSize(pTaskIdList);
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
|
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
|
||||||
|
@ -97,21 +99,19 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t status = pTask->status.taskStatus;
|
int32_t status = pTask->status.taskStatus;
|
||||||
if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (status == TASK_STATUS__DROPPING)) {
|
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
|
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
|
||||||
|
status == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
|
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if offset value exists
|
|
||||||
char key[128] = {0};
|
|
||||||
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
|
|
||||||
|
|
||||||
if (tInputQueueIsFull(pTask)) {
|
if (tInputQueueIsFull(pTask)) {
|
||||||
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
|
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
@ -120,19 +120,15 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
|
||||||
|
|
||||||
*pScanIdle = false;
|
*pScanIdle = false;
|
||||||
|
|
||||||
// check if offset value exists
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
|
|
||||||
ASSERT(pOffset != NULL);
|
|
||||||
|
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
|
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);
|
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
|
|
||||||
SPackedData packData = {0};
|
SPackedData packData = {0};
|
||||||
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
|
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
|
||||||
|
@ -151,14 +147,13 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
|
||||||
|
|
||||||
noNewDataInWal = false;
|
noNewDataInWal = false;
|
||||||
|
|
||||||
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
|
|
||||||
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
|
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
|
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
|
||||||
pOffset->val.version);
|
pTask->chkInfo.currentVer);
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamDataSubmitDestroy(p);
|
streamDataSubmitDestroy(p);
|
||||||
|
|
|
@ -25,21 +25,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||||
return taosStrdup(buf);
|
return taosStrdup(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// stream_task:stream_id:task_id
|
|
||||||
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
|
|
||||||
int32_t n = 12;
|
|
||||||
char* p = dst;
|
|
||||||
|
|
||||||
memcpy(p, "stream_task:", n);
|
|
||||||
p += n;
|
|
||||||
|
|
||||||
int32_t inc = tintToHex(streamId, p);
|
|
||||||
p += inc;
|
|
||||||
|
|
||||||
*(p++) = ':';
|
|
||||||
tintToHex(taskId, p);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
|
||||||
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
@ -55,75 +40,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initOffsetForAllRestoreTasks(STQ* pTq) {
|
|
||||||
void* pIter = NULL;
|
|
||||||
|
|
||||||
while(1) {
|
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
|
||||||
tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char key[128] = {0};
|
|
||||||
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
|
|
||||||
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
|
|
||||||
if (pOffset == NULL) {
|
|
||||||
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
|
|
||||||
void* pIter = NULL;
|
|
||||||
|
|
||||||
while(1) {
|
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
|
||||||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
|
|
||||||
pTask->status.taskStatus);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char key[128] = {0};
|
|
||||||
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
|
|
||||||
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
|
|
||||||
if (pOffset == NULL) {
|
|
||||||
doSaveTaskOffset(pTq->pOffsetStore, key, ver);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
|
|
||||||
STqOffset offset = {0};
|
|
||||||
tqOffsetResetToLog(&offset.val, ver);
|
|
||||||
|
|
||||||
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
|
|
||||||
|
|
||||||
// keep the offset info in the offset store
|
|
||||||
tqOffsetWrite(pOffsetStore, &offset);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
|
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
|
||||||
pRsp->reqOffset = pReq->reqOffset;
|
pRsp->reqOffset = pReq->reqOffset;
|
||||||
|
|
||||||
|
|
|
@ -2541,6 +2541,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
|
||||||
|
if (pInfo->pUpdated != NULL) {
|
||||||
|
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pUpdatedMap != NULL) {
|
||||||
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
|
pInfo->pUpdatedMap = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
@ -2635,6 +2649,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
||||||
taosArrayPush(pInfo->pUpdated, pIte);
|
taosArrayPush(pInfo->pUpdated, pIte);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
|
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
|
||||||
|
|
|
@ -17,6 +17,11 @@
|
||||||
|
|
||||||
#define STREAM_EXEC_MAX_BATCH_NUM 100
|
#define STREAM_EXEC_MAX_BATCH_NUM 100
|
||||||
|
|
||||||
|
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
||||||
|
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
|
||||||
|
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void* pExecutor = pTask->exec.pExecutor;
|
void* pExecutor = pTask->exec.pExecutor;
|
||||||
|
@ -66,7 +71,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
|
|
||||||
// pExecutor
|
// pExecutor
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId);
|
qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId);
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
assignOneDataBlock(&block, output);
|
assignOneDataBlock(&block, output);
|
||||||
|
@ -134,7 +139,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
|
|
||||||
int32_t batchCnt = 0;
|
int32_t batchCnt = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -270,7 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
if (pInput) {
|
if (pInput) {
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
}
|
}
|
||||||
|
@ -301,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
", checkPoint id:%" PRId64 " -> %" PRId64,
|
", checkPoint id:%" PRId64 " -> %" PRId64,
|
||||||
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
|
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
|
||||||
|
|
||||||
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId};
|
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
|
||||||
|
|
||||||
taosWLockLatch(&pTask->pMeta->lock);
|
taosWLockLatch(&pTask->pMeta->lock);
|
||||||
streamMetaSaveTask(pTask->pMeta, pTask);
|
streamMetaSaveTask(pTask->pMeta, pTask);
|
||||||
|
@ -368,7 +373,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
||||||
|
|
||||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) {
|
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,10 +191,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) {
|
if (ppTask != NULL) {
|
||||||
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
return *ppTask;
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
return *ppTask;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
@ -205,7 +207,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING);
|
ASSERT(streamTaskShouldStop(&pTask->status));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -216,11 +218,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
SStreamTask* pTask = *ppTask;
|
SStreamTask* pTask = *ppTask;
|
||||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
|
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
|
||||||
/*if (pTask->timer) {
|
|
||||||
* taosTmrStop(pTask->timer);*/
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
|
||||||
/*pTask->timer = NULL;*/
|
|
||||||
/*}*/
|
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
|
@ -37,7 +37,7 @@ if $loop_count == 20 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $rows != 4 then
|
if $rows != 4 then
|
||||||
print =====rows=$rows, expect 4
|
print =====rows=$rows expect 4
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue