From c86eeb3938f09b2020c96f56cad33864a78685b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 10:58:33 +0800 Subject: [PATCH 01/28] fix(stream): add task status check when waiting for the table create. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 5 ++++- source/dnode/vnode/src/tq/tqSink.c | 14 +++++++++++--- source/dnode/vnode/src/tq/tqStreamTask.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 1 - source/libs/stream/src/streamQueue.c | 10 +++++----- 7 files changed, 24 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3355e771e2..536273c044 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -250,7 +250,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg); +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4aee98148b..5d47237fb4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -890,9 +890,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); } -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t vgId = pTq->pStreamMeta->vgId; int32_t code; SStreamTaskCheckRsp rsp; @@ -901,7 +902,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { tDecoderInit(&decoder, (uint8_t*)pReq, len); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; tDecoderClear(&decoder); + tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f7132ff6c4..a738befec9 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -487,10 +487,18 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, pTableSinkInfo->uid = mr.me.uid; metaReaderClear(&mr); } - } else { // not exist, wait and retry + } else { // not exist, wait and retry metaReaderClear(&mr); - taosMsleep(100); - tqDebug("s-task:%s wait for the table:%s ready before insert data", id, dstTableName); + if (streamTaskShouldStop(&pTask->status)) { + tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pVals); + + return TSDB_CODE_SUCCESS; + } else { + taosMsleep(100); + tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); + } } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index eb587b8be2..553ef06974 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -112,7 +112,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { - tqInfo("vgId:%d no stream tasks exist", vgId); + tqDebug("vgId:%d no stream tasks existed to run", vgId); taosWUnLockLatch(&pMeta->lock); return 0; } @@ -150,7 +150,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { - tqInfo("vgId:%d no stream tasks exist", vgId); + tqDebug("vgId:%d no stream tasks existed to run", vgId); taosWUnLockLatch(&pMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0b7f969ed7..62822021a6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -756,7 +756,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_VND_STREAM_TASK_CHECK: return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: - return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg); + return tqProcessStreamTaskCheckRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 82fa21ea40..8a80d74c63 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1591,7 +1591,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - // qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 34b0a00639..78901a5ab1 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -264,9 +264,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { - qError( - "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); +// qError( +// "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", +// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); return -1; @@ -288,8 +288,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pQueue)) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); +// qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", +// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; } From e607a0a5cbe88aea45945818a60c6968d6862e12 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 11:39:07 +0800 Subject: [PATCH 02/28] fix(stream): add some logs. --- source/dnode/vnode/src/tq/tqRead.c | 6 +++--- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0839a2bfa3..4f5701524d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -491,11 +491,11 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); if (ret != NULL) { - tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); + tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); return true; } else { - tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, - taosHashGetSize(pReader->tbIdHash), idstr); + tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, + taosHashGetSize(pReader->tbIdHash), idstr); } pReader->nextBlk++; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a53830723c..3117bbf00e 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -395,7 +395,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { tTrace("success to dispatch conn to work thread"); } else { - tError("fail to dispatch conn to work thread"); + tError("fail to dispatch conn to work thread, code:%s", uv_strerror(status)); } if (!uv_is_closing((uv_handle_t*)req->data)) { uv_close((uv_handle_t*)req->data, uvFreeCb); From 8783ebcb02b2840371d9ead18ff149f565b0fe40 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 14:34:36 +0800 Subject: [PATCH 03/28] enh(stream): add some logs and record the submit package performance. --- include/libs/stream/tstream.h | 8 ++ source/dnode/vnode/src/tq/tqSink.c | 109 +++++++++++++++++----------- source/libs/stream/src/streamTask.c | 2 +- 3 files changed, 76 insertions(+), 43 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ea3524d12d..86e79835ac 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -312,10 +312,17 @@ typedef struct STaskSchedInfo { void* pTimer; } STaskSchedInfo; +typedef struct SSinkTaskRecorder { + int64_t numOfPackages; + int64_t numOfRows; +} SSinkTaskRecorder; + typedef struct { + int64_t created; int64_t init; int64_t step1Start; int64_t step2Start; + int64_t sinkStart; } STaskTimestamp; struct SStreamTask { @@ -345,6 +352,7 @@ struct SStreamTask { STaskSinkSma smaSink; STaskSinkFetch fetchSink; }; + SSinkTaskRecorder sinkRecorder; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a738befec9..5596c958e9 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -133,7 +133,6 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } - void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -144,6 +143,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t code = TSDB_CODE_SUCCESS; + if (pTask->tsInfo.sinkStart == 0) { + pTask->tsInfo.sinkStart = taosGetTimestampMs(); + } + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); SArray* tagArray = NULL; @@ -259,10 +262,17 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } else { + pTask->sinkRecorder.numOfPackages += 1; code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask); } } + if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { + SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + tqInfo("s-task:%s vgId:d write %d blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", pTask->id.idStr, + vgId, pRec->numOfPackages, pRec->numOfRows, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); + } + tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); _end: @@ -399,6 +409,55 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi return code; } +static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** pMsg, int32_t* msgLen) { + int32_t code = 0; + void* pBuf = NULL; + *msgLen = 0; + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); + taosArrayDestroy(pTableData->aRowP); + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(submitReq.aSubmitTbData, pTableData); + + // encode + int32_t len = 0; + tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); + + SEncoder encoder; + len += sizeof(SSubmitReq2Msg); + + pBuf = rpcMallocCont(len); + if (NULL == pBuf) { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); + if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); + tEncoderClear(&encoder); + rpcFreeCont(pBuf); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + return code; + } + + tEncoderClear(&encoder); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + + *msgLen = len; + *pMsg = pBuf; + return TSDB_CODE_SUCCESS; +} + int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, SStreamTask* pTask) { int32_t numOfRows = pDataBlock->info.rows; @@ -406,7 +465,6 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, uint64_t groupId = pDataBlock->info.id.groupId; STSchema* pTSchema = pTask->tbSink.pTSchema; int32_t code = TSDB_CODE_SUCCESS; - void* pBuf = NULL; SArray* pVals = NULL; const char* id = pTask->id.idStr; @@ -573,7 +631,7 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { // address copy, no value - SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; + SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { @@ -601,53 +659,20 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, taosArrayPush(tbData.aRowP, &pRow); } - SSubmitReq2 submitReq = {0}; - if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); - - taosArrayDestroy(tbData.aRowP); - taosArrayDestroy(pVals); - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - - // encode int32_t len = 0; - tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); - - SEncoder encoder; - len += sizeof(SSubmitReq2Msg); - - pBuf = rpcMallocCont(len); - if (NULL == pBuf) { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(tbData.aRowP); + void* pBuf = NULL; + code = tqBuildSubmitReq(&tbData, vgId, &pBuf, &len); + if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pVals); - } - - ((SSubmitReq2Msg*)pBuf)->header.vgId = vgId; - ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); - ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); - - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); - if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); - tEncoderClear(&encoder); - rpcFreeCont(pBuf); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - return code; } - tEncoderClear(&encoder); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + pTask->sinkRecorder.numOfRows += numOfRows; - SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len }; + SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); - if(code == TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS) { tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows); } else { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 711dbf65e7..7ce5040257 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -375,7 +375,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return -1; } - pTask->tsInfo.init = taosGetTimestampMs(); + pTask->tsInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; From 236b363996e49f0b88b8a9c32075b713fbfba129 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 14:37:11 +0800 Subject: [PATCH 04/28] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqSink.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5596c958e9..82703260ed 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -269,8 +269,9 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - tqInfo("s-task:%s vgId:d write %d blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", pTask->id.idStr, - vgId, pRec->numOfPackages, pRec->numOfRows, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); + tqInfo("s-task:%s vgId:d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", + pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, + (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); } tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); From b382cba58ce6fe17327ef0d063c43e34ef9c5df5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 14:37:58 +0800 Subject: [PATCH 05/28] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 82703260ed..2df1dbe24f 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -269,7 +269,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - tqInfo("s-task:%s vgId:d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", + tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); } From 926cc01cff614763f14629393861334db5affdd7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 16:48:43 +0800 Subject: [PATCH 06/28] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqSink.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2df1dbe24f..ee073e6bc4 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -147,7 +147,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { pTask->tsInfo.sinkStart = taosGetTimestampMs(); } - tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); + tqInfo("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); SArray* tagArray = NULL; SArray* pVals = NULL; @@ -401,7 +401,6 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pTableSinkInfo); - tqError("s-task:%s failed to put tableSinkInfo in to cache, code:%s", id, tstrerror(code)); } else { tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data, pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap)); @@ -674,7 +673,7 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); if (code == TSDB_CODE_SUCCESS) { - tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows); + tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); } else { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } From a82027f4925e5ccc662450d1f27efded351ed7fe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 18:58:47 +0800 Subject: [PATCH 07/28] enh(stream): pack multi-datablock into one submit message. --- source/dnode/vnode/src/tq/tqSink.c | 155 +++++++++++++++-------------- 1 file changed, 82 insertions(+), 73 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index ee073e6bc4..512007d578 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -25,9 +25,10 @@ typedef struct STableSinkInfo { } STableSinkInfo; static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, - SSDataBlock* pDataBlock, SStreamTask* pTask); + SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); +static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -142,17 +143,25 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { int32_t vgId = TD_VID(pVnode); int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t code = TSDB_CODE_SUCCESS; + const char* id = pTask->id.idStr; if (pTask->tsInfo.sinkStart == 0) { pTask->tsInfo.sinkStart = taosGetTimestampMs(); } - tqInfo("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks); + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, id, numOfBlocks); + bool hasSubmit = false; SArray* tagArray = NULL; SArray* pVals = NULL; SArray* crTblArray = NULL; + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); int32_t rows = pDataBlock->info.rows; @@ -262,19 +271,46 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } else { + hasSubmit = true; pTask->sinkRecorder.numOfPackages += 1; - code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask); + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); + + taosArrayPush(submitReq.aSubmitTbData, &tbData); + pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; } } - if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { - SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", - pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, - (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); - } + if (hasSubmit) { + int32_t len = 0; + void* pBuf = NULL; + code = tqBuildSubmitReq(&submitReq, vgId, &pBuf, &len); - tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code)); + goto _end; + } + + SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; + code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); + + if (code == TSDB_CODE_SUCCESS) { + tqDebug("s-task:%s vgId:%d send submit %d blocks(%d rows) into dstTables completed", id, vgId); + } else { + tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); + } + + if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { + SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", + pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, + (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); + } + } else { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); + } _end: taosArrayDestroy(tagArray); @@ -409,30 +445,21 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi return code; } -static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** pMsg, int32_t* msgLen) { +int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) { int32_t code = 0; void* pBuf = NULL; *msgLen = 0; - SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; - if (submitReq.aSubmitTbData == NULL) { - tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(pTableData->aRowP); - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(submitReq.aSubmitTbData, pTableData); - // encode int32_t len = 0; - tEncodeSize(tEncodeSubmitReq, &submitReq, len, code); + tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code); SEncoder encoder; len += sizeof(SSubmitReq2Msg); pBuf = rpcMallocCont(len); if (NULL == pBuf) { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); return TSDB_CODE_OUT_OF_MEMORY; } @@ -441,17 +468,17 @@ static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); - if (tEncodeSubmitReq(&encoder, &submitReq) < 0) { + if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to encode submit req, code:%s, ignore and continue", terrstr()); tEncoderClear(&encoder); rpcFreeCont(pBuf); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); return code; } tEncoderClear(&encoder); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); *msgLen = len; *pMsg = pBuf; @@ -459,7 +486,7 @@ static int32_t tqBuildSubmitReq(SSubmitTbData* pTableData, int32_t vgId, void** } int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, - SStreamTask* pTask) { + SStreamTask* pTask, SSubmitTbData* pTableData) { int32_t numOfRows = pDataBlock->info.rows; int32_t vgId = TD_VID(pVnode); uint64_t groupId = pDataBlock->info.id.groupId; @@ -468,15 +495,14 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, SArray* pVals = NULL; const char* id = pTask->id.idStr; - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - tqDebug("s-task:%s sink data pipeline, build submit msg from %d-th resBlock, including %d rows, dst suid:%" PRId64, + tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id, blockIndex + 1, numOfRows, suid); - tbData.aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); + pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*)); pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); - if (tbData.aRowP == NULL || pVals == NULL) { - taosArrayDestroy(tbData.aRowP); + if (pTableData->aRowP == NULL || pVals == NULL) { + taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); code = TSDB_CODE_OUT_OF_MEMORY; @@ -517,13 +543,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, } if (exist) { - tbData.uid = pTableSinkInfo->uid; + pTableData->uid = pTableSinkInfo->uid; - if (tbData.uid == 0) { + if (pTableData->uid == 0) { tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); } while (pTableSinkInfo->uid == 0) { + if (streamTaskShouldStop(&pTask->status)) { + tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); + taosArrayDestroy(pTableData->aRowP); + taosArrayDestroy(pVals); + + return TSDB_CODE_SUCCESS; + } + // wait for the table to be created SMetaReader mr = {0}; metaReaderDoInit(&mr, pVnode->pMeta, 0); @@ -534,29 +568,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, if (!isValid) { // not valid table, ignore it metaReaderClear(&mr); - taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } else { tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); - tbData.uid = mr.me.uid; + pTableData->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; metaReaderClear(&mr); } } else { // not exist, wait and retry metaReaderClear(&mr); - if (streamTaskShouldStop(&pTask->status)) { - tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); - taosArrayDestroy(tbData.aRowP); - taosArrayDestroy(pVals); - - return TSDB_CODE_SUCCESS; - } else { - taosMsleep(100); - tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); - } + taosMsleep(100); + tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); } } @@ -576,12 +602,12 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); - tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); - if (tbData.pCreateTbReq == NULL) { + pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); + if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return terrno; @@ -593,14 +619,14 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, if (!isValid) { metaReaderClear(&mr); taosMemoryFree(pTableSinkInfo); - taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } else { - tbData.uid = mr.me.uid; + pTableData->uid = mr.me.uid; metaReaderClear(&mr); - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, tbData.uid, id); + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id); } } } @@ -648,35 +674,18 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, SRow* pRow = NULL; code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { - tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); + tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return code; } ASSERT(pRow); - taosArrayPush(tbData.aRowP, &pRow); + taosArrayPush(pTableData->aRowP, &pRow); } - int32_t len = 0; - void* pBuf = NULL; - code = tqBuildSubmitReq(&tbData, vgId, &pBuf, &len); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pVals); - return code; - } - - pTask->sinkRecorder.numOfRows += numOfRows; - - SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; - code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); - - if (code == TSDB_CODE_SUCCESS) { - tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); - } else { - tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); - } + tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); taosArrayDestroy(pVals); return code; From 5ba6a68e7a9303de72afc5637b7e4832211f57be Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 19:00:39 +0800 Subject: [PATCH 08/28] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqSink.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 512007d578..6347b9cadd 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -294,9 +294,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); - if (code == TSDB_CODE_SUCCESS) { - tqDebug("s-task:%s vgId:%d send submit %d blocks(%d rows) into dstTables completed", id, vgId); + tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks); } else { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } From 8edf86b3bb8a823dc699403972c1c78f3209a4f3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 19:08:09 +0800 Subject: [PATCH 09/28] fix(stream): add some logs. --- include/libs/stream/tstream.h | 3 ++- source/dnode/vnode/src/tq/tqSink.c | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 86e79835ac..d36231c018 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -313,7 +313,8 @@ typedef struct STaskSchedInfo { } STaskSchedInfo; typedef struct SSinkTaskRecorder { - int64_t numOfPackages; + int64_t numOfSubmit; + int64_t numOfBlocks; int64_t numOfRows; } SSinkTaskRecorder; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6347b9cadd..45991d83c8 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -272,7 +272,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { continue; } else { hasSubmit = true; - pTask->sinkRecorder.numOfPackages += 1; + pTask->sinkRecorder.numOfBlocks += 1; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); @@ -300,10 +300,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } - if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { + pTask->sinkRecorder.numOfSubmit += 1; + + if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", - pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, + tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 + " submit into dst table, duration:%.2f Sec.", + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); } } else { From 2b84e0b02e3f192d008ef28752013c9435df619e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Sep 2023 19:39:00 +0800 Subject: [PATCH 10/28] refactor: check if the put data into queue is success or failed. --- source/dnode/vnode/src/tq/tqSink.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 45991d83c8..449d460b02 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -24,7 +24,7 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; -static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, +static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); @@ -275,7 +275,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { pTask->sinkRecorder.numOfBlocks += 1; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); + code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); taosArrayPush(submitReq.aSubmitTbData, &tbData); pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; @@ -297,6 +297,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { if (code == TSDB_CODE_SUCCESS) { tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks); } else { + ASSERT(0); tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } @@ -487,7 +488,7 @@ int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int return TSDB_CODE_SUCCESS; } -int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, +int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) { int32_t numOfRows = pDataBlock->info.rows; int32_t vgId = TD_VID(pVnode); From b240e91c911f9ce0b4c57f99bd08bbd8d6038ceb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Sep 2023 09:58:48 +0800 Subject: [PATCH 11/28] enh(stream): add token bucket to limit the rate of sink operation. --- include/libs/stream/tstream.h | 8 +++++ source/dnode/vnode/src/tq/tqSink.c | 6 ++-- source/libs/stream/inc/streamInt.h | 2 ++ source/libs/stream/src/streamQueue.c | 49 ++++++++++++++++++++++++++++ source/libs/stream/src/streamTask.c | 1 + 5 files changed, 64 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d36231c018..79d427e5c7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -326,6 +326,13 @@ typedef struct { int64_t sinkStart; } STaskTimestamp; +typedef struct STokenBucket { + int32_t capacity; // total capacity + int64_t fillTimestamp;// fill timestamp + int32_t numOfToken; // total available tokens + int32_t rate; // number of token per second +} STokenBucket; + struct SStreamTask { int64_t ver; SStreamTaskId id; @@ -354,6 +361,7 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; SSinkTaskRecorder sinkRecorder; + STokenBucket tokenBucket; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 449d460b02..1e714a8cad 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -25,10 +25,12 @@ typedef struct STableSinkInfo { } STableSinkInfo; static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, - SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); + SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); +static void fillBucket(STokenBucket* pBucket); +static bool hasAvailableToken(STokenBucket* pBucket); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -692,4 +694,4 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF taosArrayDestroy(pVals); return code; -} \ No newline at end of file +} diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bb81582a2d..bbb7595e5a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -77,6 +77,8 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 78901a5ab1..0a0573cc90 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -29,6 +29,8 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; +static bool streamTaskHasAvailableToken(STokenBucket* pBucket); + static void streamQueueCleanup(SStreamQueue* pQueue) { void* qItem = NULL; while ((qItem = streamQueueNextItem(pQueue)) != NULL) { @@ -175,6 +177,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } + STokenBucket* pBucket = &pTask->tokenBucket; + bool has = streamTaskHasAvailableToken(pBucket); + if (!has) { // no available token in th bucket, ignore this execution + qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, + pBucket->capacity, pBucket->rate); + return TSDB_CODE_SUCCESS; + } + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); @@ -320,3 +330,42 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return 0; } + +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { + if (cap < 100 || rate < 50 || pBucket == NULL) { + qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); + return TSDB_CODE_INVALID_PARA; + } + + pBucket->capacity = cap; + pBucket->rate = rate; + pBucket->numOfToken = cap; + pBucket->fillTimestamp = taosGetTimestampMs(); + return TSDB_CODE_SUCCESS; +} + +static void fillBucket(STokenBucket* pBucket) { + int64_t now = taosGetTimestampMs(); + int64_t delta = now - pBucket->fillTimestamp; + + int32_t inc = (delta / 1000.0) * pBucket->rate; + if (inc > 0) { + if ((pBucket->numOfToken + inc) < pBucket->capacity) { + pBucket->numOfToken += inc; + } else { + pBucket->numOfToken = pBucket->capacity; + } + + pBucket->fillTimestamp = now; + } +} + +bool streamTaskHasAvailableToken(STokenBucket* pBucket) { + fillBucket(pBucket); + bool hasToken = pBucket->numOfToken > 0; + if (hasToken) { + return true; + } else { + return false; + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7ce5040257..24b8933387 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -385,6 +385,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; + streamTaskInitTokenBucket(&pTask->tokenBucket, 400, 200); taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); From 956047a441be733f435724ce23e4d6fc465f9fee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Sep 2023 10:02:13 +0800 Subject: [PATCH 12/28] fix(stream): count down the token in bucket. --- source/libs/stream/src/streamQueue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 0a0573cc90..202afa5854 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -362,8 +362,9 @@ static void fillBucket(STokenBucket* pBucket) { bool streamTaskHasAvailableToken(STokenBucket* pBucket) { fillBucket(pBucket); - bool hasToken = pBucket->numOfToken > 0; + bool hasToken = (--pBucket->numOfToken) > 0; if (hasToken) { + qInfo("remain token:%d", pBucket->numOfToken); return true; } else { return false; From e407d7dc6bbd2a5752c578e9d3af1b83559e7c86 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Sep 2023 10:09:05 +0800 Subject: [PATCH 13/28] fix(stream): update log. --- source/libs/stream/src/streamQueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 202afa5854..acd34132d5 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -180,8 +180,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu STokenBucket* pBucket = &pTask->tokenBucket; bool has = streamTaskHasAvailableToken(pBucket); if (!has) { // no available token in th bucket, ignore this execution - qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, - pBucket->capacity, pBucket->rate); +// qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, +// pBucket->capacity, pBucket->rate); return TSDB_CODE_SUCCESS; } @@ -357,6 +357,7 @@ static void fillBucket(STokenBucket* pBucket) { } pBucket->fillTimestamp = now; + qDebug("new token available, ts:%"PRId64, now); } } From dc821057e5275a673d10864f4238128385066393 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Sep 2023 10:33:38 +0800 Subject: [PATCH 14/28] fix(stream): update the log. --- source/libs/stream/src/streamQueue.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index acd34132d5..f2b50a024f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -357,7 +357,7 @@ static void fillBucket(STokenBucket* pBucket) { } pBucket->fillTimestamp = now; - qDebug("new token available, ts:%"PRId64, now); + qInfo("new token available, ts:%"PRId64, now); } } @@ -365,7 +365,7 @@ bool streamTaskHasAvailableToken(STokenBucket* pBucket) { fillBucket(pBucket); bool hasToken = (--pBucket->numOfToken) > 0; if (hasToken) { - qInfo("remain token:%d", pBucket->numOfToken); + qDebug("remain token:%d", pBucket->numOfToken); return true; } else { return false; From 82222f1ab8fb06ab1fc0c345419921938262ee37 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Sep 2023 11:43:43 +0800 Subject: [PATCH 15/28] other: disable detail display. --- cmake/cmake.define | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index edc5dd601a..53026df4ef 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory From b317c8ebff51c998f9d6a2ac3a0cf929de37df3d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 8 Sep 2023 12:46:15 +0800 Subject: [PATCH 16/28] fix(stream): disable merge submit blocks. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 454 +++++++++++------- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/stream/src/streamRecover.c | 19 +- tests/system-test/8-stream/scalar_function.py | 3 +- 7 files changed, 300 insertions(+), 186 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 79d427e5c7..fc825b54dd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -700,7 +700,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level -int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pRpcInfo); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5d47237fb4..df645db7f1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1126,7 +1126,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); // set the fill-history task to be normal - if (pTask->info.fillHistory == 1) { + if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { streamSetStatusNormal(pTask); } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 1e714a8cad..2fc0f41a3b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -26,7 +26,7 @@ typedef struct STableSinkInfo { static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData); -static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, +static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); static void fillBucket(STokenBucket* pBucket); @@ -136,6 +136,149 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } +static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, + int64_t suid) { + tqDebug("s-task:%s build create table msg", pTask->id.idStr); + + STSchema* pTSchema = pTask->tbSink.pTSchema; + int32_t rows = pDataBlock->info.rows; + SArray* tagArray = NULL; + int32_t code = 0; + + SVCreateTbBatchReq reqs = {0}; + + SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); + if (NULL == reqs.pArray) { + goto _end; + } + + for (int32_t rowId = 0; rowId < rows; rowId++) { + SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); + + // set tag content + int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); + if (size == 2) { + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + + STagVal tagVal = { + .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; + + taosArrayPush(tagArray, &tagVal); + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + } else { + tagArray = taosArrayInit(size - 1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + + for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { + SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); + + STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; + void* pData = colDataGetData(pTagData, rowId); + if (colDataIsNull_s(pTagData, rowId)) { + continue; + } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { + tagVal.nData = varDataLen(pData); + tagVal.pData = (uint8_t*)varDataVal(pData); + } else { + memcpy(&tagVal.i64, pData, pTagData->info.bytes); + } + taosArrayPush(tagArray, &tagVal); + } + } + pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); + if (pTag == NULL) { + tdDestroySVCreateTbReq(pCreateTbReq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set table name + if (!pDataBlock->info.parTbName[0]) { + SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); + + void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); + pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); + } else { + pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); + } + + taosArrayPush(reqs.pArray, pCreateTbReq); + tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); + } + + reqs.nReqs = taosArrayGetSize(reqs.pArray); + code = tqPutReqToQueue(pVnode, &reqs); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s failed to send create table msg", pTask->id.idStr); + } + + _end: + taosArrayDestroy(tagArray); + taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); + return code; +} + +static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32_t numOfBlocks, SSubmitReq2* pReq) { + const char* id = pTask->id.idStr; + int32_t vgId = TD_VID(pVnode); + int32_t len = 0; + void* pBuf = NULL; + + int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code)); + return code; + } + + SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; + code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); + if (code == TSDB_CODE_SUCCESS) { + tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks); + } else { + tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); + } + + pTask->sinkRecorder.numOfSubmit += 1; + + if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { + SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0; + tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 + " submit into dst table, duration:%.2f Sec.", + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); + } + + return TSDB_CODE_SUCCESS; +} + void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -151,180 +294,141 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { pTask->tsInfo.sinkStart = taosGetTimestampMs(); } - tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, id, numOfBlocks); - - bool hasSubmit = false; - SArray* tagArray = NULL; - SArray* pVals = NULL; - SArray* crTblArray = NULL; - - SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; - if (submitReq.aSubmitTbData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - for (int32_t i = 0; i < numOfBlocks; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - int32_t rows = pDataBlock->info.rows; - - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid); - } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { - tqDebug("s-task:%s build create table msg", pTask->id.idStr); - - SVCreateTbBatchReq reqs = {0}; - crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); - if (NULL == reqs.pArray) { - goto _end; - } - - for (int32_t rowId = 0; rowId < rows; rowId++) { - SVCreateTbReq* pCreateTbReq = &((SVCreateTbReq){0}); - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - - // set tag content - int32_t size = taosArrayGetSize(pDataBlock->pDataBlock); - if (size == 2) { - tagArray = taosArrayInit(1, sizeof(STagVal)); - - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - - STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId}; - - taosArrayPush(tagArray, &tagVal); - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - } else { - tagArray = taosArrayInit(size - 1, sizeof(STagVal)); - if (!tagArray) { - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - - for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { - SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); - - STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; - void* pData = colDataGetData(pTagData, rowId); - if (colDataIsNull_s(pTagData, rowId)) { - continue; - } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { - tagVal.nData = varDataLen(pData); - tagVal.pData = (uint8_t*) varDataVal(pData); - } else { - memcpy(&tagVal.i64, pData, pTagData->info.bytes); - } - taosArrayPush(tagArray, &tagVal); - } - - } - pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { - tdDestroySVCreateTbReq(pCreateTbReq); - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set table name - if (!pDataBlock->info.parTbName[0]) { - SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); - void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); - } else { - pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName); - } - - taosArrayPush(reqs.pArray, pCreateTbReq); - tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); - } - - reqs.nReqs = taosArrayGetSize(reqs.pArray); - if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) { - goto _end; - } - - tagArray = taosArrayDestroy(tagArray); - taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); - crTblArray = NULL; - } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { - continue; - } else { - hasSubmit = true; - pTask->sinkRecorder.numOfBlocks += 1; - - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; + bool isMixBlocks = true; + for(int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* p = taosArrayGet(pBlocks, i); + if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { + isMixBlocks = true; + break; } } - if (hasSubmit) { - int32_t len = 0; - void* pBuf = NULL; - code = tqBuildSubmitReq(&submitReq, vgId, &pBuf, &len); + if (isMixBlocks) { + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, + numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code)); - goto _end; - } + for(int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } else { + pTask->sinkRecorder.numOfBlocks += 1; - SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len}; - code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); - if (code == TSDB_CODE_SUCCESS) { - tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks); - } else { - ASSERT(0); - tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); - } + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + return; + } - pTask->sinkRecorder.numOfSubmit += 1; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); + taosArrayPush(submitReq.aSubmitTbData, &tbData); - if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { - SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 - " submit into dst table, duration:%.2f Sec.", - pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, - (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); + code = doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq); + } } } else { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks); + SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + taosHashCleanup(pTableIndexMap); + return; + } + + bool hasSubmit = false; + + for (int32_t i = 0; i < numOfBlocks; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } else { + hasSubmit = true; + pTask->sinkRecorder.numOfBlocks += 1; + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); + + int32_t* index = taosHashGet(pTableIndexMap, &tbData.uid, sizeof(tbData.uid)); + if (index == NULL) { // no data yet, append it + taosArrayPush(submitReq.aSubmitTbData, &tbData); + + int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; + taosHashPut(pTableIndexMap, &tbData.uid, sizeof(tbData.uid), &size, sizeof(size)); + } else { + SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); + // merge the new submit table block with the existed blocks + // if ts in the new data block overlap with existed one, replace it + int32_t oldLen = taosArrayGetSize(pExisted->aRowP); + int32_t newLen = taosArrayGetSize(tbData.aRowP); + + int32_t j = 0, k = 0; + SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); + while (j < newLen && k < oldLen) { + SRow* pNewRow = taosArrayGetP(tbData.aRowP, j); + SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); + if (pNewRow->ts <= pOldRow->ts) { + taosArrayPush(pFinal, &pNewRow); + if (pNewRow->ts < pOldRow->ts) { + j += 1; + } else { + j += 1; + k += 1; + } + } else { + taosArrayPush(pFinal, &pOldRow); + k += 1; + } + } + + while (j < newLen) { + SRow* pRow = taosArrayGetP(tbData.aRowP, j++); + taosArrayPush(pFinal, &pRow); + } + + while (k < oldLen) { + SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); + taosArrayPush(pFinal, &pRow); + } + + taosArrayDestroy(tbData.aRowP); + taosArrayDestroy(pExisted->aRowP); + pExisted->aRowP = pFinal; + + tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, + (int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (tbData.pCreateTbReq != NULL)); + } + + pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; + } + } + + taosHashCleanup(pTableIndexMap); + + if (hasSubmit) { + doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq); + } else { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); + } } -_end: - taosArrayDestroy(tagArray); - taosArrayDestroy(pVals); - taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq); // TODO: change } -int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, +int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; @@ -490,8 +594,19 @@ int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int return TSDB_CODE_SUCCESS; } -int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock, - SStreamTask* pTask, SSubmitTbData* pTableData) { +static int32_t tsAscendingSortFn(const void* p1, const void* p2) { + SRow* pRow1 = *(SRow**) p1; + SRow* pRow2 = *(SRow**) p2; + + if (pRow1->ts == pRow2->ts) { + return 0; + } else { + return pRow1->ts > pRow2->ts? 1:-1; + } +} + +int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, + SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) { int32_t numOfRows = pDataBlock->info.rows; int32_t vgId = TD_VID(pVnode); uint64_t groupId = pDataBlock->info.id.groupId; @@ -507,7 +622,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); if (pTableData->aRowP == NULL || pVals == NULL) { - taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); code = TSDB_CODE_OUT_OF_MEMORY; @@ -557,7 +672,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF while (pTableSinkInfo->uid == 0) { if (streamTaskShouldStop(&pTask->status)) { tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); - taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; @@ -573,7 +688,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF if (!isValid) { // not valid table, ignore it metaReaderClear(&mr); - taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; @@ -612,7 +727,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return terrno; @@ -624,7 +739,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF if (!isValid) { metaReaderClear(&mr); taosMemoryFree(pTableSinkInfo); - taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return TSDB_CODE_SUCCESS; } else { @@ -681,7 +796,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - taosArrayDestroy(pTableData->aRowP); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); return code; } @@ -690,6 +805,7 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF taosArrayPush(pTableData->aRowP, &pRow); } + taosArraySort(pTableData->aRowP, tsAscendingSortFn); tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); taosArrayDestroy(pVals); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 62822021a6..8b094132f3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1443,6 +1443,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0); TSKEY *aKey = (TSKEY *)(pColData->pData); + vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), pColData->nVal, pSubmitTbData->uid); for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) { if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) { @@ -1459,7 +1460,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in for (int32_t iRow = 0; iRow < nRow; ++iRow) { if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { code = TSDB_CODE_INVALID_MSG; - vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); + vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); goto _exit; } } @@ -1564,6 +1565,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in } else { // create table failed if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { code = terrno; + vError("vgId:%d failed to create table:%s, code:%s", TD_VID(pVnode), pSubmitTbData->pCreateTbReq->name, + tstrerror(terrno)); goto _exit; } terrno = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 12ec63b4cb..474128007a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2257,7 +2257,7 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); + qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id); if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); continue; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 743d87e938..5c03e3a95e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -30,6 +30,13 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); + qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, + streamGetTaskStatusStr(pTask->status.taskStatus)); + } + ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; @@ -97,11 +104,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { streamSetParamForScanHistory(pTask); streamTaskEnablePause(pTask); } - - streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); - streamTaskScanHistoryPrepare(pTask); } return 0; } @@ -402,15 +406,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// agg -int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { - pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); - qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", - pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, - streamGetTaskStatusStr(pTask->status.taskStatus)); - return 0; -} - int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) { diff --git a/tests/system-test/8-stream/scalar_function.py b/tests/system-test/8-stream/scalar_function.py index 56537e2f54..3bc44a7dc7 100644 --- a/tests/system-test/8-stream/scalar_function.py +++ b/tests/system-test/8-stream/scalar_function.py @@ -6,7 +6,8 @@ from util.cases import * from util.common import * class TDTestCase: - updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, + 'asynclog': 0} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) From 328377c25fc2884aa38fbe427ca6e99628c26f01 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 8 Sep 2023 23:04:39 +0800 Subject: [PATCH 17/28] refactor: add some logs. --- source/libs/stream/src/streamQueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index f2b50a024f..609a67fb95 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -357,7 +357,7 @@ static void fillBucket(STokenBucket* pBucket) { } pBucket->fillTimestamp = now; - qInfo("new token available, ts:%"PRId64, now); + qInfo("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); } } From 35b97cbd19edac6746c28e7fa658ff44bf42947b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 8 Sep 2023 23:50:41 +0800 Subject: [PATCH 18/28] fix(stream): set correct token in bucket. --- source/libs/stream/src/streamDispatch.c | 13 ++++++++++--- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamQueue.c | 8 ++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 916cc6e9ee..d04f55628c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1001,10 +1001,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // so the TASK_INPUT_STATUS_BLOCKED is rsp if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + double el = 0; + if (pTask->msgInfo.blockingTs == 0) { + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + } else { + el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0; + } + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 + " wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ff667fa778..158727efea 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -84,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { - qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr); taosMsleep(1000); continue; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 609a67fb95..b65063f49a 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -347,6 +347,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t ra static void fillBucket(STokenBucket* pBucket) { int64_t now = taosGetTimestampMs(); int64_t delta = now - pBucket->fillTimestamp; + ASSERT(pBucket->numOfToken >= 0); int32_t inc = (delta / 1000.0) * pBucket->rate; if (inc > 0) { @@ -363,10 +364,9 @@ static void fillBucket(STokenBucket* pBucket) { bool streamTaskHasAvailableToken(STokenBucket* pBucket) { fillBucket(pBucket); - bool hasToken = (--pBucket->numOfToken) > 0; - if (hasToken) { - qDebug("remain token:%d", pBucket->numOfToken); - return true; + if (pBucket->numOfToken > 0) { + qDebug("remain token:%d", pBucket->numOfToken-1); + return --pBucket->numOfToken; } else { return false; } From bf79c2c9939a01bf3669690a461acb962989dfc0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 9 Sep 2023 00:11:13 +0800 Subject: [PATCH 19/28] fix(stream): limit the sink rate --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 24b8933387..58f7721c92 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -385,7 +385,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; - streamTaskInitTokenBucket(&pTask->tokenBucket, 400, 200); + streamTaskInitTokenBucket(&pTask->tokenBucket, 300, 200); taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); From e4918e85eb49cf6ac2cda3724e3ce5b757770dab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 9 Sep 2023 00:22:22 +0800 Subject: [PATCH 20/28] fix(stream): limit the sink rate --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 58f7721c92..050fa39397 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -385,7 +385,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; - streamTaskInitTokenBucket(&pTask->tokenBucket, 300, 200); + streamTaskInitTokenBucket(&pTask->tokenBucket, 200, 100); taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); From 47fd14444705680e5ed95604b6273fe3990568c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 9 Sep 2023 11:19:51 +0800 Subject: [PATCH 21/28] other: add some logs. --- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 +++++ source/libs/stream/src/streamTask.c | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8b094132f3..6ff33c4f6e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1446,6 +1446,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), pColData->nVal, pSubmitTbData->uid); for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) { + vDebug("vgId:%d uid:%"PRId64" ts:%"PRId64, TD_VID(pVnode), pSubmitTbData->uid, aKey[iRow]); + if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) { code = TSDB_CODE_INVALID_MSG; vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); @@ -1457,7 +1459,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP); SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); + vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), nRow, pSubmitTbData->uid); for (int32_t iRow = 0; iRow < nRow; ++iRow) { + vDebug("vgId:%d uid:%"PRId64" ts:%"PRId64, TD_VID(pVnode), pSubmitTbData->uid, aRow[iRow]->ts); + if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { code = TSDB_CODE_INVALID_MSG; vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 050fa39397..162ae3f097 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -385,7 +385,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; - streamTaskInitTokenBucket(&pTask->tokenBucket, 200, 100); + streamTaskInitTokenBucket(&pTask->tokenBucket, 150, 100); taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); From 6f5c855a4baef69c4162ffe4f0e7b0f8afb90d88 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 9 Sep 2023 15:08:36 +0800 Subject: [PATCH 22/28] fix(stream): fix msg lost bug. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 22 ++++++++++------------ source/libs/stream/src/streamExec.c | 4 ++-- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2fc0f41a3b..88bdc8a4d9 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -329,7 +329,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); taosArrayPush(submitReq.aSubmitTbData, &tbData); - code = doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq); + code = doBuildSubmitAndSendMsg(pVnode, pTask, 1, &submitReq); } } } else { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 553ef06974..4a2d5a2ea6 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -396,25 +396,23 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t numOfItems = streamTaskGetInputQItems(pTask); int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; + taosThreadMutexLock(&pTask->lock); + + pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + SStreamQueueItem* pItem = NULL; code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - taosThreadMutexLock(&pTask->lock); - pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - - if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { - tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); taosThreadMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pStreamMeta, pTask); - if (pItem != NULL) { - streamFreeQitem(pItem); - } continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 158727efea..6f0357324a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -566,8 +566,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { - qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, - pTask->chkInfo.checkpointVer, ver); + qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64, + pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.currentVer); pTask->chkInfo.checkpointVer = ver; } From aa8909b267a649c743a3aebad39e659324cc1608 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 9 Sep 2023 15:18:54 +0800 Subject: [PATCH 23/28] other: add some logs. --- include/libs/wal/wal.h | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqMeta.c | 4 ++-- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/libs/sync/src/syncRaftLog.c | 2 +- source/libs/wal/src/walRead.c | 34 ++++++++++++++++-------------- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index cfe70a186c..a56a5567eb 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -192,7 +192,7 @@ int32_t walApplyVer(SWal *, int64_t ver); // int32_t walDataCorrupted(SWal*); // wal reader -SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); +SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index df645db7f1..af08a0dfc4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files - pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); + pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); } // reset the task status from unfinished transaction diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 85151c6e19..154ac1e8c1 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -312,14 +312,14 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ return -1; } } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); handle->execHandle.pTqReader = tqReaderOpen(pVnode); buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, (SSnapContext**)(&reader.sContext)); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 4f5701524d..cf5d160d75 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -250,7 +250,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) { return NULL; } - pReader->pWalReader = walOpenReader(pVnode->pWal, NULL); + pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); if (pReader->pWalReader == NULL) { taosMemoryFree(pReader); return NULL; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 9299651999..b167f2ecb6 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -59,7 +59,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ASSERT(pData->pWal != NULL); taosThreadMutexInit(&(pData->mutex), NULL); - pData->pWalHandle = walOpenReader(pData->pWal, NULL); + pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); ASSERT(pData->pWalHandle != NULL); pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index d9e43e4324..9db81111f9 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -16,7 +16,7 @@ #include "taoserror.h" #include "walInt.h" -SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { +SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) { SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); if (pReader == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -24,7 +24,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { } pReader->pWal = pWal; - pReader->readerId = tGenIdPI64(); + pReader->readerId = (id != 0)? id:tGenIdPI64(); pReader->pIdxFile = NULL; pReader->pLogFile = NULL; pReader->curVersion = -1; @@ -257,9 +257,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { bool seeked = false; wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64, + ", applied ver:%" PRId64", %"PRIx64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, - pRead->pWal->vers.appliedVer); + pRead->pWal->vers.appliedVer, pRead->readerId); // TODO: valid ver if (ver > pRead->pWal->vers.commitVer) { @@ -297,7 +297,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { code = walValidHeadCksum(pRead->pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, %"PRIx64, pRead->pWal->cfg.vgId, ver, + pRead->readerId); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -324,11 +325,13 @@ int32_t walSkipFetchBody(SWalReader *pRead) { int32_t walFetchBody(SWalReader *pRead) { SWalCont *pReadHead = &pRead->pHead->head; int64_t ver = pReadHead->version; + int32_t vgId = pRead->pWal->cfg.vgId; + int64_t id = pRead->readerId; wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64, - pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, - pRead->pWal->vers.appliedVer); + ", applied ver:%" PRId64 ", %" PRIx64, + vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, + pRead->pWal->vers.appliedVer, id); if (pRead->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); @@ -344,26 +347,25 @@ int32_t walFetchBody(SWalReader *pRead) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", - pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, %"PRIx64, + vgId, pReadHead->version, ver, tstrerror(terrno), pRead->readerId); } else { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", - pRead->pWal->cfg.vgId, pReadHead->version, ver); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, %"PRIx64, + vgId, pReadHead->version, ver, pRead->readerId); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } return -1; } if (pReadHead->version != ver) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pReadHead->version, ver); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", %"PRIx64, vgId, + pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(pRead->pHead) != 0) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, - ver); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, %" PRIx64, vgId, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } From 9a30573b8bdb62fb7830f86b057531491f408457 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 9 Sep 2023 16:12:08 +0800 Subject: [PATCH 24/28] fix(stream): disable scan wal when halt is set. --- source/dnode/vnode/src/tq/tq.c | 7 +++++-- source/libs/wal/src/walRead.c | 10 +++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index af08a0dfc4..ece29a492e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1090,14 +1090,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // now we can stop the stream task execution - streamTaskHalt(pStreamTask); + int64_t latestVer = 0; + taosThreadMutexLock(&pStreamTask->lock); + streamTaskHalt(pStreamTask); tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); + latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + taosThreadMutexUnlock(&pStreamTask->lock); // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; - int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); if (done) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9db81111f9..a45b1eb85d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -308,9 +308,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { int32_t walSkipFetchBody(SWalReader *pRead) { wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64, + ", applied ver:%" PRId64", 0x%"PRIx64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, - pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); + pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId); int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -329,7 +329,7 @@ int32_t walFetchBody(SWalReader *pRead) { int64_t id = pRead->readerId; wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64 ", %" PRIx64, + ", applied ver:%" PRId64 ", 0x%" PRIx64, vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, id); @@ -358,14 +358,14 @@ int32_t walFetchBody(SWalReader *pRead) { } if (pReadHead->version != ver) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", %"PRIx64, vgId, + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", 0x%"PRIx64, vgId, pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(pRead->pHead) != 0) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, %" PRIx64, vgId, ver, id); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } From 85a5c45098a1fba40b19e4b339fdaa30d9285af7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 10 Sep 2023 00:01:25 +0800 Subject: [PATCH 25/28] fix(stream): limit the already processed data. --- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 14 ++++++------- source/dnode/vnode/src/tq/tq.c | 18 ++++++++--------- source/dnode/vnode/src/tq/tqRead.c | 13 ++++++------ source/dnode/vnode/src/tq/tqStreamTask.c | 24 +++++++++++------------ source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamExec.c | 4 ++-- source/libs/stream/src/streamRecover.c | 3 ++- source/libs/stream/src/streamTask.c | 2 +- source/libs/wal/src/walRead.c | 13 ++++++------ 10 files changed, 49 insertions(+), 46 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index fc825b54dd..30c60bcf0d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -260,7 +260,7 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version - int64_t currentVer; // current offset in WAL, not serialize it + int64_t nextProcessVer; // current offset in WAL, not serialize it } SCheckpointInfo; typedef struct SStreamStatus { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c1a59416f6..2b1885fb0e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -86,18 +86,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; - qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); + pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } else { - if (pTask->chkInfo.currentVer == -1) { - pTask->chkInfo.currentVer = 0; + if (pTask->chkInfo.nextProcessVer == -1) { + pTask->chkInfo.nextProcessVer = 0; } } - qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 + qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.fillHistory, pTask->info.triggerParam); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ece29a492e..3c4ad50a20 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -834,14 +834,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; + pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->info.fillHistory, pTask->info.triggerParam); @@ -1121,7 +1121,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int64_t dstVer = pTask->dataRange.range.minVer - 1; - pTask->chkInfo.currentVer = dstVer; + pTask->chkInfo.nextProcessVer = dstVer; walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); @@ -1154,7 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug( "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, - id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); + id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); } code = streamTaskScanHistoryDataComplete(pTask); @@ -1289,8 +1289,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.currentVer); + tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.nextProcessVer); streamProcessRunReq(pTask); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); @@ -1429,10 +1429,10 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 ", schedStatus:%d", - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus); } else { // from the previous paused version and go on tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus); } if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cf5d160d75..cadbc70c6f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -187,25 +187,26 @@ end: int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) { int32_t code = -1; int32_t vgId = TD_VID(pTq->pVnode); + int64_t id = pHandle->pWalReader->readerId; int64_t offset = *fetchOffset; int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal); int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal); int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal); - wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64, - vgId, offset, lastVer, committedVer, appliedVer); + wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64, + vgId, offset, lastVer, committedVer, appliedVer, id); while (offset <= appliedVer) { if (walFetchHead(pHandle->pWalReader, offset) < 0) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 - ", no more log to return, reqId:0x%" PRIx64, - pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); + ", no more log to return, reqId:0x%" PRIx64 " 0x%" PRIx64, + pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id); goto END; } - tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, - pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId); + tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId, + pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id); if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4a2d5a2ea6..955614297b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -256,36 +256,36 @@ int32_t tqStartStreamTasks(STQ* pTq) { int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); - if (pTask->chkInfo.currentVer < firstVer) { + if (pTask->chkInfo.nextProcessVer < firstVer) { tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer); + vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer); - pTask->chkInfo.currentVer = firstVer; + pTask->chkInfo.nextProcessVer = firstVer; // todo need retry if failed - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); if (code != TSDB_CODE_SUCCESS) { return code; } // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); } else { int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); if (currentVer == -1) { // we only seek the read for the first time - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit return code; } // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.currentVer); + pTask->chkInfo.nextProcessVer); } } int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader); - if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) { + if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) { int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit return code; @@ -304,7 +304,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if (!pTask->status.appendTranstateBlock) { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); @@ -313,7 +313,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */ streamSchedExec(pTask); } else { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", id, ver, maxVer); } } @@ -421,12 +421,12 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = streamTaskPutDataIntoInputQ(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); - pTask->chkInfo.currentVer = ver; + pTask->chkInfo.nextProcessVer = ver; handleFillhistoryScanComplete(pTask, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, - pTask->chkInfo.currentVer); + pTask->chkInfo.nextProcessVer); } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index baf319d014..361602fac9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -287,7 +287,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, + pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, streamGetTaskStatusStr(prev)); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6f0357324a..8d282696c1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -563,11 +563,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { SIZE_IN_MB(resSize), totalBlocks); // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer); + ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64, - pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.currentVer); + pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer); pTask->chkInfo.checkpointVer = ver; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 5c03e3a95e..cf04659ac1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -504,7 +504,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; - pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; + // the query version range should be limited to the already processed data + pHTask->dataRange.range.maxVer = pTask->chkInfo.nextProcessVer - 1; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 162ae3f097..ea4c2e71bc 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -380,7 +380,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; - pTask->chkInfo.currentVer = ver; + pTask->chkInfo.nextProcessVer = ver; pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index a45b1eb85d..2eee04a27a 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } + while (fetchVer <= appliedVer) { if (walFetchHead(pReader, fetchVer) < 0) { return -1; @@ -257,7 +258,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { bool seeked = false; wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64", %"PRIx64, + ", applied ver:%" PRId64", 0x%"PRIx64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId); @@ -297,7 +298,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { code = walValidHeadCksum(pRead->pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, %"PRIx64, pRead->pWal->cfg.vgId, ver, + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%"PRIx64, pRead->pWal->cfg.vgId, ver, pRead->readerId); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; @@ -347,11 +348,11 @@ int32_t walFetchBody(SWalReader *pRead) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, %"PRIx64, - vgId, pReadHead->version, ver, tstrerror(terrno), pRead->readerId); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%"PRIx64, + vgId, pReadHead->version, ver, tstrerror(terrno), id); } else { - wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, %"PRIx64, - vgId, pReadHead->version, ver, pRead->readerId); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%"PRIx64, + vgId, pReadHead->version, ver, id); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } return -1; From 4ffb4a5b1f4a380fbede56fabcca2ce987a85921 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 10 Sep 2023 11:02:32 +0800 Subject: [PATCH 26/28] refactor: disable some logs. --- source/libs/stream/src/streamQueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index b65063f49a..19c75bcb03 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -358,7 +358,7 @@ static void fillBucket(STokenBucket* pBucket) { } pBucket->fillTimestamp = now; - qInfo("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); + qDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); } } From 10271aee26c337bc97444a7148148c36a1a9a4c6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 10 Sep 2023 14:12:03 +0800 Subject: [PATCH 27/28] fix(stream): fix syntax error in unit test. --- source/libs/wal/test/walMetaTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 0784db917a..70d8921be3 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -326,7 +326,7 @@ TEST_F(WalCleanDeleteEnv, roll) { TEST_F(WalKeepEnv, readHandleRead) { walResetEnv(); int code; - SWalReader* pRead = walOpenReader(pWal, NULL); + SWalReader* pRead = walOpenReader(pWal, NULL, 0); ASSERT(pRead != NULL); int i; @@ -387,7 +387,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { ASSERT_EQ(pWal->vers.lastVer, 99); - SWalReader* pRead = walOpenReader(pWal, NULL); + SWalReader* pRead = walOpenReader(pWal, NULL, 0); ASSERT(pRead != NULL); for (int i = 0; i < 1000; i++) { From a9321248a959cdad8f98d227fc78c7daf63b26fb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 10 Sep 2023 18:34:03 +0800 Subject: [PATCH 28/28] fix(stream): set the correct flag when there is one token existed. --- source/libs/stream/src/streamQueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 19c75bcb03..a667ec2371 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -365,8 +365,9 @@ static void fillBucket(STokenBucket* pBucket) { bool streamTaskHasAvailableToken(STokenBucket* pBucket) { fillBucket(pBucket); if (pBucket->numOfToken > 0) { - qDebug("remain token:%d", pBucket->numOfToken-1); - return --pBucket->numOfToken; +// qDebug("current token:%d", pBucket->numOfToken); + --pBucket->numOfToken; + return true; } else { return false; }