refactor: do some internal refactor.
This commit is contained in:
parent
78a5680cf4
commit
e457d94ac6
|
@ -111,14 +111,12 @@ typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
|||
typedef struct {
|
||||
int8_t type;
|
||||
int64_t ver;
|
||||
int32_t* dataRef;
|
||||
SPackedData submit;
|
||||
} SStreamDataSubmit;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int64_t ver;
|
||||
SArray* dataRefs; // SArray<int32_t*>
|
||||
SArray* submits; // SArray<SPackedSubmit>
|
||||
} SStreamMergedSubmit;
|
||||
|
||||
|
@ -672,7 +670,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
|||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
|
||||
// common
|
||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||
|
@ -696,7 +694,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
|||
// source level
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
||||
int32_t streamScanHistoryData(SStreamTask* pTask);
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
|
|
|
@ -1048,7 +1048,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
ASSERT(pTask->status.pauseAllowed == true);
|
||||
}
|
||||
|
||||
streamSourceScanHistoryData(pTask);
|
||||
streamScanHistoryData(pTask);
|
||||
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
|
||||
|
|
|
@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
|
|||
if (k == 0) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
void* colData = colDataGetData(pColData, j);
|
||||
tqDebug("s-task:%s tq sink pipe2, row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
|
||||
tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
|
||||
}
|
||||
|
||||
if (IS_SET_NULL(pCol)) {
|
||||
|
|
|
@ -396,7 +396,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfItems = streamTaskGetInputQItems(pTask);
|
||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue);
|
||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
|
|
@ -177,7 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
|||
}
|
||||
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
||||
// enqueue
|
||||
|
|
|
@ -115,28 +115,16 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
||||
if (pDataSubmit->dataRef == NULL) {
|
||||
taosFreeQitem(pDataSubmit);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pDataSubmit->ver = pData->ver;
|
||||
pDataSubmit->submit = *pData;
|
||||
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
|
||||
pDataSubmit->type = type;
|
||||
|
||||
return pDataSubmit;
|
||||
}
|
||||
|
||||
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
|
||||
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
|
||||
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||
|
||||
if (ref == 0) {
|
||||
taosMemoryFree(pDataSubmit->submit.msgStr);
|
||||
taosMemoryFree(pDataSubmit->dataRef);
|
||||
}
|
||||
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||
taosMemoryFree(pDataSubmit->submit.msgStr);
|
||||
}
|
||||
|
||||
SStreamMergedSubmit* streamMergedSubmitNew() {
|
||||
|
@ -146,11 +134,8 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
|
|||
}
|
||||
|
||||
pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
|
||||
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
|
||||
|
||||
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
|
||||
if (pMerged->submits == NULL) {
|
||||
taosArrayDestroy(pMerged->submits);
|
||||
taosArrayDestroy(pMerged->dataRefs);
|
||||
taosFreeQitem(pMerged);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -160,9 +145,10 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
|
|||
}
|
||||
|
||||
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
|
||||
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
|
||||
taosArrayPush(pMerged->submits, &pSubmit->submit);
|
||||
pMerged->ver = pSubmit->ver;
|
||||
if (pSubmit->ver > pMerged->ver) {
|
||||
pMerged->ver = pSubmit->ver;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -222,18 +208,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
|||
|
||||
int32_t sz = taosArrayGetSize(pMerge->submits);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
|
||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
||||
ASSERT(ref >= 0);
|
||||
|
||||
if (ref == 0) {
|
||||
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
|
||||
taosMemoryFree(pSubmit->msgStr);
|
||||
taosMemoryFree(pRef);
|
||||
}
|
||||
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
|
||||
taosMemoryFree(pSubmit->msgStr);
|
||||
}
|
||||
taosArrayDestroy(pMerge->submits);
|
||||
taosArrayDestroy(pMerge->dataRefs);
|
||||
taosFreeQitem(pMerge);
|
||||
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
|
||||
|
|
|
@ -498,9 +498,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue);
|
||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||
int32_t size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
|
||||
if (numOfElems > 0) {
|
||||
qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems);
|
||||
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
||||
}
|
||||
|
||||
// to make sure only one dispatch is running
|
||||
|
|
|
@ -164,11 +164,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
||||
int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
bool finished = false;
|
||||
int32_t outputBatchSize = 100;
|
||||
|
||||
qSetStreamOpOpen(exec);
|
||||
|
||||
|
@ -217,8 +219,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
|||
block.info.childId = pTask->info.selfChildId;
|
||||
taosArrayPush(pRes, &block);
|
||||
|
||||
if ((++numOfBlocks) >= batchSize) {
|
||||
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
|
||||
if ((++numOfBlocks) >= outputBatchSize) {
|
||||
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -248,13 +250,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
|
||||
int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue);
|
||||
int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall);
|
||||
|
||||
return numOfItems1 + numOfItems2;
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
@ -576,7 +571,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
// todo other thread may change the status
|
||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||
if (type == STREAM_INPUT__CHECKPOINT) {
|
||||
// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
|
||||
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
streamTaskBuildCheckpoint(pTask);
|
||||
|
@ -608,8 +602,6 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// streamTaskBuildCheckpoint(pTask);
|
||||
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->status.schedStatus);
|
||||
|
|
|
@ -165,6 +165,13 @@ bool streamQueueIsFull(const STaosQueue* pQueue) {
|
|||
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
|
||||
}
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
|
||||
int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
|
||||
int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
|
||||
|
||||
return numOfItems1 + numOfItems2;
|
||||
}
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
||||
int32_t retryTimes = 0;
|
||||
int32_t MAX_RETRY_TIMES = 5;
|
||||
|
@ -269,11 +276,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
int8_t type = pItem->type;
|
||||
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
|
||||
int32_t total = taosQueueItemSize(pQueue) + 1;
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
|
||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
qTrace(
|
||||
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
|
@ -292,24 +299,30 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
return code;
|
||||
}
|
||||
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
|
||||
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if (streamQueueIsFull(pQueue)) {
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
|
||||
qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||
type == STREAM_INPUT__TRANS_STATE) {
|
||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||
|
@ -317,6 +330,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
taosFreeQitem(pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
||||
} else if (type == STREAM_INPUT__GET_RES) {
|
||||
|
@ -326,6 +341,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
taosFreeQitem(pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -371,10 +371,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
|
||||
return streamScanExec(pTask, 100);
|
||||
}
|
||||
|
||||
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
||||
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
|
||||
if (pTranstate == NULL) {
|
||||
|
|
Loading…
Reference in New Issue