refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-01 22:27:29 +08:00
parent 19042cdea6
commit 583ac1fc45
11 changed files with 120 additions and 60 deletions

View File

@ -44,7 +44,8 @@ extern "C" {
#define NODE_ROLE_LEADER 0x2 #define NODE_ROLE_LEADER 0x2
#define NODE_ROLE_FOLLOWER 0x3 #define NODE_ROLE_FOLLOWER 0x3
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
#define SSTREAM_TASK_VER 2 #define SSTREAM_TASK_VER 2
enum { enum {
@ -190,13 +191,6 @@ int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif #endif
typedef struct {
STaosQueue* pQueue;
STaosQall* qall;
void* qItem;
int8_t status;
} SStreamQueue;
int32_t streamInit(); int32_t streamInit();
void streamCleanUp(); void streamCleanUp();
@ -314,7 +308,7 @@ typedef struct STaskOutputInfo {
} STaskOutputInfo; } STaskOutputInfo;
typedef struct STaskInputInfo { typedef struct STaskInputInfo {
int8_t status; int8_t status;
SStreamQueue* queue; SStreamQueue* queue;
} STaskInputInfo; } STaskInputInfo;
@ -406,7 +400,8 @@ struct SStreamTask {
}; };
typedef struct STaskStartInfo { typedef struct STaskStartInfo {
int64_t ts; int64_t startTs;
int64_t readyTs;
int32_t startedAfterNodeUpdate; int32_t startedAfterNodeUpdate;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime; int32_t elapsedTime;
@ -463,7 +458,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ); bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
@ -602,6 +597,13 @@ typedef struct STaskStatusEntry {
int32_t status; int32_t status;
int32_t stage; int32_t stage;
int32_t nodeId; int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t offset; // only valid for source task
double inputQUsed; // in MiB
double inputQCap;
double outputQUsed; // in MiB
double outputQCap;
} STaskStatusEntry; } STaskStatusEntry;
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {

View File

@ -161,9 +161,11 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "in_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };
static const SSysDbTableSchema userTblsSchema[] = { static const SSysDbTableSchema userTblsSchema[] = {

View File

@ -353,7 +353,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pEpset, true, hasExtraSink, nextWindowSkey, false); pEpset, true, hasExtraSink, nextWindowSkey, true);
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);

View File

@ -1563,7 +1563,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} }
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false); colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
@ -1577,12 +1577,31 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
const char* pStatus = streamGetTaskStatusStr(pe->status); const char* pStatus = streamGetTaskStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus); STR_TO_VARSTR(status, pStatus);
// status
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
// stage
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char* queueInfoStr = "%.2fMiB (%.2f%, %.2fMiB)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputQUsed/pe->inputQCap, pe->inputQCap);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// output queue
sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputQUsed/pe->outputQCap, pe->outputQCap);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
numOfRows++; numOfRows++;
} }
} }
@ -2429,6 +2448,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
} else { } else {
pEntry->stage = p->stage; pEntry->stage = p->stage;
pEntry->inputQUsed = p->inputQUsed;
pEntry->inputQCap = p->inputQCap;
pEntry->outputQUsed = p->outputQUsed;
pEntry->outputQCap = p->outputQCap;
pEntry->offset = p->offset;
} }
pEntry->status = p->status; pEntry->status = p->status;

View File

@ -53,9 +53,9 @@ FAIL:
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, ver); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -724,11 +724,11 @@ end:
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -1421,8 +1421,8 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId);
} }
streamMetaReleaseTask(pMeta, pTask);
} }
streamMetaReleaseTask(pMeta, pTask);
// drop the stream task now // drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
@ -1510,7 +1510,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated); streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputInfo.queue->pQueue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) {
tqScanWalAsync(pTq, false); tqScanWalAsync(pTq, false);
} else { } else {
streamSchedExec(pTask); streamSchedExec(pTask);

View File

@ -73,7 +73,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL); pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pReadyTaskSet);
pMeta->startInfo.ts = taosGetTimestampMs(); pMeta->startInfo.startTs = taosGetTimestampMs();
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// broadcast the check downstream tasks msg // broadcast the check downstream tasks msg
@ -370,7 +370,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
if (streamQueueIsFull(pTask->inputInfo.queue->pQueue, true)) { if (streamQueueGetNumOfItems(pTask->inputInfo.queue)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;

View File

@ -40,6 +40,10 @@ extern "C" {
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
// clang-format off // clang-format off
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define stError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("STM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define stError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("STM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
@ -70,6 +74,13 @@ struct STokenBucket {
int64_t fillTimestamp; // fill timestamp int64_t fillTimestamp; // fill timestamp
}; };
struct SStreamQueue {
STaosQueue* pQueue;
STaosQall* qall;
void* qItem;
int8_t status;
};
extern SStreamGlobalEnv streamEnv; extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
@ -100,7 +111,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type); const char* streamQueueItemGetTypeStr(int32_t type);
@ -118,14 +128,15 @@ STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue); void streamQueueProcessSuccess(SStreamQueue* queue);
void streamQueueProcessFail(SStreamQueue* queue); void streamQueueProcessFail(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue); void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -406,15 +406,15 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
taosArrayPush(pMeta->pTaskList, &pTask->id); taosArrayPush(pMeta->pTaskList, &pTask->id);
// if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
// tFreeStreamTask(pTask); tFreeStreamTask(pTask);
// return -1; return -1;
// } }
//
// if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// tFreeStreamTask(pTask); tFreeStreamTask(pTask);
// return -1; return -1;
// } }
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
@ -706,9 +706,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
// pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) {
// In this case, we try not to start fill-history task anymore.
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
doClear(pKey, pVal, pCur, pRecycleList); doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
return -1; return -1;
@ -776,6 +774,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI32(pEncoder, ps->status) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQCap) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputQCap) < 0) return -1;
} }
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -796,6 +798,10 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQCap) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputQCap) < 0) return -1;
entry.id.taskId = taskId; entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry); taosArrayPush(pReq->pTaskStatus, &entry);
@ -863,12 +869,23 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i); STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) { if ((*pTask)->info.fillHistory == 1) {
continue; continue;
} }
STaskStatusEntry entry = { STaskStatusEntry entry = {
.id = *pId, .status = (*pTask)->status.taskStatus, .nodeId = pMeta->vgId, .stage = pMeta->stage}; .id = *pId,
.status = (*pTask)->status.taskStatus,
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
.inputQCap = STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE,
.outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)),
.outputQCap = STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE,
};
taosArrayPush(hbMsg.pTaskStatus, &entry); taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasValEpset) { if (!hasValEpset) {
@ -1004,4 +1021,9 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
void streamMetaInitForSnode(SStreamMeta* pMeta) { void streamMetaInitForSnode(SStreamMeta* pMeta) {
pMeta->stage = 0; pMeta->stage = 0;
pMeta->role = NODE_ROLE_LEADER; pMeta->role = NODE_ROLE_LEADER;
}
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pReadyTaskSet);
pStartInfo->startedAfterNodeUpdate = 0;
} }

View File

@ -17,9 +17,6 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4 #define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec #define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
// todo refactor: // todo refactor:
@ -105,15 +102,14 @@ void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
} }
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) { bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ) {
bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY; int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
if (isFull) { if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
return true; return true;
} }
int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; int32_t threshold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
double size = SIZE_IN_MiB(taosQueueMemorySize((STaosQueue*)pQueue)); return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= threshold);
return (size >= threahold);
} }
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
@ -123,8 +119,9 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2; return numOfItems1 + numOfItems2;
} }
int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue) { // todo: fix it: data in Qall is not included here
return taosQueueItemSize(pQueue->pQueue); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
return taosQueueMemorySize(pQueue->pQueue);
} }
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) { int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
@ -267,7 +264,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) { if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue, true)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace( stTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
@ -294,7 +291,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pQueue, true)) { if (streamQueueIsFull(pTask->inputInfo.queue, true)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
@ -348,7 +345,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
while (streamQueueIsFull(pQueue, false)) { while (streamQueueIsFull(pTask->inputInfo.queue, false)) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED; return TSDB_CODE_STREAM_EXEC_CANCELLED;

View File

@ -58,16 +58,18 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
STaskId id = streamTaskExtractKey(pTask); STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
// reset value for next time start STaskStartInfo* pStartInfo = &pMeta->startInfo;
taosHashClear(pMeta->startInfo.pReadyTaskSet); pStartInfo->readyTs = pTask->execInfo.start;
pMeta->startInfo.startedAfterNodeUpdate = 0; pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts; streamMetaResetStartInfo(pStartInfo);
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2fs", stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0); ", readyTs:%" PRId64 " total elapsed time:%.2fs",
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} }